dynamo_runtime/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics registry trait and implementation for Prometheus metrics
5//!
6//! This module provides a trait-based interface for creating and managing Prometheus metrics
7//! with automatic label injection and hierarchical naming support.
8
9pub mod prometheus_names;
10
11use parking_lot::Mutex;
12use std::collections::HashSet;
13use std::sync::Arc;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22// Import commonly used items to avoid verbose prefixes
23use prometheus_names::{
24    build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
25    sanitize_prometheus_name, work_handler,
26};
27
28// Pipeline imports for endpoint creation
29use crate::pipeline::{
30    AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31    network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37// If set to true, then metrics will be labeled with the namespace, component, and endpoint labels.
38// These labels are prefixed with "dynamo_" to avoid collisions with Kubernetes and other monitoring system labels.
39pub const USE_AUTO_LABELS: bool = true;
40
41// Prometheus imports
42use prometheus::Encoder;
43
44/// Validate that a label slice has no duplicate keys.
45/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
46fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47    let mut seen_keys = std::collections::HashSet::new();
48    for (key, _) in labels {
49        if !seen_keys.insert(*key) {
50            return Err(anyhow::anyhow!(
51                "Duplicate label key '{}' found in labels",
52                key
53            ));
54        }
55    }
56    Ok(())
57}
58
59/// ==============================
60/// Prometheus section
61/// ==============================
62/// Trait that defines common behavior for Prometheus metric types
63pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
64    /// Create a new metric with the given options
65    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
66    where
67        Self: Sized;
68
69    /// Create a new metric with histogram options and custom buckets
70    /// This is a default implementation that will panic for non-histogram metrics
71    fn with_histogram_opts_and_buckets(
72        _opts: prometheus::HistogramOpts,
73        _buckets: Option<Vec<f64>>,
74    ) -> Result<Self, prometheus::Error>
75    where
76        Self: Sized,
77    {
78        panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
79    }
80
81    /// Create a new metric with counter options and label names (for CounterVec)
82    /// This is a default implementation that will panic for non-countervec metrics
83    fn with_opts_and_label_names(
84        _opts: prometheus::Opts,
85        _label_names: &[&str],
86    ) -> Result<Self, prometheus::Error>
87    where
88        Self: Sized,
89    {
90        panic!("with_opts_and_label_names is not implemented for this metric type");
91    }
92}
93
94// Implement the trait for Counter, IntCounter, and Gauge
95impl PrometheusMetric for prometheus::Counter {
96    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
97        prometheus::Counter::with_opts(opts)
98    }
99}
100
101impl PrometheusMetric for prometheus::IntCounter {
102    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
103        prometheus::IntCounter::with_opts(opts)
104    }
105}
106
107impl PrometheusMetric for prometheus::Gauge {
108    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
109        prometheus::Gauge::with_opts(opts)
110    }
111}
112
113impl PrometheusMetric for prometheus::IntGauge {
114    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
115        prometheus::IntGauge::with_opts(opts)
116    }
117}
118
119impl PrometheusMetric for prometheus::GaugeVec {
120    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
121        Err(prometheus::Error::Msg(
122            "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
123        ))
124    }
125
126    fn with_opts_and_label_names(
127        opts: prometheus::Opts,
128        label_names: &[&str],
129    ) -> Result<Self, prometheus::Error> {
130        prometheus::GaugeVec::new(opts, label_names)
131    }
132}
133
134impl PrometheusMetric for prometheus::IntGaugeVec {
135    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
136        Err(prometheus::Error::Msg(
137            "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
138        ))
139    }
140
141    fn with_opts_and_label_names(
142        opts: prometheus::Opts,
143        label_names: &[&str],
144    ) -> Result<Self, prometheus::Error> {
145        prometheus::IntGaugeVec::new(opts, label_names)
146    }
147}
148
149impl PrometheusMetric for prometheus::IntCounterVec {
150    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
151        Err(prometheus::Error::Msg(
152            "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
153        ))
154    }
155
156    fn with_opts_and_label_names(
157        opts: prometheus::Opts,
158        label_names: &[&str],
159    ) -> Result<Self, prometheus::Error> {
160        prometheus::IntCounterVec::new(opts, label_names)
161    }
162}
163
164// Implement the trait for Histogram
165impl PrometheusMetric for prometheus::Histogram {
166    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
167        // Convert Opts to HistogramOpts
168        let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
169        prometheus::Histogram::with_opts(histogram_opts)
170    }
171
172    fn with_histogram_opts_and_buckets(
173        mut opts: prometheus::HistogramOpts,
174        buckets: Option<Vec<f64>>,
175    ) -> Result<Self, prometheus::Error> {
176        if let Some(custom_buckets) = buckets {
177            opts = opts.buckets(custom_buckets);
178        }
179        prometheus::Histogram::with_opts(opts)
180    }
181}
182
183// Implement the trait for CounterVec
184impl PrometheusMetric for prometheus::CounterVec {
185    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
186        // This will panic - CounterVec needs label names
187        panic!("CounterVec requires label names, use with_opts_and_label_names instead");
188    }
189
190    fn with_opts_and_label_names(
191        opts: prometheus::Opts,
192        label_names: &[&str],
193    ) -> Result<Self, prometheus::Error> {
194        prometheus::CounterVec::new(opts, label_names)
195    }
196}
197
198/// ==============================
199/// Metrics section
200/// ==============================
201/// Public helper function to create metrics - accessible for Python bindings
202pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
203    hierarchy: &H,
204    metric_name: &str,
205    metric_desc: &str,
206    labels: &[(&str, &str)],
207    buckets: Option<Vec<f64>>,
208    const_labels: Option<&[&str]>,
209) -> anyhow::Result<T> {
210    // Validate that user-provided labels don't have duplicate keys
211    validate_no_duplicate_label_keys(labels)?;
212    // Note: stored labels functionality has been removed
213
214    let basename = hierarchy.basename();
215    let parent_hierarchies = hierarchy.parent_hierarchies();
216
217    // Build hierarchy path as vector of strings: parent names + [basename]
218    let mut hierarchy_names: Vec<String> =
219        parent_hierarchies.iter().map(|p| p.basename()).collect();
220    hierarchy_names.push(basename.clone());
221
222    let metric_name = build_component_metric_name(metric_name);
223
224    // Build updated_labels: auto-labels first, then `labels` + stored labels
225    let mut updated_labels: Vec<(String, String)> = Vec::new();
226
227    if USE_AUTO_LABELS {
228        // Validate that user-provided labels don't conflict with auto-generated labels
229        for (key, _) in labels {
230            if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
231                return Err(anyhow::anyhow!(
232                    "Label '{}' is automatically added by auto_label feature and cannot be manually set",
233                    key
234                ));
235            }
236        }
237
238        // Add auto-generated labels with sanitized values
239        if hierarchy_names.len() > 1 {
240            let namespace = &hierarchy_names[1];
241            if !namespace.is_empty() {
242                let valid_namespace = sanitize_prometheus_label(namespace)?;
243                if !valid_namespace.is_empty() {
244                    updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
245                }
246            }
247        }
248        if hierarchy_names.len() > 2 {
249            let component = &hierarchy_names[2];
250            if !component.is_empty() {
251                let valid_component = sanitize_prometheus_label(component)?;
252                if !valid_component.is_empty() {
253                    updated_labels.push((labels::COMPONENT.to_string(), valid_component));
254                }
255            }
256        }
257        if hierarchy_names.len() > 3 {
258            let endpoint = &hierarchy_names[3];
259            if !endpoint.is_empty() {
260                let valid_endpoint = sanitize_prometheus_label(endpoint)?;
261                if !valid_endpoint.is_empty() {
262                    updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
263                }
264            }
265        }
266    }
267
268    // Add user labels
269    updated_labels.extend(
270        labels
271            .iter()
272            .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
273    );
274    // Note: stored labels functionality has been removed
275
276    // Handle different metric types
277    let prometheus_metric = if std::any::TypeId::of::<T>()
278        == std::any::TypeId::of::<prometheus::CounterVec>()
279    {
280        // Special handling for CounterVec with label names
281        // const_labels parameter is required for CounterVec
282        if buckets.is_some() {
283            return Err(anyhow::anyhow!(
284                "buckets parameter is not valid for CounterVec"
285            ));
286        }
287        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
288        for (key, value) in &updated_labels {
289            opts = opts.const_label(key.clone(), value.clone());
290        }
291        let label_names = const_labels
292            .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
293        T::with_opts_and_label_names(opts, label_names)?
294    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
295        // Special handling for GaugeVec with label names
296        // const_labels parameter is required for GaugeVec
297        if buckets.is_some() {
298            return Err(anyhow::anyhow!(
299                "buckets parameter is not valid for GaugeVec"
300            ));
301        }
302        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
303        for (key, value) in &updated_labels {
304            opts = opts.const_label(key.clone(), value.clone());
305        }
306        let label_names = const_labels
307            .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
308        T::with_opts_and_label_names(opts, label_names)?
309    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
310        // Special handling for Histogram with custom buckets
311        // buckets parameter is valid for Histogram, const_labels is not used
312        if const_labels.is_some() {
313            return Err(anyhow::anyhow!(
314                "const_labels parameter is not valid for Histogram"
315            ));
316        }
317        let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
318        for (key, value) in &updated_labels {
319            opts = opts.const_label(key.clone(), value.clone());
320        }
321        T::with_histogram_opts_and_buckets(opts, buckets)?
322    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
323        // Special handling for IntCounterVec with label names
324        // const_labels parameter is required for IntCounterVec
325        if buckets.is_some() {
326            return Err(anyhow::anyhow!(
327                "buckets parameter is not valid for IntCounterVec"
328            ));
329        }
330        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
331        for (key, value) in &updated_labels {
332            opts = opts.const_label(key.clone(), value.clone());
333        }
334        let label_names = const_labels
335            .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
336        T::with_opts_and_label_names(opts, label_names)?
337    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
338        // Special handling for IntGaugeVec with label names
339        // const_labels parameter is required for IntGaugeVec
340        if buckets.is_some() {
341            return Err(anyhow::anyhow!(
342                "buckets parameter is not valid for IntGaugeVec"
343            ));
344        }
345        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
346        for (key, value) in &updated_labels {
347            opts = opts.const_label(key.clone(), value.clone());
348        }
349        let label_names = const_labels
350            .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
351        T::with_opts_and_label_names(opts, label_names)?
352    } else {
353        // Standard handling for Counter, IntCounter, Gauge, IntGauge
354        // buckets and const_labels parameters are not valid for these types
355        if buckets.is_some() {
356            return Err(anyhow::anyhow!(
357                "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
358            ));
359        }
360        if const_labels.is_some() {
361            return Err(anyhow::anyhow!(
362                "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
363            ));
364        }
365        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366        for (key, value) in &updated_labels {
367            opts = opts.const_label(key.clone(), value.clone());
368        }
369        T::with_opts(opts)?
370    };
371
372    // Register the metric at all hierarchy levels (parents + self)
373    // First register at all parent levels
374    for parent in parent_hierarchies {
375        let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
376        parent.get_metrics_registry().add_metric(collector)?;
377    }
378
379    // Then register at this level
380    let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
381    hierarchy.get_metrics_registry().add_metric(collector)?;
382
383    Ok(prometheus_metric)
384}
385
386/// Wrapper struct that provides access to metrics functionality
387/// This struct is accessed via the `.metrics()` method on DistributedRuntime, Namespace, Component, and Endpoint
388pub struct Metrics<H: MetricsHierarchy> {
389    hierarchy: H,
390}
391
392impl<H: MetricsHierarchy> Metrics<H> {
393    pub fn new(hierarchy: H) -> Self {
394        Self { hierarchy }
395    }
396
397    // TODO: Add support for additional Prometheus metric types:
398    // - Counter: ✅ IMPLEMENTED - create_counter()
399    // - CounterVec: ✅ IMPLEMENTED - create_countervec()
400    // - Gauge: ✅ IMPLEMENTED - create_gauge()
401    // - GaugeVec: ✅ IMPLEMENTED - create_gaugevec()
402    // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
403    // - Histogram: ✅ IMPLEMENTED - create_histogram()
404    // - HistogramVec with custom buckets: create_histogram_with_buckets()
405    // - Info: create_info() - for info metrics with labels
406    // - IntCounter: ✅ IMPLEMENTED - create_intcounter()
407    // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
408    // - IntGauge: ✅ IMPLEMENTED - create_intgauge()
409    // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
410    // - Stateset: create_stateset() - for state-based metrics
411    // - Summary: create_summary() - for quantiles and sum/count metrics
412    // - SummaryVec: create_summary_vec() - for labeled summaries
413    // - Untyped: create_untyped() - for untyped metrics
414    //
415    // NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics
416    // Keep them synchronized when adding new metric types
417
418    /// Create a Counter metric
419    pub fn create_counter(
420        &self,
421        name: &str,
422        description: &str,
423        labels: &[(&str, &str)],
424    ) -> anyhow::Result<prometheus::Counter> {
425        create_metric(&self.hierarchy, name, description, labels, None, None)
426    }
427
428    /// Create a CounterVec metric with label names (for dynamic labels)
429    pub fn create_countervec(
430        &self,
431        name: &str,
432        description: &str,
433        const_labels: &[&str],
434        const_label_values: &[(&str, &str)],
435    ) -> anyhow::Result<prometheus::CounterVec> {
436        create_metric(
437            &self.hierarchy,
438            name,
439            description,
440            const_label_values,
441            None,
442            Some(const_labels),
443        )
444    }
445
446    /// Create a Gauge metric
447    pub fn create_gauge(
448        &self,
449        name: &str,
450        description: &str,
451        labels: &[(&str, &str)],
452    ) -> anyhow::Result<prometheus::Gauge> {
453        create_metric(&self.hierarchy, name, description, labels, None, None)
454    }
455
456    /// Create a GaugeVec metric with label names (for dynamic labels)
457    pub fn create_gaugevec(
458        &self,
459        name: &str,
460        description: &str,
461        const_labels: &[&str],
462        const_label_values: &[(&str, &str)],
463    ) -> anyhow::Result<prometheus::GaugeVec> {
464        create_metric(
465            &self.hierarchy,
466            name,
467            description,
468            const_label_values,
469            None,
470            Some(const_labels),
471        )
472    }
473
474    /// Create a Histogram metric with custom buckets
475    pub fn create_histogram(
476        &self,
477        name: &str,
478        description: &str,
479        labels: &[(&str, &str)],
480        buckets: Option<Vec<f64>>,
481    ) -> anyhow::Result<prometheus::Histogram> {
482        create_metric(&self.hierarchy, name, description, labels, buckets, None)
483    }
484
485    /// Create an IntCounter metric
486    pub fn create_intcounter(
487        &self,
488        name: &str,
489        description: &str,
490        labels: &[(&str, &str)],
491    ) -> anyhow::Result<prometheus::IntCounter> {
492        create_metric(&self.hierarchy, name, description, labels, None, None)
493    }
494
495    /// Create an IntCounterVec metric with label names (for dynamic labels)
496    pub fn create_intcountervec(
497        &self,
498        name: &str,
499        description: &str,
500        const_labels: &[&str],
501        const_label_values: &[(&str, &str)],
502    ) -> anyhow::Result<prometheus::IntCounterVec> {
503        create_metric(
504            &self.hierarchy,
505            name,
506            description,
507            const_label_values,
508            None,
509            Some(const_labels),
510        )
511    }
512
513    /// Create an IntGauge metric
514    pub fn create_intgauge(
515        &self,
516        name: &str,
517        description: &str,
518        labels: &[(&str, &str)],
519    ) -> anyhow::Result<prometheus::IntGauge> {
520        create_metric(&self.hierarchy, name, description, labels, None, None)
521    }
522
523    /// Create an IntGaugeVec metric with label names (for dynamic labels)
524    pub fn create_intgaugevec(
525        &self,
526        name: &str,
527        description: &str,
528        const_labels: &[&str],
529        const_label_values: &[(&str, &str)],
530    ) -> anyhow::Result<prometheus::IntGaugeVec> {
531        create_metric(
532            &self.hierarchy,
533            name,
534            description,
535            const_label_values,
536            None,
537            Some(const_labels),
538        )
539    }
540
541    /// Get metrics in Prometheus text format
542    pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
543        // Execute callbacks first to ensure any new metrics are added to the registry
544        let callback_results = self
545            .hierarchy
546            .get_metrics_registry()
547            .execute_update_callbacks();
548
549        // Log any callback errors but continue
550        for result in callback_results {
551            if let Err(e) = result {
552                tracing::error!("Error executing metrics callback: {}", e);
553            }
554        }
555
556        // Get the Prometheus registry for this hierarchy
557        let prometheus_registry = self
558            .hierarchy
559            .get_metrics_registry()
560            .get_prometheus_registry();
561
562        // Encode metrics from the registry
563        let metric_families = prometheus_registry.gather();
564        let encoder = prometheus::TextEncoder::new();
565        let mut buffer = Vec::new();
566        encoder.encode(&metric_families, &mut buffer)?;
567        let mut result = String::from_utf8(buffer)?;
568
569        // Execute and append exposition text callback results
570        let expfmt = self
571            .hierarchy
572            .get_metrics_registry()
573            .execute_expfmt_callbacks();
574        if !expfmt.is_empty() {
575            if !result.ends_with('\n') {
576                result.push('\n');
577            }
578            result.push_str(&expfmt);
579        }
580
581        Ok(result)
582    }
583}
584
585/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
586/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
587/// generating output in Prometheus text format.
588use crate::traits::DistributedRuntimeProvider;
589
590pub trait MetricsHierarchy: Send + Sync {
591    // ========================================================================
592    // Required methods - must be implemented by all types
593    // ========================================================================
594
595    /// Get the name of this hierarchy (without any hierarchy prefix)
596    fn basename(&self) -> String;
597
598    /// Get the parent hierarchies as actual objects (not strings)
599    /// Returns a vector of hierarchy references, ordered from root to immediate parent.
600    /// For example, an Endpoint would return [DRT, Namespace, Component].
601    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
602
603    /// Get a reference to this hierarchy's metrics registry
604    fn get_metrics_registry(&self) -> &MetricsRegistry;
605
606    // ========================================================================
607    // Provided methods - have default implementations
608    // ========================================================================
609
610    /// Access the metrics interface for this hierarchy
611    /// This is a provided method that works for any type implementing MetricsHierarchy
612    fn metrics(&self) -> Metrics<&Self>
613    where
614        Self: Sized,
615    {
616        Metrics::new(self)
617    }
618}
619
620// Blanket implementation for references to types that implement MetricsHierarchy
621impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
622    fn basename(&self) -> String {
623        (**self).basename()
624    }
625
626    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
627        (**self).parent_hierarchies()
628    }
629
630    fn get_metrics_registry(&self) -> &MetricsRegistry {
631        (**self).get_metrics_registry()
632    }
633}
634
635/// Type alias for runtime callback functions to reduce complexity
636///
637/// This type represents an Arc-wrapped callback function that can be:
638/// - Shared efficiently across multiple threads and contexts
639/// - Cloned without duplicating the underlying closure
640/// - Used in generic contexts requiring 'static lifetime
641///
642/// The Arc wrapper is included in the type to make sharing explicit.
643pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
644
645/// Type alias for exposition text callback functions that return Prometheus text
646pub type PrometheusExpositionFormatCallback =
647    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
648
649/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy.
650///
651/// All fields are Arc-wrapped, so cloning shares state. This ensures metrics registered
652/// on cloned instances (e.g., cloned Client/Endpoint) are visible to the original.
653#[derive(Clone)]
654pub struct MetricsRegistry {
655    /// The Prometheus registry for this hierarchy.
656    /// Arc-wrapped so clones share the same registry (metrics registered on clones are visible everywhere).
657    pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
658
659    /// Update callbacks invoked before metrics are scraped.
660    /// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned).
661    pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
662
663    /// Callbacks that return Prometheus exposition text appended to metrics output.
664    /// Wrapped in Arc to preserve callbacks across clones (e.g., vLLM callbacks registered at Endpoint remain accessible at DRT).
665    pub prometheus_expfmt_callbacks:
666        Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
667}
668
669impl std::fmt::Debug for MetricsRegistry {
670    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
671        f.debug_struct("MetricsRegistry")
672            .field("prometheus_registry", &"<RwLock<Registry>>")
673            .field(
674                "prometheus_update_callbacks",
675                &format!(
676                    "<RwLock<Vec<Callback>>> with {} callbacks",
677                    self.prometheus_update_callbacks.read().unwrap().len()
678                ),
679            )
680            .field(
681                "prometheus_expfmt_callbacks",
682                &format!(
683                    "<RwLock<Vec<Callback>>> with {} callbacks",
684                    self.prometheus_expfmt_callbacks.read().unwrap().len()
685                ),
686            )
687            .finish()
688    }
689}
690
691impl MetricsRegistry {
692    /// Create a new metrics registry with an empty Prometheus registry and callback lists
693    pub fn new() -> Self {
694        Self {
695            prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
696            prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
697            prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
698        }
699    }
700
701    /// Add a callback function that receives a reference to any MetricsHierarchy
702    pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
703        self.prometheus_update_callbacks
704            .write()
705            .unwrap()
706            .push(callback);
707    }
708
709    /// Add an exposition text callback that returns Prometheus text
710    pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
711        self.prometheus_expfmt_callbacks
712            .write()
713            .unwrap()
714            .push(callback);
715    }
716
717    /// Execute all update callbacks and return their results
718    pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
719        self.prometheus_update_callbacks
720            .read()
721            .unwrap()
722            .iter()
723            .map(|callback| callback())
724            .collect()
725    }
726
727    /// Execute all exposition text callbacks and return their concatenated text
728    pub fn execute_expfmt_callbacks(&self) -> String {
729        let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
730        let mut result = String::new();
731        for callback in callbacks.iter() {
732            match callback() {
733                Ok(text) => {
734                    if !text.is_empty() {
735                        if !result.is_empty() && !result.ends_with('\n') {
736                            result.push('\n');
737                        }
738                        result.push_str(&text);
739                    }
740                }
741                Err(e) => {
742                    tracing::error!("Error executing exposition text callback: {}", e);
743                }
744            }
745        }
746        result
747    }
748
749    /// Add a Prometheus metric collector to this registry
750    pub fn add_metric(
751        &self,
752        collector: Box<dyn prometheus::core::Collector>,
753    ) -> anyhow::Result<()> {
754        self.prometheus_registry
755            .write()
756            .unwrap()
757            .register(collector)
758            .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
759    }
760
761    /// Get a read guard to the Prometheus registry for scraping
762    pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
763        self.prometheus_registry.read().unwrap()
764    }
765
766    /// Returns true if a metric with the given name already exists in the Prometheus registry
767    pub fn has_metric_named(&self, metric_name: &str) -> bool {
768        self.prometheus_registry
769            .read()
770            .unwrap()
771            .gather()
772            .iter()
773            .any(|mf| mf.name() == metric_name)
774    }
775}
776
777impl Default for MetricsRegistry {
778    fn default() -> Self {
779        Self::new()
780    }
781}
782
783#[cfg(test)]
784mod test_helpers {
785    use super::prometheus_names::name_prefix;
786    use super::*;
787
788    /// Base function to filter Prometheus output lines based on a predicate.
789    /// Returns lines that match the predicate, converted to String.
790    fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
791    where
792        F: FnMut(&str) -> bool,
793    {
794        input
795            .lines()
796            .filter(|line| predicate(line))
797            .map(|line| line.to_string())
798            .collect::<Vec<_>>()
799    }
800
801    /// Extracts all component metrics (excluding help text and type definitions).
802    /// Returns only the actual metric lines with values.
803    pub fn extract_metrics(input: &str) -> Vec<String> {
804        filter_prometheus_lines(input, |line| {
805            line.starts_with(&format!("{}_", name_prefix::COMPONENT))
806                && !line.starts_with("#")
807                && !line.trim().is_empty()
808        })
809    }
810
811    /// Parses a Prometheus metric line and extracts the name, labels, and value.
812    /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state.
813    ///
814    /// # Example
815    /// ```
816    /// let line = "http_requests_total{method=\"GET\"} 1234";
817    /// let (name, labels, value) = parse_prometheus_metric(line).unwrap();
818    /// assert_eq!(name, "http_requests_total");
819    /// assert_eq!(labels.get("method"), Some(&"GET".to_string()));
820    /// assert_eq!(value, 1234.0);
821    /// ```
822    pub fn parse_prometheus_metric(
823        line: &str,
824    ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
825        if line.trim().is_empty() || line.starts_with('#') {
826            return None;
827        }
828
829        let parts: Vec<&str> = line.split_whitespace().collect();
830        if parts.len() < 2 {
831            return None;
832        }
833
834        let metric_part = parts[0];
835        let value: f64 = parts[1].parse().ok()?;
836
837        let (name, labels) = if metric_part.contains('{') {
838            let brace_start = metric_part.find('{').unwrap();
839            let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
840            let name = &metric_part[..brace_start];
841            let labels_str = &metric_part[brace_start + 1..brace_end];
842
843            let mut labels = std::collections::HashMap::new();
844            for pair in labels_str.split(',') {
845                if let Some((k, v)) = pair.split_once('=') {
846                    let v = v.trim_matches('"');
847                    labels.insert(k.trim().to_string(), v.to_string());
848                }
849            }
850            (name.to_string(), labels)
851        } else {
852            (metric_part.to_string(), std::collections::HashMap::new())
853        };
854
855        Some((name, labels, value))
856    }
857}
858
859#[cfg(test)]
860mod test_metricsregistry_units {
861    use super::*;
862
863    #[test]
864    fn test_build_component_metric_name_with_prefix() {
865        // Test that build_component_metric_name correctly prepends the dynamo_component prefix
866        let result = build_component_metric_name("requests");
867        assert_eq!(result, "dynamo_component_requests");
868
869        let result = build_component_metric_name("counter");
870        assert_eq!(result, "dynamo_component_counter");
871    }
872
873    #[test]
874    fn test_parse_prometheus_metric() {
875        use super::test_helpers::parse_prometheus_metric;
876        use std::collections::HashMap;
877
878        // Test parsing a metric with labels
879        let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
880        let parsed = parse_prometheus_metric(line);
881        assert!(parsed.is_some());
882
883        let (name, labels, value) = parsed.unwrap();
884        assert_eq!(name, "http_requests_total");
885
886        let mut expected_labels = HashMap::new();
887        expected_labels.insert("method".to_string(), "GET".to_string());
888        expected_labels.insert("status".to_string(), "200".to_string());
889        assert_eq!(labels, expected_labels);
890
891        assert_eq!(value, 1234.0);
892
893        // Test parsing a metric without labels
894        let line = "cpu_usage 98.5";
895        let parsed = parse_prometheus_metric(line);
896        assert!(parsed.is_some());
897
898        let (name, labels, value) = parsed.unwrap();
899        assert_eq!(name, "cpu_usage");
900        assert!(labels.is_empty());
901        assert_eq!(value, 98.5);
902
903        // Test parsing a metric with float value
904        let line = "response_time{service=\"api\"} 0.123";
905        let parsed = parse_prometheus_metric(line);
906        assert!(parsed.is_some());
907
908        let (name, labels, value) = parsed.unwrap();
909        assert_eq!(name, "response_time");
910
911        let mut expected_labels = HashMap::new();
912        expected_labels.insert("service".to_string(), "api".to_string());
913        assert_eq!(labels, expected_labels);
914
915        assert_eq!(value, 0.123);
916
917        // Test parsing invalid lines
918        assert!(parse_prometheus_metric("").is_none()); // Empty line
919        assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text
920        assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition
921        assert!(parse_prometheus_metric("metric_name").is_none()); // No value
922
923        println!("✓ Prometheus metric parsing works correctly!");
924    }
925
926    #[test]
927    fn test_metrics_registry_entry_callbacks() {
928        use crate::MetricsRegistry;
929        use std::sync::atomic::{AtomicUsize, Ordering};
930
931        // Test 1: Basic callback execution with counter increments
932        {
933            let registry = MetricsRegistry::new();
934            let counter = Arc::new(AtomicUsize::new(0));
935
936            // Add callbacks with different increment values
937            for increment in [1, 10, 100] {
938                let counter_clone = counter.clone();
939                registry.add_update_callback(Arc::new(move || {
940                    counter_clone.fetch_add(increment, Ordering::SeqCst);
941                    Ok(())
942                }));
943            }
944
945            // Verify counter starts at 0
946            assert_eq!(counter.load(Ordering::SeqCst), 0);
947
948            // First execution
949            let results = registry.execute_update_callbacks();
950            assert_eq!(results.len(), 3);
951            assert!(results.iter().all(|r| r.is_ok()));
952            assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
953
954            // Second execution - callbacks should be reusable
955            let results = registry.execute_update_callbacks();
956            assert_eq!(results.len(), 3);
957            assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
958
959            // Test cloning - cloned entry shares callbacks (callbacks are Arc-wrapped)
960            let cloned = registry.clone();
961            assert_eq!(cloned.execute_update_callbacks().len(), 3);
962            assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
963
964            // Original still has callbacks and shares the same Arc
965            registry.execute_update_callbacks();
966            assert_eq!(counter.load(Ordering::SeqCst), 444); // 333 + 111
967        }
968
969        // Test 2: Mixed success and error callbacks
970        {
971            let registry = MetricsRegistry::new();
972            let counter = Arc::new(AtomicUsize::new(0));
973
974            // Successful callback
975            let counter_clone = counter.clone();
976            registry.add_update_callback(Arc::new(move || {
977                counter_clone.fetch_add(1, Ordering::SeqCst);
978                Ok(())
979            }));
980
981            // Error callback
982            registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
983
984            // Another successful callback
985            let counter_clone = counter.clone();
986            registry.add_update_callback(Arc::new(move || {
987                counter_clone.fetch_add(10, Ordering::SeqCst);
988                Ok(())
989            }));
990
991            // Execute and verify mixed results
992            let results = registry.execute_update_callbacks();
993            assert_eq!(results.len(), 3);
994            assert!(results[0].is_ok());
995            assert!(results[1].is_err());
996            assert!(results[2].is_ok());
997
998            // Verify error message
999            assert_eq!(
1000                results[1].as_ref().unwrap_err().to_string(),
1001                "Simulated error"
1002            );
1003
1004            // Verify successful callbacks still executed
1005            assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
1006
1007            // Execute again - errors should be consistent
1008            let results = registry.execute_update_callbacks();
1009            assert!(results[1].is_err());
1010            assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
1011        }
1012
1013        // Test 3: Empty registry
1014        {
1015            let registry = MetricsRegistry::new();
1016            let results = registry.execute_update_callbacks();
1017            assert_eq!(results.len(), 0);
1018        }
1019    }
1020}
1021
1022#[cfg(feature = "integration")]
1023#[cfg(test)]
1024mod test_metricsregistry_prefixes {
1025    use super::*;
1026    use crate::distributed::distributed_test_utils::create_test_drt_async;
1027    use prometheus::core::Collector;
1028
1029    #[tokio::test]
1030    async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1031        let drt = create_test_drt_async().await;
1032
1033        const DRT_NAME: &str = "";
1034        const NAMESPACE_NAME: &str = "ns901";
1035        const COMPONENT_NAME: &str = "comp901";
1036        const ENDPOINT_NAME: &str = "ep901";
1037        let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1038        let component = namespace.component(COMPONENT_NAME).unwrap();
1039        let endpoint = component.endpoint(ENDPOINT_NAME);
1040
1041        // DRT
1042        assert_eq!(drt.basename(), DRT_NAME);
1043        assert_eq!(drt.parent_hierarchies().len(), 0);
1044        // DRT hierarchy is just its basename (empty string)
1045
1046        // Namespace
1047        assert_eq!(namespace.basename(), NAMESPACE_NAME);
1048        assert_eq!(namespace.parent_hierarchies().len(), 1);
1049        assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1050        // Namespace hierarchy is just its basename since parent is empty
1051
1052        // Component
1053        assert_eq!(component.basename(), COMPONENT_NAME);
1054        assert_eq!(component.parent_hierarchies().len(), 2);
1055        assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1056        assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1057        // Component hierarchy structure is validated by the individual assertions above
1058
1059        // Endpoint
1060        assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1061        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1062        assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1063        assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1064        assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1065        // Endpoint hierarchy structure is validated by the individual assertions above
1066
1067        // Relationships
1068        assert!(
1069            namespace
1070                .parent_hierarchies()
1071                .iter()
1072                .any(|h| h.basename() == drt.basename())
1073        );
1074        assert!(
1075            component
1076                .parent_hierarchies()
1077                .iter()
1078                .any(|h| h.basename() == namespace.basename())
1079        );
1080        assert!(
1081            endpoint
1082                .parent_hierarchies()
1083                .iter()
1084                .any(|h| h.basename() == component.basename())
1085        );
1086
1087        // Depth
1088        assert_eq!(drt.parent_hierarchies().len(), 0);
1089        assert_eq!(namespace.parent_hierarchies().len(), 1);
1090        assert_eq!(component.parent_hierarchies().len(), 2);
1091        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1092
1093        // Invalid namespace behavior - sanitizes to "_123" and succeeds
1094        // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
1095        // so invalid characters are sanitized in MetricsRegistry rather than rejected.
1096        let invalid_namespace = drt.namespace("@@123").unwrap();
1097        let result =
1098            invalid_namespace
1099                .metrics()
1100                .create_counter("test_counter", "A test counter", &[]);
1101        assert!(result.is_ok());
1102        if let Ok(counter) = &result {
1103            // Verify the namespace was sanitized to "_123" in the label
1104            let desc = counter.desc();
1105            let namespace_label = desc[0]
1106                .const_label_pairs
1107                .iter()
1108                .find(|l| l.name() == "dynamo_namespace")
1109                .expect("Should have dynamo_namespace label");
1110            assert_eq!(namespace_label.value(), "_123");
1111        }
1112
1113        // Valid namespace works
1114        let valid_namespace = drt.namespace("ns567").unwrap();
1115        assert!(
1116            valid_namespace
1117                .metrics()
1118                .create_counter("test_counter", "A test counter", &[])
1119                .is_ok()
1120        );
1121    }
1122
1123    #[tokio::test]
1124    async fn test_recursive_namespace() {
1125        // Create a distributed runtime for testing
1126        let drt = create_test_drt_async().await;
1127
1128        // Create a deeply chained namespace: ns1.ns2.ns3
1129        let ns1 = drt.namespace("ns1").unwrap();
1130        let ns2 = ns1.namespace("ns2").unwrap();
1131        let ns3 = ns2.namespace("ns3").unwrap();
1132
1133        // Create a component in the deepest namespace
1134        let component = ns3.component("test-component").unwrap();
1135
1136        // Verify the hierarchy structure
1137        assert_eq!(ns1.basename(), "ns1");
1138        assert_eq!(ns1.parent_hierarchies().len(), 1);
1139        assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1140        // ns1 hierarchy is just its basename since parent is empty
1141
1142        assert_eq!(ns2.basename(), "ns2");
1143        assert_eq!(ns2.parent_hierarchies().len(), 2);
1144        assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1145        assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1146        // ns2 hierarchy structure validated by parent assertions above
1147
1148        assert_eq!(ns3.basename(), "ns3");
1149        assert_eq!(ns3.parent_hierarchies().len(), 3);
1150        assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1151        assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1152        assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1153        // ns3 hierarchy structure validated by parent assertions above
1154
1155        assert_eq!(component.basename(), "test-component");
1156        assert_eq!(component.parent_hierarchies().len(), 4);
1157        assert_eq!(component.parent_hierarchies()[0].basename(), "");
1158        assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1159        assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1160        assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1161        // component hierarchy structure validated by parent assertions above
1162
1163        println!("✓ Chained namespace test passed - all prefixes correct");
1164    }
1165}
1166
1167#[cfg(feature = "integration")]
1168#[cfg(test)]
1169mod test_metricsregistry_prometheus_fmt_outputs {
1170    use super::prometheus_names::name_prefix;
1171    use super::*;
1172    use crate::distributed::distributed_test_utils::create_test_drt_async;
1173    use prometheus::Counter;
1174    use std::sync::Arc;
1175
1176    #[tokio::test]
1177    async fn test_prometheusfactory_using_metrics_registry_trait() {
1178        // Setup real DRT and registry using the test-friendly constructor
1179        let drt = create_test_drt_async().await;
1180
1181        // Use a simple constant namespace name
1182        let namespace_name = "ns345";
1183
1184        let namespace = drt.namespace(namespace_name).unwrap();
1185        let component = namespace.component("comp345").unwrap();
1186        let endpoint = component.endpoint("ep345");
1187
1188        // Test Counter creation
1189        let counter = endpoint
1190            .metrics()
1191            .create_counter("testcounter", "A test counter", &[])
1192            .unwrap();
1193        counter.inc_by(123.456789);
1194        let epsilon = 0.01;
1195        assert!((counter.get() - 123.456789).abs() < epsilon);
1196
1197        let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1198        println!("Endpoint output:");
1199        println!("{}", endpoint_output_raw);
1200
1201        let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1202# TYPE dynamo_component_testcounter counter
1203dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1204
1205        assert_eq!(
1206            endpoint_output_raw, expected_endpoint_output,
1207            "\n=== ENDPOINT COMPARISON FAILED ===\n\
1208             Actual:\n{}\n\
1209             Expected:\n{}\n\
1210             ==============================",
1211            endpoint_output_raw, expected_endpoint_output
1212        );
1213
1214        // Test Gauge creation
1215        let gauge = component
1216            .metrics()
1217            .create_gauge("testgauge", "A test gauge", &[])
1218            .unwrap();
1219        gauge.set(50000.0);
1220        assert_eq!(gauge.get(), 50000.0);
1221
1222        // Test Prometheus format output for Component (gauge + histogram)
1223        let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1224        println!("Component output:");
1225        println!("{}", component_output_raw);
1226
1227        let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1228# TYPE dynamo_component_testcounter counter
1229dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1230# HELP dynamo_component_testgauge A test gauge
1231# TYPE dynamo_component_testgauge gauge
1232dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1233
1234        assert_eq!(
1235            component_output_raw, expected_component_output,
1236            "\n=== COMPONENT COMPARISON FAILED ===\n\
1237             Actual:\n{}\n\
1238             Expected:\n{}\n\
1239             ==============================",
1240            component_output_raw, expected_component_output
1241        );
1242
1243        let intcounter = namespace
1244            .metrics()
1245            .create_intcounter("testintcounter", "A test int counter", &[])
1246            .unwrap();
1247        intcounter.inc_by(12345);
1248        assert_eq!(intcounter.get(), 12345);
1249
1250        // Test Prometheus format output for Namespace (int_counter + gauge + histogram)
1251        let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1252        println!("Namespace output:");
1253        println!("{}", namespace_output_raw);
1254
1255        let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1256# TYPE dynamo_component_testcounter counter
1257dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1258# HELP dynamo_component_testgauge A test gauge
1259# TYPE dynamo_component_testgauge gauge
1260dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1261# HELP dynamo_component_testintcounter A test int counter
1262# TYPE dynamo_component_testintcounter counter
1263dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1264
1265        assert_eq!(
1266            namespace_output_raw, expected_namespace_output,
1267            "\n=== NAMESPACE COMPARISON FAILED ===\n\
1268             Actual:\n{}\n\
1269             Expected:\n{}\n\
1270             ==============================",
1271            namespace_output_raw, expected_namespace_output
1272        );
1273
1274        // Test IntGauge creation
1275        let intgauge = namespace
1276            .metrics()
1277            .create_intgauge("testintgauge", "A test int gauge", &[])
1278            .unwrap();
1279        intgauge.set(42);
1280        assert_eq!(intgauge.get(), 42);
1281
1282        // Test IntGaugeVec creation
1283        let intgaugevec = namespace
1284            .metrics()
1285            .create_intgaugevec(
1286                "testintgaugevec",
1287                "A test int gauge vector",
1288                &["instance", "status"],
1289                &[("service", "api")],
1290            )
1291            .unwrap();
1292        intgaugevec
1293            .with_label_values(&["server1", "active"])
1294            .set(10);
1295        intgaugevec
1296            .with_label_values(&["server2", "inactive"])
1297            .set(0);
1298
1299        // Test CounterVec creation
1300        let countervec = endpoint
1301            .metrics()
1302            .create_countervec(
1303                "testcountervec",
1304                "A test counter vector",
1305                &["method", "status"],
1306                &[("service", "api")],
1307            )
1308            .unwrap();
1309        countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1310        countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1311
1312        // Test Histogram creation
1313        let histogram = component
1314            .metrics()
1315            .create_histogram("testhistogram", "A test histogram", &[], None)
1316            .unwrap();
1317        histogram.observe(1.0);
1318        histogram.observe(2.5);
1319        histogram.observe(4.0);
1320
1321        // Test Prometheus format output for DRT (all metrics combined)
1322        let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1323        println!("DRT output:");
1324        println!("{}", drt_output_raw);
1325
1326        let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1327# TYPE dynamo_component_testcounter counter
1328dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1329# HELP dynamo_component_testcountervec A test counter vector
1330# TYPE dynamo_component_testcountervec counter
1331dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1332dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1333# HELP dynamo_component_testgauge A test gauge
1334# TYPE dynamo_component_testgauge gauge
1335dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1336# HELP dynamo_component_testhistogram A test histogram
1337# TYPE dynamo_component_testhistogram histogram
1338dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1339dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1340dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1341dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1342dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1343dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1344dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1345dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1346dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1347dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1348dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1349dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1350dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1351dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1352# HELP dynamo_component_testintcounter A test int counter
1353# TYPE dynamo_component_testintcounter counter
1354dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1355# HELP dynamo_component_testintgauge A test int gauge
1356# TYPE dynamo_component_testintgauge gauge
1357dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1358# HELP dynamo_component_testintgaugevec A test int gauge vector
1359# TYPE dynamo_component_testintgaugevec gauge
1360dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1361dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1362# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1363# TYPE dynamo_component_uptime_seconds gauge
1364dynamo_component_uptime_seconds 0"#.to_string();
1365
1366        assert_eq!(
1367            drt_output_raw, expected_drt_output,
1368            "\n=== DRT COMPARISON FAILED ===\n\
1369             Expected:\n{}\n\
1370             Actual (filtered):\n{}\n\
1371             ==============================",
1372            expected_drt_output, drt_output_raw
1373        );
1374
1375        println!("✓ All Prometheus format outputs verified successfully!");
1376    }
1377
1378    #[test]
1379    fn test_refactored_filter_functions() {
1380        // Test data with component metrics
1381        let test_input = r#"# HELP dynamo_component_requests Total requests
1382# TYPE dynamo_component_requests counter
1383dynamo_component_requests 42
1384# HELP dynamo_component_latency Response latency
1385# TYPE dynamo_component_latency histogram
1386dynamo_component_latency_bucket{le="0.1"} 10
1387dynamo_component_latency_bucket{le="0.5"} 25
1388dynamo_component_errors_total 5"#;
1389
1390        // Test extract_metrics (only actual metric lines, excluding help/type)
1391        let metrics_only = super::test_helpers::extract_metrics(test_input);
1392        assert_eq!(metrics_only.len(), 4); // 4 actual metric lines (excluding help/type)
1393        assert!(
1394            metrics_only
1395                .iter()
1396                .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1397        );
1398
1399        println!("✓ All refactored filter functions work correctly!");
1400    }
1401}