1use std::io::{self, Write};
14
15use crate::model::error::ObzError;
16use crate::model::response::{
17 ExtensionData, LabelValuesData, LogSearchData, MetricInfoData, MetricQueryData, QueryMetadata,
18 Response, ScalarData, SeriesListData, StringListData, TimeRange, TraceDetailData,
19 TraceSearchData, RESULT_TYPE_LABEL_LIST, RESULT_TYPE_LABEL_VALUES, RESULT_TYPE_LOG_ENTRIES,
20 RESULT_TYPE_METRIC_INFO, RESULT_TYPE_METRIC_LIST, RESULT_TYPE_SERIES, RESULT_TYPE_SPANS,
21 RESULT_TYPE_TRACE_DETAIL,
22};
23use crate::output::{format_and_print, OutputFormat};
24use crate::provider::params::{
25 ExtensionParams, LabelValuesParams, LogSearchParams, MetricInfoParams, MetricMetadataParams,
26 MetricQueryParams, TraceGetParams, TraceSearchParams,
27};
28use crate::provider::results::MetricResultType;
29use crate::registry::BuiltProvider;
30
31#[derive(Debug, thiserror::Error)]
36pub enum ExecuteError {
37 #[error(transparent)]
39 Obz(#[from] ObzError),
40 #[error("output error: {0}")]
42 Io(#[from] io::Error),
43}
44
45pub async fn execute_metric_query(
52 provider: &BuiltProvider,
53 params: &MetricQueryParams,
54 limit: usize,
55 output: OutputFormat,
56 fields: Option<&[String]>,
57 truncate: Option<usize>,
58 writer: &mut impl Write,
59) -> Result<(), ExecuteError> {
60 let mp = provider
61 .metric
62 .as_deref()
63 .ok_or_else(|| unsupported(provider.name, "metric"))?;
64
65 let result = mp.query(params).await?;
66 let time_range = TimeRange {
67 start: params.start,
68 end: params.end,
69 };
70
71 if let Some((ts, val)) = result.scalar {
72 let resp = Response::success(
73 build_metric_metadata(provider, 1, Some(¶ms.query), Some(time_range)),
74 ScalarData {
75 result_type: MetricResultType::Scalar,
76 scalar: (ts, val),
77 },
78 );
79 format_and_print(&resp, output, fields, truncate, writer)?;
80 } else {
81 let total = result.total_count;
82 let mut series = result.series;
83 if series.len() > limit {
85 series.truncate(limit);
86 }
87 let mut metadata =
88 build_metric_metadata(provider, total, Some(¶ms.query), Some(time_range));
89 if series.len() < total {
92 metadata.returned_series = Some(series.len());
93 }
94 let resp = Response::success(
95 metadata,
96 MetricQueryData {
97 result_type: result.result_type,
98 series,
99 },
100 );
101 format_and_print(&resp, output, fields, truncate, writer)?;
102 }
103 Ok(())
104}
105
106pub async fn execute_metric_list(
112 provider: &BuiltProvider,
113 params: &MetricMetadataParams,
114 output: OutputFormat,
115 fields: Option<&[String]>,
116 truncate: Option<usize>,
117 writer: &mut impl Write,
118) -> Result<(), ExecuteError> {
119 let mp = provider
120 .metric
121 .as_deref()
122 .ok_or_else(|| unsupported(provider.name, "metric"))?;
123
124 let items = mp.list(params).await?;
125 let time_range = match (params.start, params.end) {
126 (Some(s), Some(e)) => Some(TimeRange { start: s, end: e }),
127 _ => None,
128 };
129 let resp = Response::success(
130 build_metric_metadata(
131 provider,
132 items.len(),
133 params.match_expr.as_deref(),
134 time_range,
135 ),
136 StringListData {
137 result_type: RESULT_TYPE_METRIC_LIST.to_string(),
138 items,
139 },
140 );
141 format_and_print(&resp, output, fields, truncate, writer)?;
142 Ok(())
143}
144
145pub async fn execute_metric_info(
151 provider: &BuiltProvider,
152 params: &MetricInfoParams,
153 output: OutputFormat,
154 fields: Option<&[String]>,
155 truncate: Option<usize>,
156 writer: &mut impl Write,
157) -> Result<(), ExecuteError> {
158 let mp = provider
159 .metric
160 .as_deref()
161 .ok_or_else(|| unsupported(provider.name, "metric"))?;
162
163 let entries = mp.info(params).await?;
164 let info = entries.into_iter().next();
165 let resp = Response::success(
166 build_metric_metadata(
167 provider,
168 usize::from(info.is_some()),
169 Some(¶ms.metric_name),
170 None,
171 ),
172 MetricInfoData {
173 result_type: RESULT_TYPE_METRIC_INFO.to_string(),
174 info,
175 },
176 );
177 format_and_print(&resp, output, fields, truncate, writer)?;
178 Ok(())
179}
180
181pub async fn execute_metric_labels(
187 provider: &BuiltProvider,
188 params: &MetricMetadataParams,
189 output: OutputFormat,
190 fields: Option<&[String]>,
191 truncate: Option<usize>,
192 writer: &mut impl Write,
193) -> Result<(), ExecuteError> {
194 let mp = provider
195 .metric
196 .as_deref()
197 .ok_or_else(|| unsupported(provider.name, "metric"))?;
198
199 let items = mp.labels(params).await?;
200 let resp = Response::success(
201 build_metric_metadata(provider, items.len(), params.match_expr.as_deref(), None),
202 StringListData {
203 result_type: RESULT_TYPE_LABEL_LIST.to_string(),
204 items,
205 },
206 );
207 format_and_print(&resp, output, fields, truncate, writer)?;
208 Ok(())
209}
210
211pub async fn execute_metric_label_values(
217 provider: &BuiltProvider,
218 params: &LabelValuesParams,
219 output: OutputFormat,
220 fields: Option<&[String]>,
221 truncate: Option<usize>,
222 writer: &mut impl Write,
223) -> Result<(), ExecuteError> {
224 let mp = provider
225 .metric
226 .as_deref()
227 .ok_or_else(|| unsupported(provider.name, "metric"))?;
228
229 let label_name = params.label_name.clone();
230 let items = mp.label_values(params).await?;
231 let resp = Response::success(
232 build_metric_metadata(provider, items.len(), params.match_expr.as_deref(), None),
233 LabelValuesData {
234 result_type: RESULT_TYPE_LABEL_VALUES.to_string(),
235 label: label_name,
236 items,
237 },
238 );
239 format_and_print(&resp, output, fields, truncate, writer)?;
240 Ok(())
241}
242
243pub async fn execute_metric_series(
249 provider: &BuiltProvider,
250 params: &MetricMetadataParams,
251 output: OutputFormat,
252 fields: Option<&[String]>,
253 truncate: Option<usize>,
254 writer: &mut impl Write,
255) -> Result<(), ExecuteError> {
256 let mp = provider
257 .metric
258 .as_deref()
259 .ok_or_else(|| unsupported(provider.name, "metric"))?;
260
261 let series = mp.series(params).await?;
262 let time_range = match (params.start, params.end) {
263 (Some(s), Some(e)) => Some(TimeRange { start: s, end: e }),
264 _ => None,
265 };
266 let resp = Response::success(
267 build_metric_metadata(
268 provider,
269 series.len(),
270 params.match_expr.as_deref(),
271 time_range,
272 ),
273 SeriesListData {
274 result_type: RESULT_TYPE_SERIES.to_string(),
275 series,
276 },
277 );
278 format_and_print(&resp, output, fields, truncate, writer)?;
279 Ok(())
280}
281
282pub async fn execute_log_search(
288 provider: &BuiltProvider,
289 params: &LogSearchParams,
290 output: OutputFormat,
291 fields: Option<&[String]>,
292 truncate: Option<usize>,
293 writer: &mut impl Write,
294) -> Result<(), ExecuteError> {
295 let lp = provider
296 .log
297 .as_deref()
298 .ok_or_else(|| unsupported(provider.name, "log"))?;
299
300 let result = lp.search(params).await?;
301 let resp = Response::success(
302 QueryMetadata {
303 provider: provider.name.to_string(),
304 provider_type: None,
305 query_language: provider.log_query_language.map(str::to_string),
306 query: Some(params.query.clone()),
307 time_range: Some(TimeRange {
308 start: params.start,
309 end: params.end,
310 }),
311 total_count: result.total_count,
312 returned_series: None,
313 is_complete: result.is_complete,
314 cursor: result.cursor,
315 },
316 LogSearchData {
317 result_type: RESULT_TYPE_LOG_ENTRIES.to_string(),
318 entries: result.entries,
319 },
320 );
321 format_and_print(&resp, output, fields, truncate, writer)?;
322 Ok(())
323}
324
325fn build_metric_metadata(
329 provider: &BuiltProvider,
330 total_count: usize,
331 query: Option<&str>,
332 time_range: Option<TimeRange>,
333) -> QueryMetadata {
334 QueryMetadata {
335 provider: provider.name.to_string(),
336 provider_type: None,
337 query_language: provider.metric_query_language.map(str::to_string),
338 query: query.map(str::to_string),
339 time_range,
340 total_count,
341 returned_series: None,
342 is_complete: None,
343 cursor: None,
344 }
345}
346
347fn infer_count(data: &serde_json::Value) -> usize {
355 match data {
356 serde_json::Value::Array(arr) => arr.len(),
357 _ => 0,
358 }
359}
360
361fn unsupported(provider_name: &str, signal: &str) -> ObzError {
362 ObzError::Unsupported {
363 message: format!(
364 "provider '{}' does not support {} queries",
365 provider_name, signal
366 ),
367 provider: Some(provider_name.to_string()),
368 suggestion: Some(
369 "This may be a configuration issue — some providers require \
370 additional fields (e.g. project, logstore) to enable all signals"
371 .to_string(),
372 ),
373 }
374}
375
376pub async fn execute_trace_search(
382 provider: &BuiltProvider,
383 params: &TraceSearchParams,
384 output: OutputFormat,
385 fields: Option<&[String]>,
386 truncate: Option<usize>,
387 writer: &mut impl Write,
388) -> Result<(), ExecuteError> {
389 let tp = provider
390 .trace
391 .as_deref()
392 .ok_or_else(|| unsupported(provider.name, "trace"))?;
393
394 let result = tp.search(params).await?;
395 let total = result.total_count;
396 let resp = Response::success(
397 QueryMetadata {
398 provider: provider.name.to_string(),
399 provider_type: None,
400 query_language: None,
401 query: Some(params.query.clone()),
402 time_range: Some(TimeRange {
403 start: params.start,
404 end: params.end,
405 }),
406 total_count: total,
407 returned_series: None,
408 is_complete: result.is_complete,
409 cursor: result.cursor,
410 },
411 TraceSearchData {
412 result_type: RESULT_TYPE_SPANS.to_string(),
413 spans: result.spans,
414 },
415 );
416 format_and_print(&resp, output, fields, truncate, writer)?;
417 Ok(())
418}
419
420pub async fn execute_trace_get(
426 provider: &BuiltProvider,
427 params: &TraceGetParams,
428 output: OutputFormat,
429 fields: Option<&[String]>,
430 truncate: Option<usize>,
431 writer: &mut impl Write,
432) -> Result<(), ExecuteError> {
433 let tp = provider
434 .trace
435 .as_deref()
436 .ok_or_else(|| unsupported(provider.name, "trace"))?;
437
438 let detail = tp.get_trace(params).await?;
439 let span_count = detail.span_count;
440 let resp = Response::success(
441 QueryMetadata {
442 provider: provider.name.to_string(),
443 provider_type: None,
444 query_language: None,
445 query: Some(params.trace_id.clone()),
446 time_range: Some(TimeRange {
447 start: params.start,
448 end: params.end,
449 }),
450 total_count: span_count,
451 returned_series: None,
452 is_complete: None,
453 cursor: None,
454 },
455 TraceDetailData {
456 result_type: RESULT_TYPE_TRACE_DETAIL.to_string(),
457 detail,
458 },
459 );
460 format_and_print(&resp, output, fields, truncate, writer)?;
461 Ok(())
462}
463
464pub async fn execute_extension_command(
471 provider: &BuiltProvider,
472 command: &str,
473 params: &ExtensionParams,
474 output: OutputFormat,
475 fields: Option<&[String]>,
476 truncate: Option<usize>,
477 writer: &mut impl Write,
478) -> Result<(), ExecuteError> {
479 let ep = provider
480 .extension
481 .as_deref()
482 .ok_or_else(|| ObzError::Unsupported {
483 message: format!(
484 "provider '{}' does not support extension commands",
485 provider.name
486 ),
487 provider: Some(provider.name.to_string()),
488 suggestion: None,
489 })?;
490
491 let result = ep.execute(command, params).await?;
492 let total = result
493 .total_count
494 .unwrap_or_else(|| infer_count(&result.data));
495 let resp = Response::success(
496 QueryMetadata {
497 provider: provider.name.to_string(),
498 provider_type: None,
499 query_language: None,
500 query: None,
501 time_range: None,
502 total_count: total,
503 returned_series: None,
504 is_complete: None,
505 cursor: None,
506 },
507 ExtensionData {
508 result_type: command.to_string(),
509 data: result.data,
510 },
511 );
512 format_and_print(&resp, output, fields, truncate, writer)?;
513 Ok(())
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519 use crate::model::log::LogEntry;
520 use crate::model::metric::{DataPoint, MetricSeries};
521 use crate::model::trace::{Span, SpanStatus, TraceDetail};
522 use crate::provider::results::{
523 ExtensionResult, LogSearchResult, MetricQueryResult, ProviderResult, TraceSearchResult,
524 };
525 use crate::provider::traits::{ExtensionProvider, LogProvider, MetricProvider, TraceProvider};
526 use async_trait::async_trait;
527 use std::collections::BTreeMap;
528 struct MockMetricProvider {
534 result: MetricQueryResult,
535 }
536
537 #[async_trait]
538 impl MetricProvider for MockMetricProvider {
539 async fn query(&self, _params: &MetricQueryParams) -> ProviderResult<MetricQueryResult> {
540 Ok(MetricQueryResult {
542 result_type: self.result.result_type,
543 series: self.result.series.clone(),
544 scalar: self.result.scalar,
545 total_count: self.result.total_count,
546 })
547 }
548 async fn list(&self, _params: &MetricMetadataParams) -> ProviderResult<Vec<String>> {
549 Ok(vec!["cpu_usage".to_string(), "mem_usage".to_string()])
550 }
551 async fn info(
552 &self,
553 _params: &crate::provider::params::MetricInfoParams,
554 ) -> ProviderResult<Vec<crate::model::metric::MetricInfoDetail>> {
555 Ok(vec![])
556 }
557 async fn labels(&self, _params: &MetricMetadataParams) -> ProviderResult<Vec<String>> {
558 Ok(vec!["__name__".to_string(), "instance".to_string()])
559 }
560 async fn label_values(&self, _params: &LabelValuesParams) -> ProviderResult<Vec<String>> {
561 Ok(vec!["value1".to_string(), "value2".to_string()])
562 }
563 async fn series(
564 &self,
565 _params: &MetricMetadataParams,
566 ) -> ProviderResult<Vec<BTreeMap<String, String>>> {
567 Ok(vec![BTreeMap::from([
568 ("__name__".to_string(), "up".to_string()),
569 ("instance".to_string(), "localhost:9090".to_string()),
570 ])])
571 }
572 }
573
574 struct MockLogProvider {
576 is_complete: Option<bool>,
577 cursor: Option<String>,
578 }
579
580 #[async_trait]
581 impl LogProvider for MockLogProvider {
582 async fn search(&self, _params: &LogSearchParams) -> ProviderResult<LogSearchResult> {
583 Ok(LogSearchResult {
584 entries: vec![LogEntry {
585 timestamp: 1_700_000_000,
586 message: "test log message".to_string(),
587 severity: None,
588 source: None,
589 service: None,
590 id: None,
591 attributes: None,
592 resource: None,
593 trace_id: None,
594 span_id: None,
595 extensions: None,
596 }],
597 total_count: 1,
598 is_complete: self.is_complete,
599 cursor: self.cursor.clone(),
600 })
601 }
602 }
603
604 struct MockTraceProvider;
606
607 fn make_span(trace_id: &str, span_id: &str, name: &str) -> Span {
608 Span {
609 trace_id: trace_id.to_string(),
610 span_id: span_id.to_string(),
611 parent_span_id: None,
612 name: name.to_string(),
613 service: "test-service".to_string(),
614 kind: None,
615 status: SpanStatus::Ok,
616 start_time: 1_700_000_000_i64,
617 duration_us: 1000,
618 attributes: None,
619 events: None,
620 resource: None,
621 extensions: None,
622 }
623 }
624
625 #[async_trait]
626 impl TraceProvider for MockTraceProvider {
627 async fn search(&self, _params: &TraceSearchParams) -> ProviderResult<TraceSearchResult> {
628 Ok(TraceSearchResult {
629 spans: vec![make_span("abc123", "span1", "GET /api")],
630 total_count: 1,
631 is_complete: None,
632 cursor: None,
633 })
634 }
635 async fn get_trace(&self, params: &TraceGetParams) -> ProviderResult<TraceDetail> {
636 let spans = vec![make_span(¶ms.trace_id, "span1", "GET /api")];
637 Ok(TraceDetail::from_spans(params.trace_id.clone(), spans))
638 }
639 }
640
641 struct MockExtensionProvider;
643
644 #[async_trait]
645 impl ExtensionProvider for MockExtensionProvider {
646 async fn execute(
647 &self,
648 _command: &str,
649 _params: &ExtensionParams,
650 ) -> ProviderResult<ExtensionResult> {
651 Ok(ExtensionResult::from_strings(vec![
652 "service-a".to_string(),
653 "service-b".to_string(),
654 ]))
655 }
656 }
657
658 struct MockStructuredExtensionProvider;
660
661 #[async_trait]
662 impl ExtensionProvider for MockStructuredExtensionProvider {
663 async fn execute(
664 &self,
665 _command: &str,
666 _params: &ExtensionParams,
667 ) -> ProviderResult<ExtensionResult> {
668 Ok(ExtensionResult::from_value(serde_json::json!([
669 {"name": "cart", "spans": 100},
670 {"name": "payment", "spans": 50}
671 ])))
672 }
673 }
674
675 fn make_series(name: &str, points: Vec<DataPoint>) -> MetricSeries {
680 MetricSeries {
681 name: name.to_string(),
682 labels: BTreeMap::new(),
683 points,
684 stats: None,
685 extensions: None,
686 }
687 }
688
689 fn make_provider_metric_only(mp: impl MetricProvider + 'static) -> BuiltProvider {
690 BuiltProvider {
691 name: "test",
692 metric_query_language: Some("TestQL"),
693 log_query_language: None,
694 metric: Some(Box::new(mp)),
695 log: None,
696 trace: None,
697 extension: None,
698 }
699 }
700
701 fn make_provider_log_only(lp: impl LogProvider + 'static) -> BuiltProvider {
702 BuiltProvider {
703 name: "test",
704 metric_query_language: None,
705 log_query_language: Some("TestLogQL"),
706 metric: None,
707 log: Some(Box::new(lp)),
708 trace: None,
709 extension: None,
710 }
711 }
712
713 fn make_provider_trace_only(tp: impl TraceProvider + 'static) -> BuiltProvider {
714 BuiltProvider {
715 name: "test",
716 metric_query_language: None,
717 log_query_language: None,
718 metric: None,
719 log: None,
720 trace: Some(Box::new(tp)),
721 extension: None,
722 }
723 }
724
725 fn make_provider_extension_only(ep: impl ExtensionProvider + 'static) -> BuiltProvider {
726 BuiltProvider {
727 name: "test",
728 metric_query_language: None,
729 log_query_language: None,
730 metric: None,
731 log: None,
732 trace: None,
733 extension: Some(Box::new(ep)),
734 }
735 }
736
737 fn empty_provider() -> BuiltProvider {
738 BuiltProvider {
739 name: "empty",
740 metric_query_language: None,
741 log_query_language: None,
742 metric: None,
743 log: None,
744 trace: None,
745 extension: None,
746 }
747 }
748
749 fn default_metric_params() -> MetricQueryParams {
750 MetricQueryParams {
751 query: "up".to_string(),
752 is_range: false,
753 start: 1_700_000_000,
754 end: 1_700_003_600,
755 step: None,
756 limit: None,
757 timeout: None,
758 }
759 }
760
761 #[test]
766 fn build_metric_metadata_includes_query_language() {
767 let provider = make_provider_metric_only(MockMetricProvider {
768 result: MetricQueryResult {
769 result_type: MetricResultType::Vector,
770 series: vec![],
771 scalar: None,
772 total_count: 0,
773 },
774 });
775 let md = build_metric_metadata(&provider, 5, Some("up"), None);
776 assert_eq!(md.provider, "test");
777 assert_eq!(md.query_language.as_deref(), Some("TestQL"));
778 assert_eq!(md.query.as_deref(), Some("up"));
779 assert_eq!(md.total_count, 5);
780 assert!(md.returned_series.is_none());
781 assert!(md.time_range.is_none());
782 }
783
784 #[test]
785 fn truncation_logic_client_side() {
786 let mut series: Vec<MetricSeries> = (0..10)
788 .map(|i| {
789 make_series(
790 &format!("s{i}"),
791 vec![DataPoint {
792 timestamp: 100,
793 value: 1.0,
794 }],
795 )
796 })
797 .collect();
798 let total = 10;
799 let limit = 5;
800
801 if series.len() > limit {
802 series.truncate(limit);
803 }
804 assert_eq!(series.len(), 5, "should truncate to limit");
805
806 let returned_series = if series.len() < total {
807 Some(series.len())
808 } else {
809 None
810 };
811 assert_eq!(returned_series, Some(5), "should report truncated count");
812 }
813
814 #[test]
815 fn truncation_logic_no_truncation_needed() {
816 let mut series: Vec<MetricSeries> = (0..3)
817 .map(|i| {
818 make_series(
819 &format!("s{i}"),
820 vec![DataPoint {
821 timestamp: 100,
822 value: 1.0,
823 }],
824 )
825 })
826 .collect();
827 let total = 3;
828 let limit = 100;
829
830 if series.len() > limit {
831 series.truncate(limit);
832 }
833 assert_eq!(series.len(), 3, "should not truncate");
834
835 let returned_series = if series.len() < total {
836 Some(series.len())
837 } else {
838 None
839 };
840 assert!(
841 returned_series.is_none(),
842 "should be None when no truncation"
843 );
844 }
845
846 #[test]
847 fn truncation_logic_server_side() {
848 let mut series: Vec<MetricSeries> = (0..5)
850 .map(|i| {
851 make_series(
852 &format!("s{i}"),
853 vec![DataPoint {
854 timestamp: 100,
855 value: 1.0,
856 }],
857 )
858 })
859 .collect();
860 let total = 20;
861 let limit = 100;
862
863 if series.len() > limit {
864 series.truncate(limit);
865 }
866 assert_eq!(series.len(), 5);
867
868 let returned_series = if series.len() < total {
869 Some(series.len())
870 } else {
871 None
872 };
873 assert_eq!(
874 returned_series,
875 Some(5),
876 "should report server-truncated count"
877 );
878 }
879
880 fn parse_output(buf: &[u8]) -> serde_json::Value {
886 serde_json::from_slice(buf).expect("output should be valid JSON")
887 }
888
889 #[tokio::test]
890 async fn metric_query_series_path() {
891 let series = vec![
892 make_series(
893 "s1",
894 vec![DataPoint {
895 timestamp: 100,
896 value: 1.0,
897 }],
898 ),
899 make_series(
900 "s2",
901 vec![DataPoint {
902 timestamp: 100,
903 value: 2.0,
904 }],
905 ),
906 ];
907 let provider = make_provider_metric_only(MockMetricProvider {
908 result: MetricQueryResult {
909 result_type: MetricResultType::Vector,
910 series,
911 scalar: None,
912 total_count: 2,
913 },
914 });
915
916 let mut buf = Vec::new();
917 execute_metric_query(
918 &provider,
919 &default_metric_params(),
920 100,
921 OutputFormat::Json,
922 None,
923 None,
924 &mut buf,
925 )
926 .await
927 .unwrap();
928
929 let json = parse_output(&buf);
930 assert_eq!(json["status"], "success");
931 assert_eq!(json["metadata"]["provider"], "test");
932 assert_eq!(json["metadata"]["total_count"], 2);
933 assert_eq!(json["data"]["series"].as_array().unwrap().len(), 2);
934 assert_eq!(json["data"]["series"][0]["name"], "s1");
935 }
936
937 #[tokio::test]
938 async fn metric_query_scalar_path() {
939 let provider = make_provider_metric_only(MockMetricProvider {
940 result: MetricQueryResult {
941 result_type: MetricResultType::Scalar,
942 series: vec![],
943 scalar: Some((1_700_000_000, 42.0)),
944 total_count: 1,
945 },
946 });
947
948 let mut buf = Vec::new();
949 execute_metric_query(
950 &provider,
951 &default_metric_params(),
952 100,
953 OutputFormat::Json,
954 None,
955 None,
956 &mut buf,
957 )
958 .await
959 .unwrap();
960
961 let json = parse_output(&buf);
962 assert_eq!(json["status"], "success");
963 assert_eq!(json["data"]["result_type"], "scalar");
964 assert_eq!(json["data"]["scalar"][1], 42.0);
965 }
966
967 #[tokio::test]
968 async fn metric_query_client_side_truncation() {
969 let series: Vec<MetricSeries> = (0..10)
970 .map(|i| {
971 make_series(
972 &format!("s{i}"),
973 vec![DataPoint {
974 timestamp: 100,
975 value: 1.0,
976 }],
977 )
978 })
979 .collect();
980 let provider = make_provider_metric_only(MockMetricProvider {
981 result: MetricQueryResult {
982 result_type: MetricResultType::Vector,
983 series,
984 scalar: None,
985 total_count: 10,
986 },
987 });
988
989 let mut buf = Vec::new();
990 execute_metric_query(
991 &provider,
992 &default_metric_params(),
993 5,
994 OutputFormat::Json,
995 None,
996 None,
997 &mut buf,
998 )
999 .await
1000 .unwrap();
1001
1002 let json = parse_output(&buf);
1003 assert_eq!(json["metadata"]["total_count"], 10);
1004 assert_eq!(json["metadata"]["returned_series"], 5);
1005 assert_eq!(json["data"]["series"].as_array().unwrap().len(), 5);
1006 }
1007
1008 #[tokio::test]
1009 async fn metric_query_no_truncation() {
1010 let series: Vec<MetricSeries> = (0..3)
1011 .map(|i| {
1012 make_series(
1013 &format!("s{i}"),
1014 vec![DataPoint {
1015 timestamp: 100,
1016 value: 1.0,
1017 }],
1018 )
1019 })
1020 .collect();
1021 let provider = make_provider_metric_only(MockMetricProvider {
1022 result: MetricQueryResult {
1023 result_type: MetricResultType::Vector,
1024 series,
1025 scalar: None,
1026 total_count: 3,
1027 },
1028 });
1029
1030 let result = execute_metric_query(
1031 &provider,
1032 &default_metric_params(),
1033 100,
1034 OutputFormat::Json,
1035 None,
1036 None,
1037 &mut io::sink(),
1038 )
1039 .await;
1040 assert!(result.is_ok());
1041 }
1042
1043 #[tokio::test]
1044 async fn metric_query_unsupported_provider() {
1045 let provider = empty_provider();
1046 let result = execute_metric_query(
1047 &provider,
1048 &default_metric_params(),
1049 100,
1050 OutputFormat::Json,
1051 None,
1052 None,
1053 &mut io::sink(),
1054 )
1055 .await;
1056
1057 match result {
1058 Err(ExecuteError::Obz(ObzError::Unsupported {
1059 message,
1060 suggestion,
1061 ..
1062 })) => {
1063 assert!(message.contains("metric"));
1064 assert!(message.contains("empty"));
1065 assert!(
1066 suggestion.is_some(),
1067 "unsupported should include suggestion"
1068 );
1069 let hint = suggestion.unwrap();
1070 assert!(
1071 !hint.contains("obz "),
1072 "core suggestion must not reference CLI commands, got: {hint}"
1073 );
1074 }
1075 other => panic!("expected Unsupported error, got {other:?}"),
1076 }
1077 }
1078
1079 #[tokio::test]
1084 async fn metric_list_success() {
1085 let provider = make_provider_metric_only(MockMetricProvider {
1086 result: MetricQueryResult {
1087 result_type: MetricResultType::Vector,
1088 series: vec![],
1089 scalar: None,
1090 total_count: 0,
1091 },
1092 });
1093
1094 let result = execute_metric_list(
1095 &provider,
1096 &MetricMetadataParams {
1097 match_expr: Some("cpu".to_string()),
1098 match_exprs: vec![],
1099 start: Some(100),
1100 end: Some(200),
1101 limit: Some(100),
1102 },
1103 OutputFormat::Json,
1104 None,
1105 None,
1106 &mut io::sink(),
1107 )
1108 .await;
1109 assert!(result.is_ok());
1110 }
1111
1112 #[tokio::test]
1113 async fn metric_list_partial_time_range() {
1114 let provider = make_provider_metric_only(MockMetricProvider {
1115 result: MetricQueryResult {
1116 result_type: MetricResultType::Vector,
1117 series: vec![],
1118 scalar: None,
1119 total_count: 0,
1120 },
1121 });
1122
1123 let result = execute_metric_list(
1124 &provider,
1125 &MetricMetadataParams {
1126 match_expr: None,
1127 match_exprs: vec![],
1128 start: Some(1_700_000_000),
1129 end: None,
1130 limit: Some(100),
1131 },
1132 OutputFormat::Json,
1133 None,
1134 None,
1135 &mut io::sink(),
1136 )
1137 .await;
1138 assert!(result.is_ok());
1139 }
1140
1141 #[tokio::test]
1142 async fn metric_info_success() {
1143 let provider = make_provider_metric_only(MockMetricProvider {
1144 result: MetricQueryResult {
1145 result_type: MetricResultType::Vector,
1146 series: vec![],
1147 scalar: None,
1148 total_count: 0,
1149 },
1150 });
1151
1152 let result = execute_metric_info(
1153 &provider,
1154 &MetricInfoParams {
1155 metric_name: "cpu_usage".to_string(),
1156 },
1157 OutputFormat::Json,
1158 None,
1159 None,
1160 &mut io::sink(),
1161 )
1162 .await;
1163 assert!(result.is_ok());
1164 }
1165
1166 #[tokio::test]
1167 async fn metric_labels_success() {
1168 let provider = make_provider_metric_only(MockMetricProvider {
1169 result: MetricQueryResult {
1170 result_type: MetricResultType::Vector,
1171 series: vec![],
1172 scalar: None,
1173 total_count: 0,
1174 },
1175 });
1176
1177 let result = execute_metric_labels(
1178 &provider,
1179 &MetricMetadataParams {
1180 match_expr: None,
1181 match_exprs: vec![],
1182 start: None,
1183 end: None,
1184 limit: None,
1185 },
1186 OutputFormat::Json,
1187 None,
1188 None,
1189 &mut io::sink(),
1190 )
1191 .await;
1192 assert!(result.is_ok());
1193 }
1194
1195 #[tokio::test]
1196 async fn metric_label_values_success() {
1197 let provider = make_provider_metric_only(MockMetricProvider {
1198 result: MetricQueryResult {
1199 result_type: MetricResultType::Vector,
1200 series: vec![],
1201 scalar: None,
1202 total_count: 0,
1203 },
1204 });
1205
1206 let result = execute_metric_label_values(
1207 &provider,
1208 &LabelValuesParams {
1209 label_name: "instance".to_string(),
1210 match_expr: None,
1211 start: None,
1212 end: None,
1213 limit: Some(100),
1214 },
1215 OutputFormat::Json,
1216 None,
1217 None,
1218 &mut io::sink(),
1219 )
1220 .await;
1221 assert!(result.is_ok());
1222 }
1223
1224 #[tokio::test]
1225 async fn metric_series_success() {
1226 let provider = make_provider_metric_only(MockMetricProvider {
1227 result: MetricQueryResult {
1228 result_type: MetricResultType::Vector,
1229 series: vec![],
1230 scalar: None,
1231 total_count: 0,
1232 },
1233 });
1234
1235 let result = execute_metric_series(
1236 &provider,
1237 &MetricMetadataParams {
1238 match_expr: None,
1239 match_exprs: vec!["{__name__=\"up\"}".to_string()],
1240 start: Some(100),
1241 end: Some(200),
1242 limit: Some(1000),
1243 },
1244 OutputFormat::Json,
1245 None,
1246 None,
1247 &mut io::sink(),
1248 )
1249 .await;
1250 assert!(result.is_ok());
1251 }
1252
1253 #[tokio::test]
1258 async fn log_search_metadata_propagation() {
1259 let provider = make_provider_log_only(MockLogProvider {
1260 is_complete: Some(true),
1261 cursor: Some("next-page-token".to_string()),
1262 });
1263
1264 let mut buf = Vec::new();
1265 execute_log_search(
1266 &provider,
1267 &LogSearchParams {
1268 query: "error".to_string(),
1269 start: 1_700_000_000,
1270 end: 1_700_003_600,
1271 limit: 100,
1272 },
1273 OutputFormat::Json,
1274 None,
1275 None,
1276 &mut buf,
1277 )
1278 .await
1279 .unwrap();
1280
1281 let json = parse_output(&buf);
1282 assert_eq!(json["status"], "success");
1283 assert_eq!(json["metadata"]["cursor"], "next-page-token");
1284 assert_eq!(json["metadata"]["is_complete"], true);
1285 assert_eq!(json["metadata"]["query_language"], "TestLogQL");
1286 assert_eq!(json["data"]["entries"].as_array().unwrap().len(), 1);
1287 }
1288
1289 #[tokio::test]
1290 async fn log_search_unsupported_provider() {
1291 let provider = empty_provider();
1292 let result = execute_log_search(
1293 &provider,
1294 &LogSearchParams {
1295 query: "*".to_string(),
1296 start: 0,
1297 end: 0,
1298 limit: 10,
1299 },
1300 OutputFormat::Json,
1301 None,
1302 None,
1303 &mut io::sink(),
1304 )
1305 .await;
1306
1307 match result {
1308 Err(ExecuteError::Obz(ObzError::Unsupported {
1309 message,
1310 suggestion,
1311 ..
1312 })) => {
1313 assert!(message.contains("log"));
1314 assert!(
1315 suggestion.is_some(),
1316 "log unsupported should include suggestion"
1317 );
1318 let hint = suggestion.unwrap();
1319 assert!(
1320 !hint.contains("obz "),
1321 "core suggestion must not reference CLI commands, got: {hint}"
1322 );
1323 }
1324 other => panic!("expected Unsupported error, got {other:?}"),
1325 }
1326 }
1327
1328 #[tokio::test]
1333 async fn trace_search_success() {
1334 let provider = make_provider_trace_only(MockTraceProvider);
1335 let result = execute_trace_search(
1336 &provider,
1337 &TraceSearchParams {
1338 query: "cart".to_string(),
1339 start: 1_700_000_000,
1340 end: 1_700_003_600,
1341 limit: 20,
1342 },
1343 OutputFormat::Json,
1344 None,
1345 None,
1346 &mut io::sink(),
1347 )
1348 .await;
1349 assert!(result.is_ok());
1350 }
1351
1352 #[tokio::test]
1353 async fn trace_search_unsupported_provider() {
1354 let provider = empty_provider();
1355 let result = execute_trace_search(
1356 &provider,
1357 &TraceSearchParams {
1358 query: "cart".to_string(),
1359 start: 0,
1360 end: 0,
1361 limit: 10,
1362 },
1363 OutputFormat::Json,
1364 None,
1365 None,
1366 &mut io::sink(),
1367 )
1368 .await;
1369
1370 match result {
1371 Err(ExecuteError::Obz(ObzError::Unsupported {
1372 message,
1373 suggestion,
1374 ..
1375 })) => {
1376 assert!(message.contains("trace"));
1377 assert!(
1378 suggestion.is_some(),
1379 "trace unsupported should include suggestion"
1380 );
1381 let hint = suggestion.unwrap();
1382 assert!(
1383 !hint.contains("obz "),
1384 "core suggestion must not reference CLI commands, got: {hint}"
1385 );
1386 }
1387 other => panic!("expected Unsupported error, got {other:?}"),
1388 }
1389 }
1390
1391 #[tokio::test]
1396 async fn trace_get_pass_through() {
1397 let provider = make_provider_trace_only(MockTraceProvider);
1398 let result = execute_trace_get(
1399 &provider,
1400 &TraceGetParams {
1401 trace_id: "abc123".to_string(),
1402 start: 1_700_000_000,
1403 end: 1_700_003_600,
1404 },
1405 OutputFormat::Json,
1406 None,
1407 None,
1408 &mut io::sink(),
1409 )
1410 .await;
1411 assert!(result.is_ok());
1412 }
1413
1414 #[tokio::test]
1419 async fn extension_command_unsupported() {
1420 let provider = empty_provider();
1421 let result = execute_extension_command(
1422 &provider,
1423 "services",
1424 &ExtensionParams {
1425 start: None,
1426 end: None,
1427 signal: String::new(),
1428 args: Vec::new(),
1429 },
1430 OutputFormat::Json,
1431 None,
1432 None,
1433 &mut io::sink(),
1434 )
1435 .await;
1436
1437 match result {
1438 Err(ExecuteError::Obz(ObzError::Unsupported { message, .. })) => {
1439 assert!(message.contains("extension"));
1440 }
1441 other => panic!("expected Unsupported error, got {other:?}"),
1442 }
1443 }
1444
1445 #[tokio::test]
1446 async fn extension_command_success() {
1447 let provider = make_provider_extension_only(MockExtensionProvider);
1448 let result = execute_extension_command(
1449 &provider,
1450 "services",
1451 &ExtensionParams {
1452 start: None,
1453 end: None,
1454 signal: String::new(),
1455 args: Vec::new(),
1456 },
1457 OutputFormat::Json,
1458 None,
1459 None,
1460 &mut io::sink(),
1461 )
1462 .await;
1463 assert!(result.is_ok());
1464 }
1465
1466 #[tokio::test]
1467 async fn extension_command_structured_data() {
1468 let provider = make_provider_extension_only(MockStructuredExtensionProvider);
1469 let mut buf = Vec::new();
1470 execute_extension_command(
1471 &provider,
1472 "services",
1473 &ExtensionParams {
1474 start: None,
1475 end: None,
1476 signal: String::new(),
1477 args: Vec::new(),
1478 },
1479 OutputFormat::Json,
1480 None,
1481 None,
1482 &mut buf,
1483 )
1484 .await
1485 .unwrap();
1486
1487 let json = parse_output(&buf);
1488 assert_eq!(json["status"], "success");
1489 assert_eq!(json["metadata"]["total_count"], 2);
1491 assert_eq!(json["data"]["result_type"], "services");
1492 let data = json["data"]["data"].as_array().unwrap();
1493 assert_eq!(data.len(), 2);
1494 assert_eq!(data[0]["name"], "cart");
1495 assert_eq!(data[1]["spans"], 50);
1496 }
1497
1498 #[test]
1499 fn infer_count_array() {
1500 assert_eq!(infer_count(&serde_json::json!([1, 2, 3])), 3);
1501 }
1502
1503 #[test]
1504 fn infer_count_object() {
1505 assert_eq!(infer_count(&serde_json::json!({"key": "val"})), 0);
1507 }
1508
1509 #[test]
1510 fn infer_count_empty_array() {
1511 assert_eq!(infer_count(&serde_json::json!([])), 0);
1512 }
1513
1514 #[test]
1515 fn infer_count_primitive() {
1516 assert_eq!(infer_count(&serde_json::json!("hello")), 0);
1517 assert_eq!(infer_count(&serde_json::json!(null)), 0);
1518 }
1519}