use std::io::Write;
use super::{Executable, default_tracing};
use crate::{
LOCALHOST,
common::{connect_and_check_version, query_running_dataflows, rpc},
formatting::OutputFormat,
};
use clap::Args;
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
use dora_message::{
cli_to_coordinator::CoordinatorControlClient, coordinator_to_cli::DataflowStatus, tarpc,
};
use eyre::{Context, eyre};
use serde::Serialize;
use tabwriter::TabWriter;
use uuid::Uuid;
#[derive(Debug, Args)]
pub struct ListArgs {
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
pub coordinator_addr: std::net::IpAddr,
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
pub coordinator_port: u16,
#[clap(long, value_name = "FORMAT", default_value_t = OutputFormat::Table)]
pub format: OutputFormat,
#[clap(long, value_name = "STATUS")]
pub status: Option<String>,
#[clap(long, value_name = "PATTERN")]
pub name: Option<String>,
#[clap(long, value_name = "FIELD")]
pub sort_by: Option<String>,
}
impl Executable for ListArgs {
async fn execute(self) -> eyre::Result<()> {
default_tracing()?;
let client = connect_and_check_version(self.coordinator_addr, self.coordinator_port)
.await
.wrap_err("failed to connect to dora coordinator")?;
list(&client, self.format, self.status, self.name, self.sort_by).await
}
}
#[derive(Serialize)]
struct OutputEntry {
uuid: Uuid,
name: String,
status: DataflowStatus,
nodes: usize,
cpu: f64,
memory: f64,
}
#[derive(Default)]
struct DataflowMetrics {
node_count: usize,
total_cpu: f64,
total_memory_mb: f64,
}
async fn list(
client: &CoordinatorControlClient,
format: OutputFormat,
status_filter: Option<String>,
name_filter: Option<String>,
sort_by: Option<String>,
) -> Result<(), eyre::ErrReport> {
let list = query_running_dataflows(client).await?;
let node_infos = rpc(
"get node info",
client.get_node_info(tarpc::context::current()),
)
.await?;
let mut dataflow_metrics: std::collections::BTreeMap<Uuid, DataflowMetrics> =
std::collections::BTreeMap::new();
for node_info in node_infos {
let metrics = dataflow_metrics
.entry(node_info.dataflow_id)
.or_insert_with(DataflowMetrics::default);
metrics.node_count += 1;
if let Some(node_metrics) = node_info.metrics {
metrics.total_cpu += node_metrics.cpu_usage as f64;
metrics.total_memory_mb += node_metrics.memory_mb;
}
}
let mut entries: Vec<OutputEntry> = list
.0
.into_iter()
.map(|entry| {
let uuid = entry.id.uuid;
let metrics = dataflow_metrics.get(&uuid);
let (nodes, cpu, memory) = if let Some(m) = metrics {
(
m.node_count,
if m.total_cpu >= 0.1 { m.total_cpu } else { 0.0 },
if m.total_memory_mb >= 0.1 {
m.total_memory_mb / 1024.0
} else {
0.0
},
)
} else {
(0, 0.0, 0.0)
};
OutputEntry {
uuid,
name: entry.id.name.unwrap_or_default(),
status: entry.status,
nodes,
cpu,
memory,
}
})
.collect();
if let Some(ref status_str) = status_filter {
let status_lower = status_str.to_lowercase();
entries.retain(|entry| {
let entry_status = match entry.status {
DataflowStatus::Running => "running",
DataflowStatus::Finished => "finished",
DataflowStatus::Failed => "failed",
};
entry_status.starts_with(&status_lower)
});
}
if let Some(ref name_pattern) = name_filter {
let pattern_lower = name_pattern.to_lowercase();
entries.retain(|entry| entry.name.to_lowercase().contains(&pattern_lower));
}
if let Some(ref sort_field) = sort_by {
let sort_lower = sort_field.to_lowercase();
match sort_lower.as_str() {
"cpu" => {
entries.sort_by(|a, b| {
b.cpu
.partial_cmp(&a.cpu)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
"memory" => {
entries.sort_by(|a, b| {
b.memory
.partial_cmp(&a.memory)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
_ => {
eprintln!(
"Unknown sort field: {}. Valid options: cpu, memory",
sort_field
);
}
}
}
match format {
OutputFormat::Table => {
let mut tw = TabWriter::new(std::io::stdout().lock());
tw.write_all(format!("UUID\tName\tStatus\tNodes\tCPU\tMemory\n").as_bytes())?;
for entry in entries {
let status = match entry.status {
DataflowStatus::Running => "Running",
DataflowStatus::Finished => "Succeeded",
DataflowStatus::Failed => "Failed",
};
tw.write_all(
format!(
"{}\t{}\t{}\t{}\t{}\t{}\n",
entry.uuid,
entry.name,
status,
entry.nodes,
format!("{:.1}%", entry.cpu),
format!("{:.1} GB", entry.memory)
)
.as_bytes(),
)?;
}
tw.flush()?;
}
OutputFormat::Json => {
for entry in entries {
println!("{}", serde_json::to_string(&entry)?);
}
}
}
Ok(())
}