use std::io::Write;
use clap::Args;
use serde::Serialize;
use tabwriter::TabWriter;
use uuid::Uuid;
use crate::{
command::{Executable, default_tracing},
common::{CoordinatorOptions, rpc},
formatting::OutputFormat,
};
use dora_message::{
cli_to_coordinator::CoordinatorControlClient, coordinator_to_cli::NodeInfo, tarpc,
};
#[derive(Debug, Args)]
#[clap(verbatim_doc_comment)]
pub struct List {
#[clap(long, short = 'd', value_name = "NAME_OR_UUID")]
pub dataflow: Option<String>,
#[clap(long, value_name = "FORMAT", default_value_t = OutputFormat::Table)]
pub format: OutputFormat,
#[clap(flatten)]
coordinator: CoordinatorOptions,
}
impl Executable for List {
async fn execute(self) -> eyre::Result<()> {
default_tracing()?;
let client = self.coordinator.connect_rpc().await?;
list(&client, self.dataflow, self.format).await
}
}
#[derive(Serialize)]
struct OutputEntry {
node: String,
status: String,
pid: String,
cpu: String,
memory: String,
#[serde(skip_serializing_if = "Option::is_none")]
dataflow: Option<String>,
}
async fn list(
client: &CoordinatorControlClient,
dataflow_filter: Option<String>,
format: OutputFormat,
) -> eyre::Result<()> {
let node_infos = rpc(
"get node info",
client.get_node_info(tarpc::context::current()),
)
.await?;
let filtered_nodes: Vec<NodeInfo> = if let Some(ref filter) = dataflow_filter {
let filter_uuid = Uuid::parse_str(filter).ok();
node_infos
.into_iter()
.filter(|node| {
if let Some(uuid) = filter_uuid {
node.dataflow_id == uuid
} else {
node.dataflow_name.as_deref() == Some(filter)
}
})
.collect()
} else {
node_infos
};
let entries: Vec<OutputEntry> = filtered_nodes
.into_iter()
.map(|node| {
let (status, pid, cpu, memory) = if let Some(metrics) = node.metrics {
(
"Running".to_string(),
metrics.pid.to_string(),
format!("{:.1}%", metrics.cpu_usage),
format!("{:.0} MB", metrics.memory_mb),
)
} else {
(
"Unknown".to_string(),
"-".to_string(),
"-".to_string(),
"-".to_string(),
)
};
OutputEntry {
node: node.node_id.to_string(),
status,
pid,
cpu,
memory,
dataflow: if dataflow_filter.is_none() {
Some(
node.dataflow_name
.unwrap_or_else(|| node.dataflow_id.to_string()),
)
} else {
None
},
}
})
.collect();
match format {
OutputFormat::Table => {
let mut tw = TabWriter::new(std::io::stdout().lock());
if dataflow_filter.is_none() {
tw.write_all(b"NODE\tSTATUS\tPID\tCPU\tMEMORY\tDATAFLOW\n")?;
} else {
tw.write_all(b"NODE\tSTATUS\tPID\tCPU\tMEMORY\n")?;
}
for entry in entries {
if let Some(ref dataflow) = entry.dataflow {
tw.write_all(
format!(
"{}\t{}\t{}\t{}\t{}\t{}\n",
entry.node, entry.status, entry.pid, entry.cpu, entry.memory, dataflow
)
.as_bytes(),
)?;
} else {
tw.write_all(
format!(
"{}\t{}\t{}\t{}\t{}\n",
entry.node, entry.status, entry.pid, entry.cpu, entry.memory
)
.as_bytes(),
)?;
}
}
tw.flush()?;
}
OutputFormat::Json => {
for entry in entries {
println!("{}", serde_json::to_string(&entry)?);
}
}
}
Ok(())
}