dynamo_runtime/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics registry trait and implementation for Prometheus metrics
5//!
6//! This module provides a trait-based interface for creating and managing Prometheus metrics
7//! with automatic label injection and hierarchical naming support.
8
9pub mod prometheus_names;
10
11use parking_lot::Mutex;
12use std::collections::HashSet;
13use std::sync::Arc;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22// Import commonly used items to avoid verbose prefixes
23use prometheus_names::{
24    COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
25    nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
26};
27
28// Pipeline imports for endpoint creation
29use crate::pipeline::{
30    AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31    network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37// If set to true, then metrics will be labeled with the namespace, component, and endpoint labels.
38// These labels are prefixed with "dynamo_" to avoid collisions with Kubernetes and other monitoring system labels.
39pub const USE_AUTO_LABELS: bool = true;
40
41// Prometheus imports
42use prometheus::Encoder;
43
44/// Validate that a label slice has no duplicate keys.
45/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
46fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47    let mut seen_keys = std::collections::HashSet::new();
48    for (key, _) in labels {
49        if !seen_keys.insert(*key) {
50            return Err(anyhow::anyhow!(
51                "Duplicate label key '{}' found in labels",
52                key
53            ));
54        }
55    }
56    Ok(())
57}
58
59/// ==============================
60/// Prometheus section
61/// ==============================
62/// Trait that defines common behavior for Prometheus metric types
63pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
64    /// Create a new metric with the given options
65    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
66    where
67        Self: Sized;
68
69    /// Create a new metric with histogram options and custom buckets
70    /// This is a default implementation that will panic for non-histogram metrics
71    fn with_histogram_opts_and_buckets(
72        _opts: prometheus::HistogramOpts,
73        _buckets: Option<Vec<f64>>,
74    ) -> Result<Self, prometheus::Error>
75    where
76        Self: Sized,
77    {
78        panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
79    }
80
81    /// Create a new metric with counter options and label names (for CounterVec)
82    /// This is a default implementation that will panic for non-countervec metrics
83    fn with_opts_and_label_names(
84        _opts: prometheus::Opts,
85        _label_names: &[&str],
86    ) -> Result<Self, prometheus::Error>
87    where
88        Self: Sized,
89    {
90        panic!("with_opts_and_label_names is not implemented for this metric type");
91    }
92}
93
94// Implement the trait for Counter, IntCounter, and Gauge
95impl PrometheusMetric for prometheus::Counter {
96    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
97        prometheus::Counter::with_opts(opts)
98    }
99}
100
101impl PrometheusMetric for prometheus::IntCounter {
102    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
103        prometheus::IntCounter::with_opts(opts)
104    }
105}
106
107impl PrometheusMetric for prometheus::Gauge {
108    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
109        prometheus::Gauge::with_opts(opts)
110    }
111}
112
113impl PrometheusMetric for prometheus::IntGauge {
114    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
115        prometheus::IntGauge::with_opts(opts)
116    }
117}
118
119impl PrometheusMetric for prometheus::GaugeVec {
120    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
121        Err(prometheus::Error::Msg(
122            "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
123        ))
124    }
125
126    fn with_opts_and_label_names(
127        opts: prometheus::Opts,
128        label_names: &[&str],
129    ) -> Result<Self, prometheus::Error> {
130        prometheus::GaugeVec::new(opts, label_names)
131    }
132}
133
134impl PrometheusMetric for prometheus::IntGaugeVec {
135    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
136        Err(prometheus::Error::Msg(
137            "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
138        ))
139    }
140
141    fn with_opts_and_label_names(
142        opts: prometheus::Opts,
143        label_names: &[&str],
144    ) -> Result<Self, prometheus::Error> {
145        prometheus::IntGaugeVec::new(opts, label_names)
146    }
147}
148
149impl PrometheusMetric for prometheus::IntCounterVec {
150    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
151        Err(prometheus::Error::Msg(
152            "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
153        ))
154    }
155
156    fn with_opts_and_label_names(
157        opts: prometheus::Opts,
158        label_names: &[&str],
159    ) -> Result<Self, prometheus::Error> {
160        prometheus::IntCounterVec::new(opts, label_names)
161    }
162}
163
164// Implement the trait for Histogram
165impl PrometheusMetric for prometheus::Histogram {
166    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
167        // Convert Opts to HistogramOpts
168        let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
169        prometheus::Histogram::with_opts(histogram_opts)
170    }
171
172    fn with_histogram_opts_and_buckets(
173        mut opts: prometheus::HistogramOpts,
174        buckets: Option<Vec<f64>>,
175    ) -> Result<Self, prometheus::Error> {
176        if let Some(custom_buckets) = buckets {
177            opts = opts.buckets(custom_buckets);
178        }
179        prometheus::Histogram::with_opts(opts)
180    }
181}
182
183// Implement the trait for CounterVec
184impl PrometheusMetric for prometheus::CounterVec {
185    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
186        // This will panic - CounterVec needs label names
187        panic!("CounterVec requires label names, use with_opts_and_label_names instead");
188    }
189
190    fn with_opts_and_label_names(
191        opts: prometheus::Opts,
192        label_names: &[&str],
193    ) -> Result<Self, prometheus::Error> {
194        prometheus::CounterVec::new(opts, label_names)
195    }
196}
197
198/// ==============================
199/// Metrics section
200/// ==============================
201/// Public helper function to create metrics - accessible for Python bindings
202pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
203    hierarchy: &H,
204    metric_name: &str,
205    metric_desc: &str,
206    labels: &[(&str, &str)],
207    buckets: Option<Vec<f64>>,
208    const_labels: Option<&[&str]>,
209) -> anyhow::Result<T> {
210    // Validate that user-provided labels don't have duplicate keys
211    validate_no_duplicate_label_keys(labels)?;
212    // Note: stored labels functionality has been removed
213
214    let basename = hierarchy.basename();
215    let parent_hierarchies = hierarchy.parent_hierarchies();
216
217    // Build hierarchy path as vector of strings: parent names + [basename]
218    let mut hierarchy_names: Vec<String> =
219        parent_hierarchies.iter().map(|p| p.basename()).collect();
220    hierarchy_names.push(basename.clone());
221
222    let metric_name = build_component_metric_name(metric_name);
223
224    // Build updated_labels: auto-labels first, then `labels` + stored labels
225    let mut updated_labels: Vec<(String, String)> = Vec::new();
226
227    if USE_AUTO_LABELS {
228        // Validate that user-provided labels don't conflict with auto-generated labels
229        for (key, _) in labels {
230            if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
231                return Err(anyhow::anyhow!(
232                    "Label '{}' is automatically added by auto_label feature and cannot be manually set",
233                    key
234                ));
235            }
236        }
237
238        // Add auto-generated labels with sanitized values
239        if hierarchy_names.len() > 1 {
240            let namespace = &hierarchy_names[1];
241            if !namespace.is_empty() {
242                let valid_namespace = sanitize_prometheus_label(namespace)?;
243                if !valid_namespace.is_empty() {
244                    updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
245                }
246            }
247        }
248        if hierarchy_names.len() > 2 {
249            let component = &hierarchy_names[2];
250            if !component.is_empty() {
251                let valid_component = sanitize_prometheus_label(component)?;
252                if !valid_component.is_empty() {
253                    updated_labels.push((labels::COMPONENT.to_string(), valid_component));
254                }
255            }
256        }
257        if hierarchy_names.len() > 3 {
258            let endpoint = &hierarchy_names[3];
259            if !endpoint.is_empty() {
260                let valid_endpoint = sanitize_prometheus_label(endpoint)?;
261                if !valid_endpoint.is_empty() {
262                    updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
263                }
264            }
265        }
266    }
267
268    // Add user labels
269    updated_labels.extend(
270        labels
271            .iter()
272            .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
273    );
274    // Note: stored labels functionality has been removed
275
276    // Handle different metric types
277    let prometheus_metric = if std::any::TypeId::of::<T>()
278        == std::any::TypeId::of::<prometheus::CounterVec>()
279    {
280        // Special handling for CounterVec with label names
281        // const_labels parameter is required for CounterVec
282        if buckets.is_some() {
283            return Err(anyhow::anyhow!(
284                "buckets parameter is not valid for CounterVec"
285            ));
286        }
287        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
288        for (key, value) in &updated_labels {
289            opts = opts.const_label(key.clone(), value.clone());
290        }
291        let label_names = const_labels
292            .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
293        T::with_opts_and_label_names(opts, label_names)?
294    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
295        // Special handling for GaugeVec with label names
296        // const_labels parameter is required for GaugeVec
297        if buckets.is_some() {
298            return Err(anyhow::anyhow!(
299                "buckets parameter is not valid for GaugeVec"
300            ));
301        }
302        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
303        for (key, value) in &updated_labels {
304            opts = opts.const_label(key.clone(), value.clone());
305        }
306        let label_names = const_labels
307            .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
308        T::with_opts_and_label_names(opts, label_names)?
309    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
310        // Special handling for Histogram with custom buckets
311        // buckets parameter is valid for Histogram, const_labels is not used
312        if const_labels.is_some() {
313            return Err(anyhow::anyhow!(
314                "const_labels parameter is not valid for Histogram"
315            ));
316        }
317        let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
318        for (key, value) in &updated_labels {
319            opts = opts.const_label(key.clone(), value.clone());
320        }
321        T::with_histogram_opts_and_buckets(opts, buckets)?
322    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
323        // Special handling for IntCounterVec with label names
324        // const_labels parameter is required for IntCounterVec
325        if buckets.is_some() {
326            return Err(anyhow::anyhow!(
327                "buckets parameter is not valid for IntCounterVec"
328            ));
329        }
330        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
331        for (key, value) in &updated_labels {
332            opts = opts.const_label(key.clone(), value.clone());
333        }
334        let label_names = const_labels
335            .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
336        T::with_opts_and_label_names(opts, label_names)?
337    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
338        // Special handling for IntGaugeVec with label names
339        // const_labels parameter is required for IntGaugeVec
340        if buckets.is_some() {
341            return Err(anyhow::anyhow!(
342                "buckets parameter is not valid for IntGaugeVec"
343            ));
344        }
345        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
346        for (key, value) in &updated_labels {
347            opts = opts.const_label(key.clone(), value.clone());
348        }
349        let label_names = const_labels
350            .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
351        T::with_opts_and_label_names(opts, label_names)?
352    } else {
353        // Standard handling for Counter, IntCounter, Gauge, IntGauge
354        // buckets and const_labels parameters are not valid for these types
355        if buckets.is_some() {
356            return Err(anyhow::anyhow!(
357                "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
358            ));
359        }
360        if const_labels.is_some() {
361            return Err(anyhow::anyhow!(
362                "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
363            ));
364        }
365        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366        for (key, value) in &updated_labels {
367            opts = opts.const_label(key.clone(), value.clone());
368        }
369        T::with_opts(opts)?
370    };
371
372    // Register the metric at all hierarchy levels (parents + self)
373    // First register at all parent levels
374    for parent in parent_hierarchies {
375        let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
376        parent.get_metrics_registry().add_metric(collector)?;
377    }
378
379    // Then register at this level
380    let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
381    hierarchy.get_metrics_registry().add_metric(collector)?;
382
383    Ok(prometheus_metric)
384}
385
386/// Wrapper struct that provides access to metrics functionality
387/// This struct is accessed via the `.metrics()` method on DistributedRuntime, Namespace, Component, and Endpoint
388pub struct Metrics<H: MetricsHierarchy> {
389    hierarchy: H,
390}
391
392impl<H: MetricsHierarchy> Metrics<H> {
393    pub fn new(hierarchy: H) -> Self {
394        Self { hierarchy }
395    }
396
397    // TODO: Add support for additional Prometheus metric types:
398    // - Counter: ✅ IMPLEMENTED - create_counter()
399    // - CounterVec: ✅ IMPLEMENTED - create_countervec()
400    // - Gauge: ✅ IMPLEMENTED - create_gauge()
401    // - GaugeVec: ✅ IMPLEMENTED - create_gaugevec()
402    // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
403    // - Histogram: ✅ IMPLEMENTED - create_histogram()
404    // - HistogramVec with custom buckets: create_histogram_with_buckets()
405    // - Info: create_info() - for info metrics with labels
406    // - IntCounter: ✅ IMPLEMENTED - create_intcounter()
407    // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
408    // - IntGauge: ✅ IMPLEMENTED - create_intgauge()
409    // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
410    // - Stateset: create_stateset() - for state-based metrics
411    // - Summary: create_summary() - for quantiles and sum/count metrics
412    // - SummaryVec: create_summary_vec() - for labeled summaries
413    // - Untyped: create_untyped() - for untyped metrics
414    //
415    // NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics
416    // Keep them synchronized when adding new metric types
417
418    /// Create a Counter metric
419    pub fn create_counter(
420        &self,
421        name: &str,
422        description: &str,
423        labels: &[(&str, &str)],
424    ) -> anyhow::Result<prometheus::Counter> {
425        create_metric(&self.hierarchy, name, description, labels, None, None)
426    }
427
428    /// Create a CounterVec metric with label names (for dynamic labels)
429    pub fn create_countervec(
430        &self,
431        name: &str,
432        description: &str,
433        const_labels: &[&str],
434        const_label_values: &[(&str, &str)],
435    ) -> anyhow::Result<prometheus::CounterVec> {
436        create_metric(
437            &self.hierarchy,
438            name,
439            description,
440            const_label_values,
441            None,
442            Some(const_labels),
443        )
444    }
445
446    /// Create a Gauge metric
447    pub fn create_gauge(
448        &self,
449        name: &str,
450        description: &str,
451        labels: &[(&str, &str)],
452    ) -> anyhow::Result<prometheus::Gauge> {
453        create_metric(&self.hierarchy, name, description, labels, None, None)
454    }
455
456    /// Create a GaugeVec metric with label names (for dynamic labels)
457    pub fn create_gaugevec(
458        &self,
459        name: &str,
460        description: &str,
461        const_labels: &[&str],
462        const_label_values: &[(&str, &str)],
463    ) -> anyhow::Result<prometheus::GaugeVec> {
464        create_metric(
465            &self.hierarchy,
466            name,
467            description,
468            const_label_values,
469            None,
470            Some(const_labels),
471        )
472    }
473
474    /// Create a Histogram metric with custom buckets
475    pub fn create_histogram(
476        &self,
477        name: &str,
478        description: &str,
479        labels: &[(&str, &str)],
480        buckets: Option<Vec<f64>>,
481    ) -> anyhow::Result<prometheus::Histogram> {
482        create_metric(&self.hierarchy, name, description, labels, buckets, None)
483    }
484
485    /// Create an IntCounter metric
486    pub fn create_intcounter(
487        &self,
488        name: &str,
489        description: &str,
490        labels: &[(&str, &str)],
491    ) -> anyhow::Result<prometheus::IntCounter> {
492        create_metric(&self.hierarchy, name, description, labels, None, None)
493    }
494
495    /// Create an IntCounterVec metric with label names (for dynamic labels)
496    pub fn create_intcountervec(
497        &self,
498        name: &str,
499        description: &str,
500        const_labels: &[&str],
501        const_label_values: &[(&str, &str)],
502    ) -> anyhow::Result<prometheus::IntCounterVec> {
503        create_metric(
504            &self.hierarchy,
505            name,
506            description,
507            const_label_values,
508            None,
509            Some(const_labels),
510        )
511    }
512
513    /// Create an IntGauge metric
514    pub fn create_intgauge(
515        &self,
516        name: &str,
517        description: &str,
518        labels: &[(&str, &str)],
519    ) -> anyhow::Result<prometheus::IntGauge> {
520        create_metric(&self.hierarchy, name, description, labels, None, None)
521    }
522
523    /// Create an IntGaugeVec metric with label names (for dynamic labels)
524    pub fn create_intgaugevec(
525        &self,
526        name: &str,
527        description: &str,
528        const_labels: &[&str],
529        const_label_values: &[(&str, &str)],
530    ) -> anyhow::Result<prometheus::IntGaugeVec> {
531        create_metric(
532            &self.hierarchy,
533            name,
534            description,
535            const_label_values,
536            None,
537            Some(const_labels),
538        )
539    }
540
541    /// Get metrics in Prometheus text format
542    pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
543        // Execute callbacks first to ensure any new metrics are added to the registry
544        let callback_results = self
545            .hierarchy
546            .get_metrics_registry()
547            .execute_update_callbacks();
548
549        // Log any callback errors but continue
550        for result in callback_results {
551            if let Err(e) = result {
552                tracing::error!("Error executing metrics callback: {}", e);
553            }
554        }
555
556        // Get the Prometheus registry for this hierarchy
557        let prometheus_registry = self
558            .hierarchy
559            .get_metrics_registry()
560            .get_prometheus_registry();
561
562        // Encode metrics from the registry
563        let metric_families = prometheus_registry.gather();
564        let encoder = prometheus::TextEncoder::new();
565        let mut buffer = Vec::new();
566        encoder.encode(&metric_families, &mut buffer)?;
567        let mut result = String::from_utf8(buffer)?;
568
569        // Execute and append exposition text callback results
570        let expfmt = self
571            .hierarchy
572            .get_metrics_registry()
573            .execute_expfmt_callbacks();
574        if !expfmt.is_empty() {
575            if !result.ends_with('\n') {
576                result.push('\n');
577            }
578            result.push_str(&expfmt);
579        }
580
581        Ok(result)
582    }
583}
584
585/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
586/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
587/// generating output in Prometheus text format.
588use crate::traits::DistributedRuntimeProvider;
589
590pub trait MetricsHierarchy: Send + Sync {
591    // ========================================================================
592    // Required methods - must be implemented by all types
593    // ========================================================================
594
595    /// Get the name of this hierarchy (without any hierarchy prefix)
596    fn basename(&self) -> String;
597
598    /// Get the parent hierarchies as actual objects (not strings)
599    /// Returns a vector of hierarchy references, ordered from root to immediate parent.
600    /// For example, an Endpoint would return [DRT, Namespace, Component].
601    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
602
603    /// Get a reference to this hierarchy's metrics registry
604    fn get_metrics_registry(&self) -> &MetricsRegistry;
605
606    // ========================================================================
607    // Provided methods - have default implementations
608    // ========================================================================
609
610    /// Access the metrics interface for this hierarchy
611    /// This is a provided method that works for any type implementing MetricsHierarchy
612    fn metrics(&self) -> Metrics<&Self>
613    where
614        Self: Sized,
615    {
616        Metrics::new(self)
617    }
618}
619
620// Blanket implementation for references to types that implement MetricsHierarchy
621impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
622    fn basename(&self) -> String {
623        (**self).basename()
624    }
625
626    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
627        (**self).parent_hierarchies()
628    }
629
630    fn get_metrics_registry(&self) -> &MetricsRegistry {
631        (**self).get_metrics_registry()
632    }
633}
634
635/// Type alias for runtime callback functions to reduce complexity
636///
637/// This type represents an Arc-wrapped callback function that can be:
638/// - Shared efficiently across multiple threads and contexts
639/// - Cloned without duplicating the underlying closure
640/// - Used in generic contexts requiring 'static lifetime
641///
642/// The Arc wrapper is included in the type to make sharing explicit.
643pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
644
645/// Type alias for exposition text callback functions that return Prometheus text
646pub type PrometheusExpositionFormatCallback =
647    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
648
649/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
650pub struct MetricsRegistry {
651    /// The Prometheus registry for this hierarchy (with interior mutability for thread-safe access)
652    pub prometheus_registry: std::sync::RwLock<prometheus::Registry>,
653
654    /// Update callbacks invoked before metrics are scraped.
655    /// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned).
656    pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
657
658    /// Callbacks that return Prometheus exposition text appended to metrics output.
659    /// Wrapped in Arc to preserve callbacks across clones (e.g., vLLM callbacks registered at Endpoint remain accessible at DRT).
660    pub prometheus_expfmt_callbacks:
661        Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
662}
663
664impl std::fmt::Debug for MetricsRegistry {
665    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
666        f.debug_struct("MetricsRegistry")
667            .field("prometheus_registry", &"<RwLock<Registry>>")
668            .field(
669                "prometheus_update_callbacks",
670                &format!(
671                    "<RwLock<Vec<Callback>>> with {} callbacks",
672                    self.prometheus_update_callbacks.read().unwrap().len()
673                ),
674            )
675            .field(
676                "prometheus_expfmt_callbacks",
677                &format!(
678                    "<RwLock<Vec<Callback>>> with {} callbacks",
679                    self.prometheus_expfmt_callbacks.read().unwrap().len()
680                ),
681            )
682            .finish()
683    }
684}
685
686impl Clone for MetricsRegistry {
687    fn clone(&self) -> Self {
688        Self {
689            prometheus_registry: std::sync::RwLock::new(
690                self.prometheus_registry.read().unwrap().clone(),
691            ),
692            // Clone the Arc to share callbacks across all clones (prevents callback loss).
693            // Previously used Vec::new() here, which caused vllm: metrics to disappear.
694            prometheus_update_callbacks: Arc::clone(&self.prometheus_update_callbacks),
695            prometheus_expfmt_callbacks: Arc::clone(&self.prometheus_expfmt_callbacks),
696        }
697    }
698}
699
700impl MetricsRegistry {
701    /// Create a new metrics registry with an empty Prometheus registry and callback lists
702    pub fn new() -> Self {
703        Self {
704            prometheus_registry: std::sync::RwLock::new(prometheus::Registry::new()),
705            prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
706            prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
707        }
708    }
709
710    /// Add a callback function that receives a reference to any MetricsHierarchy
711    pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
712        self.prometheus_update_callbacks
713            .write()
714            .unwrap()
715            .push(callback);
716    }
717
718    /// Add an exposition text callback that returns Prometheus text
719    pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
720        self.prometheus_expfmt_callbacks
721            .write()
722            .unwrap()
723            .push(callback);
724    }
725
726    /// Execute all update callbacks and return their results
727    pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
728        self.prometheus_update_callbacks
729            .read()
730            .unwrap()
731            .iter()
732            .map(|callback| callback())
733            .collect()
734    }
735
736    /// Execute all exposition text callbacks and return their concatenated text
737    pub fn execute_expfmt_callbacks(&self) -> String {
738        let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
739        let mut result = String::new();
740        for callback in callbacks.iter() {
741            match callback() {
742                Ok(text) => {
743                    if !text.is_empty() {
744                        if !result.is_empty() && !result.ends_with('\n') {
745                            result.push('\n');
746                        }
747                        result.push_str(&text);
748                    }
749                }
750                Err(e) => {
751                    tracing::error!("Error executing exposition text callback: {}", e);
752                }
753            }
754        }
755        result
756    }
757
758    /// Add a Prometheus metric collector to this registry
759    pub fn add_metric(
760        &self,
761        collector: Box<dyn prometheus::core::Collector>,
762    ) -> anyhow::Result<()> {
763        self.prometheus_registry
764            .write()
765            .unwrap()
766            .register(collector)
767            .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
768    }
769
770    /// Get a read guard to the Prometheus registry for scraping
771    pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
772        self.prometheus_registry.read().unwrap()
773    }
774
775    /// Returns true if a metric with the given name already exists in the Prometheus registry
776    pub fn has_metric_named(&self, metric_name: &str) -> bool {
777        self.prometheus_registry
778            .read()
779            .unwrap()
780            .gather()
781            .iter()
782            .any(|mf| mf.name() == metric_name)
783    }
784}
785
786impl Default for MetricsRegistry {
787    fn default() -> Self {
788        Self::new()
789    }
790}
791
792#[cfg(test)]
793mod test_helpers {
794    use super::prometheus_names::name_prefix;
795    use super::prometheus_names::{nats_client, nats_service};
796    use super::*;
797
798    /// Base function to filter Prometheus output lines based on a predicate.
799    /// Returns lines that match the predicate, converted to String.
800    fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
801    where
802        F: FnMut(&str) -> bool,
803    {
804        input
805            .lines()
806            .filter(|line| predicate(line))
807            .map(|line| line.to_string())
808            .collect::<Vec<_>>()
809    }
810
811    /// Filters out all NATS metrics from Prometheus output for test comparisons.
812    pub fn remove_nats_lines(input: &str) -> Vec<String> {
813        filter_prometheus_lines(input, |line| {
814            !line.contains(&format!(
815                "{}_{}",
816                name_prefix::COMPONENT,
817                nats_client::PREFIX
818            )) && !line.contains(&format!(
819                "{}_{}",
820                name_prefix::COMPONENT,
821                nats_service::PREFIX
822            )) && !line.trim().is_empty()
823        })
824    }
825
826    /// Filters to only include NATS metrics from Prometheus output for test comparisons.
827    pub fn extract_nats_lines(input: &str) -> Vec<String> {
828        filter_prometheus_lines(input, |line| {
829            line.contains(&format!(
830                "{}_{}",
831                name_prefix::COMPONENT,
832                nats_client::PREFIX
833            )) || line.contains(&format!(
834                "{}_{}",
835                name_prefix::COMPONENT,
836                nats_service::PREFIX
837            ))
838        })
839    }
840
841    /// Extracts all component metrics (excluding help text and type definitions).
842    /// Returns only the actual metric lines with values.
843    pub fn extract_metrics(input: &str) -> Vec<String> {
844        filter_prometheus_lines(input, |line| {
845            line.starts_with(&format!("{}_", name_prefix::COMPONENT))
846                && !line.starts_with("#")
847                && !line.trim().is_empty()
848        })
849    }
850
851    /// Parses a Prometheus metric line and extracts the name, labels, and value.
852    /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state.
853    ///
854    /// # Example
855    /// ```
856    /// let line = "http_requests_total{method=\"GET\"} 1234";
857    /// let (name, labels, value) = parse_prometheus_metric(line).unwrap();
858    /// assert_eq!(name, "http_requests_total");
859    /// assert_eq!(labels.get("method"), Some(&"GET".to_string()));
860    /// assert_eq!(value, 1234.0);
861    /// ```
862    pub fn parse_prometheus_metric(
863        line: &str,
864    ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
865        if line.trim().is_empty() || line.starts_with('#') {
866            return None;
867        }
868
869        let parts: Vec<&str> = line.split_whitespace().collect();
870        if parts.len() < 2 {
871            return None;
872        }
873
874        let metric_part = parts[0];
875        let value: f64 = parts[1].parse().ok()?;
876
877        let (name, labels) = if metric_part.contains('{') {
878            let brace_start = metric_part.find('{').unwrap();
879            let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
880            let name = &metric_part[..brace_start];
881            let labels_str = &metric_part[brace_start + 1..brace_end];
882
883            let mut labels = std::collections::HashMap::new();
884            for pair in labels_str.split(',') {
885                if let Some((k, v)) = pair.split_once('=') {
886                    let v = v.trim_matches('"');
887                    labels.insert(k.trim().to_string(), v.to_string());
888                }
889            }
890            (name.to_string(), labels)
891        } else {
892            (metric_part.to_string(), std::collections::HashMap::new())
893        };
894
895        Some((name, labels, value))
896    }
897}
898
899#[cfg(test)]
900mod test_metricsregistry_units {
901    use super::*;
902
903    #[test]
904    fn test_build_component_metric_name_with_prefix() {
905        // Test that build_component_metric_name correctly prepends the dynamo_component prefix
906        let result = build_component_metric_name("requests");
907        assert_eq!(result, "dynamo_component_requests");
908
909        let result = build_component_metric_name("counter");
910        assert_eq!(result, "dynamo_component_counter");
911    }
912
913    #[test]
914    fn test_parse_prometheus_metric() {
915        use super::test_helpers::parse_prometheus_metric;
916        use std::collections::HashMap;
917
918        // Test parsing a metric with labels
919        let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
920        let parsed = parse_prometheus_metric(line);
921        assert!(parsed.is_some());
922
923        let (name, labels, value) = parsed.unwrap();
924        assert_eq!(name, "http_requests_total");
925
926        let mut expected_labels = HashMap::new();
927        expected_labels.insert("method".to_string(), "GET".to_string());
928        expected_labels.insert("status".to_string(), "200".to_string());
929        assert_eq!(labels, expected_labels);
930
931        assert_eq!(value, 1234.0);
932
933        // Test parsing a metric without labels
934        let line = "cpu_usage 98.5";
935        let parsed = parse_prometheus_metric(line);
936        assert!(parsed.is_some());
937
938        let (name, labels, value) = parsed.unwrap();
939        assert_eq!(name, "cpu_usage");
940        assert!(labels.is_empty());
941        assert_eq!(value, 98.5);
942
943        // Test parsing a metric with float value
944        let line = "response_time{service=\"api\"} 0.123";
945        let parsed = parse_prometheus_metric(line);
946        assert!(parsed.is_some());
947
948        let (name, labels, value) = parsed.unwrap();
949        assert_eq!(name, "response_time");
950
951        let mut expected_labels = HashMap::new();
952        expected_labels.insert("service".to_string(), "api".to_string());
953        assert_eq!(labels, expected_labels);
954
955        assert_eq!(value, 0.123);
956
957        // Test parsing invalid lines
958        assert!(parse_prometheus_metric("").is_none()); // Empty line
959        assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text
960        assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition
961        assert!(parse_prometheus_metric("metric_name").is_none()); // No value
962
963        println!("✓ Prometheus metric parsing works correctly!");
964    }
965
966    #[test]
967    fn test_metrics_registry_entry_callbacks() {
968        use crate::MetricsRegistry;
969        use std::sync::atomic::{AtomicUsize, Ordering};
970
971        // Test 1: Basic callback execution with counter increments
972        {
973            let registry = MetricsRegistry::new();
974            let counter = Arc::new(AtomicUsize::new(0));
975
976            // Add callbacks with different increment values
977            for increment in [1, 10, 100] {
978                let counter_clone = counter.clone();
979                registry.add_update_callback(Arc::new(move || {
980                    counter_clone.fetch_add(increment, Ordering::SeqCst);
981                    Ok(())
982                }));
983            }
984
985            // Verify counter starts at 0
986            assert_eq!(counter.load(Ordering::SeqCst), 0);
987
988            // First execution
989            let results = registry.execute_update_callbacks();
990            assert_eq!(results.len(), 3);
991            assert!(results.iter().all(|r| r.is_ok()));
992            assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
993
994            // Second execution - callbacks should be reusable
995            let results = registry.execute_update_callbacks();
996            assert_eq!(results.len(), 3);
997            assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
998
999            // Test cloning - cloned entry shares callbacks (callbacks are Arc-wrapped)
1000            let cloned = registry.clone();
1001            assert_eq!(cloned.execute_update_callbacks().len(), 3);
1002            assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
1003
1004            // Original still has callbacks and shares the same Arc
1005            registry.execute_update_callbacks();
1006            assert_eq!(counter.load(Ordering::SeqCst), 444); // 333 + 111
1007        }
1008
1009        // Test 2: Mixed success and error callbacks
1010        {
1011            let registry = MetricsRegistry::new();
1012            let counter = Arc::new(AtomicUsize::new(0));
1013
1014            // Successful callback
1015            let counter_clone = counter.clone();
1016            registry.add_update_callback(Arc::new(move || {
1017                counter_clone.fetch_add(1, Ordering::SeqCst);
1018                Ok(())
1019            }));
1020
1021            // Error callback
1022            registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1023
1024            // Another successful callback
1025            let counter_clone = counter.clone();
1026            registry.add_update_callback(Arc::new(move || {
1027                counter_clone.fetch_add(10, Ordering::SeqCst);
1028                Ok(())
1029            }));
1030
1031            // Execute and verify mixed results
1032            let results = registry.execute_update_callbacks();
1033            assert_eq!(results.len(), 3);
1034            assert!(results[0].is_ok());
1035            assert!(results[1].is_err());
1036            assert!(results[2].is_ok());
1037
1038            // Verify error message
1039            assert_eq!(
1040                results[1].as_ref().unwrap_err().to_string(),
1041                "Simulated error"
1042            );
1043
1044            // Verify successful callbacks still executed
1045            assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
1046
1047            // Execute again - errors should be consistent
1048            let results = registry.execute_update_callbacks();
1049            assert!(results[1].is_err());
1050            assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
1051        }
1052
1053        // Test 3: Empty registry
1054        {
1055            let registry = MetricsRegistry::new();
1056            let results = registry.execute_update_callbacks();
1057            assert_eq!(results.len(), 0);
1058        }
1059    }
1060}
1061
1062#[cfg(feature = "integration")]
1063#[cfg(test)]
1064mod test_metricsregistry_prefixes {
1065    use super::*;
1066    use crate::distributed::distributed_test_utils::create_test_drt_async;
1067    use prometheus::core::Collector;
1068
1069    #[tokio::test]
1070    async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1071        let drt = create_test_drt_async().await;
1072
1073        const DRT_NAME: &str = "";
1074        const NAMESPACE_NAME: &str = "ns901";
1075        const COMPONENT_NAME: &str = "comp901";
1076        const ENDPOINT_NAME: &str = "ep901";
1077        let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1078        let component = namespace.component(COMPONENT_NAME).unwrap();
1079        let endpoint = component.endpoint(ENDPOINT_NAME);
1080
1081        // DRT
1082        assert_eq!(drt.basename(), DRT_NAME);
1083        assert_eq!(drt.parent_hierarchies().len(), 0);
1084        // DRT hierarchy is just its basename (empty string)
1085
1086        // Namespace
1087        assert_eq!(namespace.basename(), NAMESPACE_NAME);
1088        assert_eq!(namespace.parent_hierarchies().len(), 1);
1089        assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1090        // Namespace hierarchy is just its basename since parent is empty
1091
1092        // Component
1093        assert_eq!(component.basename(), COMPONENT_NAME);
1094        assert_eq!(component.parent_hierarchies().len(), 2);
1095        assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1096        assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1097        // Component hierarchy structure is validated by the individual assertions above
1098
1099        // Endpoint
1100        assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1101        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1102        assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1103        assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1104        assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1105        // Endpoint hierarchy structure is validated by the individual assertions above
1106
1107        // Relationships
1108        assert!(
1109            namespace
1110                .parent_hierarchies()
1111                .iter()
1112                .any(|h| h.basename() == drt.basename())
1113        );
1114        assert!(
1115            component
1116                .parent_hierarchies()
1117                .iter()
1118                .any(|h| h.basename() == namespace.basename())
1119        );
1120        assert!(
1121            endpoint
1122                .parent_hierarchies()
1123                .iter()
1124                .any(|h| h.basename() == component.basename())
1125        );
1126
1127        // Depth
1128        assert_eq!(drt.parent_hierarchies().len(), 0);
1129        assert_eq!(namespace.parent_hierarchies().len(), 1);
1130        assert_eq!(component.parent_hierarchies().len(), 2);
1131        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1132
1133        // Invalid namespace behavior - sanitizes to "_123" and succeeds
1134        // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
1135        // so invalid characters are sanitized in MetricsRegistry rather than rejected.
1136        let invalid_namespace = drt.namespace("@@123").unwrap();
1137        let result =
1138            invalid_namespace
1139                .metrics()
1140                .create_counter("test_counter", "A test counter", &[]);
1141        assert!(result.is_ok());
1142        if let Ok(counter) = &result {
1143            // Verify the namespace was sanitized to "_123" in the label
1144            let desc = counter.desc();
1145            let namespace_label = desc[0]
1146                .const_label_pairs
1147                .iter()
1148                .find(|l| l.name() == "dynamo_namespace")
1149                .expect("Should have dynamo_namespace label");
1150            assert_eq!(namespace_label.value(), "_123");
1151        }
1152
1153        // Valid namespace works
1154        let valid_namespace = drt.namespace("ns567").unwrap();
1155        assert!(
1156            valid_namespace
1157                .metrics()
1158                .create_counter("test_counter", "A test counter", &[])
1159                .is_ok()
1160        );
1161    }
1162
1163    #[tokio::test]
1164    async fn test_recursive_namespace() {
1165        // Create a distributed runtime for testing
1166        let drt = create_test_drt_async().await;
1167
1168        // Create a deeply chained namespace: ns1.ns2.ns3
1169        let ns1 = drt.namespace("ns1").unwrap();
1170        let ns2 = ns1.namespace("ns2").unwrap();
1171        let ns3 = ns2.namespace("ns3").unwrap();
1172
1173        // Create a component in the deepest namespace
1174        let component = ns3.component("test-component").unwrap();
1175
1176        // Verify the hierarchy structure
1177        assert_eq!(ns1.basename(), "ns1");
1178        assert_eq!(ns1.parent_hierarchies().len(), 1);
1179        assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1180        // ns1 hierarchy is just its basename since parent is empty
1181
1182        assert_eq!(ns2.basename(), "ns2");
1183        assert_eq!(ns2.parent_hierarchies().len(), 2);
1184        assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1185        assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1186        // ns2 hierarchy structure validated by parent assertions above
1187
1188        assert_eq!(ns3.basename(), "ns3");
1189        assert_eq!(ns3.parent_hierarchies().len(), 3);
1190        assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1191        assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1192        assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1193        // ns3 hierarchy structure validated by parent assertions above
1194
1195        assert_eq!(component.basename(), "test-component");
1196        assert_eq!(component.parent_hierarchies().len(), 4);
1197        assert_eq!(component.parent_hierarchies()[0].basename(), "");
1198        assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1199        assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1200        assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1201        // component hierarchy structure validated by parent assertions above
1202
1203        println!("✓ Chained namespace test passed - all prefixes correct");
1204    }
1205}
1206
1207#[cfg(feature = "integration")]
1208#[cfg(test)]
1209mod test_metricsregistry_prometheus_fmt_outputs {
1210    use super::prometheus_names::name_prefix;
1211    use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1212    use super::prometheus_names::{nats_client, nats_service};
1213    use super::*;
1214    use crate::distributed::distributed_test_utils::create_test_drt_async;
1215    use prometheus::Counter;
1216    use std::sync::Arc;
1217
1218    #[tokio::test]
1219    async fn test_prometheusfactory_using_metrics_registry_trait() {
1220        // Setup real DRT and registry using the test-friendly constructor
1221        let drt = create_test_drt_async().await;
1222
1223        // Use a simple constant namespace name
1224        let namespace_name = "ns345";
1225
1226        let namespace = drt.namespace(namespace_name).unwrap();
1227        let component = namespace.component("comp345").unwrap();
1228        let endpoint = component.endpoint("ep345");
1229
1230        // Test Counter creation
1231        let counter = endpoint
1232            .metrics()
1233            .create_counter("testcounter", "A test counter", &[])
1234            .unwrap();
1235        counter.inc_by(123.456789);
1236        let epsilon = 0.01;
1237        assert!((counter.get() - 123.456789).abs() < epsilon);
1238
1239        let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1240        println!("Endpoint output:");
1241        println!("{}", endpoint_output_raw);
1242
1243        // Filter out NATS service metrics for test comparison
1244        let endpoint_output =
1245            super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
1246
1247        let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1248# TYPE dynamo_component_testcounter counter
1249dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1250
1251        assert_eq!(
1252            endpoint_output, expected_endpoint_output,
1253            "\n=== ENDPOINT COMPARISON FAILED ===\n\
1254             Expected:\n{}\n\
1255             Actual:\n{}\n\
1256             ==============================",
1257            expected_endpoint_output, endpoint_output
1258        );
1259
1260        // Test Gauge creation
1261        let gauge = component
1262            .metrics()
1263            .create_gauge("testgauge", "A test gauge", &[])
1264            .unwrap();
1265        gauge.set(50000.0);
1266        assert_eq!(gauge.get(), 50000.0);
1267
1268        // Test Prometheus format output for Component (gauge + histogram)
1269        let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1270        println!("Component output:");
1271        println!("{}", component_output_raw);
1272
1273        // Filter out NATS service metrics for test comparison
1274        let component_output =
1275            super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
1276
1277        let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1278# TYPE dynamo_component_testcounter counter
1279dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1280# HELP dynamo_component_testgauge A test gauge
1281# TYPE dynamo_component_testgauge gauge
1282dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1283
1284        assert_eq!(
1285            component_output, expected_component_output,
1286            "\n=== COMPONENT COMPARISON FAILED ===\n\
1287             Expected:\n{}\n\
1288             Actual:\n{}\n\
1289             ==============================",
1290            expected_component_output, component_output
1291        );
1292
1293        let intcounter = namespace
1294            .metrics()
1295            .create_intcounter("testintcounter", "A test int counter", &[])
1296            .unwrap();
1297        intcounter.inc_by(12345);
1298        assert_eq!(intcounter.get(), 12345);
1299
1300        // Test Prometheus format output for Namespace (int_counter + gauge + histogram)
1301        let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1302        println!("Namespace output:");
1303        println!("{}", namespace_output_raw);
1304
1305        // Filter out NATS service metrics for test comparison
1306        let namespace_output =
1307            super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
1308
1309        let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1310# TYPE dynamo_component_testcounter counter
1311dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1312# HELP dynamo_component_testgauge A test gauge
1313# TYPE dynamo_component_testgauge gauge
1314dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1315# HELP dynamo_component_testintcounter A test int counter
1316# TYPE dynamo_component_testintcounter counter
1317dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1318
1319        assert_eq!(
1320            namespace_output, expected_namespace_output,
1321            "\n=== NAMESPACE COMPARISON FAILED ===\n\
1322             Expected:\n{}\n\
1323             Actual:\n{}\n\
1324             ==============================",
1325            expected_namespace_output, namespace_output
1326        );
1327
1328        // Test IntGauge creation
1329        let intgauge = namespace
1330            .metrics()
1331            .create_intgauge("testintgauge", "A test int gauge", &[])
1332            .unwrap();
1333        intgauge.set(42);
1334        assert_eq!(intgauge.get(), 42);
1335
1336        // Test IntGaugeVec creation
1337        let intgaugevec = namespace
1338            .metrics()
1339            .create_intgaugevec(
1340                "testintgaugevec",
1341                "A test int gauge vector",
1342                &["instance", "status"],
1343                &[("service", "api")],
1344            )
1345            .unwrap();
1346        intgaugevec
1347            .with_label_values(&["server1", "active"])
1348            .set(10);
1349        intgaugevec
1350            .with_label_values(&["server2", "inactive"])
1351            .set(0);
1352
1353        // Test CounterVec creation
1354        let countervec = endpoint
1355            .metrics()
1356            .create_countervec(
1357                "testcountervec",
1358                "A test counter vector",
1359                &["method", "status"],
1360                &[("service", "api")],
1361            )
1362            .unwrap();
1363        countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1364        countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1365
1366        // Test Histogram creation
1367        let histogram = component
1368            .metrics()
1369            .create_histogram("testhistogram", "A test histogram", &[], None)
1370            .unwrap();
1371        histogram.observe(1.0);
1372        histogram.observe(2.5);
1373        histogram.observe(4.0);
1374
1375        // Test Prometheus format output for DRT (all metrics combined)
1376        let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1377        println!("DRT output:");
1378        println!("{}", drt_output_raw);
1379
1380        // Filter out all NATS metrics for comparison
1381        let filtered_drt_output =
1382            super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
1383
1384        let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1385# TYPE dynamo_component_testcounter counter
1386dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1387# HELP dynamo_component_testcountervec A test counter vector
1388# TYPE dynamo_component_testcountervec counter
1389dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1390dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1391# HELP dynamo_component_testgauge A test gauge
1392# TYPE dynamo_component_testgauge gauge
1393dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1394# HELP dynamo_component_testhistogram A test histogram
1395# TYPE dynamo_component_testhistogram histogram
1396dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1397dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1398dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1399dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1400dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1401dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1402dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1403dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1404dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1405dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1406dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1407dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1408dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1409dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1410# HELP dynamo_component_testintcounter A test int counter
1411# TYPE dynamo_component_testintcounter counter
1412dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1413# HELP dynamo_component_testintgauge A test int gauge
1414# TYPE dynamo_component_testintgauge gauge
1415dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1416# HELP dynamo_component_testintgaugevec A test int gauge vector
1417# TYPE dynamo_component_testintgaugevec gauge
1418dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1419dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1420# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1421# TYPE dynamo_component_uptime_seconds gauge
1422dynamo_component_uptime_seconds 0"#.to_string();
1423
1424        assert_eq!(
1425            filtered_drt_output, expected_drt_output,
1426            "\n=== DRT COMPARISON FAILED ===\n\
1427             Expected:\n{}\n\
1428             Actual (filtered):\n{}\n\
1429             ==============================",
1430            expected_drt_output, filtered_drt_output
1431        );
1432
1433        println!("✓ All Prometheus format outputs verified successfully!");
1434    }
1435
1436    #[test]
1437    fn test_refactored_filter_functions() {
1438        // Test data with mixed content
1439        let test_input = r#"# HELP dynamo_component_requests Total requests
1440# TYPE dynamo_component_requests counter
1441dynamo_component_requests 42
1442# HELP dynamo_component_nats_client_connection_state Connection state
1443# TYPE dynamo_component_nats_client_connection_state gauge
1444dynamo_component_nats_client_connection_state 1
1445# HELP dynamo_component_latency Response latency
1446# TYPE dynamo_component_latency histogram
1447dynamo_component_latency_bucket{le="0.1"} 10
1448dynamo_component_latency_bucket{le="0.5"} 25
1449dynamo_component_nats_service_requests_total 100
1450dynamo_component_nats_service_errors_total 5"#;
1451
1452        // Test remove_nats_lines (excludes NATS lines but keeps help/type)
1453        let filtered_out = super::test_helpers::remove_nats_lines(test_input);
1454        assert_eq!(filtered_out.len(), 7); // 7 non-NATS lines
1455        assert!(!filtered_out.iter().any(|line| line.contains("nats")));
1456
1457        // Test extract_nats_lines (includes all NATS lines including help/type)
1458        let filtered_only = super::test_helpers::extract_nats_lines(test_input);
1459        assert_eq!(filtered_only.len(), 5); // 5 NATS lines
1460        assert!(filtered_only.iter().all(|line| line.contains("nats")));
1461
1462        // Test extract_metrics (only actual metric lines, excluding help/type)
1463        let metrics_only = super::test_helpers::extract_metrics(test_input);
1464        assert_eq!(metrics_only.len(), 6); // 6 actual metric lines (excluding help/type)
1465        assert!(
1466            metrics_only
1467                .iter()
1468                .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1469        );
1470
1471        println!("✓ All refactored filter functions work correctly!");
1472    }
1473}
1474
1475#[cfg(feature = "integration")]
1476#[cfg(test)]
1477mod test_metricsregistry_nats {
1478    use super::prometheus_names::name_prefix;
1479    use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1480    use super::prometheus_names::{nats_client, nats_service};
1481    use super::*;
1482    use crate::distributed::distributed_test_utils::create_test_drt_async;
1483    use crate::pipeline::PushRouter;
1484    use crate::{DistributedRuntime, Runtime};
1485    use tokio::time::{Duration, sleep};
1486    #[tokio::test]
1487    async fn test_drt_nats_metrics() {
1488        // Setup real DRT and registry using the test-friendly constructor
1489        let drt = create_test_drt_async().await;
1490
1491        // Get DRT output which should include NATS client metrics
1492        let drt_output = drt.metrics().prometheus_expfmt().unwrap();
1493        println!("DRT output with NATS metrics:");
1494        println!("{}", drt_output);
1495
1496        // Additional checks for NATS client metrics (without checking specific values)
1497        let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
1498
1499        // Check that NATS client metrics are present
1500        assert!(
1501            !drt_nats_metrics.is_empty(),
1502            "NATS client metrics should be present"
1503        );
1504
1505        // Check for specific NATS client metric names (without values)
1506        // Extract only the metric lines from the already-filtered NATS metrics
1507        let drt_nats_metric_lines =
1508            super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
1509        let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
1510            .iter()
1511            .map(|line| {
1512                let without_labels = line.split('{').next().unwrap_or(line);
1513                // Remove the value part (everything after the last space)
1514                without_labels.split(' ').next().unwrap_or(without_labels)
1515            })
1516            .collect();
1517
1518        let expect_drt_nats_metrics_sorted = {
1519            let mut temp = DRT_NATS_METRICS
1520                .iter()
1521                .map(|metric| build_component_metric_name(metric))
1522                .collect::<Vec<_>>();
1523            temp.sort();
1524            temp
1525        };
1526
1527        // Print both lists for comparison
1528        println!(
1529            "actual_drt_nats_metrics_sorted: {:?}",
1530            actual_drt_nats_metrics_sorted
1531        );
1532        println!(
1533            "expect_drt_nats_metrics_sorted: {:?}",
1534            expect_drt_nats_metrics_sorted
1535        );
1536
1537        // Compare the sorted lists
1538        assert_eq!(
1539            actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
1540            "DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1541        );
1542
1543        println!("✓ DistributedRuntime NATS metrics integration test passed!");
1544    }
1545
1546    #[tokio::test]
1547    async fn test_nats_metric_names() {
1548        // This test only tests the existence of the NATS metrics. It does not check
1549        // the values of the metrics.
1550
1551        // Setup real DRT and registry using the test-friendly constructor
1552        let drt = create_test_drt_async().await;
1553
1554        // Create a namespace and component from the DRT
1555        let namespace = drt.namespace("ns789").unwrap();
1556        let mut component = namespace.component("comp789").unwrap();
1557
1558        // Create a service to trigger metrics callback registration
1559        component.add_stats_service().await.unwrap();
1560
1561        // Get component output which should include NATS client metrics
1562        // Additional checks for NATS client metrics (without checking specific values)
1563        let component_nats_metrics = super::test_helpers::extract_nats_lines(
1564            &component.metrics().prometheus_expfmt().unwrap(),
1565        );
1566        println!(
1567            "Component NATS metrics count: {}",
1568            component_nats_metrics.len()
1569        );
1570
1571        // Check that NATS client metrics are present
1572        assert!(
1573            !component_nats_metrics.is_empty(),
1574            "NATS client metrics should be present"
1575        );
1576
1577        // Check for specific NATS client metric names (without values)
1578        let component_metrics =
1579            super::test_helpers::extract_metrics(&component.metrics().prometheus_expfmt().unwrap());
1580        let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
1581            .iter()
1582            .map(|line| {
1583                let without_labels = line.split('{').next().unwrap_or(line);
1584                // Remove the value part (everything after the last space)
1585                without_labels.split(' ').next().unwrap_or(without_labels)
1586            })
1587            .collect();
1588
1589        let expect_component_nats_metrics_sorted = {
1590            let mut temp = COMPONENT_NATS_METRICS
1591                .iter()
1592                .map(|metric| build_component_metric_name(metric))
1593                .collect::<Vec<_>>();
1594            temp.sort();
1595            temp
1596        };
1597
1598        // Print both lists for comparison
1599        println!(
1600            "actual_component_nats_metrics_sorted: {:?}",
1601            actual_component_nats_metrics_sorted
1602        );
1603        println!(
1604            "expect_component_nats_metrics_sorted: {:?}",
1605            expect_component_nats_metrics_sorted
1606        );
1607
1608        // Compare the sorted lists
1609        assert_eq!(
1610            actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
1611            "COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1612        );
1613
1614        // Get both DRT and component output and filter for NATS metrics only
1615        let drt_output = drt.metrics().prometheus_expfmt().unwrap();
1616        let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
1617        let drt_and_component_nats_metrics =
1618            super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
1619        println!(
1620            "DRT and component NATS metrics count: {}",
1621            drt_and_component_nats_metrics.len()
1622        );
1623
1624        // Check that the NATS metrics are present in the component output
1625        assert_eq!(
1626            drt_and_component_nats_metrics.len(),
1627            DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
1628            "DRT at this point should have both the DRT and component NATS metrics"
1629        );
1630
1631        // Check that the NATS metrics are present in the component output
1632        println!("✓ Component NATS metrics integration test passed!");
1633    }
1634
1635    /// Tests NATS metrics values before and after endpoint activity with large message processing.
1636    /// Creates endpoint, sends test messages + 10k byte message, validates metrics (NATS + work handler)
1637    /// at initial state and post-activity state. Ensures byte thresholds, message counts, and processing
1638    /// times are within expected ranges. Tests end-to-end client-server communication and metrics collection.
1639    #[tokio::test]
1640    async fn test_nats_metrics_values() -> anyhow::Result<()> {
1641        struct MessageHandler {}
1642        impl MessageHandler {
1643            fn new() -> std::sync::Arc<Self> {
1644                std::sync::Arc::new(Self {})
1645            }
1646        }
1647
1648        #[async_trait]
1649        impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
1650            async fn generate(
1651                &self,
1652                input: SingleIn<String>,
1653            ) -> Result<ManyOut<Annotated<String>>, Error> {
1654                let (data, ctx) = input.into_parts();
1655                let response = data.to_string();
1656                let stream = stream::iter(vec![Annotated::from_data(response)]);
1657                Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
1658            }
1659        }
1660
1661        println!("\n=== Initializing DistributedRuntime ===");
1662        let runtime = Runtime::from_current()?;
1663        let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
1664        let namespace = drt.namespace("ns123").unwrap();
1665        let mut component = namespace.component("comp123").unwrap();
1666        let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
1667
1668        let _backend_handle = tokio::spawn(async move {
1669            component.add_stats_service().await.unwrap();
1670            let endpoint = component
1671                .endpoint("echo")
1672                .endpoint_builder()
1673                .handler(ingress);
1674            endpoint.start().await.unwrap();
1675        });
1676
1677        sleep(Duration::from_millis(500)).await;
1678        println!("✓ Launched endpoint service in background successfully");
1679
1680        let drt_output = drt.metrics().prometheus_expfmt().unwrap();
1681        let parsed_metrics: Vec<_> = drt_output
1682            .lines()
1683            .filter_map(super::test_helpers::parse_prometheus_metric)
1684            .collect();
1685
1686        println!("=== Initial DRT metrics output ===");
1687        println!("{}", drt_output);
1688
1689        println!("\n=== Checking Initial Metric Values ===");
1690
1691        let initial_expected_metric_values = [
1692            // DRT NATS metrics (ordered to match DRT_NATS_METRICS)
1693            (
1694                build_component_metric_name(nats_client::CONNECTION_STATE),
1695                1.0,
1696                1.0,
1697            ), // Should be connected
1698            (
1699                build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1700                1.0,
1701                1.0,
1702            ), // Should have 1 connection
1703            (
1704                build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1705                800.0,
1706                4000.0,
1707            ), // Wide range around observed value of 1888
1708            (
1709                build_component_metric_name(nats_client::IN_MESSAGES),
1710                0.0,
1711                5.0,
1712            ), // Wide range around 2
1713            (
1714                build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1715                1500.0,
1716                5000.0,
1717            ), // Wide range around observed value of 2752
1718            (
1719                build_component_metric_name(nats_client::OUT_MESSAGES),
1720                0.0,
1721                5.0,
1722            ), // Wide range around 2
1723            // Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
1724            (
1725                build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1726                0.0,
1727                0.0,
1728            ), // No processing yet
1729            (
1730                build_component_metric_name(nats_service::ERRORS_TOTAL),
1731                0.0,
1732                0.0,
1733            ), // No errors yet
1734            (
1735                build_component_metric_name(nats_service::REQUESTS_TOTAL),
1736                0.0,
1737                0.0,
1738            ), // No requests yet
1739            (
1740                build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1741                0.0,
1742                0.0,
1743            ), // No processing yet
1744            (
1745                build_component_metric_name(nats_service::ACTIVE_SERVICES),
1746                0.0,
1747                2.0,
1748            ), // Service may not be fully active yet
1749            (
1750                build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1751                0.0,
1752                2.0,
1753            ), // Endpoint may not be fully active yet
1754        ];
1755
1756        for (metric_name, min_value, max_value) in &initial_expected_metric_values {
1757            let actual_value = parsed_metrics
1758                .iter()
1759                .find(|(name, _, _)| name == metric_name)
1760                .map(|(_, _, value)| *value)
1761                .unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
1762
1763            assert!(
1764                actual_value >= *min_value && actual_value <= *max_value,
1765                "Initial metric {} should be between {} and {}, but got {}",
1766                metric_name,
1767                min_value,
1768                max_value,
1769                actual_value
1770            );
1771        }
1772
1773        println!("\n=== Client Runtime to hit the endpoint ===");
1774        let client_runtime = Runtime::from_current()?;
1775        let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
1776        let namespace = client_distributed.namespace("ns123")?;
1777        let component = namespace.component("comp123")?;
1778        let client = component.endpoint("echo").client().await?;
1779
1780        client.wait_for_instances().await?;
1781        println!("✓ Connected to endpoint, waiting for instances...");
1782
1783        let router =
1784            PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
1785                .await?;
1786
1787        for i in 0..10 {
1788            let msg = i.to_string().repeat(2000); // 2k bytes message
1789            let mut stream = router.random(msg.clone().into()).await?;
1790            while let Some(resp) = stream.next().await {
1791                // Check if response matches the original message
1792                if let Some(data) = &resp.data {
1793                    let is_same = data == &msg;
1794                    println!(
1795                        "Response {}: {} bytes, matches original: {}",
1796                        i,
1797                        data.len(),
1798                        is_same
1799                    );
1800                }
1801            }
1802        }
1803        println!("✓ Sent messages and received responses successfully");
1804
1805        println!("\n=== Waiting 500ms for metrics to update ===");
1806        sleep(Duration::from_millis(500)).await;
1807        println!("✓ Wait complete, getting final metrics...");
1808
1809        let final_drt_output = drt.metrics().prometheus_expfmt().unwrap();
1810        println!("\n=== Final Prometheus DRT output ===");
1811        println!("{}", final_drt_output);
1812
1813        let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
1814        println!("\n=== Filtered NATS metrics from final DRT output ===");
1815        for line in &final_drt_nats_output {
1816            println!("{}", line);
1817        }
1818
1819        let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
1820            .iter()
1821            .filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
1822            .collect();
1823
1824        let post_expected_metric_values = [
1825            // DRT NATS metrics
1826            (
1827                build_component_metric_name(nats_client::CONNECTION_STATE),
1828                1.0,
1829                1.0,
1830            ), // Connected
1831            (
1832                build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1833                1.0,
1834                1.0,
1835            ), // 1 connection
1836            (
1837                build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1838                20000.0,
1839                32000.0,
1840            ), // Wide range around 26117
1841            (
1842                build_component_metric_name(nats_client::IN_MESSAGES),
1843                8.0,
1844                20.0,
1845            ), // Wide range around 16
1846            (
1847                build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1848                2500.0,
1849                8000.0,
1850            ), // Wide range around 5524
1851            (
1852                build_component_metric_name(nats_client::OUT_MESSAGES),
1853                8.0,
1854                20.0,
1855            ), // Wide range around 16
1856            // Component NATS metrics
1857            (
1858                build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1859                0.0,
1860                1.0,
1861            ), // Low processing time
1862            (
1863                build_component_metric_name(nats_service::ERRORS_TOTAL),
1864                0.0,
1865                0.0,
1866            ), // No errors
1867            (
1868                build_component_metric_name(nats_service::REQUESTS_TOTAL),
1869                0.0,
1870                10.0,
1871            ), // NATS service stats requests (may differ from work handler count)
1872            (
1873                build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1874                0.0,
1875                5.0,
1876            ), // Low total processing time
1877            (
1878                build_component_metric_name(nats_service::ACTIVE_SERVICES),
1879                0.0,
1880                2.0,
1881            ), // Service may not be fully active
1882            (
1883                build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1884                0.0,
1885                2.0,
1886            ), // Endpoint may not be fully active
1887            // Work handler metrics
1888            (
1889                build_component_metric_name(work_handler::REQUESTS_TOTAL),
1890                10.0,
1891                10.0,
1892            ), // 10 messages
1893            (
1894                build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
1895                21000.0,
1896                26000.0,
1897            ), // ~75-125% of 23520
1898            (
1899                build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
1900                18000.0,
1901                23000.0,
1902            ), // ~75-125% of 20660
1903            (
1904                build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
1905                0.0,
1906                1.0,
1907            ), // 0 or very low
1908            // Histograms have _{count,sum} suffixes
1909            (
1910                format!(
1911                    "{}_count",
1912                    build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1913                ),
1914                10.0,
1915                10.0,
1916            ), // 10 messages
1917            (
1918                format!(
1919                    "{}_sum",
1920                    build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1921                ),
1922                0.0001,
1923                1.0,
1924            ), // Processing time sum (wide range)
1925        ];
1926
1927        println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
1928        for (metric_name, min_value, max_value) in &post_expected_metric_values {
1929            let actual_value = final_parsed_metrics
1930                .iter()
1931                .find(|(name, _, _)| name == metric_name)
1932                .map(|(_, _, value)| *value)
1933                .unwrap_or_else(|| {
1934                    panic!(
1935                        "Could not find expected post-activity metric: {}",
1936                        metric_name
1937                    )
1938                });
1939
1940            assert!(
1941                actual_value >= *min_value && actual_value <= *max_value,
1942                "Post-activity metric {} should be between {} and {}, but got {}",
1943                metric_name,
1944                min_value,
1945                max_value,
1946                actual_value
1947            );
1948            println!(
1949                "✓ {}: {} (range: {} to {})",
1950                metric_name, actual_value, min_value, max_value
1951            );
1952        }
1953
1954        println!("✓ All NATS and component metrics parsed successfully!");
1955        println!("✓ Byte metrics verified to be >= 100 bytes!");
1956        println!("✓ Post-activity metrics verified with higher thresholds!");
1957        println!("✓ Work handler metrics reflect increased activity!");
1958
1959        Ok(())
1960    }
1961}