use std::collections::BTreeMap;
use anyhow::{bail, Result};
use serde_json::{json, Value};
use comfy_table::{Cell, CellAlignment};
use crate::api::client::ApiClient;
use crate::api::error::ApiError;
use crate::config::credentials::resolve_api_key;
use crate::config::manager::ConfigManager;
use super::dispatch::{print_output_with_opts, OutputOpts};
pub const METRIC_MAP: &[(&str, &str)] = &[
("CU_COMPUTATION", "CU_COMPUTATION"),
("CU_CAPACITY", "CU_CAPACITY"),
("CU_SIZE", "CU_SIZE"),
("REPLICA_COUNT", "REPLICA_COUNT"),
("STORAGE", "STORAGE_USE"),
("SEARCH_QPS", "REQ_SEARCH_COUNT"),
("QUERY_QPS", "REQ_QUERY_COUNT"),
("INSERT_QPS", "REQ_INSERT_COUNT"),
("UPSERT_QPS", "REQ_UPSERT_COUNT"),
("DELETE_QPS", "REQ_DELETE_COUNT"),
("BULK_INSERT_QPS", "REQ_BULK_INSERT_COUNT"),
("SEARCH_LATENCY_AVG", "REQ_SEARCH_LATENCY_AVG"),
("SEARCH_LATENCY_P99", "REQ_SEARCH_LATENCY_P99"),
("QUERY_LATENCY_AVG", "REQ_QUERY_LATENCY_AVG"),
("QUERY_LATENCY_P99", "REQ_QUERY_LATENCY_P99"),
("INSERT_LATENCY_AVG", "REQ_INSERT_LATENCY_AVG"),
("INSERT_LATENCY_P99", "REQ_INSERT_LATENCY_P99"),
("UPSERT_LATENCY_AVG", "REQ_UPSERT_LATENCY_AVG"),
("UPSERT_LATENCY_P99", "REQ_UPSERT_LATENCY_P99"),
("DELETE_LATENCY_AVG", "REQ_DELETE_LATENCY_AVG"),
("DELETE_LATENCY_P99", "REQ_DELETE_LATENCY_P99"),
("SEARCH_VPS", "VECTOR_REQ_SEARCH_COUNT"),
("INSERT_VPS", "VECTOR_REQ_INSERT_COUNT"),
("UPSERT_VPS", "VECTOR_REQ_UPSERT_COUNT"),
("DELETE_VPS", "VECTOR_REQ_DELETE_COUNT"),
("BULK_INSERT_VPS", "VECTOR_REQ_BULK_INSERT_COUNT"),
("SEARCH_FAIL_RATE", "REQ_FAIL_RATE_SEARCH"),
("QUERY_FAIL_RATE", "REQ_FAIL_RATE_QUERY"),
("INSERT_FAIL_RATE", "REQ_FAIL_RATE_INSERT"),
("UPSERT_FAIL_RATE", "REQ_FAIL_RATE_UPSERT"),
("DELETE_FAIL_RATE", "REQ_FAIL_RATE_DELETE"),
("BULK_INSERT_FAIL_RATE", "REQ_FAIL_RATE_BULK_INSERT"),
("ENTITIES", "ENTITIES_COUNT"),
("ENTITIES_LOADED", "ENTITIES_LOADED"),
("ENTITIES_INDEXED", "ENTITIES_INDEXED"),
("COLLECTIONS", "COLLECTIONS_COUNT"),
("SLOW_QUERIES", "SLOW_QUERY_COUNT"),
("READ_VCU", "READ_VCU"),
("WRITE_VCU", "WRITE_VCU"),
];
pub fn resolve_metric(name: &str) -> Result<String> {
let upper = name.to_uppercase();
for (cli_name, backend_name) in METRIC_MAP {
if *cli_name == upper {
return Ok(backend_name.to_string());
}
}
let valid: Vec<&str> = METRIC_MAP.iter().map(|(k, _)| *k).collect();
bail!(
"Unknown metric '{}'. Valid metrics:\n {}",
name,
valid.join(", ")
);
}
pub fn human_to_iso_duration(input: &str) -> Result<String> {
let input = input.trim();
if input.is_empty() {
bail!("Empty duration string");
}
let unit = input.chars().last().unwrap().to_ascii_lowercase();
let num_str = &input[..input.len() - 1];
let num: u64 = num_str
.parse()
.map_err(|_| anyhow::anyhow!("Invalid duration '{}': not a number", input))?;
match unit {
's' => Ok(format!("PT{}S", num)),
'm' => Ok(format!("PT{}M", num)),
'h' => Ok(format!("PT{}H", num)),
'd' => Ok(format!("PT{}H", num * 24)),
_ => bail!(
"Invalid duration unit '{}'. Use s (seconds), m (minutes), h (hours), d (days)",
unit
),
}
}
pub fn normalize_timestamp(ts: &str) -> String {
let ts = ts.trim();
if ts.len() == 10 && ts.chars().nth(4) == Some('-') {
return format!("{}T00:00:00Z", ts);
}
if ts.ends_with('Z') || ts.ends_with('z') {
return ts.to_string();
}
format!("{}Z", ts)
}
pub fn default_granularity(hours: f64) -> &'static str {
if hours <= 1.0 {
"PT1M"
} else if hours <= 6.0 {
"PT5M"
} else if hours <= 24.0 {
"PT15M"
} else if hours <= 168.0 {
"PT1H"
} else {
"PT6H"
}
}
fn pt_to_hours(pt: &str) -> f64 {
let s = pt.trim_start_matches("PT").trim_start_matches("pt");
if let Some(h) = s.strip_suffix('H').or_else(|| s.strip_suffix('h')) {
h.parse::<f64>().unwrap_or(1.0)
} else if let Some(m) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
m.parse::<f64>().unwrap_or(60.0) / 60.0
} else if let Some(sec) = s.strip_suffix('S').or_else(|| s.strip_suffix('s')) {
sec.parse::<f64>().unwrap_or(3600.0) / 3600.0
} else {
1.0
}
}
pub async fn run_from_args(
config_mgr: &ConfigManager,
raw_args: &[String],
output_opts: &OutputOpts<'_>,
) -> Result<()> {
if raw_args.iter().any(|a| a == "-h" || a == "--help") {
print!(
"Show cluster metrics.\n\n\
Usage: zz cluster metrics [OPTIONS]\n\n\
Options:\n\
\x20 {:24}{:30}Cluster ID (uses context if omitted)\n\
\x20 {:24}{:30}Metric name (repeatable). Omit for all metrics\n\
\x20 {:24}{:30}Time period: 10m, 1h, 6h, 24h, 3d, 7d [default: 6h]\n\
\x20 {:24}{:30}Start time (ISO 8601)\n\
\x20 {:24}{:30}End time (ISO 8601)\n\
\x20 {:24}{:30}Data granularity: 1m, 5m, 1h, 1d\n",
"--cluster-id", "<string>",
"--metric, -m", "<string>",
"--period", "<string>",
"--start", "<string>",
"--end", "<string>",
"--granularity, -g", "<string>",
);
return Ok(());
}
let mut cluster_id = None;
let mut metrics_list = Vec::new();
let mut period = None;
let mut start = None;
let mut end = None;
let mut granularity = None;
let mut i = 0;
while i < raw_args.len() {
match raw_args[i].as_str() {
"--cluster-id" => {
i += 1;
cluster_id = raw_args.get(i).cloned();
}
"--metric" | "-m" => {
i += 1;
if let Some(v) = raw_args.get(i) {
metrics_list.push(v.clone());
}
}
"--period" => {
i += 1;
period = raw_args.get(i).cloned();
}
"--start" => {
i += 1;
start = raw_args.get(i).cloned();
}
"--end" => {
i += 1;
end = raw_args.get(i).cloned();
}
"--granularity" | "-g" => {
i += 1;
granularity = raw_args.get(i).cloned();
}
_ => {}
}
i += 1;
}
run(config_mgr, cluster_id, metrics_list, period, start, end, granularity, output_opts).await
}
#[allow(clippy::too_many_arguments)]
pub async fn run(
config_mgr: &ConfigManager,
cluster_id: Option<String>,
metrics_list: Vec<String>,
period: Option<String>,
start: Option<String>,
end: Option<String>,
granularity: Option<String>,
output_opts: &OutputOpts<'_>,
) -> Result<()> {
let cluster_id = match cluster_id {
Some(id) => id,
None => {
let ctx = config_mgr.get_context();
ctx.cluster_id
.ok_or_else(|| anyhow::anyhow!(
"No --cluster-id provided and no cluster context set.\n\
Run: zilliz context set --cluster-id <id>"
))?
}
};
if metrics_list.is_empty() {
bail!(
"At least one --metric / -m is required.\n\
Valid metrics: {}",
METRIC_MAP
.iter()
.map(|(k, _)| *k)
.collect::<Vec<_>>()
.join(", ")
);
}
if period.is_some() && (start.is_some() || end.is_some()) {
bail!("--period and --start/--end are mutually exclusive.");
}
if start.is_some() != end.is_some() {
bail!("--start and --end must be used together.");
}
let metric_queries: Vec<Value> = metrics_list
.iter()
.map(|m| {
let backend = resolve_metric(m)?;
Ok(json!({"name": backend}))
})
.collect::<Result<Vec<_>>>()?;
let granularity_iso = if let Some(g) = &granularity {
human_to_iso_duration(g)?
} else {
let hours = if let Some(ref p) = period {
pt_to_hours(&human_to_iso_duration(p)?)
} else {
1.0 };
default_granularity(hours).to_string()
};
let mut body = json!({
"metricQueries": metric_queries,
"granularity": granularity_iso,
});
if let Some(ref p) = period {
body["period"] = json!(human_to_iso_duration(p)?);
} else if let (Some(ref s), Some(ref e)) = (&start, &end) {
body["start"] = json!(normalize_timestamp(s));
body["end"] = json!(normalize_timestamp(e));
} else {
body["period"] = json!("PT1H");
}
let api_key = resolve_api_key(output_opts.api_key, config_mgr).ok_or_else(|| ApiError::NoApiKey)?;
let client = ApiClient::new(api_key, "https://api.cloud.zilliz.com".to_string());
let path = format!(
"/v2/clusters/{}/metrics/query",
urlencoding::encode(&cluster_id)
);
let result = client.call("POST", &path, None, Some(&body)).await?;
if output_opts.format == "table" && output_opts.query.is_none() {
print_metrics_table(&result, &metrics_list, output_opts.no_header);
} else {
print_output_with_opts(&result, output_opts, None);
}
Ok(())
}
pub fn format_metric_value(val: f64) -> String {
if val == val.floor() && val < 1e9 {
format!("{}", val as i64)
} else if val.abs() < 0.01 {
format!("{:.4}", val)
} else {
let formatted = format!("{:.2}", val);
let parts: Vec<&str> = formatted.split('.').collect();
let int_part = parts[0];
let dec_part = parts.get(1).unwrap_or(&"");
let negative = int_part.starts_with('-');
let digits: &str = if negative { &int_part[1..] } else { int_part };
let with_commas: String = digits
.chars()
.rev()
.enumerate()
.map(|(i, c)| {
if i > 0 && i % 3 == 0 {
format!("{},", c)
} else {
c.to_string()
}
})
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect();
if negative {
format!("-{}.{}", with_commas, dec_part)
} else {
format!("{}.{}", with_commas, dec_part)
}
}
}
fn print_metrics_table(result: &Value, metric_names: &[String], no_header: bool) {
let results = match result.get("results").and_then(|v| v.as_array()) {
Some(r) => r,
None => {
println!("{}", serde_json::to_string_pretty(result).unwrap_or_default());
return;
}
};
let mut pivot: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
let mut metric_columns: Vec<String> = Vec::new();
for (i, metric_result) in results.iter().enumerate() {
let name = metric_result
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let unit = metric_result
.get("unit")
.and_then(|v| v.as_str())
.unwrap_or("");
let display_name = metric_names
.get(i)
.map(|n| n.to_string())
.unwrap_or_else(|| name.to_string());
let col_name = if unit.is_empty() {
display_name.clone()
} else {
format!("{} ({})", display_name, unit)
};
metric_columns.push(col_name.clone());
if let Some(values) = metric_result.get("values").and_then(|v| v.as_array()) {
for point in values {
let ts = point
.get("timestamp")
.and_then(|v| v.as_str())
.unwrap_or("");
let val = point
.get("value")
.and_then(|v| {
v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse::<f64>().ok()))
})
.map(format_metric_value)
.unwrap_or_default();
pivot
.entry(ts.to_string())
.or_default()
.insert(col_name.clone(), val);
}
}
}
if pivot.is_empty() {
println!("No metric data returned.");
return;
}
let mut headers = vec!["Timestamp".to_string()];
headers.extend(metric_columns.iter().cloned());
let mut table = super::formatter::create_table(&headers, no_header);
for (ts, values) in &pivot {
let display_ts = format_timestamp(ts);
let mut cells = vec![Cell::new(display_ts)];
for col in &metric_columns {
let val = values.get(col).cloned().unwrap_or_default();
cells.push(Cell::new(val).set_alignment(CellAlignment::Right));
}
table.add_row(cells);
}
println!("{}", table);
}
pub fn format_timestamp(ts: &str) -> String {
if ts.len() >= 19 {
let date_part = &ts[5..10]; let time_part = &ts[11..19]; format!("{} {}", date_part, time_part)
} else {
ts.to_string()
}
}