use std::collections::BTreeMap;
use std::io::Write;
use anyhow::{Context, Result};
use chrono::{DateTime, TimeZone, Utc};
use clap::Parser;
use crate::cli::datadog::format::{output_as, OutputFormat};
use crate::cli::datadog::helpers::create_client;
use crate::datadog::client::DatadogClient;
use crate::datadog::metrics_api::MetricsApi;
use crate::datadog::time::parse_time_range;
use crate::datadog::types::{MetricQueryResponse, MetricSeries};
#[derive(Parser)]
pub struct QueryCommand {
#[arg(long)]
pub query: String,
#[arg(long)]
pub from: String,
#[arg(long)]
pub to: Option<String>,
#[arg(short = 'o', long, value_enum, default_value_t = OutputFormat::Table)]
pub output: OutputFormat,
}
impl QueryCommand {
pub async fn execute(self) -> Result<()> {
let (from_ts, to_ts) = parse_time_range(&self.from, self.to.as_deref())?;
let (client, _site) = create_client()?;
run_query(&client, &self.query, from_ts, to_ts, &self.output).await
}
}
async fn run_query(
client: &DatadogClient,
query: &str,
from: i64,
to: i64,
output: &OutputFormat,
) -> Result<()> {
let response = MetricsApi::new(client).point_query(query, from, to).await?;
if output_as(&response, output)? {
return Ok(());
}
let stdout = std::io::stdout();
let mut handle = stdout.lock();
render_table(&response, &mut handle)
}
pub(crate) fn render_table(response: &MetricQueryResponse, out: &mut dyn Write) -> Result<()> {
if response.series.is_empty() {
writeln!(out, "No series returned.").context("Failed to write empty-table message")?;
return Ok(());
}
let timestamps = collect_timestamps(&response.series);
let labels: Vec<String> = response
.series
.iter()
.map(|s| s.label().to_string())
.collect();
let rendered_values = render_values(&response.series, ×tamps);
let ts_label = format_timestamp(*timestamps.first().unwrap_or(&0.0));
let ts_width = ts_label.len().max("TIMESTAMP".len());
let col_widths: Vec<usize> = labels
.iter()
.enumerate()
.map(|(i, label)| {
let data_max = rendered_values
.iter()
.map(|row| row[i].len())
.max()
.unwrap_or(0);
label.len().max(data_max)
})
.collect();
let header_cells: Vec<&str> = labels.iter().map(String::as_str).collect();
write_row(out, "TIMESTAMP", ts_width, &header_cells, &col_widths)?;
let ts_sep = "-".repeat(ts_width);
let seps: Vec<String> = col_widths.iter().map(|w| "-".repeat(*w)).collect();
let sep_cells: Vec<&str> = seps.iter().map(String::as_str).collect();
write_row(out, &ts_sep, ts_width, &sep_cells, &col_widths)?;
for (i, ts) in timestamps.iter().enumerate() {
let ts_str = format_timestamp(*ts);
let row: Vec<&str> = rendered_values[i].iter().map(String::as_str).collect();
write_row(out, &ts_str, ts_width, &row, &col_widths)?;
}
Ok(())
}
fn collect_timestamps(series: &[MetricSeries]) -> Vec<f64> {
let mut seen: BTreeMap<u64, f64> = BTreeMap::new();
for s in series {
for (ts, _) in &s.pointlist {
seen.entry(ts.to_bits()).or_insert(*ts);
}
}
let mut ts: Vec<f64> = seen.into_values().collect();
ts.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
ts
}
fn render_values(series: &[MetricSeries], timestamps: &[f64]) -> Vec<Vec<String>> {
timestamps
.iter()
.map(|ts| {
series
.iter()
.map(|s| match find_value(s, *ts) {
Some(v) => format_value(v),
None => "-".to_string(),
})
.collect()
})
.collect()
}
fn find_value(series: &MetricSeries, ts: f64) -> Option<f64> {
series
.pointlist
.iter()
.find(|(t, _)| t.to_bits() == ts.to_bits())
.and_then(|(_, v)| *v)
}
fn format_value(v: f64) -> String {
if v.fract() == 0.0 && v.abs() < 1e15 {
format!("{v:.0}")
} else {
format!("{v:.6}")
}
}
fn format_timestamp(ms: f64) -> String {
let secs = (ms / 1_000.0) as i64;
let nsec = ((ms.rem_euclid(1_000.0)) * 1_000_000.0) as u32;
let dt: DateTime<Utc> = Utc
.timestamp_opt(secs, nsec)
.single()
.unwrap_or_else(Utc::now);
dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
}
fn write_row(
out: &mut dyn Write,
ts_cell: &str,
ts_width: usize,
cells: &[&str],
col_widths: &[usize],
) -> Result<()> {
write!(out, "{ts_cell:<ts_width$}").context("Failed to write timestamp cell")?;
for (cell, width) in cells.iter().zip(col_widths.iter()) {
write!(out, " {cell:<width$}").context("Failed to write series cell")?;
}
writeln!(out).context("Failed to terminate row")?;
Ok(())
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::cli::datadog::format::write_output;
use crate::datadog::types::MetricSeries;
struct FailingWriter;
impl Write for FailingWriter {
fn write(&mut self, _: &[u8]) -> std::io::Result<usize> {
Err(std::io::Error::other("test forced write failure"))
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
fn sample_body() -> serde_json::Value {
serde_json::json!({
"status": "ok",
"from_date": 1_700_000_000_000_i64,
"to_date": 1_700_000_060_000_i64,
"series": [
{
"metric": "avg:system.cpu.user{*}",
"display_name": "avg:system.cpu.user{*}",
"expression": "avg:system.cpu.user{*}",
"pointlist": [
[1_700_000_000_000_i64, 0.5_f64],
[1_700_000_030_000_i64, 0.6_f64]
]
},
{
"metric": "avg:system.cpu.idle{*}",
"display_name": "avg:system.cpu.idle{*}",
"expression": "avg:system.cpu.idle{*}",
"pointlist": [
[1_700_000_030_000_i64, 99.4_f64],
[1_700_000_060_000_i64, null]
]
}
]
})
}
fn sample_response() -> MetricQueryResponse {
serde_json::from_value(sample_body()).unwrap()
}
#[test]
fn format_value_integers_drop_fraction() {
assert_eq!(format_value(1.0), "1");
assert_eq!(format_value(-3.0), "-3");
}
#[test]
fn format_value_fractional_has_six_places() {
assert_eq!(format_value(1.5), "1.500000");
}
#[test]
fn format_value_large_integer_falls_back_to_fractional() {
let s = format_value(1e16);
assert!(s.contains('.'));
}
#[test]
fn format_value_negative_fractional_kept_at_six_places() {
assert_eq!(format_value(-0.25), "-0.250000");
}
#[test]
fn format_timestamp_is_rfc3339_utc() {
let s = format_timestamp(1_700_000_000_000.0);
assert_eq!(s, "2023-11-14T22:13:20Z");
}
#[test]
fn format_timestamp_out_of_range_falls_back_to_now() {
let s = format_timestamp(f64::MAX);
assert!(s.ends_with('Z'));
assert!(s.contains('T'));
}
#[test]
fn collect_timestamps_dedups_and_sorts() {
let series = vec![
MetricSeries {
metric: "a".into(),
display_name: None,
scope: None,
expression: None,
pointlist: vec![(2.0, None), (1.0, None)],
},
MetricSeries {
metric: "b".into(),
display_name: None,
scope: None,
expression: None,
pointlist: vec![(1.0, None), (3.0, None)],
},
];
assert_eq!(collect_timestamps(&series), vec![1.0, 2.0, 3.0]);
}
#[test]
fn collect_timestamps_tolerates_nan() {
let series = vec![MetricSeries {
metric: "a".into(),
display_name: None,
scope: None,
expression: None,
pointlist: vec![(1.0, None), (f64::NAN, None), (2.0, None)],
}];
let ts = collect_timestamps(&series);
assert_eq!(ts.len(), 3);
assert!(ts.iter().any(|t| t.is_nan()));
}
#[test]
fn find_value_collapses_gap_and_missing_point() {
let s = MetricSeries {
metric: "m".into(),
display_name: None,
scope: None,
expression: None,
pointlist: vec![(10.0, None), (20.0, Some(0.5))],
};
assert_eq!(find_value(&s, 10.0), None);
assert_eq!(find_value(&s, 20.0), Some(0.5));
assert_eq!(find_value(&s, 30.0), None);
}
#[test]
fn render_table_includes_both_series_and_union_timestamps() {
let resp = sample_response();
let mut buf = Vec::new();
render_table(&resp, &mut buf).unwrap();
let out = String::from_utf8(buf).unwrap();
assert!(out.contains("TIMESTAMP"));
assert!(out.contains("avg:system.cpu.user{*}"));
assert!(out.contains("avg:system.cpu.idle{*}"));
assert!(out.contains("2023-11-14T22:13:20Z"));
assert!(out.contains("2023-11-14T22:13:50Z"));
assert!(out.contains("2023-11-14T22:14:20Z"));
let last_line = out.lines().last().unwrap();
assert!(last_line.contains(" -"));
}
#[test]
fn render_table_fills_missing_series_cells_with_dash() {
let resp = sample_response();
let mut buf = Vec::new();
render_table(&resp, &mut buf).unwrap();
let out = String::from_utf8(buf).unwrap();
let first_data_line = out.lines().nth(2).unwrap();
assert!(first_data_line.contains("2023-11-14T22:13:20Z"));
assert!(first_data_line.contains("0.500000"));
assert!(first_data_line.contains(" -"));
}
#[test]
fn render_table_on_empty_series_prints_message() {
let resp = MetricQueryResponse {
status: "ok".into(),
from_date: 0,
to_date: 0,
series: vec![],
};
let mut buf = Vec::new();
render_table(&resp, &mut buf).unwrap();
assert_eq!(String::from_utf8(buf).unwrap(), "No series returned.\n");
}
#[test]
fn render_table_propagates_write_errors() {
let resp = sample_response();
let err = render_table(&resp, &mut FailingWriter).unwrap_err();
assert!(err.to_string().contains("Failed to write"));
}
#[test]
fn render_table_empty_series_propagates_write_errors() {
let resp = MetricQueryResponse {
status: "ok".into(),
from_date: 0,
to_date: 0,
series: vec![],
};
let err = render_table(&resp, &mut FailingWriter).unwrap_err();
assert!(err.to_string().contains("empty-table message"));
}
#[test]
fn render_table_with_series_but_no_points_still_renders_headers() {
let resp = MetricQueryResponse {
status: "ok".into(),
from_date: 0,
to_date: 0,
series: vec![MetricSeries {
metric: "m".into(),
display_name: Some("label".into()),
scope: None,
expression: None,
pointlist: vec![],
}],
};
let mut buf = Vec::new();
render_table(&resp, &mut buf).unwrap();
let out = String::from_utf8(buf).unwrap();
assert!(out.contains("TIMESTAMP"));
assert!(out.contains("label"));
assert_eq!(out.lines().count(), 2);
}
#[test]
fn write_output_json_emits_full_response() {
let resp = sample_response();
let mut buf = Vec::new();
let wrote = write_output(&resp, &OutputFormat::Json, &mut buf).unwrap();
assert!(wrote);
let out = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&out).unwrap();
assert_eq!(parsed["status"], "ok");
assert_eq!(parsed["series"].as_array().unwrap().len(), 2);
}
#[test]
fn write_output_yaml_emits_document() {
let resp = sample_response();
let mut buf = Vec::new();
write_output(&resp, &OutputFormat::Yaml, &mut buf).unwrap();
let out = String::from_utf8(buf).unwrap();
assert!(out.contains("status: ok"));
assert!(out.contains("series:"));
}
#[test]
fn write_output_jsonl_emits_single_object_line() {
let resp = sample_response();
let mut buf = Vec::new();
write_output(&resp, &OutputFormat::Jsonl, &mut buf).unwrap();
let out = String::from_utf8(buf).unwrap();
assert_eq!(out.matches('\n').count(), 1);
let parsed: serde_json::Value = serde_json::from_str(out.trim_end()).unwrap();
assert_eq!(parsed["series"].as_array().unwrap().len(), 2);
}
#[test]
fn write_output_yamls_emits_single_document() {
let resp = sample_response();
let mut buf = Vec::new();
write_output(&resp, &OutputFormat::Yamls, &mut buf).unwrap();
let out = String::from_utf8(buf).unwrap();
assert!(out.starts_with("---\n"));
assert!(out.contains("status: ok"));
}
#[test]
fn write_output_table_returns_false() {
let resp = sample_response();
let mut buf = Vec::new();
let wrote = write_output(&resp, &OutputFormat::Table, &mut buf).unwrap();
assert!(!wrote);
assert!(buf.is_empty());
}
#[tokio::test]
async fn run_query_table_path_writes_headers_to_stdout_returns_ok() {
let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.and(wiremock::matchers::path("/api/v1/query"))
.and(wiremock::matchers::query_param("from", "100"))
.and(wiremock::matchers::query_param("to", "200"))
.respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_body()))
.expect(1)
.mount(&server)
.await;
let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
run_query(
&client,
"avg:system.cpu.user{*}",
100,
200,
&OutputFormat::Table,
)
.await
.unwrap();
}
#[tokio::test]
async fn run_query_json_path_returns_ok() {
let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.and(wiremock::matchers::path("/api/v1/query"))
.respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_body()))
.mount(&server)
.await;
let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
run_query(&client, "m", 0, 1, &OutputFormat::Json)
.await
.unwrap();
}
#[tokio::test]
async fn run_query_propagates_api_errors() {
let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.and(wiremock::matchers::path("/api/v1/query"))
.respond_with(wiremock::ResponseTemplate::new(403).set_body_string("Forbidden"))
.mount(&server)
.await;
let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
let err = run_query(&client, "m", 0, 1, &OutputFormat::Table)
.await
.unwrap_err();
assert!(err.to_string().contains("403"));
}
#[tokio::test]
async fn query_command_execute_rejects_invalid_time_range() {
let cmd = QueryCommand {
query: "m".into(),
from: "garbage".into(),
to: None,
output: OutputFormat::Table,
};
let err = cmd.execute().await.unwrap_err();
assert!(err.to_string().contains("Invalid time range"));
}
#[tokio::test]
async fn query_command_execute_errors_when_credentials_missing() {
use crate::datadog::test_support::{with_empty_home, EnvGuard};
let guard = EnvGuard::take();
let _dir = with_empty_home(&guard);
let cmd = QueryCommand {
query: "m".into(),
from: "1h".into(),
to: Some("now".into()),
output: OutputFormat::Table,
};
let err = cmd.execute().await.unwrap_err();
assert!(err.to_string().contains("not configured"));
}
#[tokio::test]
async fn query_command_execute_end_to_end_via_api_url_override() {
use std::fs;
use crate::datadog::auth::{DATADOG_API_KEY, DATADOG_API_URL, DATADOG_APP_KEY};
use crate::datadog::test_support::{with_empty_home, EnvGuard};
let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.and(wiremock::matchers::path("/api/v1/query"))
.and(wiremock::matchers::query_param(
"query",
"avg:system.cpu.user{*}",
))
.respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_body()))
.expect(1)
.mount(&server)
.await;
let guard = EnvGuard::take();
let dir = with_empty_home(&guard);
let omni_dir = dir.path().join(".omni-dev");
fs::create_dir_all(&omni_dir).unwrap();
fs::write(
omni_dir.join("settings.json"),
r#"{"env":{"DATADOG_API_KEY":"api","DATADOG_APP_KEY":"app","DATADOG_SITE":"datadoghq.com"}}"#,
)
.unwrap();
std::env::set_var(DATADOG_API_KEY, "api");
std::env::set_var(DATADOG_APP_KEY, "app");
std::env::set_var(DATADOG_API_URL, server.uri());
let cmd = QueryCommand {
query: "avg:system.cpu.user{*}".into(),
from: "2023-11-14T22:00:00Z".into(),
to: Some("2023-11-14T23:00:00Z".into()),
output: OutputFormat::Json,
};
cmd.execute().await.unwrap();
}
}