use std::collections::BTreeMap;
use anyhow::{bail, Result};
use serde_json::{json, Value};
use comfy_table::{Cell, CellAlignment};
use crate::api::client::ApiClient;
use crate::api::error::ApiError;
use crate::config::credentials::resolve_api_key;
use crate::config::manager::ConfigManager;
use crate::model::loader::Models;
use super::dispatch::{print_output_with_opts, OutputOpts};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MetricScope {
Cluster,
Collection,
Both,
}
pub const METRIC_MAP: &[(&str, &str, MetricScope)] = &[
("CU_COMPUTATION", "CU_COMPUTATION", MetricScope::Cluster),
("CU_CAPACITY", "CU_CAPACITY", MetricScope::Cluster),
("CU_SIZE", "CU_SIZE", MetricScope::Cluster),
("REPLICA_COUNT", "REPLICA_COUNT", MetricScope::Cluster),
("STORAGE", "STORAGE_USE", MetricScope::Cluster),
("SEARCH_QPS", "REQ_SEARCH_COUNT", MetricScope::Both),
(
"HYBRID_SEARCH_QPS",
"REQ_HYBRID_SEARCH_COUNT",
MetricScope::Both,
),
("QUERY_QPS", "REQ_QUERY_COUNT", MetricScope::Both),
("INSERT_QPS", "REQ_INSERT_COUNT", MetricScope::Both),
("UPSERT_QPS", "REQ_UPSERT_COUNT", MetricScope::Both),
("DELETE_QPS", "REQ_DELETE_COUNT", MetricScope::Both),
(
"BULK_INSERT_QPS",
"REQ_BULK_INSERT_COUNT",
MetricScope::Both,
),
(
"SEARCH_LATENCY_AVG",
"REQ_SEARCH_LATENCY_AVG",
MetricScope::Both,
),
(
"SEARCH_LATENCY_P99",
"REQ_SEARCH_LATENCY_P99",
MetricScope::Both,
),
(
"HYBRID_SEARCH_LATENCY_AVG",
"REQ_HYBRID_SEARCH_LATENCY_AVG",
MetricScope::Both,
),
(
"HYBRID_SEARCH_LATENCY_P99",
"REQ_HYBRID_SEARCH_LATENCY_P99",
MetricScope::Both,
),
(
"QUERY_LATENCY_AVG",
"REQ_QUERY_LATENCY_AVG",
MetricScope::Both,
),
(
"QUERY_LATENCY_P99",
"REQ_QUERY_LATENCY_P99",
MetricScope::Both,
),
(
"INSERT_LATENCY_AVG",
"REQ_INSERT_LATENCY_AVG",
MetricScope::Both,
),
(
"INSERT_LATENCY_P99",
"REQ_INSERT_LATENCY_P99",
MetricScope::Both,
),
(
"UPSERT_LATENCY_AVG",
"REQ_UPSERT_LATENCY_AVG",
MetricScope::Both,
),
(
"UPSERT_LATENCY_P99",
"REQ_UPSERT_LATENCY_P99",
MetricScope::Both,
),
(
"DELETE_LATENCY_AVG",
"REQ_DELETE_LATENCY_AVG",
MetricScope::Both,
),
(
"DELETE_LATENCY_P99",
"REQ_DELETE_LATENCY_P99",
MetricScope::Both,
),
("SEARCH_VPS", "VECTOR_REQ_SEARCH_COUNT", MetricScope::Both),
("INSERT_VPS", "VECTOR_REQ_INSERT_COUNT", MetricScope::Both),
("UPSERT_VPS", "VECTOR_REQ_UPSERT_COUNT", MetricScope::Both),
("DELETE_VPS", "VECTOR_REQ_DELETE_COUNT", MetricScope::Both),
(
"BULK_INSERT_VPS",
"VECTOR_REQ_BULK_INSERT_COUNT",
MetricScope::Both,
),
(
"SEARCH_FAIL_RATE",
"REQ_FAIL_RATE_SEARCH",
MetricScope::Both,
),
(
"HYBRID_SEARCH_FAIL_RATE",
"REQ_FAIL_RATE_HYBRID_SEARCH",
MetricScope::Both,
),
("QUERY_FAIL_RATE", "REQ_FAIL_RATE_QUERY", MetricScope::Both),
(
"INSERT_FAIL_RATE",
"REQ_FAIL_RATE_INSERT",
MetricScope::Both,
),
(
"UPSERT_FAIL_RATE",
"REQ_FAIL_RATE_UPSERT",
MetricScope::Both,
),
(
"DELETE_FAIL_RATE",
"REQ_FAIL_RATE_DELETE",
MetricScope::Both,
),
(
"BULK_INSERT_FAIL_RATE",
"REQ_FAIL_RATE_BULK_INSERT",
MetricScope::Both,
),
("ENTITIES", "ENTITIES_COUNT", MetricScope::Both),
("ENTITIES_LOADED", "ENTITIES_LOADED", MetricScope::Both),
("ENTITIES_INDEXED", "ENTITIES_INDEXED", MetricScope::Both),
("COLLECTIONS", "COLLECTIONS_COUNT", MetricScope::Cluster),
("SLOW_QUERIES", "SLOW_QUERY_COUNT", MetricScope::Cluster),
("READ_VCU", "READ_VCU", MetricScope::Cluster),
("WRITE_VCU", "WRITE_VCU", MetricScope::Cluster),
];
fn scope_compatible(entry_scope: MetricScope, requested: MetricScope) -> bool {
match (entry_scope, requested) {
(MetricScope::Both, _) => true,
(_, MetricScope::Both) => true,
(a, b) => a == b,
}
}
pub fn resolve_metric_scoped(name: &str, scope: MetricScope) -> Result<String> {
let upper = name.to_uppercase();
for (cli_name, backend_name, entry_scope) in METRIC_MAP {
if *cli_name == upper {
if !scope_compatible(*entry_scope, scope) {
match scope {
MetricScope::Collection => bail!(
"Metric '{}' is cluster-scope only and cannot be used with --collection-name.",
cli_name
),
MetricScope::Cluster => bail!(
"Metric '{}' is collection-scope only.",
cli_name
),
MetricScope::Both => {}
}
}
return Ok(backend_name.to_string());
}
}
let valid: Vec<&str> = METRIC_MAP
.iter()
.filter(|(_, _, s)| scope_compatible(*s, scope))
.map(|(k, _, _)| *k)
.collect();
bail!(
"Unknown metric '{}'. Valid metrics:\n {}",
name,
valid.join(", ")
);
}
pub fn resolve_metric(name: &str) -> Result<String> {
resolve_metric_scoped(name, MetricScope::Both)
}
pub fn human_to_iso_duration(input: &str) -> Result<String> {
let input = input.trim();
if input.is_empty() {
bail!("Empty duration string");
}
let unit = input.chars().last().unwrap().to_ascii_lowercase();
let num_str = &input[..input.len() - 1];
let num: u64 = num_str
.parse()
.map_err(|_| anyhow::anyhow!("Invalid duration '{}': not a number", input))?;
match unit {
's' => Ok(format!("PT{}S", num)),
'm' => Ok(format!("PT{}M", num)),
'h' => Ok(format!("PT{}H", num)),
'd' => Ok(format!("PT{}H", num * 24)),
_ => bail!(
"Invalid duration unit '{}'. Use s (seconds), m (minutes), h (hours), d (days)",
unit
),
}
}
pub fn normalize_timestamp(ts: &str) -> String {
let ts = ts.trim();
if ts.len() == 10 && ts.chars().nth(4) == Some('-') {
return format!("{}T00:00:00Z", ts);
}
if ts.ends_with('Z') || ts.ends_with('z') {
return ts.to_string();
}
format!("{}Z", ts)
}
pub fn default_granularity(hours: f64) -> &'static str {
if hours <= 1.0 {
"PT1M"
} else if hours <= 6.0 {
"PT5M"
} else if hours <= 24.0 {
"PT15M"
} else if hours <= 168.0 {
"PT1H"
} else {
"PT6H"
}
}
fn default_period_iso(_scope: MetricScope) -> &'static str {
"PT1H"
}
fn default_period_hours(_scope: MetricScope) -> f64 {
1.0
}
fn range_hours(start: &str, end: &str) -> Option<f64> {
let s = chrono::DateTime::parse_from_rfc3339(&normalize_timestamp(start)).ok()?;
let e = chrono::DateTime::parse_from_rfc3339(&normalize_timestamp(end)).ok()?;
let secs = (e - s).num_seconds();
if secs <= 0 {
None
} else {
Some(secs as f64 / 3600.0)
}
}
fn pt_to_hours(pt: &str) -> f64 {
let s = pt.trim_start_matches("PT").trim_start_matches("pt");
if let Some(h) = s.strip_suffix('H').or_else(|| s.strip_suffix('h')) {
h.parse::<f64>().unwrap_or(1.0)
} else if let Some(m) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
m.parse::<f64>().unwrap_or(60.0) / 60.0
} else if let Some(sec) = s.strip_suffix('S').or_else(|| s.strip_suffix('s')) {
sec.parse::<f64>().unwrap_or(3600.0) / 3600.0
} else {
1.0
}
}
fn take_value(args: &[String], i: &mut usize, flag: &str) -> Result<String> {
*i += 1;
let v = args
.get(*i)
.ok_or_else(|| anyhow::anyhow!("Flag `{}` requires a value.", flag))?;
if v.starts_with('-') {
bail!(
"Flag `{}` requires a value, got `{}` (looks like another flag).",
flag,
v
);
}
Ok(v.clone())
}
pub async fn run_from_args(
models: &Models,
config_mgr: &ConfigManager,
raw_args: &[String],
output_opts: &OutputOpts<'_>,
) -> Result<()> {
run_from_args_scoped(
models,
config_mgr,
raw_args,
output_opts,
MetricScope::Cluster,
)
.await
}
pub async fn run_from_args_for_collection(
models: &Models,
config_mgr: &ConfigManager,
raw_args: &[String],
output_opts: &OutputOpts<'_>,
) -> Result<()> {
run_from_args_scoped(
models,
config_mgr,
raw_args,
output_opts,
MetricScope::Collection,
)
.await
}
async fn run_from_args_scoped(
models: &Models,
config_mgr: &ConfigManager,
raw_args: &[String],
output_opts: &OutputOpts<'_>,
scope: MetricScope,
) -> Result<()> {
if raw_args.iter().any(|a| a == "-h" || a == "--help") {
match scope {
MetricScope::Collection => {
print!(
"Show collection metrics.\n\n\
Usage: zilliz collection metrics [OPTIONS]\n\n\
Options:\n\
\x20 {:28}{:30}Cluster ID (overrides context if provided)\n\
\x20 {:28}{:30}Collection name (required)\n\
\x20 {:28}{:30}Metric name (required, repeatable)\n\
\x20 {:28}{:30}Time period: 10m, 1h, 6h, 24h, 3d, 7d [default: 1h]\n\
\x20 {:28}{:30}Start time (ISO 8601)\n\
\x20 {:28}{:30}End time (ISO 8601)\n\
\x20 {:28}{:30}Data granularity: 1m, 5m, 1h, 1d\n",
"--cluster-id",
"<string>",
"--collection-name, -c",
"<string>",
"--metric, -m",
"<string>",
"--period",
"<string>",
"--start",
"<string>",
"--end",
"<string>",
"--granularity, -g",
"<string>",
);
}
_ => {
print!(
"Show cluster metrics.\n\n\
Usage: zilliz cluster metrics [OPTIONS]\n\n\
Options:\n\
\x20 {:24}{:30}Cluster ID (uses context if omitted)\n\
\x20 {:24}{:30}Metric name (required, repeatable)\n\
\x20 {:24}{:30}Time period: 10m, 1h, 6h, 24h, 3d, 7d [default: 1h]\n\
\x20 {:24}{:30}Start time (ISO 8601)\n\
\x20 {:24}{:30}End time (ISO 8601)\n\
\x20 {:24}{:30}Data granularity: 1m, 5m, 1h, 1d\n",
"--cluster-id",
"<string>",
"--metric, -m",
"<string>",
"--period",
"<string>",
"--start",
"<string>",
"--end",
"<string>",
"--granularity, -g",
"<string>",
);
}
}
return Ok(());
}
let mut cluster_id = None;
let mut collection_name: Option<String> = None;
let mut metrics_list = Vec::new();
let mut period = None;
let mut start = None;
let mut end = None;
let mut granularity = None;
let mut i = 0;
while i < raw_args.len() {
let flag = raw_args[i].as_str();
match flag {
"--cluster-id" => {
cluster_id = Some(take_value(raw_args, &mut i, flag)?);
}
"--collection-name" | "-c" => {
collection_name = Some(take_value(raw_args, &mut i, flag)?);
}
"--metric" | "-m" => {
metrics_list.push(take_value(raw_args, &mut i, flag)?);
}
"--period" => {
period = Some(take_value(raw_args, &mut i, flag)?);
}
"--start" => {
start = Some(take_value(raw_args, &mut i, flag)?);
}
"--end" => {
end = Some(take_value(raw_args, &mut i, flag)?);
}
"--granularity" | "-g" => {
granularity = Some(take_value(raw_args, &mut i, flag)?);
}
_ => {}
}
i += 1;
}
if scope == MetricScope::Collection {
let name = collection_name
.as_ref()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.ok_or_else(|| {
anyhow::anyhow!(
"--collection-name / -c is required for `zilliz collection metrics`."
)
})?;
collection_name = Some(name);
} else {
if collection_name
.as_ref()
.map(|s| !s.trim().is_empty())
.unwrap_or(false)
{
bail!("--collection-name / -c is only valid for `zilliz collection metrics`.");
}
collection_name = None;
}
run(
models,
config_mgr,
cluster_id,
collection_name,
metrics_list,
period,
start,
end,
granularity,
scope,
output_opts,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn run(
models: &Models,
config_mgr: &ConfigManager,
cluster_id: Option<String>,
collection_name: Option<String>,
metrics_list: Vec<String>,
period: Option<String>,
start: Option<String>,
end: Option<String>,
granularity: Option<String>,
scope: MetricScope,
output_opts: &OutputOpts<'_>,
) -> Result<()> {
let cluster_id = match cluster_id {
Some(id) => id,
None => {
let ctx = config_mgr.get_context();
ctx.cluster_id.ok_or_else(|| {
anyhow::anyhow!(
"No --cluster-id provided and no cluster context set.\n\
Run: zilliz context set --cluster-id <id>"
)
})?
}
};
if metrics_list.is_empty() {
bail!(
"At least one --metric / -m is required.\n\
Valid metrics: {}",
METRIC_MAP
.iter()
.filter(|(_, _, s)| scope_compatible(*s, scope))
.map(|(k, _, _)| *k)
.collect::<Vec<_>>()
.join(", ")
);
}
if period.is_some() && (start.is_some() || end.is_some()) {
bail!("--period and --start/--end are mutually exclusive.");
}
if start.is_some() != end.is_some() {
bail!("--start and --end must be used together.");
}
let body = build_request_body(
&metrics_list,
collection_name.as_deref(),
period.as_deref(),
start.as_deref(),
end.as_deref(),
granularity.as_deref(),
scope,
)?;
let api_key =
resolve_api_key(output_opts.api_key, config_mgr).ok_or_else(|| ApiError::NoApiKey)?;
let base_url =
super::endpoint::resolve_control_plane_url(config_mgr, &models.control_plane, None);
let client = ApiClient::new(api_key, base_url);
let path = format!(
"/v2/clusters/{}/metrics/query",
urlencoding::encode(&cluster_id)
);
let result = client.call("POST", &path, None, Some(&body)).await?;
if output_opts.format == "table" && output_opts.query.is_none() {
print_metrics_table(&result, &metrics_list, output_opts.no_header);
} else {
print_output_with_opts(&result, output_opts, None);
}
Ok(())
}
pub fn build_request_body(
metrics_list: &[String],
collection_name: Option<&str>,
period: Option<&str>,
start: Option<&str>,
end: Option<&str>,
granularity: Option<&str>,
scope: MetricScope,
) -> Result<Value> {
let metric_queries: Vec<Value> = metrics_list
.iter()
.map(|m| {
let backend = resolve_metric_scoped(m, scope)?;
Ok(json!({"name": backend}))
})
.collect::<Result<Vec<_>>>()?;
let granularity_iso = if let Some(g) = granularity {
human_to_iso_duration(g)?
} else {
let hours = if let Some(p) = period {
pt_to_hours(&human_to_iso_duration(p)?)
} else if let (Some(s), Some(e)) = (start, end) {
range_hours(s, e).unwrap_or(1.0)
} else {
default_period_hours(scope)
};
default_granularity(hours).to_string()
};
let mut body = json!({
"metricQueries": metric_queries,
"granularity": granularity_iso,
});
if let Some(name) = collection_name {
body["collectionName"] = json!(name);
}
if let Some(p) = period {
body["period"] = json!(human_to_iso_duration(p)?);
} else if let (Some(s), Some(e)) = (start, end) {
body["start"] = json!(normalize_timestamp(s));
body["end"] = json!(normalize_timestamp(e));
} else {
body["period"] = json!(default_period_iso(scope));
}
Ok(body)
}
pub fn format_metric_value(val: f64) -> String {
if val == val.floor() && val < 1e9 {
format!("{}", val as i64)
} else if val.abs() < 0.01 {
format!("{:.4}", val)
} else {
let formatted = format!("{:.2}", val);
let parts: Vec<&str> = formatted.split('.').collect();
let int_part = parts[0];
let dec_part = parts.get(1).unwrap_or(&"");
let negative = int_part.starts_with('-');
let digits: &str = if negative { &int_part[1..] } else { int_part };
let with_commas: String = digits
.chars()
.rev()
.enumerate()
.map(|(i, c)| {
if i > 0 && i % 3 == 0 {
format!("{},", c)
} else {
c.to_string()
}
})
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect();
if negative {
format!("-{}.{}", with_commas, dec_part)
} else {
format!("{}.{}", with_commas, dec_part)
}
}
}
fn print_metrics_table(result: &Value, metric_names: &[String], no_header: bool) {
let results = match result.get("results").and_then(|v| v.as_array()) {
Some(r) => r,
None => {
println!(
"{}",
serde_json::to_string_pretty(result).unwrap_or_default()
);
return;
}
};
let mut pivot: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
let mut metric_columns: Vec<String> = Vec::new();
for (i, metric_result) in results.iter().enumerate() {
let name = metric_result
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let unit = metric_result
.get("unit")
.and_then(|v| v.as_str())
.unwrap_or("");
let display_name = metric_names
.get(i)
.map(|n| n.to_string())
.unwrap_or_else(|| name.to_string());
let col_name = if unit.is_empty() {
display_name.clone()
} else {
format!("{} ({})", display_name, unit)
};
metric_columns.push(col_name.clone());
if let Some(values) = metric_result.get("values").and_then(|v| v.as_array()) {
for point in values {
let ts = point
.get("timestamp")
.and_then(|v| v.as_str())
.unwrap_or("");
let val = point
.get("value")
.and_then(|v| {
if v.is_null() {
return None;
}
v.as_f64()
.or_else(|| v.as_str().and_then(|s| s.parse::<f64>().ok()))
})
.map(format_metric_value)
.unwrap_or_else(|| "-".to_string());
pivot
.entry(ts.to_string())
.or_default()
.insert(col_name.clone(), val);
}
}
}
if pivot.is_empty() {
println!("No metric data returned.");
return;
}
let mut headers = vec!["Timestamp".to_string()];
headers.extend(metric_columns.iter().cloned());
let mut table = super::formatter::create_table(&headers, no_header);
for (ts, values) in &pivot {
let display_ts = format_timestamp(ts);
let mut cells = vec![Cell::new(display_ts)];
for col in &metric_columns {
let val = values.get(col).cloned().unwrap_or_else(|| "-".to_string());
cells.push(Cell::new(val).set_alignment(CellAlignment::Right));
}
table.add_row(cells);
}
println!("{}", table);
}
pub fn format_timestamp(ts: &str) -> String {
if ts.len() >= 19 {
let date_part = &ts[5..10]; let time_part = &ts[11..19]; format!("{} {}", date_part, time_part)
} else {
ts.to_string()
}
}