dynamo_runtime/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics registry trait and implementation for Prometheus metrics
5//!
6//! This module provides a trait-based interface for creating and managing Prometheus metrics
7//! with automatic label injection and hierarchical naming support.
8
9pub mod prometheus_names;
10
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22// Import commonly used items to avoid verbose prefixes
23use prometheus_names::{
24    COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
25    nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
26};
27
28// Pipeline imports for endpoint creation
29use crate::pipeline::{
30    AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31    network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37// If set to true, then metrics will be labeled with the namespace, component, and endpoint labels.
38// These labels are prefixed with "dynamo_" to avoid collisions with Kubernetes and other monitoring system labels.
39pub const USE_AUTO_LABELS: bool = true;
40
41// Prometheus imports
42use prometheus::Encoder;
43
44/// Validate that a label slice has no duplicate keys.
45/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
46fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47    let mut seen_keys = std::collections::HashSet::new();
48    for (key, _) in labels {
49        if !seen_keys.insert(*key) {
50            return Err(anyhow::anyhow!(
51                "Duplicate label key '{}' found in labels",
52                key
53            ));
54        }
55    }
56    Ok(())
57}
58
59/// Trait that defines common behavior for Prometheus metric types
60pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
61    /// Create a new metric with the given options
62    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
63    where
64        Self: Sized;
65
66    /// Create a new metric with histogram options and custom buckets
67    /// This is a default implementation that will panic for non-histogram metrics
68    fn with_histogram_opts_and_buckets(
69        _opts: prometheus::HistogramOpts,
70        _buckets: Option<Vec<f64>>,
71    ) -> Result<Self, prometheus::Error>
72    where
73        Self: Sized,
74    {
75        panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
76    }
77
78    /// Create a new metric with counter options and label names (for CounterVec)
79    /// This is a default implementation that will panic for non-countervec metrics
80    fn with_opts_and_label_names(
81        _opts: prometheus::Opts,
82        _label_names: &[&str],
83    ) -> Result<Self, prometheus::Error>
84    where
85        Self: Sized,
86    {
87        panic!("with_opts_and_label_names is not implemented for this metric type");
88    }
89}
90
91// Implement the trait for Counter, IntCounter, and Gauge
92impl PrometheusMetric for prometheus::Counter {
93    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
94        prometheus::Counter::with_opts(opts)
95    }
96}
97
98impl PrometheusMetric for prometheus::IntCounter {
99    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
100        prometheus::IntCounter::with_opts(opts)
101    }
102}
103
104impl PrometheusMetric for prometheus::Gauge {
105    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
106        prometheus::Gauge::with_opts(opts)
107    }
108}
109
110impl PrometheusMetric for prometheus::IntGauge {
111    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
112        prometheus::IntGauge::with_opts(opts)
113    }
114}
115
116impl PrometheusMetric for prometheus::IntGaugeVec {
117    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
118        Err(prometheus::Error::Msg(
119            "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
120        ))
121    }
122
123    fn with_opts_and_label_names(
124        opts: prometheus::Opts,
125        label_names: &[&str],
126    ) -> Result<Self, prometheus::Error> {
127        prometheus::IntGaugeVec::new(opts, label_names)
128    }
129}
130
131impl PrometheusMetric for prometheus::IntCounterVec {
132    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
133        Err(prometheus::Error::Msg(
134            "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
135        ))
136    }
137
138    fn with_opts_and_label_names(
139        opts: prometheus::Opts,
140        label_names: &[&str],
141    ) -> Result<Self, prometheus::Error> {
142        prometheus::IntCounterVec::new(opts, label_names)
143    }
144}
145
146// Implement the trait for Histogram
147impl PrometheusMetric for prometheus::Histogram {
148    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
149        // Convert Opts to HistogramOpts
150        let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
151        prometheus::Histogram::with_opts(histogram_opts)
152    }
153
154    fn with_histogram_opts_and_buckets(
155        mut opts: prometheus::HistogramOpts,
156        buckets: Option<Vec<f64>>,
157    ) -> Result<Self, prometheus::Error> {
158        if let Some(custom_buckets) = buckets {
159            opts = opts.buckets(custom_buckets);
160        }
161        prometheus::Histogram::with_opts(opts)
162    }
163}
164
165// Implement the trait for CounterVec
166impl PrometheusMetric for prometheus::CounterVec {
167    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
168        // This will panic - CounterVec needs label names
169        panic!("CounterVec requires label names, use with_opts_and_label_names instead");
170    }
171
172    fn with_opts_and_label_names(
173        opts: prometheus::Opts,
174        label_names: &[&str],
175    ) -> Result<Self, prometheus::Error> {
176        prometheus::CounterVec::new(opts, label_names)
177    }
178}
179
180/// Private helper function to create metrics - not accessible to trait implementors
181fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
182    registry: &R,
183    metric_name: &str,
184    metric_desc: &str,
185    labels: &[(&str, &str)],
186    buckets: Option<Vec<f64>>,
187    const_labels: Option<&[&str]>,
188) -> anyhow::Result<T> {
189    // Validate that user-provided labels don't have duplicate keys
190    validate_no_duplicate_label_keys(labels)?;
191    // Note: stored labels functionality has been removed
192
193    let basename = registry.basename();
194    let parent_hierarchy = registry.parent_hierarchy();
195
196    // Build hierarchy: parent_hierarchy + [basename]
197    let hierarchy = [parent_hierarchy.clone(), vec![basename.clone()]].concat();
198
199    let metric_name = build_component_metric_name(metric_name);
200
201    // Build updated_labels: auto-labels first, then `labels` + stored labels
202    let mut updated_labels: Vec<(String, String)> = Vec::new();
203
204    if USE_AUTO_LABELS {
205        // Validate that user-provided labels don't conflict with auto-generated labels
206        for (key, _) in labels {
207            if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
208                return Err(anyhow::anyhow!(
209                    "Label '{}' is automatically added by auto_label feature and cannot be manually set",
210                    key
211                ));
212            }
213        }
214
215        // Add auto-generated labels with sanitized values
216        if hierarchy.len() > 1 {
217            let namespace = &hierarchy[1];
218            if !namespace.is_empty() {
219                let valid_namespace = sanitize_prometheus_label(namespace)?;
220                if !valid_namespace.is_empty() {
221                    updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
222                }
223            }
224        }
225        if hierarchy.len() > 2 {
226            let component = &hierarchy[2];
227            if !component.is_empty() {
228                let valid_component = sanitize_prometheus_label(component)?;
229                if !valid_component.is_empty() {
230                    updated_labels.push((labels::COMPONENT.to_string(), valid_component));
231                }
232            }
233        }
234        if hierarchy.len() > 3 {
235            let endpoint = &hierarchy[3];
236            if !endpoint.is_empty() {
237                let valid_endpoint = sanitize_prometheus_label(endpoint)?;
238                if !valid_endpoint.is_empty() {
239                    updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
240                }
241            }
242        }
243    }
244
245    // Add user labels
246    updated_labels.extend(
247        labels
248            .iter()
249            .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
250    );
251    // Note: stored labels functionality has been removed
252
253    // Handle different metric types
254    let prometheus_metric = if std::any::TypeId::of::<T>()
255        == std::any::TypeId::of::<prometheus::Histogram>()
256    {
257        // Special handling for Histogram with custom buckets
258        // buckets parameter is valid for Histogram, const_labels is not used
259        if const_labels.is_some() {
260            return Err(anyhow::anyhow!(
261                "const_labels parameter is not valid for Histogram"
262            ));
263        }
264        let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
265        for (key, value) in &updated_labels {
266            opts = opts.const_label(key.clone(), value.clone());
267        }
268        T::with_histogram_opts_and_buckets(opts, buckets)?
269    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::CounterVec>() {
270        // Special handling for CounterVec with label names
271        // const_labels parameter is required for CounterVec
272        if buckets.is_some() {
273            return Err(anyhow::anyhow!(
274                "buckets parameter is not valid for CounterVec"
275            ));
276        }
277        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
278        for (key, value) in &updated_labels {
279            opts = opts.const_label(key.clone(), value.clone());
280        }
281        let label_names = const_labels
282            .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
283        T::with_opts_and_label_names(opts, label_names)?
284    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
285        // Special handling for IntGaugeVec with label names
286        // const_labels parameter is required for IntGaugeVec
287        if buckets.is_some() {
288            return Err(anyhow::anyhow!(
289                "buckets parameter is not valid for IntGaugeVec"
290            ));
291        }
292        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
293        for (key, value) in &updated_labels {
294            opts = opts.const_label(key.clone(), value.clone());
295        }
296        let label_names = const_labels
297            .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
298        T::with_opts_and_label_names(opts, label_names)?
299    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
300        // Special handling for IntCounterVec with label names
301        // const_labels parameter is required for IntCounterVec
302        if buckets.is_some() {
303            return Err(anyhow::anyhow!(
304                "buckets parameter is not valid for IntCounterVec"
305            ));
306        }
307        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
308        for (key, value) in &updated_labels {
309            opts = opts.const_label(key.clone(), value.clone());
310        }
311        let label_names = const_labels
312            .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
313        T::with_opts_and_label_names(opts, label_names)?
314    } else {
315        // Standard handling for Counter, IntCounter, Gauge, IntGauge
316        // buckets and const_labels parameters are not valid for these types
317        if buckets.is_some() {
318            return Err(anyhow::anyhow!(
319                "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
320            ));
321        }
322        if const_labels.is_some() {
323            return Err(anyhow::anyhow!(
324                "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
325            ));
326        }
327        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
328        for (key, value) in &updated_labels {
329            opts = opts.const_label(key.clone(), value.clone());
330        }
331        T::with_opts(opts)?
332    };
333
334    // Iterate over the DRT's registry and register this metric across all hierarchical levels.
335    // The accumulated hierarchy is structured as: ["", "testnamespace", "testnamespace_testcomponent", "testnamespace_testcomponent_testendpoint"]
336    // This accumulation is essential to differentiate between the names of children and grandchildren.
337    // Build accumulated hierarchy and register metrics in a single loop
338    // current_prefix accumulates the hierarchical path as we iterate through hierarchy
339    // For example, if hierarchy = ["", "testnamespace", "testcomponent"], then:
340    // - Iteration 1: current_prefix = "" (empty string from DRT)
341    // - Iteration 2: current_prefix = "testnamespace"
342    // - Iteration 3: current_prefix = "testnamespace_testcomponent"
343    let mut current_hierarchy = String::new();
344    for name in &hierarchy {
345        if !current_hierarchy.is_empty() && !name.is_empty() {
346            current_hierarchy.push('_');
347        }
348        current_hierarchy.push_str(name);
349
350        // Register metric at this hierarchical level using the new helper function
351        let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
352        registry
353            .drt()
354            .add_prometheus_metric(&current_hierarchy, collector)?;
355    }
356
357    Ok(prometheus_metric)
358}
359
360/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
361/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
362/// generating output in Prometheus text format.
363use crate::traits::DistributedRuntimeProvider;
364
365pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
366    // Get the name of this registry (without any hierarchy prefix)
367    fn basename(&self) -> String;
368
369    /// Retrieve the complete hierarchy and basename for this registry. Currently, the hierarchy for drt is an empty string,
370    /// so we must account for the leading underscore. The existing code remains unchanged to accommodate any future
371    /// scenarios where drt's prefix might be assigned a value.
372    fn hierarchy(&self) -> String {
373        [self.parent_hierarchy(), vec![self.basename()]]
374            .concat()
375            .join("_")
376            .trim_start_matches('_')
377            .to_string()
378    }
379
380    // Get the parent hierarchy for this registry (just the base names, NOT the flattened hierarchy key)
381    fn parent_hierarchy(&self) -> Vec<String>;
382
383    // TODO: Add support for additional Prometheus metric types:
384    // - Counter: ✅ IMPLEMENTED - create_counter()
385    // - CounterVec: ✅ IMPLEMENTED - create_countervec()
386    // - Gauge: ✅ IMPLEMENTED - create_gauge()
387    // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
388    // - Histogram: ✅ IMPLEMENTED - create_histogram()
389    // - HistogramVec with custom buckets: create_histogram_with_buckets()
390    // - Info: create_info() - for info metrics with labels
391    // - IntCounter: ✅ IMPLEMENTED - create_intcounter()
392    // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
393    // - IntGauge: ✅ IMPLEMENTED - create_intgauge()
394    // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
395    // - Stateset: create_stateset() - for state-based metrics
396    // - Summary: create_summary() - for quantiles and sum/count metrics
397    // - SummaryVec: create_summary_vec() - for labeled summaries
398    // - Untyped: create_untyped() - for untyped metrics
399
400    /// Create a Counter metric
401    fn create_counter(
402        &self,
403        name: &str,
404        description: &str,
405        labels: &[(&str, &str)],
406    ) -> anyhow::Result<prometheus::Counter> {
407        create_metric(self, name, description, labels, None, None)
408    }
409
410    /// Create a CounterVec metric with label names (for dynamic labels)
411    fn create_countervec(
412        &self,
413        name: &str,
414        description: &str,
415        const_labels: &[&str],
416        const_label_values: &[(&str, &str)],
417    ) -> anyhow::Result<prometheus::CounterVec> {
418        create_metric(
419            self,
420            name,
421            description,
422            const_label_values,
423            None,
424            Some(const_labels),
425        )
426    }
427
428    /// Create a Gauge metric
429    fn create_gauge(
430        &self,
431        name: &str,
432        description: &str,
433        labels: &[(&str, &str)],
434    ) -> anyhow::Result<prometheus::Gauge> {
435        create_metric(self, name, description, labels, None, None)
436    }
437
438    /// Create a Histogram metric with custom buckets
439    fn create_histogram(
440        &self,
441        name: &str,
442        description: &str,
443        labels: &[(&str, &str)],
444        buckets: Option<Vec<f64>>,
445    ) -> anyhow::Result<prometheus::Histogram> {
446        create_metric(self, name, description, labels, buckets, None)
447    }
448
449    /// Create an IntCounter metric
450    fn create_intcounter(
451        &self,
452        name: &str,
453        description: &str,
454        labels: &[(&str, &str)],
455    ) -> anyhow::Result<prometheus::IntCounter> {
456        create_metric(self, name, description, labels, None, None)
457    }
458
459    /// Create an IntCounterVec metric with label names (for dynamic labels)
460    fn create_intcountervec(
461        &self,
462        name: &str,
463        description: &str,
464        const_labels: &[&str],
465        const_label_values: &[(&str, &str)],
466    ) -> anyhow::Result<prometheus::IntCounterVec> {
467        create_metric(
468            self,
469            name,
470            description,
471            const_label_values,
472            None,
473            Some(const_labels),
474        )
475    }
476
477    /// Create an IntGauge metric
478    fn create_intgauge(
479        &self,
480        name: &str,
481        description: &str,
482        labels: &[(&str, &str)],
483    ) -> anyhow::Result<prometheus::IntGauge> {
484        create_metric(self, name, description, labels, None, None)
485    }
486
487    /// Create an IntGaugeVec metric with label names (for dynamic labels)
488    fn create_intgaugevec(
489        &self,
490        name: &str,
491        description: &str,
492        const_labels: &[&str],
493        const_label_values: &[(&str, &str)],
494    ) -> anyhow::Result<prometheus::IntGaugeVec> {
495        create_metric(
496            self,
497            name,
498            description,
499            const_label_values,
500            None,
501            Some(const_labels),
502        )
503    }
504
505    /// Get metrics in Prometheus text format
506    fn prometheus_metrics_fmt(&self) -> anyhow::Result<String> {
507        // Execute callbacks first to ensure any new metrics are added to the registry
508        let callback_results = self.drt().execute_metrics_callbacks(&self.hierarchy());
509
510        // Log any callback errors but continue
511        for result in callback_results {
512            if let Err(e) = result {
513                tracing::error!("Error executing metrics callback: {}", e);
514            }
515        }
516
517        // Get the Prometheus registry for this hierarchy
518        let prometheus_registry = {
519            let mut registry_entry = self.drt().hierarchy_to_metricsregistry.write().unwrap();
520            registry_entry
521                .entry(self.hierarchy())
522                .or_default()
523                .prometheus_registry
524                .clone()
525        };
526        let metric_families = prometheus_registry.gather();
527        let encoder = prometheus::TextEncoder::new();
528        let mut buffer = Vec::new();
529        encoder.encode(&metric_families, &mut buffer)?;
530        Ok(String::from_utf8(buffer)?)
531    }
532}
533
534#[cfg(test)]
535mod test_helpers {
536    use super::prometheus_names::name_prefix;
537    use super::prometheus_names::{nats_client, nats_service};
538    use super::*;
539
540    /// Base function to filter Prometheus output lines based on a predicate.
541    /// Returns lines that match the predicate, converted to String.
542    fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
543    where
544        F: FnMut(&str) -> bool,
545    {
546        input
547            .lines()
548            .filter(|line| predicate(line))
549            .map(|line| line.to_string())
550            .collect::<Vec<_>>()
551    }
552
553    /// Filters out all NATS metrics from Prometheus output for test comparisons.
554    pub fn remove_nats_lines(input: &str) -> Vec<String> {
555        filter_prometheus_lines(input, |line| {
556            !line.contains(&format!(
557                "{}_{}",
558                name_prefix::COMPONENT,
559                nats_client::PREFIX
560            )) && !line.contains(&format!(
561                "{}_{}",
562                name_prefix::COMPONENT,
563                nats_service::PREFIX
564            )) && !line.trim().is_empty()
565        })
566    }
567
568    /// Filters to only include NATS metrics from Prometheus output for test comparisons.
569    pub fn extract_nats_lines(input: &str) -> Vec<String> {
570        filter_prometheus_lines(input, |line| {
571            line.contains(&format!(
572                "{}_{}",
573                name_prefix::COMPONENT,
574                nats_client::PREFIX
575            )) || line.contains(&format!(
576                "{}_{}",
577                name_prefix::COMPONENT,
578                nats_service::PREFIX
579            ))
580        })
581    }
582
583    /// Extracts all component metrics (excluding help text and type definitions).
584    /// Returns only the actual metric lines with values.
585    pub fn extract_metrics(input: &str) -> Vec<String> {
586        filter_prometheus_lines(input, |line| {
587            line.starts_with(&format!("{}_", name_prefix::COMPONENT))
588                && !line.starts_with("#")
589                && !line.trim().is_empty()
590        })
591    }
592
593    /// Parses a Prometheus metric line and extracts the name, labels, and value.
594    /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state.
595    ///
596    /// # Example
597    /// ```
598    /// let line = "http_requests_total{method=\"GET\"} 1234";
599    /// let (name, labels, value) = parse_prometheus_metric(line).unwrap();
600    /// assert_eq!(name, "http_requests_total");
601    /// assert_eq!(labels.get("method"), Some(&"GET".to_string()));
602    /// assert_eq!(value, 1234.0);
603    /// ```
604    pub fn parse_prometheus_metric(
605        line: &str,
606    ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
607        if line.trim().is_empty() || line.starts_with('#') {
608            return None;
609        }
610
611        let parts: Vec<&str> = line.split_whitespace().collect();
612        if parts.len() < 2 {
613            return None;
614        }
615
616        let metric_part = parts[0];
617        let value: f64 = parts[1].parse().ok()?;
618
619        let (name, labels) = if metric_part.contains('{') {
620            let brace_start = metric_part.find('{').unwrap();
621            let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
622            let name = &metric_part[..brace_start];
623            let labels_str = &metric_part[brace_start + 1..brace_end];
624
625            let mut labels = std::collections::HashMap::new();
626            for pair in labels_str.split(',') {
627                if let Some((k, v)) = pair.split_once('=') {
628                    let v = v.trim_matches('"');
629                    labels.insert(k.trim().to_string(), v.to_string());
630                }
631            }
632            (name.to_string(), labels)
633        } else {
634            (metric_part.to_string(), std::collections::HashMap::new())
635        };
636
637        Some((name, labels, value))
638    }
639}
640
641#[cfg(test)]
642mod test_metricsregistry_units {
643    use super::*;
644
645    #[test]
646    fn test_build_component_metric_name_with_prefix() {
647        // Test that build_component_metric_name correctly prepends the dynamo_component prefix
648        let result = build_component_metric_name("requests");
649        assert_eq!(result, "dynamo_component_requests");
650
651        let result = build_component_metric_name("counter");
652        assert_eq!(result, "dynamo_component_counter");
653    }
654
655    #[test]
656    fn test_parse_prometheus_metric() {
657        use super::test_helpers::parse_prometheus_metric;
658        use std::collections::HashMap;
659
660        // Test parsing a metric with labels
661        let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
662        let parsed = parse_prometheus_metric(line);
663        assert!(parsed.is_some());
664
665        let (name, labels, value) = parsed.unwrap();
666        assert_eq!(name, "http_requests_total");
667
668        let mut expected_labels = HashMap::new();
669        expected_labels.insert("method".to_string(), "GET".to_string());
670        expected_labels.insert("status".to_string(), "200".to_string());
671        assert_eq!(labels, expected_labels);
672
673        assert_eq!(value, 1234.0);
674
675        // Test parsing a metric without labels
676        let line = "cpu_usage 98.5";
677        let parsed = parse_prometheus_metric(line);
678        assert!(parsed.is_some());
679
680        let (name, labels, value) = parsed.unwrap();
681        assert_eq!(name, "cpu_usage");
682        assert!(labels.is_empty());
683        assert_eq!(value, 98.5);
684
685        // Test parsing a metric with float value
686        let line = "response_time{service=\"api\"} 0.123";
687        let parsed = parse_prometheus_metric(line);
688        assert!(parsed.is_some());
689
690        let (name, labels, value) = parsed.unwrap();
691        assert_eq!(name, "response_time");
692
693        let mut expected_labels = HashMap::new();
694        expected_labels.insert("service".to_string(), "api".to_string());
695        assert_eq!(labels, expected_labels);
696
697        assert_eq!(value, 0.123);
698
699        // Test parsing invalid lines
700        assert!(parse_prometheus_metric("").is_none()); // Empty line
701        assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text
702        assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition
703        assert!(parse_prometheus_metric("metric_name").is_none()); // No value
704
705        println!("✓ Prometheus metric parsing works correctly!");
706    }
707
708    #[test]
709    fn test_metrics_registry_entry_callbacks() {
710        use crate::MetricsRegistryEntry;
711        use std::sync::atomic::{AtomicUsize, Ordering};
712
713        // Test 1: Basic callback execution with counter increments
714        {
715            let mut entry = MetricsRegistryEntry::new();
716            let counter = Arc::new(AtomicUsize::new(0));
717
718            // Add callbacks with different increment values
719            for increment in [1, 10, 100] {
720                let counter_clone = counter.clone();
721                entry.add_callback(Arc::new(move || {
722                    counter_clone.fetch_add(increment, Ordering::SeqCst);
723                    Ok(())
724                }));
725            }
726
727            // Verify counter starts at 0
728            assert_eq!(counter.load(Ordering::SeqCst), 0);
729
730            // First execution
731            let results = entry.execute_callbacks();
732            assert_eq!(results.len(), 3);
733            assert!(results.iter().all(|r| r.is_ok()));
734            assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
735
736            // Second execution - callbacks should be reusable
737            let results = entry.execute_callbacks();
738            assert_eq!(results.len(), 3);
739            assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
740
741            // Test cloning - cloned entry should have no callbacks
742            let cloned = entry.clone();
743            assert_eq!(cloned.execute_callbacks().len(), 0);
744            assert_eq!(counter.load(Ordering::SeqCst), 222); // No change
745
746            // Original still has callbacks
747            entry.execute_callbacks();
748            assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
749        }
750
751        // Test 2: Mixed success and error callbacks
752        {
753            let mut entry = MetricsRegistryEntry::new();
754            let counter = Arc::new(AtomicUsize::new(0));
755
756            // Successful callback
757            let counter_clone = counter.clone();
758            entry.add_callback(Arc::new(move || {
759                counter_clone.fetch_add(1, Ordering::SeqCst);
760                Ok(())
761            }));
762
763            // Error callback
764            entry.add_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
765
766            // Another successful callback
767            let counter_clone = counter.clone();
768            entry.add_callback(Arc::new(move || {
769                counter_clone.fetch_add(10, Ordering::SeqCst);
770                Ok(())
771            }));
772
773            // Execute and verify mixed results
774            let results = entry.execute_callbacks();
775            assert_eq!(results.len(), 3);
776            assert!(results[0].is_ok());
777            assert!(results[1].is_err());
778            assert!(results[2].is_ok());
779
780            // Verify error message
781            assert_eq!(
782                results[1].as_ref().unwrap_err().to_string(),
783                "Simulated error"
784            );
785
786            // Verify successful callbacks still executed
787            assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
788
789            // Execute again - errors should be consistent
790            let results = entry.execute_callbacks();
791            assert!(results[1].is_err());
792            assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
793        }
794
795        // Test 3: Empty registry
796        {
797            let entry = MetricsRegistryEntry::new();
798            let results = entry.execute_callbacks();
799            assert_eq!(results.len(), 0);
800        }
801    }
802}
803
804#[cfg(feature = "integration")]
805#[cfg(test)]
806mod test_metricsregistry_prefixes {
807    use super::*;
808    use crate::distributed::distributed_test_utils::create_test_drt_async;
809    use prometheus::core::Collector;
810
811    #[tokio::test]
812    async fn test_hierarchical_prefixes_and_parent_hierarchies() {
813        let drt = create_test_drt_async().await;
814
815        const DRT_NAME: &str = "";
816        const NAMESPACE_NAME: &str = "ns901";
817        const COMPONENT_NAME: &str = "comp901";
818        const ENDPOINT_NAME: &str = "ep901";
819        let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
820        let component = namespace.component(COMPONENT_NAME).unwrap();
821        let endpoint = component.endpoint(ENDPOINT_NAME);
822
823        // DRT
824        assert_eq!(drt.basename(), DRT_NAME);
825        assert_eq!(drt.parent_hierarchy(), Vec::<String>::new());
826        assert_eq!(drt.hierarchy(), DRT_NAME);
827
828        // Namespace
829        assert_eq!(namespace.basename(), NAMESPACE_NAME);
830        assert_eq!(namespace.parent_hierarchy(), vec!["".to_string()]);
831        assert_eq!(namespace.hierarchy(), NAMESPACE_NAME);
832
833        // Component
834        assert_eq!(component.basename(), COMPONENT_NAME);
835        assert_eq!(
836            component.parent_hierarchy(),
837            vec!["".to_string(), NAMESPACE_NAME.to_string()]
838        );
839        assert_eq!(
840            component.hierarchy(),
841            format!("{}_{}", NAMESPACE_NAME, COMPONENT_NAME)
842        );
843
844        // Endpoint
845        assert_eq!(endpoint.basename(), ENDPOINT_NAME);
846        assert_eq!(
847            endpoint.parent_hierarchy(),
848            vec![
849                "".to_string(),
850                NAMESPACE_NAME.to_string(),
851                COMPONENT_NAME.to_string(),
852            ]
853        );
854        assert_eq!(
855            endpoint.hierarchy(),
856            format!("{}_{}_{}", NAMESPACE_NAME, COMPONENT_NAME, ENDPOINT_NAME)
857        );
858
859        // Relationships
860        assert!(namespace.parent_hierarchy().contains(&drt.basename()));
861        assert!(component.parent_hierarchy().contains(&namespace.basename()));
862        assert!(endpoint.parent_hierarchy().contains(&component.basename()));
863
864        // Depth
865        assert_eq!(drt.parent_hierarchy().len(), 0);
866        assert_eq!(namespace.parent_hierarchy().len(), 1);
867        assert_eq!(component.parent_hierarchy().len(), 2);
868        assert_eq!(endpoint.parent_hierarchy().len(), 3);
869
870        // Invalid namespace behavior - sanitizes to "_123" and succeeds
871        // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
872        // so invalid characters are sanitized in MetricsRegistry rather than rejected.
873        let invalid_namespace = drt.namespace("@@123").unwrap();
874        let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
875        assert!(result.is_ok());
876        if let Ok(counter) = &result {
877            // Verify the namespace was sanitized to "_123" in the label
878            let desc = counter.desc();
879            let namespace_label = desc[0]
880                .const_label_pairs
881                .iter()
882                .find(|l| l.name() == "dynamo_namespace")
883                .expect("Should have dynamo_namespace label");
884            assert_eq!(namespace_label.value(), "_123");
885        }
886
887        // Valid namespace works
888        let valid_namespace = drt.namespace("ns567").unwrap();
889        assert!(
890            valid_namespace
891                .create_counter("test_counter", "A test counter", &[])
892                .is_ok()
893        );
894    }
895
896    #[tokio::test]
897    async fn test_recursive_namespace() {
898        // Create a distributed runtime for testing
899        let drt = create_test_drt_async().await;
900
901        // Create a deeply chained namespace: ns1.ns2.ns3
902        let ns1 = drt.namespace("ns1").unwrap();
903        let ns2 = ns1.namespace("ns2").unwrap();
904        let ns3 = ns2.namespace("ns3").unwrap();
905
906        // Create a component in the deepest namespace
907        let component = ns3.component("test-component").unwrap();
908
909        // Verify the hierarchy structure
910        assert_eq!(ns1.basename(), "ns1");
911        assert_eq!(ns1.parent_hierarchy(), vec!("".to_string()));
912        assert_eq!(ns1.hierarchy(), "ns1");
913
914        assert_eq!(ns2.basename(), "ns2");
915        assert_eq!(
916            ns2.parent_hierarchy(),
917            vec!["".to_string(), "ns1".to_string()]
918        );
919        assert_eq!(ns2.hierarchy(), "ns1_ns2");
920
921        assert_eq!(ns3.basename(), "ns3");
922        assert_eq!(
923            ns3.parent_hierarchy(),
924            vec!["".to_string(), "ns1".to_string(), "ns2".to_string()]
925        );
926        assert_eq!(ns3.hierarchy(), "ns1_ns2_ns3");
927
928        assert_eq!(component.basename(), "test-component");
929        assert_eq!(
930            component.parent_hierarchy(),
931            vec![
932                "".to_string(),
933                "ns1".to_string(),
934                "ns2".to_string(),
935                "ns3".to_string()
936            ]
937        );
938        assert_eq!(component.hierarchy(), "ns1_ns2_ns3_test-component");
939
940        println!("✓ Chained namespace test passed - all prefixes correct");
941    }
942}
943
944#[cfg(feature = "integration")]
945#[cfg(test)]
946mod test_metricsregistry_prometheus_fmt_outputs {
947    use super::prometheus_names::name_prefix;
948    use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
949    use super::prometheus_names::{nats_client, nats_service};
950    use super::*;
951    use crate::distributed::distributed_test_utils::create_test_drt_async;
952    use prometheus::Counter;
953    use std::sync::Arc;
954
955    #[tokio::test]
956    async fn test_prometheusfactory_using_metrics_registry_trait() {
957        // Setup real DRT and registry using the test-friendly constructor
958        let drt = create_test_drt_async().await;
959
960        // Use a simple constant namespace name
961        let namespace_name = "ns345";
962
963        let namespace = drt.namespace(namespace_name).unwrap();
964        let component = namespace.component("comp345").unwrap();
965        let endpoint = component.endpoint("ep345");
966
967        // Test Counter creation
968        let counter = endpoint
969            .create_counter("testcounter", "A test counter", &[])
970            .unwrap();
971        counter.inc_by(123.456789);
972        let epsilon = 0.01;
973        assert!((counter.get() - 123.456789).abs() < epsilon);
974
975        let endpoint_output_raw = endpoint.prometheus_metrics_fmt().unwrap();
976        println!("Endpoint output:");
977        println!("{}", endpoint_output_raw);
978
979        // Filter out NATS service metrics for test comparison
980        let endpoint_output =
981            super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
982
983        let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
984# TYPE dynamo_component_testcounter counter
985dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
986
987        assert_eq!(
988            endpoint_output, expected_endpoint_output,
989            "\n=== ENDPOINT COMPARISON FAILED ===\n\
990             Expected:\n{}\n\
991             Actual:\n{}\n\
992             ==============================",
993            expected_endpoint_output, endpoint_output
994        );
995
996        // Test Gauge creation
997        let gauge = component
998            .create_gauge("testgauge", "A test gauge", &[])
999            .unwrap();
1000        gauge.set(50000.0);
1001        assert_eq!(gauge.get(), 50000.0);
1002
1003        // Test Prometheus format output for Component (gauge + histogram)
1004        let component_output_raw = component.prometheus_metrics_fmt().unwrap();
1005        println!("Component output:");
1006        println!("{}", component_output_raw);
1007
1008        // Filter out NATS service metrics for test comparison
1009        let component_output =
1010            super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
1011
1012        let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1013# TYPE dynamo_component_testcounter counter
1014dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1015# HELP dynamo_component_testgauge A test gauge
1016# TYPE dynamo_component_testgauge gauge
1017dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1018
1019        assert_eq!(
1020            component_output, expected_component_output,
1021            "\n=== COMPONENT COMPARISON FAILED ===\n\
1022             Expected:\n{}\n\
1023             Actual:\n{}\n\
1024             ==============================",
1025            expected_component_output, component_output
1026        );
1027
1028        let intcounter = namespace
1029            .create_intcounter("testintcounter", "A test int counter", &[])
1030            .unwrap();
1031        intcounter.inc_by(12345);
1032        assert_eq!(intcounter.get(), 12345);
1033
1034        // Test Prometheus format output for Namespace (int_counter + gauge + histogram)
1035        let namespace_output_raw = namespace.prometheus_metrics_fmt().unwrap();
1036        println!("Namespace output:");
1037        println!("{}", namespace_output_raw);
1038
1039        // Filter out NATS service metrics for test comparison
1040        let namespace_output =
1041            super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
1042
1043        let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1044# TYPE dynamo_component_testcounter counter
1045dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1046# HELP dynamo_component_testgauge A test gauge
1047# TYPE dynamo_component_testgauge gauge
1048dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1049# HELP dynamo_component_testintcounter A test int counter
1050# TYPE dynamo_component_testintcounter counter
1051dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1052
1053        assert_eq!(
1054            namespace_output, expected_namespace_output,
1055            "\n=== NAMESPACE COMPARISON FAILED ===\n\
1056             Expected:\n{}\n\
1057             Actual:\n{}\n\
1058             ==============================",
1059            expected_namespace_output, namespace_output
1060        );
1061
1062        // Test IntGauge creation
1063        let intgauge = namespace
1064            .create_intgauge("testintgauge", "A test int gauge", &[])
1065            .unwrap();
1066        intgauge.set(42);
1067        assert_eq!(intgauge.get(), 42);
1068
1069        // Test IntGaugeVec creation
1070        let intgaugevec = namespace
1071            .create_intgaugevec(
1072                "testintgaugevec",
1073                "A test int gauge vector",
1074                &["instance", "status"],
1075                &[("service", "api")],
1076            )
1077            .unwrap();
1078        intgaugevec
1079            .with_label_values(&["server1", "active"])
1080            .set(10);
1081        intgaugevec
1082            .with_label_values(&["server2", "inactive"])
1083            .set(0);
1084
1085        // Test CounterVec creation
1086        let countervec = endpoint
1087            .create_countervec(
1088                "testcountervec",
1089                "A test counter vector",
1090                &["method", "status"],
1091                &[("service", "api")],
1092            )
1093            .unwrap();
1094        countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1095        countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1096
1097        // Test Histogram creation
1098        let histogram = component
1099            .create_histogram("testhistogram", "A test histogram", &[], None)
1100            .unwrap();
1101        histogram.observe(1.0);
1102        histogram.observe(2.5);
1103        histogram.observe(4.0);
1104
1105        // Test Prometheus format output for DRT (all metrics combined)
1106        let drt_output_raw = drt.prometheus_metrics_fmt().unwrap();
1107        println!("DRT output:");
1108        println!("{}", drt_output_raw);
1109
1110        // Filter out all NATS metrics for comparison
1111        let filtered_drt_output =
1112            super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
1113
1114        let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1115# TYPE dynamo_component_testcounter counter
1116dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1117# HELP dynamo_component_testcountervec A test counter vector
1118# TYPE dynamo_component_testcountervec counter
1119dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1120dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1121# HELP dynamo_component_testgauge A test gauge
1122# TYPE dynamo_component_testgauge gauge
1123dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1124# HELP dynamo_component_testhistogram A test histogram
1125# TYPE dynamo_component_testhistogram histogram
1126dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1127dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1128dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1129dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1130dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1131dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1132dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1133dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1134dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1135dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1136dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1137dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1138dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1139dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1140# HELP dynamo_component_testintcounter A test int counter
1141# TYPE dynamo_component_testintcounter counter
1142dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1143# HELP dynamo_component_testintgauge A test int gauge
1144# TYPE dynamo_component_testintgauge gauge
1145dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1146# HELP dynamo_component_testintgaugevec A test int gauge vector
1147# TYPE dynamo_component_testintgaugevec gauge
1148dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1149dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1150# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1151# TYPE dynamo_component_uptime_seconds gauge
1152dynamo_component_uptime_seconds 0"#.to_string();
1153
1154        assert_eq!(
1155            filtered_drt_output, expected_drt_output,
1156            "\n=== DRT COMPARISON FAILED ===\n\
1157             Expected:\n{}\n\
1158             Actual (filtered):\n{}\n\
1159             ==============================",
1160            expected_drt_output, filtered_drt_output
1161        );
1162
1163        println!("✓ All Prometheus format outputs verified successfully!");
1164    }
1165
1166    #[test]
1167    fn test_refactored_filter_functions() {
1168        // Test data with mixed content
1169        let test_input = r#"# HELP dynamo_component_requests Total requests
1170# TYPE dynamo_component_requests counter
1171dynamo_component_requests 42
1172# HELP dynamo_component_nats_client_connection_state Connection state
1173# TYPE dynamo_component_nats_client_connection_state gauge
1174dynamo_component_nats_client_connection_state 1
1175# HELP dynamo_component_latency Response latency
1176# TYPE dynamo_component_latency histogram
1177dynamo_component_latency_bucket{le="0.1"} 10
1178dynamo_component_latency_bucket{le="0.5"} 25
1179dynamo_component_nats_service_total_requests 100
1180dynamo_component_nats_service_total_errors 5"#;
1181
1182        // Test remove_nats_lines (excludes NATS lines but keeps help/type)
1183        let filtered_out = super::test_helpers::remove_nats_lines(test_input);
1184        assert_eq!(filtered_out.len(), 7); // 7 non-NATS lines
1185        assert!(!filtered_out.iter().any(|line| line.contains("nats")));
1186
1187        // Test extract_nats_lines (includes all NATS lines including help/type)
1188        let filtered_only = super::test_helpers::extract_nats_lines(test_input);
1189        assert_eq!(filtered_only.len(), 5); // 5 NATS lines
1190        assert!(filtered_only.iter().all(|line| line.contains("nats")));
1191
1192        // Test extract_metrics (only actual metric lines, excluding help/type)
1193        let metrics_only = super::test_helpers::extract_metrics(test_input);
1194        assert_eq!(metrics_only.len(), 6); // 6 actual metric lines (excluding help/type)
1195        assert!(
1196            metrics_only
1197                .iter()
1198                .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1199        );
1200
1201        println!("✓ All refactored filter functions work correctly!");
1202    }
1203}
1204
1205#[cfg(feature = "integration")]
1206#[cfg(test)]
1207mod test_metricsregistry_nats {
1208    use super::prometheus_names::name_prefix;
1209    use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1210    use super::prometheus_names::{nats_client, nats_service};
1211    use super::*;
1212    use crate::distributed::distributed_test_utils::create_test_drt_async;
1213    use crate::pipeline::PushRouter;
1214    use crate::{DistributedRuntime, Runtime};
1215    use tokio::time::{Duration, sleep};
1216    #[tokio::test]
1217    async fn test_drt_nats_metrics() {
1218        // Setup real DRT and registry using the test-friendly constructor
1219        let drt = create_test_drt_async().await;
1220
1221        // Get DRT output which should include NATS client metrics
1222        let drt_output = drt.prometheus_metrics_fmt().unwrap();
1223        println!("DRT output with NATS metrics:");
1224        println!("{}", drt_output);
1225
1226        // Additional checks for NATS client metrics (without checking specific values)
1227        let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
1228
1229        // Check that NATS client metrics are present
1230        assert!(
1231            !drt_nats_metrics.is_empty(),
1232            "NATS client metrics should be present"
1233        );
1234
1235        // Check for specific NATS client metric names (without values)
1236        // Extract only the metric lines from the already-filtered NATS metrics
1237        let drt_nats_metric_lines =
1238            super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
1239        let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
1240            .iter()
1241            .map(|line| {
1242                let without_labels = line.split('{').next().unwrap_or(line);
1243                // Remove the value part (everything after the last space)
1244                without_labels.split(' ').next().unwrap_or(without_labels)
1245            })
1246            .collect();
1247
1248        let expect_drt_nats_metrics_sorted = {
1249            let mut temp = DRT_NATS_METRICS
1250                .iter()
1251                .map(|metric| build_component_metric_name(metric))
1252                .collect::<Vec<_>>();
1253            temp.sort();
1254            temp
1255        };
1256
1257        // Print both lists for comparison
1258        println!(
1259            "actual_drt_nats_metrics_sorted: {:?}",
1260            actual_drt_nats_metrics_sorted
1261        );
1262        println!(
1263            "expect_drt_nats_metrics_sorted: {:?}",
1264            expect_drt_nats_metrics_sorted
1265        );
1266
1267        // Compare the sorted lists
1268        assert_eq!(
1269            actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
1270            "DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1271        );
1272
1273        println!("✓ DistributedRuntime NATS metrics integration test passed!");
1274    }
1275
1276    #[tokio::test]
1277    async fn test_nats_metric_names() {
1278        // This test only tests the existence of the NATS metrics. It does not check
1279        // the values of the metrics.
1280
1281        // Setup real DRT and registry using the test-friendly constructor
1282        let drt = create_test_drt_async().await;
1283
1284        // Create a namespace and components from the DRT
1285        let namespace = drt.namespace("ns789").unwrap();
1286        let components = namespace.component("comp789").unwrap();
1287
1288        // Create a service to trigger metrics callback registration
1289        let _service = components.service_builder().create().await.unwrap();
1290
1291        // Get components output which should include NATS client metrics
1292        // Additional checks for NATS client metrics (without checking specific values)
1293        let component_nats_metrics =
1294            super::test_helpers::extract_nats_lines(&components.prometheus_metrics_fmt().unwrap());
1295        println!(
1296            "Component NATS metrics count: {}",
1297            component_nats_metrics.len()
1298        );
1299
1300        // Check that NATS client metrics are present
1301        assert!(
1302            !component_nats_metrics.is_empty(),
1303            "NATS client metrics should be present"
1304        );
1305
1306        // Check for specific NATS client metric names (without values)
1307        let component_metrics =
1308            super::test_helpers::extract_metrics(&components.prometheus_metrics_fmt().unwrap());
1309        let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
1310            .iter()
1311            .map(|line| {
1312                let without_labels = line.split('{').next().unwrap_or(line);
1313                // Remove the value part (everything after the last space)
1314                without_labels.split(' ').next().unwrap_or(without_labels)
1315            })
1316            .collect();
1317
1318        let expect_component_nats_metrics_sorted = {
1319            let mut temp = COMPONENT_NATS_METRICS
1320                .iter()
1321                .map(|metric| build_component_metric_name(metric))
1322                .collect::<Vec<_>>();
1323            temp.sort();
1324            temp
1325        };
1326
1327        // Print both lists for comparison
1328        println!(
1329            "actual_component_nats_metrics_sorted: {:?}",
1330            actual_component_nats_metrics_sorted
1331        );
1332        println!(
1333            "expect_component_nats_metrics_sorted: {:?}",
1334            expect_component_nats_metrics_sorted
1335        );
1336
1337        // Compare the sorted lists
1338        assert_eq!(
1339            actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
1340            "COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1341        );
1342
1343        // Get both DRT and component output and filter for NATS metrics only
1344        let drt_output = drt.prometheus_metrics_fmt().unwrap();
1345        let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
1346        let drt_and_component_nats_metrics =
1347            super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
1348        println!(
1349            "DRT and component NATS metrics count: {}",
1350            drt_and_component_nats_metrics.len()
1351        );
1352
1353        // Check that the NATS metrics are present in the component output
1354        assert_eq!(
1355            drt_and_component_nats_metrics.len(),
1356            DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
1357            "DRT at this point should have both the DRT and component NATS metrics"
1358        );
1359
1360        // Check that the NATS metrics are present in the component output
1361        println!("✓ Component NATS metrics integration test passed!");
1362    }
1363
1364    /// Tests NATS metrics values before and after endpoint activity with large message processing.
1365    /// Creates endpoint, sends test messages + 10k byte message, validates metrics (NATS + work handler)
1366    /// at initial state and post-activity state. Ensures byte thresholds, message counts, and processing
1367    /// times are within expected ranges. Tests end-to-end client-server communication and metrics collection.
1368    #[tokio::test]
1369    async fn test_nats_metrics_values() -> anyhow::Result<()> {
1370        struct MessageHandler {}
1371        impl MessageHandler {
1372            fn new() -> std::sync::Arc<Self> {
1373                std::sync::Arc::new(Self {})
1374            }
1375        }
1376
1377        #[async_trait]
1378        impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
1379            async fn generate(
1380                &self,
1381                input: SingleIn<String>,
1382            ) -> Result<ManyOut<Annotated<String>>, Error> {
1383                let (data, ctx) = input.into_parts();
1384                let response = data.to_string();
1385                let stream = stream::iter(vec![Annotated::from_data(response)]);
1386                Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
1387            }
1388        }
1389
1390        println!("\n=== Initializing DistributedRuntime ===");
1391        let runtime = Runtime::from_current()?;
1392        let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
1393        let namespace = drt.namespace("ns123").unwrap();
1394        let component = namespace.component("comp123").unwrap();
1395        let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
1396
1397        let _backend_handle = tokio::spawn(async move {
1398            let service = component.service_builder().create().await.unwrap();
1399            let endpoint = service.endpoint("echo").endpoint_builder().handler(ingress);
1400            endpoint.start().await.unwrap();
1401        });
1402
1403        sleep(Duration::from_millis(500)).await;
1404        println!("✓ Launched endpoint service in background successfully");
1405
1406        let drt_output = drt.prometheus_metrics_fmt().unwrap();
1407        let parsed_metrics: Vec<_> = drt_output
1408            .lines()
1409            .filter_map(super::test_helpers::parse_prometheus_metric)
1410            .collect();
1411
1412        println!("=== Initial DRT metrics output ===");
1413        println!("{}", drt_output);
1414
1415        println!("\n=== Checking Initial Metric Values ===");
1416
1417        let initial_expected_metric_values = [
1418            // DRT NATS metrics (ordered to match DRT_NATS_METRICS)
1419            (
1420                build_component_metric_name(nats_client::CONNECTION_STATE),
1421                1.0,
1422                1.0,
1423            ), // Should be connected
1424            (build_component_metric_name(nats_client::CONNECTS), 1.0, 1.0), // Should have 1 connection
1425            (
1426                build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1427                800.0,
1428                4000.0,
1429            ), // Wide range around observed value of 1888
1430            (
1431                build_component_metric_name(nats_client::IN_MESSAGES),
1432                0.0,
1433                5.0,
1434            ), // Wide range around 2
1435            (
1436                build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1437                1500.0,
1438                5000.0,
1439            ), // Wide range around observed value of 2752
1440            (
1441                build_component_metric_name(nats_client::OUT_MESSAGES),
1442                0.0,
1443                5.0,
1444            ), // Wide range around 2
1445            // Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
1446            (
1447                build_component_metric_name(nats_service::AVG_PROCESSING_MS),
1448                0.0,
1449                0.0,
1450            ), // No processing yet
1451            (
1452                build_component_metric_name(nats_service::TOTAL_ERRORS),
1453                0.0,
1454                0.0,
1455            ), // No errors yet
1456            (
1457                build_component_metric_name(nats_service::TOTAL_REQUESTS),
1458                0.0,
1459                0.0,
1460            ), // No requests yet
1461            (
1462                build_component_metric_name(nats_service::TOTAL_PROCESSING_MS),
1463                0.0,
1464                0.0,
1465            ), // No processing yet
1466            (
1467                build_component_metric_name(nats_service::ACTIVE_SERVICES),
1468                0.0,
1469                2.0,
1470            ), // Service may not be fully active yet
1471            (
1472                build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1473                0.0,
1474                2.0,
1475            ), // Endpoint may not be fully active yet
1476        ];
1477
1478        for (metric_name, min_value, max_value) in &initial_expected_metric_values {
1479            let actual_value = parsed_metrics
1480                .iter()
1481                .find(|(name, _, _)| name == metric_name)
1482                .map(|(_, _, value)| *value)
1483                .unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
1484
1485            assert!(
1486                actual_value >= *min_value && actual_value <= *max_value,
1487                "Initial metric {} should be between {} and {}, but got {}",
1488                metric_name,
1489                min_value,
1490                max_value,
1491                actual_value
1492            );
1493        }
1494
1495        println!("\n=== Client Runtime to hit the endpoint ===");
1496        let client_runtime = Runtime::from_current()?;
1497        let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
1498        let namespace = client_distributed.namespace("ns123")?;
1499        let component = namespace.component("comp123")?;
1500        let client = component.endpoint("echo").client().await?;
1501
1502        client.wait_for_instances().await?;
1503        println!("✓ Connected to endpoint, waiting for instances...");
1504
1505        let router =
1506            PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
1507                .await?;
1508
1509        for i in 0..10 {
1510            let msg = i.to_string().repeat(2000); // 2k bytes message
1511            let mut stream = router.random(msg.clone().into()).await?;
1512            while let Some(resp) = stream.next().await {
1513                // Check if response matches the original message
1514                if let Some(data) = &resp.data {
1515                    let is_same = data == &msg;
1516                    println!(
1517                        "Response {}: {} bytes, matches original: {}",
1518                        i,
1519                        data.len(),
1520                        is_same
1521                    );
1522                }
1523            }
1524        }
1525        println!("✓ Sent messages and received responses successfully");
1526
1527        println!("\n=== Waiting 500ms for metrics to update ===");
1528        sleep(Duration::from_millis(500)).await;
1529        println!("✓ Wait complete, getting final metrics...");
1530
1531        let final_drt_output = drt.prometheus_metrics_fmt().unwrap();
1532        println!("\n=== Final Prometheus DRT output ===");
1533        println!("{}", final_drt_output);
1534
1535        let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
1536        println!("\n=== Filtered NATS metrics from final DRT output ===");
1537        for line in &final_drt_nats_output {
1538            println!("{}", line);
1539        }
1540
1541        let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
1542            .iter()
1543            .filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
1544            .collect();
1545
1546        let post_expected_metric_values = [
1547            // DRT NATS metrics
1548            (
1549                build_component_metric_name(nats_client::CONNECTION_STATE),
1550                1.0,
1551                1.0,
1552            ), // Connected
1553            (build_component_metric_name(nats_client::CONNECTS), 1.0, 1.0), // 1 connection
1554            (
1555                build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1556                20000.0,
1557                32000.0,
1558            ), // Wide range around 26117
1559            (
1560                build_component_metric_name(nats_client::IN_MESSAGES),
1561                8.0,
1562                20.0,
1563            ), // Wide range around 16
1564            (
1565                build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1566                2500.0,
1567                8000.0,
1568            ), // Wide range around 5524
1569            (
1570                build_component_metric_name(nats_client::OUT_MESSAGES),
1571                8.0,
1572                20.0,
1573            ), // Wide range around 16
1574            // Component NATS metrics
1575            (
1576                build_component_metric_name(nats_service::AVG_PROCESSING_MS),
1577                0.0,
1578                1.0,
1579            ), // Low processing time
1580            (
1581                build_component_metric_name(nats_service::TOTAL_ERRORS),
1582                0.0,
1583                0.0,
1584            ), // No errors
1585            (
1586                build_component_metric_name(nats_service::TOTAL_REQUESTS),
1587                0.0,
1588                0.0,
1589            ), // No work handler requests
1590            (
1591                build_component_metric_name(nats_service::TOTAL_PROCESSING_MS),
1592                0.0,
1593                5.0,
1594            ), // Low total processing time
1595            (
1596                build_component_metric_name(nats_service::ACTIVE_SERVICES),
1597                0.0,
1598                2.0,
1599            ), // Service may not be fully active
1600            (
1601                build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1602                0.0,
1603                2.0,
1604            ), // Endpoint may not be fully active
1605            // Work handler metrics
1606            (
1607                build_component_metric_name(work_handler::REQUESTS_TOTAL),
1608                10.0,
1609                10.0,
1610            ), // 10 messages
1611            (
1612                build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
1613                21000.0,
1614                26000.0,
1615            ), // ~75-125% of 23520
1616            (
1617                build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
1618                18000.0,
1619                23000.0,
1620            ), // ~75-125% of 20660
1621            (
1622                build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
1623                0.0,
1624                1.0,
1625            ), // 0 or very low
1626            // Histograms have _{count,sum} suffixes
1627            (
1628                format!(
1629                    "{}_count",
1630                    build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1631                ),
1632                10.0,
1633                10.0,
1634            ), // 10 messages
1635            (
1636                format!(
1637                    "{}_sum",
1638                    build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1639                ),
1640                0.0001,
1641                1.0,
1642            ), // Processing time sum (wide range)
1643        ];
1644
1645        println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
1646        for (metric_name, min_value, max_value) in &post_expected_metric_values {
1647            let actual_value = final_parsed_metrics
1648                .iter()
1649                .find(|(name, _, _)| name == metric_name)
1650                .map(|(_, _, value)| *value)
1651                .unwrap_or_else(|| {
1652                    panic!(
1653                        "Could not find expected post-activity metric: {}",
1654                        metric_name
1655                    )
1656                });
1657
1658            assert!(
1659                actual_value >= *min_value && actual_value <= *max_value,
1660                "Post-activity metric {} should be between {} and {}, but got {}",
1661                metric_name,
1662                min_value,
1663                max_value,
1664                actual_value
1665            );
1666            println!(
1667                "✓ {}: {} (range: {} to {})",
1668                metric_name, actual_value, min_value, max_value
1669            );
1670        }
1671
1672        println!("✓ All NATS and component metrics parsed successfully!");
1673        println!("✓ Byte metrics verified to be >= 100 bytes!");
1674        println!("✓ Post-activity metrics verified with higher thresholds!");
1675        println!("✓ Work handler metrics reflect increased activity!");
1676
1677        Ok(())
1678    }
1679}