Skip to main content

dynamo_runtime/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics registry trait and implementation for Prometheus metrics
5//!
6//! This module provides a trait-based interface for creating and managing Prometheus metrics
7//! with automatic label injection and hierarchical naming support.
8
9pub mod prometheus_names;
10
11use parking_lot::Mutex;
12use std::collections::HashSet;
13use std::sync::Arc;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22// Import commonly used items to avoid verbose prefixes
23use prometheus_names::{
24    build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
25    sanitize_prometheus_name, work_handler,
26};
27
28// Pipeline imports for endpoint creation
29use crate::pipeline::{
30    AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31    network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37// If set to true, then metrics will be labeled with the namespace, component, and endpoint labels.
38// These labels are prefixed with "dynamo_" to avoid collisions with Kubernetes and other monitoring system labels.
39pub const USE_AUTO_LABELS: bool = true;
40
41// Prometheus imports
42use prometheus::Encoder;
43
44/// Validate that a label slice has no duplicate keys.
45/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
46fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47    let mut seen_keys = std::collections::HashSet::new();
48    for (key, _) in labels {
49        if !seen_keys.insert(*key) {
50            return Err(anyhow::anyhow!(
51                "Duplicate label key '{}' found in labels",
52                key
53            ));
54        }
55    }
56    Ok(())
57}
58
59/// ==============================
60/// Prometheus section
61/// ==============================
62/// Trait that defines common behavior for Prometheus metric types
63pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
64    /// Create a new metric with the given options
65    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
66    where
67        Self: Sized;
68
69    /// Create a new metric with histogram options and custom buckets
70    /// This is a default implementation that will panic for non-histogram metrics
71    fn with_histogram_opts_and_buckets(
72        _opts: prometheus::HistogramOpts,
73        _buckets: Option<Vec<f64>>,
74    ) -> Result<Self, prometheus::Error>
75    where
76        Self: Sized,
77    {
78        panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
79    }
80
81    /// Create a new metric with counter options and label names (for CounterVec)
82    /// This is a default implementation that will panic for non-countervec metrics
83    fn with_opts_and_label_names(
84        _opts: prometheus::Opts,
85        _label_names: &[&str],
86    ) -> Result<Self, prometheus::Error>
87    where
88        Self: Sized,
89    {
90        panic!("with_opts_and_label_names is not implemented for this metric type");
91    }
92}
93
94// Implement the trait for Counter, IntCounter, and Gauge
95impl PrometheusMetric for prometheus::Counter {
96    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
97        prometheus::Counter::with_opts(opts)
98    }
99}
100
101impl PrometheusMetric for prometheus::IntCounter {
102    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
103        prometheus::IntCounter::with_opts(opts)
104    }
105}
106
107impl PrometheusMetric for prometheus::Gauge {
108    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
109        prometheus::Gauge::with_opts(opts)
110    }
111}
112
113impl PrometheusMetric for prometheus::IntGauge {
114    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
115        prometheus::IntGauge::with_opts(opts)
116    }
117}
118
119impl PrometheusMetric for prometheus::GaugeVec {
120    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
121        Err(prometheus::Error::Msg(
122            "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
123        ))
124    }
125
126    fn with_opts_and_label_names(
127        opts: prometheus::Opts,
128        label_names: &[&str],
129    ) -> Result<Self, prometheus::Error> {
130        prometheus::GaugeVec::new(opts, label_names)
131    }
132}
133
134impl PrometheusMetric for prometheus::IntGaugeVec {
135    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
136        Err(prometheus::Error::Msg(
137            "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
138        ))
139    }
140
141    fn with_opts_and_label_names(
142        opts: prometheus::Opts,
143        label_names: &[&str],
144    ) -> Result<Self, prometheus::Error> {
145        prometheus::IntGaugeVec::new(opts, label_names)
146    }
147}
148
149impl PrometheusMetric for prometheus::IntCounterVec {
150    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
151        Err(prometheus::Error::Msg(
152            "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
153        ))
154    }
155
156    fn with_opts_and_label_names(
157        opts: prometheus::Opts,
158        label_names: &[&str],
159    ) -> Result<Self, prometheus::Error> {
160        prometheus::IntCounterVec::new(opts, label_names)
161    }
162}
163
164// Implement the trait for Histogram
165impl PrometheusMetric for prometheus::Histogram {
166    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
167        // Convert Opts to HistogramOpts
168        let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
169        prometheus::Histogram::with_opts(histogram_opts)
170    }
171
172    fn with_histogram_opts_and_buckets(
173        mut opts: prometheus::HistogramOpts,
174        buckets: Option<Vec<f64>>,
175    ) -> Result<Self, prometheus::Error> {
176        if let Some(custom_buckets) = buckets {
177            opts = opts.buckets(custom_buckets);
178        }
179        prometheus::Histogram::with_opts(opts)
180    }
181}
182
183// Implement the trait for CounterVec
184impl PrometheusMetric for prometheus::CounterVec {
185    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
186        // This will panic - CounterVec needs label names
187        panic!("CounterVec requires label names, use with_opts_and_label_names instead");
188    }
189
190    fn with_opts_and_label_names(
191        opts: prometheus::Opts,
192        label_names: &[&str],
193    ) -> Result<Self, prometheus::Error> {
194        prometheus::CounterVec::new(opts, label_names)
195    }
196}
197
198/// ==============================
199/// Metrics section
200/// ==============================
201/// Public helper function to create metrics - accessible for Python bindings
202pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
203    hierarchy: &H,
204    metric_name: &str,
205    metric_desc: &str,
206    labels: &[(&str, &str)],
207    buckets: Option<Vec<f64>>,
208    const_labels: Option<&[&str]>,
209) -> anyhow::Result<T> {
210    // Validate that user-provided labels don't have duplicate keys
211    validate_no_duplicate_label_keys(labels)?;
212    // Note: stored labels functionality has been removed
213
214    let basename = hierarchy.basename();
215    let parent_hierarchies = hierarchy.parent_hierarchies();
216
217    // Build hierarchy path as vector of strings: parent names + [basename]
218    let mut hierarchy_names: Vec<String> =
219        parent_hierarchies.iter().map(|p| p.basename()).collect();
220    hierarchy_names.push(basename.clone());
221
222    let metric_name = build_component_metric_name(metric_name);
223
224    // Build updated_labels: auto-labels first, then `labels` + stored labels
225    let mut updated_labels: Vec<(String, String)> = Vec::new();
226
227    if USE_AUTO_LABELS {
228        // Validate that user-provided labels don't conflict with auto-generated labels
229        for (key, _) in labels {
230            if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
231                return Err(anyhow::anyhow!(
232                    "Label '{}' is automatically added by auto_label feature and cannot be manually set",
233                    key
234                ));
235            }
236        }
237
238        // Add auto-generated labels with sanitized values
239        if hierarchy_names.len() > 1 {
240            let namespace = &hierarchy_names[1];
241            if !namespace.is_empty() {
242                let valid_namespace = sanitize_prometheus_label(namespace)?;
243                if !valid_namespace.is_empty() {
244                    updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
245                }
246            }
247        }
248        if hierarchy_names.len() > 2 {
249            let component = &hierarchy_names[2];
250            if !component.is_empty() {
251                let valid_component = sanitize_prometheus_label(component)?;
252                if !valid_component.is_empty() {
253                    updated_labels.push((labels::COMPONENT.to_string(), valid_component));
254                }
255            }
256        }
257        if hierarchy_names.len() > 3 {
258            let endpoint = &hierarchy_names[3];
259            if !endpoint.is_empty() {
260                let valid_endpoint = sanitize_prometheus_label(endpoint)?;
261                if !valid_endpoint.is_empty() {
262                    updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
263                }
264            }
265        }
266    }
267
268    // Add user labels
269    updated_labels.extend(
270        labels
271            .iter()
272            .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
273    );
274    // Note: stored labels functionality has been removed
275
276    // Handle different metric types
277    let prometheus_metric = if std::any::TypeId::of::<T>()
278        == std::any::TypeId::of::<prometheus::CounterVec>()
279    {
280        // Special handling for CounterVec with label names
281        // const_labels parameter is required for CounterVec
282        if buckets.is_some() {
283            return Err(anyhow::anyhow!(
284                "buckets parameter is not valid for CounterVec"
285            ));
286        }
287        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
288        for (key, value) in &updated_labels {
289            opts = opts.const_label(key.clone(), value.clone());
290        }
291        let label_names = const_labels
292            .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
293        T::with_opts_and_label_names(opts, label_names)?
294    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
295        // Special handling for GaugeVec with label names
296        // const_labels parameter is required for GaugeVec
297        if buckets.is_some() {
298            return Err(anyhow::anyhow!(
299                "buckets parameter is not valid for GaugeVec"
300            ));
301        }
302        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
303        for (key, value) in &updated_labels {
304            opts = opts.const_label(key.clone(), value.clone());
305        }
306        let label_names = const_labels
307            .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
308        T::with_opts_and_label_names(opts, label_names)?
309    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
310        // Special handling for Histogram with custom buckets
311        // buckets parameter is valid for Histogram, const_labels is not used
312        if const_labels.is_some() {
313            return Err(anyhow::anyhow!(
314                "const_labels parameter is not valid for Histogram"
315            ));
316        }
317        let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
318        for (key, value) in &updated_labels {
319            opts = opts.const_label(key.clone(), value.clone());
320        }
321        T::with_histogram_opts_and_buckets(opts, buckets)?
322    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
323        // Special handling for IntCounterVec with label names
324        // const_labels parameter is required for IntCounterVec
325        if buckets.is_some() {
326            return Err(anyhow::anyhow!(
327                "buckets parameter is not valid for IntCounterVec"
328            ));
329        }
330        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
331        for (key, value) in &updated_labels {
332            opts = opts.const_label(key.clone(), value.clone());
333        }
334        let label_names = const_labels
335            .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
336        T::with_opts_and_label_names(opts, label_names)?
337    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
338        // Special handling for IntGaugeVec with label names
339        // const_labels parameter is required for IntGaugeVec
340        if buckets.is_some() {
341            return Err(anyhow::anyhow!(
342                "buckets parameter is not valid for IntGaugeVec"
343            ));
344        }
345        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
346        for (key, value) in &updated_labels {
347            opts = opts.const_label(key.clone(), value.clone());
348        }
349        let label_names = const_labels
350            .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
351        T::with_opts_and_label_names(opts, label_names)?
352    } else {
353        // Standard handling for Counter, IntCounter, Gauge, IntGauge
354        // buckets and const_labels parameters are not valid for these types
355        if buckets.is_some() {
356            return Err(anyhow::anyhow!(
357                "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
358            ));
359        }
360        if const_labels.is_some() {
361            return Err(anyhow::anyhow!(
362                "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
363            ));
364        }
365        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366        for (key, value) in &updated_labels {
367            opts = opts.const_label(key.clone(), value.clone());
368        }
369        T::with_opts(opts)?
370    };
371
372    let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
373    hierarchy.get_metrics_registry().add_metric(collector)?;
374
375    Ok(prometheus_metric)
376}
377
378/// Wrapper struct that provides access to metrics functionality
379/// This struct is accessed via the `.metrics()` method on DistributedRuntime, Namespace, Component, and Endpoint
380pub struct Metrics<H: MetricsHierarchy> {
381    hierarchy: H,
382}
383
384impl<H: MetricsHierarchy> Metrics<H> {
385    pub fn new(hierarchy: H) -> Self {
386        Self { hierarchy }
387    }
388
389    // TODO: Add support for additional Prometheus metric types:
390    // - Counter: ✅ IMPLEMENTED - create_counter()
391    // - CounterVec: ✅ IMPLEMENTED - create_countervec()
392    // - Gauge: ✅ IMPLEMENTED - create_gauge()
393    // - GaugeVec: ✅ IMPLEMENTED - create_gaugevec()
394    // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
395    // - Histogram: ✅ IMPLEMENTED - create_histogram()
396    // - HistogramVec with custom buckets: create_histogram_with_buckets()
397    // - Info: create_info() - for info metrics with labels
398    // - IntCounter: ✅ IMPLEMENTED - create_intcounter()
399    // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
400    // - IntGauge: ✅ IMPLEMENTED - create_intgauge()
401    // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
402    // - Stateset: create_stateset() - for state-based metrics
403    // - Summary: create_summary() - for quantiles and sum/count metrics
404    // - SummaryVec: create_summary_vec() - for labeled summaries
405    // - Untyped: create_untyped() - for untyped metrics
406    //
407    // NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics
408    // Keep them synchronized when adding new metric types
409
410    /// Create a Counter metric
411    pub fn create_counter(
412        &self,
413        name: &str,
414        description: &str,
415        labels: &[(&str, &str)],
416    ) -> anyhow::Result<prometheus::Counter> {
417        create_metric(&self.hierarchy, name, description, labels, None, None)
418    }
419
420    /// Create a CounterVec metric with label names (for dynamic labels)
421    pub fn create_countervec(
422        &self,
423        name: &str,
424        description: &str,
425        const_labels: &[&str],
426        const_label_values: &[(&str, &str)],
427    ) -> anyhow::Result<prometheus::CounterVec> {
428        create_metric(
429            &self.hierarchy,
430            name,
431            description,
432            const_label_values,
433            None,
434            Some(const_labels),
435        )
436    }
437
438    /// Create a Gauge metric
439    pub fn create_gauge(
440        &self,
441        name: &str,
442        description: &str,
443        labels: &[(&str, &str)],
444    ) -> anyhow::Result<prometheus::Gauge> {
445        create_metric(&self.hierarchy, name, description, labels, None, None)
446    }
447
448    /// Create a GaugeVec metric with label names (for dynamic labels)
449    pub fn create_gaugevec(
450        &self,
451        name: &str,
452        description: &str,
453        const_labels: &[&str],
454        const_label_values: &[(&str, &str)],
455    ) -> anyhow::Result<prometheus::GaugeVec> {
456        create_metric(
457            &self.hierarchy,
458            name,
459            description,
460            const_label_values,
461            None,
462            Some(const_labels),
463        )
464    }
465
466    /// Create a Histogram metric with custom buckets
467    pub fn create_histogram(
468        &self,
469        name: &str,
470        description: &str,
471        labels: &[(&str, &str)],
472        buckets: Option<Vec<f64>>,
473    ) -> anyhow::Result<prometheus::Histogram> {
474        create_metric(&self.hierarchy, name, description, labels, buckets, None)
475    }
476
477    /// Create an IntCounter metric
478    pub fn create_intcounter(
479        &self,
480        name: &str,
481        description: &str,
482        labels: &[(&str, &str)],
483    ) -> anyhow::Result<prometheus::IntCounter> {
484        create_metric(&self.hierarchy, name, description, labels, None, None)
485    }
486
487    /// Create an IntCounterVec metric with label names (for dynamic labels)
488    pub fn create_intcountervec(
489        &self,
490        name: &str,
491        description: &str,
492        const_labels: &[&str],
493        const_label_values: &[(&str, &str)],
494    ) -> anyhow::Result<prometheus::IntCounterVec> {
495        create_metric(
496            &self.hierarchy,
497            name,
498            description,
499            const_label_values,
500            None,
501            Some(const_labels),
502        )
503    }
504
505    /// Create an IntGauge metric
506    pub fn create_intgauge(
507        &self,
508        name: &str,
509        description: &str,
510        labels: &[(&str, &str)],
511    ) -> anyhow::Result<prometheus::IntGauge> {
512        create_metric(&self.hierarchy, name, description, labels, None, None)
513    }
514
515    /// Create an IntGaugeVec metric with label names (for dynamic labels)
516    pub fn create_intgaugevec(
517        &self,
518        name: &str,
519        description: &str,
520        const_labels: &[&str],
521        const_label_values: &[(&str, &str)],
522    ) -> anyhow::Result<prometheus::IntGaugeVec> {
523        create_metric(
524            &self.hierarchy,
525            name,
526            description,
527            const_label_values,
528            None,
529            Some(const_labels),
530        )
531    }
532
533    /// Get metrics in Prometheus text format
534    pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
535        self.hierarchy
536            .get_metrics_registry()
537            .prometheus_expfmt_combined()
538    }
539}
540
541/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
542/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
543/// generating output in Prometheus text format.
544use crate::traits::DistributedRuntimeProvider;
545
546pub trait MetricsHierarchy: Send + Sync {
547    // ========================================================================
548    // Required methods - must be implemented by all types
549    // ========================================================================
550
551    /// Get the name of this hierarchy (without any hierarchy prefix)
552    fn basename(&self) -> String;
553
554    /// Get the parent hierarchies as actual objects (not strings)
555    /// Returns a vector of hierarchy references, ordered from root to immediate parent.
556    /// For example, an Endpoint would return [DRT, Namespace, Component].
557    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
558
559    /// Get a reference to this hierarchy's metrics registry
560    fn get_metrics_registry(&self) -> &MetricsRegistry;
561
562    // ========================================================================
563    // Provided methods - have default implementations
564    // ========================================================================
565
566    /// Access the metrics interface for this hierarchy
567    /// This is a provided method that works for any type implementing MetricsHierarchy
568    fn metrics(&self) -> Metrics<&Self>
569    where
570        Self: Sized,
571    {
572        Metrics::new(self)
573    }
574}
575
576// Blanket implementation for references to types that implement MetricsHierarchy
577impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
578    fn basename(&self) -> String {
579        (**self).basename()
580    }
581
582    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
583        (**self).parent_hierarchies()
584    }
585
586    fn get_metrics_registry(&self) -> &MetricsRegistry {
587        (**self).get_metrics_registry()
588    }
589}
590
591/// Type alias for runtime callback functions to reduce complexity
592///
593/// This type represents an Arc-wrapped callback function that can be:
594/// - Shared efficiently across multiple threads and contexts
595/// - Cloned without duplicating the underlying closure
596/// - Used in generic contexts requiring 'static lifetime
597///
598/// The Arc wrapper is included in the type to make sharing explicit.
599pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
600
601/// Type alias for exposition text callback functions that return Prometheus text
602pub type PrometheusExpositionFormatCallback =
603    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
604
605/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy.
606///
607/// All fields are Arc-wrapped, so cloning shares state. This ensures metrics registered
608/// on cloned instances (e.g., cloned Client/Endpoint) are visible to the original.
609#[derive(Clone)]
610pub struct MetricsRegistry {
611    /// The Prometheus registry for this hierarchy.
612    /// Arc-wrapped so clones share the same registry (metrics registered on clones are visible everywhere).
613    pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
614
615    /// Child registries included when emitting combined `/metrics` output.
616    ///
617    /// Why this exists:
618    /// - Previously, `create_metric()` registered every collector into *all* parent registries
619    ///   (Endpoint → Component → Namespace → DRT) so scraping the root registry included everything.
620    /// - That fan-out caused Prometheus collisions when different endpoints tried to register the
621    ///   same metric name with different const-labels (descriptor mismatch).
622    ///
623    /// We now register metrics only into the local hierarchy registry to avoid collisions.
624    /// `child_registries` rebuilds “what to scrape” as a tree of registries so `/metrics` can:
625    /// - traverse registries recursively,
626    /// - merge metric families into one exposition payload,
627    /// - warn/drop exact duplicate series, while allowing same metric name with different labels.
628    child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
629
630    /// Update callbacks invoked before metrics are scraped.
631    /// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned).
632    pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
633
634    /// Callbacks that return Prometheus exposition text appended to metrics output.
635    /// Wrapped in Arc to preserve callbacks across clones (e.g., vLLM callbacks registered at Endpoint remain accessible at DRT).
636    pub prometheus_expfmt_callbacks:
637        Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
638}
639
640impl std::fmt::Debug for MetricsRegistry {
641    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642        f.debug_struct("MetricsRegistry")
643            .field("prometheus_registry", &"<RwLock<Registry>>")
644            .field(
645                "prometheus_update_callbacks",
646                &format!(
647                    "<RwLock<Vec<Callback>>> with {} callbacks",
648                    self.prometheus_update_callbacks.read().unwrap().len()
649                ),
650            )
651            .field(
652                "prometheus_expfmt_callbacks",
653                &format!(
654                    "<RwLock<Vec<Callback>>> with {} callbacks",
655                    self.prometheus_expfmt_callbacks.read().unwrap().len()
656                ),
657            )
658            .finish()
659    }
660}
661
662impl MetricsRegistry {
663    /// Create a new metrics registry with an empty Prometheus registry and callback lists
664    pub fn new() -> Self {
665        Self {
666            prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
667            child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
668            prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
669            prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
670        }
671    }
672
673    /// Add a child registry to be included in combined /metrics output.
674    ///
675    /// Dedup is by underlying Prometheus registry pointer, so repeated registration via clones is safe.
676    pub fn add_child_registry(&self, child: &MetricsRegistry) {
677        let child_ptr = Arc::as_ptr(&child.prometheus_registry);
678        let mut guard = self.child_registries.write().unwrap();
679        if guard
680            .iter()
681            .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
682        {
683            return;
684        }
685        guard.push(child.clone());
686    }
687
688    fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
689        // Traverse child registries recursively so `prometheus_expfmt()` on any hierarchy
690        // (DRT/namespace/component/endpoint) includes metrics from its descendants.
691        //
692        // Dedup by underlying Prometheus registry pointer so multiple paths (e.g. also registering
693        // directly on the root) won't duplicate output.
694        fn visit(
695            registry: &MetricsRegistry,
696            out: &mut Vec<MetricsRegistry>,
697            seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
698        ) {
699            let ptr = Arc::as_ptr(&registry.prometheus_registry);
700            if !seen.insert(ptr) {
701                return;
702            }
703
704            out.push(registry.clone());
705
706            let children: Vec<MetricsRegistry> = registry
707                .child_registries
708                .read()
709                .unwrap()
710                .iter()
711                .cloned()
712                .collect();
713            for child in children {
714                visit(&child, out, seen);
715            }
716        }
717
718        let mut out = Vec::new();
719        let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
720        visit(self, &mut out, &mut seen);
721        out
722    }
723
724    /// Combine metrics across this registry and all registered children into one Prometheus exposition output.
725    ///
726    /// - Families are merged by name; HELP and TYPE must match.
727    /// - Multiple series for the same name are allowed if labels differ.
728    /// - Exact duplicate series (same name + identical label pairs) are warned and dropped.
729    pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
730        let registries = self.registries_for_combined_scrape();
731
732        // Run per-registry update callbacks first.
733        for registry in &registries {
734            for result in registry.execute_update_callbacks() {
735                if let Err(e) = result {
736                    tracing::error!("Error executing metrics callback: {}", e);
737                }
738            }
739        }
740
741        // Merge metric families.
742        let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
743        let mut seen_series: HashSet<String> = HashSet::new();
744
745        for (registry_idx, registry) in registries.iter().enumerate() {
746            let families = registry.get_prometheus_registry().gather();
747            for mut family in families {
748                let name = family.name().to_string();
749
750                let entry = by_name.entry(name.clone()).or_insert_with(|| {
751                    let mut out = prometheus::proto::MetricFamily::new();
752                    out.set_name(name.clone());
753                    out.set_help(family.help().to_string());
754                    out.set_field_type(family.get_field_type());
755                    out
756                });
757
758                if entry.help() != family.help()
759                    || entry.get_field_type() != family.get_field_type()
760                {
761                    return Err(anyhow::anyhow!(
762                        "Metric family '{}' has inconsistent help/type across registries (idx={})",
763                        name,
764                        registry_idx
765                    ));
766                }
767
768                let mut metrics = family.take_metric();
769                for metric in metrics.drain(..) {
770                    let mut labels: Vec<(String, String)> = metric
771                        .get_label()
772                        .iter()
773                        .map(|lp| (lp.name().to_string(), lp.value().to_string()))
774                        .collect();
775                    labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
776
777                    let key = format!(
778                        "{}|{}",
779                        name,
780                        labels
781                            .iter()
782                            .map(|(k, v)| format!("{}={}", k, v))
783                            .collect::<Vec<_>>()
784                            .join(",")
785                    );
786
787                    if !seen_series.insert(key) {
788                        tracing::warn!(
789                            metric_name = %name,
790                            labels = ?labels,
791                            registry_idx,
792                            "Duplicate Prometheus series while merging registries; dropping later sample"
793                        );
794                        continue;
795                    }
796
797                    entry.mut_metric().push(metric);
798                }
799            }
800        }
801
802        let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
803        merged.sort_by(|a, b| a.name().cmp(b.name()));
804
805        let encoder = prometheus::TextEncoder::new();
806        let mut buffer = Vec::new();
807        encoder.encode(&merged, &mut buffer)?;
808        let mut result = String::from_utf8(buffer)?;
809
810        // Append expfmt callbacks deterministically in registry order.
811        let mut expfmt = String::new();
812        for registry in registries {
813            let text = registry.execute_expfmt_callbacks();
814            if !text.is_empty() {
815                if !expfmt.is_empty() && !expfmt.ends_with('\n') {
816                    expfmt.push('\n');
817                }
818                expfmt.push_str(&text);
819            }
820        }
821
822        if !expfmt.is_empty() {
823            if !result.ends_with('\n') {
824                result.push('\n');
825            }
826            result.push_str(&expfmt);
827        }
828
829        Ok(result)
830    }
831
832    /// Add a callback function that receives a reference to any MetricsHierarchy
833    pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
834        self.prometheus_update_callbacks
835            .write()
836            .unwrap()
837            .push(callback);
838    }
839
840    /// Add an exposition text callback that returns Prometheus text
841    pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
842        self.prometheus_expfmt_callbacks
843            .write()
844            .unwrap()
845            .push(callback);
846    }
847
848    /// Execute all update callbacks and return their results
849    pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
850        self.prometheus_update_callbacks
851            .read()
852            .unwrap()
853            .iter()
854            .map(|callback| callback())
855            .collect()
856    }
857
858    /// Execute all exposition text callbacks and return their concatenated text
859    pub fn execute_expfmt_callbacks(&self) -> String {
860        let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
861        let mut result = String::new();
862        for callback in callbacks.iter() {
863            match callback() {
864                Ok(text) => {
865                    if !text.is_empty() {
866                        if !result.is_empty() && !result.ends_with('\n') {
867                            result.push('\n');
868                        }
869                        result.push_str(&text);
870                    }
871                }
872                Err(e) => {
873                    tracing::error!("Error executing exposition text callback: {}", e);
874                }
875            }
876        }
877        result
878    }
879
880    /// Add a Prometheus metric collector to this registry
881    pub fn add_metric(
882        &self,
883        collector: Box<dyn prometheus::core::Collector>,
884    ) -> anyhow::Result<()> {
885        self.prometheus_registry
886            .write()
887            .unwrap()
888            .register(collector)
889            .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
890    }
891
892    /// Get a read guard to the Prometheus registry for scraping
893    pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
894        self.prometheus_registry.read().unwrap()
895    }
896
897    /// Returns true if a metric with the given name already exists in the Prometheus registry
898    pub fn has_metric_named(&self, metric_name: &str) -> bool {
899        self.prometheus_registry
900            .read()
901            .unwrap()
902            .gather()
903            .iter()
904            .any(|mf| mf.name() == metric_name)
905    }
906}
907
908impl Default for MetricsRegistry {
909    fn default() -> Self {
910        Self::new()
911    }
912}
913
914#[cfg(test)]
915mod test_helpers {
916    use super::prometheus_names::name_prefix;
917    use super::*;
918
919    /// Base function to filter Prometheus output lines based on a predicate.
920    /// Returns lines that match the predicate, converted to String.
921    fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
922    where
923        F: FnMut(&str) -> bool,
924    {
925        input
926            .lines()
927            .filter(|line| predicate(line))
928            .map(|line| line.to_string())
929            .collect::<Vec<_>>()
930    }
931
932    /// Extracts all component metrics (excluding help text and type definitions).
933    /// Returns only the actual metric lines with values.
934    pub fn extract_metrics(input: &str) -> Vec<String> {
935        filter_prometheus_lines(input, |line| {
936            line.starts_with(&format!("{}_", name_prefix::COMPONENT))
937                && !line.starts_with("#")
938                && !line.trim().is_empty()
939        })
940    }
941
942    /// Parses a Prometheus metric line and extracts the name, labels, and value.
943    /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state.
944    ///
945    /// # Example
946    /// ```
947    /// let line = "http_requests_total{method=\"GET\"} 1234";
948    /// let (name, labels, value) = parse_prometheus_metric(line).unwrap();
949    /// assert_eq!(name, "http_requests_total");
950    /// assert_eq!(labels.get("method"), Some(&"GET".to_string()));
951    /// assert_eq!(value, 1234.0);
952    /// ```
953    pub fn parse_prometheus_metric(
954        line: &str,
955    ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
956        if line.trim().is_empty() || line.starts_with('#') {
957            return None;
958        }
959
960        let parts: Vec<&str> = line.split_whitespace().collect();
961        if parts.len() < 2 {
962            return None;
963        }
964
965        let metric_part = parts[0];
966        let value: f64 = parts[1].parse().ok()?;
967
968        let (name, labels) = if metric_part.contains('{') {
969            let brace_start = metric_part.find('{').unwrap();
970            let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
971            let name = &metric_part[..brace_start];
972            let labels_str = &metric_part[brace_start + 1..brace_end];
973
974            let mut labels = std::collections::HashMap::new();
975            for pair in labels_str.split(',') {
976                if let Some((k, v)) = pair.split_once('=') {
977                    let v = v.trim_matches('"');
978                    labels.insert(k.trim().to_string(), v.to_string());
979                }
980            }
981            (name.to_string(), labels)
982        } else {
983            (metric_part.to_string(), std::collections::HashMap::new())
984        };
985
986        Some((name, labels, value))
987    }
988}
989
990#[cfg(test)]
991mod test_metricsregistry_units {
992    use super::*;
993
994    #[test]
995    fn test_build_component_metric_name_with_prefix() {
996        // Test that build_component_metric_name correctly prepends the dynamo_component prefix
997        let result = build_component_metric_name("requests");
998        assert_eq!(result, "dynamo_component_requests");
999
1000        let result = build_component_metric_name("counter");
1001        assert_eq!(result, "dynamo_component_counter");
1002    }
1003
1004    #[test]
1005    fn test_parse_prometheus_metric() {
1006        use super::test_helpers::parse_prometheus_metric;
1007        use std::collections::HashMap;
1008
1009        // Test parsing a metric with labels
1010        let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
1011        let parsed = parse_prometheus_metric(line);
1012        assert!(parsed.is_some());
1013
1014        let (name, labels, value) = parsed.unwrap();
1015        assert_eq!(name, "http_requests_total");
1016
1017        let mut expected_labels = HashMap::new();
1018        expected_labels.insert("method".to_string(), "GET".to_string());
1019        expected_labels.insert("status".to_string(), "200".to_string());
1020        assert_eq!(labels, expected_labels);
1021
1022        assert_eq!(value, 1234.0);
1023
1024        // Test parsing a metric without labels
1025        let line = "cpu_usage 98.5";
1026        let parsed = parse_prometheus_metric(line);
1027        assert!(parsed.is_some());
1028
1029        let (name, labels, value) = parsed.unwrap();
1030        assert_eq!(name, "cpu_usage");
1031        assert!(labels.is_empty());
1032        assert_eq!(value, 98.5);
1033
1034        // Test parsing a metric with float value
1035        let line = "response_time{service=\"api\"} 0.123";
1036        let parsed = parse_prometheus_metric(line);
1037        assert!(parsed.is_some());
1038
1039        let (name, labels, value) = parsed.unwrap();
1040        assert_eq!(name, "response_time");
1041
1042        let mut expected_labels = HashMap::new();
1043        expected_labels.insert("service".to_string(), "api".to_string());
1044        assert_eq!(labels, expected_labels);
1045
1046        assert_eq!(value, 0.123);
1047
1048        // Test parsing invalid lines
1049        assert!(parse_prometheus_metric("").is_none()); // Empty line
1050        assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text
1051        assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition
1052        assert!(parse_prometheus_metric("metric_name").is_none()); // No value
1053
1054        println!("✓ Prometheus metric parsing works correctly!");
1055    }
1056
1057    #[test]
1058    fn test_metrics_registry_entry_callbacks() {
1059        use crate::MetricsRegistry;
1060        use std::sync::atomic::{AtomicUsize, Ordering};
1061
1062        // Test 1: Basic callback execution with counter increments
1063        {
1064            let registry = MetricsRegistry::new();
1065            let counter = Arc::new(AtomicUsize::new(0));
1066
1067            // Add callbacks with different increment values
1068            for increment in [1, 10, 100] {
1069                let counter_clone = counter.clone();
1070                registry.add_update_callback(Arc::new(move || {
1071                    counter_clone.fetch_add(increment, Ordering::SeqCst);
1072                    Ok(())
1073                }));
1074            }
1075
1076            // Verify counter starts at 0
1077            assert_eq!(counter.load(Ordering::SeqCst), 0);
1078
1079            // First execution
1080            let results = registry.execute_update_callbacks();
1081            assert_eq!(results.len(), 3);
1082            assert!(results.iter().all(|r| r.is_ok()));
1083            assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
1084
1085            // Second execution - callbacks should be reusable
1086            let results = registry.execute_update_callbacks();
1087            assert_eq!(results.len(), 3);
1088            assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
1089
1090            // Test cloning - cloned entry shares callbacks (callbacks are Arc-wrapped)
1091            let cloned = registry.clone();
1092            assert_eq!(cloned.execute_update_callbacks().len(), 3);
1093            assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
1094
1095            // Original still has callbacks and shares the same Arc
1096            registry.execute_update_callbacks();
1097            assert_eq!(counter.load(Ordering::SeqCst), 444); // 333 + 111
1098        }
1099
1100        // Test 2: Mixed success and error callbacks
1101        {
1102            let registry = MetricsRegistry::new();
1103            let counter = Arc::new(AtomicUsize::new(0));
1104
1105            // Successful callback
1106            let counter_clone = counter.clone();
1107            registry.add_update_callback(Arc::new(move || {
1108                counter_clone.fetch_add(1, Ordering::SeqCst);
1109                Ok(())
1110            }));
1111
1112            // Error callback
1113            registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1114
1115            // Another successful callback
1116            let counter_clone = counter.clone();
1117            registry.add_update_callback(Arc::new(move || {
1118                counter_clone.fetch_add(10, Ordering::SeqCst);
1119                Ok(())
1120            }));
1121
1122            // Execute and verify mixed results
1123            let results = registry.execute_update_callbacks();
1124            assert_eq!(results.len(), 3);
1125            assert!(results[0].is_ok());
1126            assert!(results[1].is_err());
1127            assert!(results[2].is_ok());
1128
1129            // Verify error message
1130            assert_eq!(
1131                results[1].as_ref().unwrap_err().to_string(),
1132                "Simulated error"
1133            );
1134
1135            // Verify successful callbacks still executed
1136            assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
1137
1138            // Execute again - errors should be consistent
1139            let results = registry.execute_update_callbacks();
1140            assert!(results[1].is_err());
1141            assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
1142        }
1143
1144        // Test 3: Empty registry
1145        {
1146            let registry = MetricsRegistry::new();
1147            let results = registry.execute_update_callbacks();
1148            assert_eq!(results.len(), 0);
1149        }
1150    }
1151}
1152
1153#[cfg(feature = "integration")]
1154#[cfg(test)]
1155mod test_metricsregistry_prefixes {
1156    use super::*;
1157    use crate::distributed::distributed_test_utils::create_test_drt_async;
1158    use prometheus::core::Collector;
1159
1160    #[tokio::test]
1161    async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1162        let drt = create_test_drt_async().await;
1163
1164        const DRT_NAME: &str = "";
1165        const NAMESPACE_NAME: &str = "ns901";
1166        const COMPONENT_NAME: &str = "comp901";
1167        const ENDPOINT_NAME: &str = "ep901";
1168        let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1169        let component = namespace.component(COMPONENT_NAME).unwrap();
1170        let endpoint = component.endpoint(ENDPOINT_NAME);
1171
1172        // DRT
1173        assert_eq!(drt.basename(), DRT_NAME);
1174        assert_eq!(drt.parent_hierarchies().len(), 0);
1175        // DRT hierarchy is just its basename (empty string)
1176
1177        // Namespace
1178        assert_eq!(namespace.basename(), NAMESPACE_NAME);
1179        assert_eq!(namespace.parent_hierarchies().len(), 1);
1180        assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1181        // Namespace hierarchy is just its basename since parent is empty
1182
1183        // Component
1184        assert_eq!(component.basename(), COMPONENT_NAME);
1185        assert_eq!(component.parent_hierarchies().len(), 2);
1186        assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1187        assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1188        // Component hierarchy structure is validated by the individual assertions above
1189
1190        // Endpoint
1191        assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1192        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1193        assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1194        assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1195        assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1196        // Endpoint hierarchy structure is validated by the individual assertions above
1197
1198        // Relationships
1199        assert!(
1200            namespace
1201                .parent_hierarchies()
1202                .iter()
1203                .any(|h| h.basename() == drt.basename())
1204        );
1205        assert!(
1206            component
1207                .parent_hierarchies()
1208                .iter()
1209                .any(|h| h.basename() == namespace.basename())
1210        );
1211        assert!(
1212            endpoint
1213                .parent_hierarchies()
1214                .iter()
1215                .any(|h| h.basename() == component.basename())
1216        );
1217
1218        // Depth
1219        assert_eq!(drt.parent_hierarchies().len(), 0);
1220        assert_eq!(namespace.parent_hierarchies().len(), 1);
1221        assert_eq!(component.parent_hierarchies().len(), 2);
1222        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1223
1224        // Invalid namespace behavior - sanitizes to "_123" and succeeds
1225        // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
1226        // so invalid characters are sanitized in MetricsRegistry rather than rejected.
1227        let invalid_namespace = drt.namespace("@@123").unwrap();
1228        let result =
1229            invalid_namespace
1230                .metrics()
1231                .create_counter("test_counter", "A test counter", &[]);
1232        assert!(result.is_ok());
1233        if let Ok(counter) = &result {
1234            // Verify the namespace was sanitized to "_123" in the label
1235            let desc = counter.desc();
1236            let namespace_label = desc[0]
1237                .const_label_pairs
1238                .iter()
1239                .find(|l| l.name() == "dynamo_namespace")
1240                .expect("Should have dynamo_namespace label");
1241            assert_eq!(namespace_label.value(), "_123");
1242        }
1243
1244        // Valid namespace works
1245        let valid_namespace = drt.namespace("ns567").unwrap();
1246        assert!(
1247            valid_namespace
1248                .metrics()
1249                .create_counter("test_counter", "A test counter", &[])
1250                .is_ok()
1251        );
1252    }
1253
1254    #[tokio::test]
1255    async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
1256        // Sanity test: if an expfmt callback is registered only on the endpoint registry,
1257        // scraping from the root (DRT) should still include it exactly once via the
1258        // child-registry traversal.
1259        let drt = create_test_drt_async().await;
1260        let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
1261        let component = namespace.component("comp_expfmt_ep_only").unwrap();
1262        let endpoint = component.endpoint("ep_expfmt_ep_only");
1263
1264        let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
1265        let callback: PrometheusExpositionFormatCallback =
1266            Arc::new(move || Ok(metric_line.to_string()));
1267
1268        endpoint
1269            .get_metrics_registry()
1270            .add_expfmt_callback(callback);
1271
1272        let output = drt.metrics().prometheus_expfmt().unwrap();
1273        let occurrences = output
1274            .lines()
1275            .filter(|line| line == &metric_line.trim_end_matches('\n'))
1276            .count();
1277
1278        assert_eq!(
1279            occurrences, 1,
1280            "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
1281            occurrences, output
1282        );
1283    }
1284
1285    #[tokio::test]
1286    async fn test_recursive_namespace() {
1287        // Create a distributed runtime for testing
1288        let drt = create_test_drt_async().await;
1289
1290        // Create a deeply chained namespace: ns1.ns2.ns3
1291        let ns1 = drt.namespace("ns1").unwrap();
1292        let ns2 = ns1.namespace("ns2").unwrap();
1293        let ns3 = ns2.namespace("ns3").unwrap();
1294
1295        // Create a component in the deepest namespace
1296        let component = ns3.component("test-component").unwrap();
1297
1298        // Verify the hierarchy structure
1299        assert_eq!(ns1.basename(), "ns1");
1300        assert_eq!(ns1.parent_hierarchies().len(), 1);
1301        assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1302        // ns1 hierarchy is just its basename since parent is empty
1303
1304        assert_eq!(ns2.basename(), "ns2");
1305        assert_eq!(ns2.parent_hierarchies().len(), 2);
1306        assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1307        assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1308        // ns2 hierarchy structure validated by parent assertions above
1309
1310        assert_eq!(ns3.basename(), "ns3");
1311        assert_eq!(ns3.parent_hierarchies().len(), 3);
1312        assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1313        assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1314        assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1315        // ns3 hierarchy structure validated by parent assertions above
1316
1317        assert_eq!(component.basename(), "test-component");
1318        assert_eq!(component.parent_hierarchies().len(), 4);
1319        assert_eq!(component.parent_hierarchies()[0].basename(), "");
1320        assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1321        assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1322        assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1323        // component hierarchy structure validated by parent assertions above
1324
1325        println!("✓ Chained namespace test passed - all prefixes correct");
1326    }
1327}
1328
1329#[cfg(feature = "integration")]
1330#[cfg(test)]
1331mod test_metricsregistry_prometheus_fmt_outputs {
1332    use super::prometheus_names::name_prefix;
1333    use super::*;
1334    use crate::distributed::distributed_test_utils::create_test_drt_async;
1335    use prometheus::Counter;
1336    use std::sync::Arc;
1337
1338    #[tokio::test]
1339    async fn test_prometheusfactory_using_metrics_registry_trait() {
1340        // Setup real DRT and registry using the test-friendly constructor
1341        let drt = create_test_drt_async().await;
1342
1343        // Use a simple constant namespace name
1344        let namespace_name = "ns345";
1345
1346        let namespace = drt.namespace(namespace_name).unwrap();
1347        let component = namespace.component("comp345").unwrap();
1348        let endpoint = component.endpoint("ep345");
1349
1350        // Test Counter creation
1351        let counter = endpoint
1352            .metrics()
1353            .create_counter("testcounter", "A test counter", &[])
1354            .unwrap();
1355        counter.inc_by(123.456789);
1356        let epsilon = 0.01;
1357        assert!((counter.get() - 123.456789).abs() < epsilon);
1358
1359        let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1360        println!("Endpoint output:");
1361        println!("{}", endpoint_output_raw);
1362
1363        let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1364# TYPE dynamo_component_testcounter counter
1365dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1366
1367        assert_eq!(
1368            endpoint_output_raw.trim_end_matches('\n'),
1369            expected_endpoint_output.trim_end_matches('\n'),
1370            "\n=== ENDPOINT COMPARISON FAILED ===\n\
1371             Actual:\n{}\n\
1372             Expected:\n{}\n\
1373             ==============================",
1374            endpoint_output_raw,
1375            expected_endpoint_output
1376        );
1377
1378        // Test Gauge creation
1379        let gauge = component
1380            .metrics()
1381            .create_gauge("testgauge", "A test gauge", &[])
1382            .unwrap();
1383        gauge.set(50000.0);
1384        assert_eq!(gauge.get(), 50000.0);
1385
1386        // Test Prometheus format output for Component (gauge + histogram)
1387        let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1388        println!("Component output:");
1389        println!("{}", component_output_raw);
1390
1391        let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1392# TYPE dynamo_component_testcounter counter
1393dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1394# HELP dynamo_component_testgauge A test gauge
1395# TYPE dynamo_component_testgauge gauge
1396dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1397
1398        assert_eq!(
1399            component_output_raw.trim_end_matches('\n'),
1400            expected_component_output.trim_end_matches('\n'),
1401            "\n=== COMPONENT COMPARISON FAILED ===\n\
1402             Actual:\n{}\n\
1403             Expected:\n{}\n\
1404             ==============================",
1405            component_output_raw,
1406            expected_component_output
1407        );
1408
1409        let intcounter = namespace
1410            .metrics()
1411            .create_intcounter("testintcounter", "A test int counter", &[])
1412            .unwrap();
1413        intcounter.inc_by(12345);
1414        assert_eq!(intcounter.get(), 12345);
1415
1416        // Test Prometheus format output for Namespace (int_counter + gauge + histogram)
1417        let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1418        println!("Namespace output:");
1419        println!("{}", namespace_output_raw);
1420
1421        let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1422# TYPE dynamo_component_testcounter counter
1423dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1424# HELP dynamo_component_testgauge A test gauge
1425# TYPE dynamo_component_testgauge gauge
1426dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1427# HELP dynamo_component_testintcounter A test int counter
1428# TYPE dynamo_component_testintcounter counter
1429dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1430
1431        assert_eq!(
1432            namespace_output_raw.trim_end_matches('\n'),
1433            expected_namespace_output.trim_end_matches('\n'),
1434            "\n=== NAMESPACE COMPARISON FAILED ===\n\
1435             Actual:\n{}\n\
1436             Expected:\n{}\n\
1437             ==============================",
1438            namespace_output_raw,
1439            expected_namespace_output
1440        );
1441
1442        // Test IntGauge creation
1443        let intgauge = namespace
1444            .metrics()
1445            .create_intgauge("testintgauge", "A test int gauge", &[])
1446            .unwrap();
1447        intgauge.set(42);
1448        assert_eq!(intgauge.get(), 42);
1449
1450        // Test IntGaugeVec creation
1451        let intgaugevec = namespace
1452            .metrics()
1453            .create_intgaugevec(
1454                "testintgaugevec",
1455                "A test int gauge vector",
1456                &["instance", "status"],
1457                &[("service", "api")],
1458            )
1459            .unwrap();
1460        intgaugevec
1461            .with_label_values(&["server1", "active"])
1462            .set(10);
1463        intgaugevec
1464            .with_label_values(&["server2", "inactive"])
1465            .set(0);
1466
1467        // Test CounterVec creation
1468        let countervec = endpoint
1469            .metrics()
1470            .create_countervec(
1471                "testcountervec",
1472                "A test counter vector",
1473                &["method", "status"],
1474                &[("service", "api")],
1475            )
1476            .unwrap();
1477        countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1478        countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1479
1480        // Test Histogram creation
1481        let histogram = component
1482            .metrics()
1483            .create_histogram("testhistogram", "A test histogram", &[], None)
1484            .unwrap();
1485        histogram.observe(1.0);
1486        histogram.observe(2.5);
1487        histogram.observe(4.0);
1488
1489        // Test Prometheus format output for DRT (all metrics combined)
1490        let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1491        println!("DRT output:");
1492        println!("{}", drt_output_raw);
1493
1494        let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1495# TYPE dynamo_component_testcounter counter
1496dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1497# HELP dynamo_component_testcountervec A test counter vector
1498# TYPE dynamo_component_testcountervec counter
1499dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1500dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1501# HELP dynamo_component_testgauge A test gauge
1502# TYPE dynamo_component_testgauge gauge
1503dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1504# HELP dynamo_component_testhistogram A test histogram
1505# TYPE dynamo_component_testhistogram histogram
1506dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1507dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1508dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1509dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1510dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1511dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1512dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1513dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1514dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1515dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1516dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1517dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1518dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1519dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1520# HELP dynamo_component_testintcounter A test int counter
1521# TYPE dynamo_component_testintcounter counter
1522dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1523# HELP dynamo_component_testintgauge A test int gauge
1524# TYPE dynamo_component_testintgauge gauge
1525dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1526# HELP dynamo_component_testintgaugevec A test int gauge vector
1527# TYPE dynamo_component_testintgaugevec gauge
1528dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1529dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1530# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1531# TYPE dynamo_component_uptime_seconds gauge
1532dynamo_component_uptime_seconds 0"#.to_string();
1533
1534        assert_eq!(
1535            drt_output_raw.trim_end_matches('\n'),
1536            expected_drt_output.trim_end_matches('\n'),
1537            "\n=== DRT COMPARISON FAILED ===\n\
1538             Expected:\n{}\n\
1539             Actual (filtered):\n{}\n\
1540             ==============================",
1541            expected_drt_output,
1542            drt_output_raw
1543        );
1544
1545        println!("✓ All Prometheus format outputs verified successfully!");
1546    }
1547
1548    #[test]
1549    fn test_refactored_filter_functions() {
1550        // Test data with component metrics
1551        let test_input = r#"# HELP dynamo_component_requests Total requests
1552# TYPE dynamo_component_requests counter
1553dynamo_component_requests 42
1554# HELP dynamo_component_latency Response latency
1555# TYPE dynamo_component_latency histogram
1556dynamo_component_latency_bucket{le="0.1"} 10
1557dynamo_component_latency_bucket{le="0.5"} 25
1558dynamo_component_errors_total 5"#;
1559
1560        // Test extract_metrics (only actual metric lines, excluding help/type)
1561        let metrics_only = super::test_helpers::extract_metrics(test_input);
1562        assert_eq!(metrics_only.len(), 4); // 4 actual metric lines (excluding help/type)
1563        assert!(
1564            metrics_only
1565                .iter()
1566                .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1567        );
1568
1569        println!("✓ All refactored filter functions work correctly!");
1570    }
1571
1572    #[tokio::test]
1573    async fn test_same_metric_name_different_endpoints() {
1574        // Test that the same metric name can exist in different endpoints without collision.
1575        // This validates the multi-registry approach: each endpoint has its own registry,
1576        // and metrics are merged at scrape time with distinct labels.
1577        let drt = create_test_drt_async().await;
1578        let namespace = drt.namespace("ns_test").unwrap();
1579        let component = namespace.component("comp_test").unwrap();
1580
1581        // Create two endpoints with the same metric name
1582        let ep1 = component.endpoint("ep1");
1583        let ep2 = component.endpoint("ep2");
1584
1585        let counter1 = ep1
1586            .metrics()
1587            .create_counter("requests_total", "Total requests", &[])
1588            .unwrap();
1589        counter1.inc_by(100.0);
1590
1591        let counter2 = ep2
1592            .metrics()
1593            .create_counter("requests_total", "Total requests", &[])
1594            .unwrap();
1595        counter2.inc_by(200.0);
1596
1597        // Get merged Prometheus output from component level
1598        let output = component.metrics().prometheus_expfmt().unwrap();
1599
1600        let expected_output = r#"# HELP dynamo_component_requests_total Total requests
1601# TYPE dynamo_component_requests_total counter
1602dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
1603dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#;
1604
1605        assert_eq!(
1606            output.trim_end_matches('\n'),
1607            expected_output.trim_end_matches('\n'),
1608            "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
1609             Actual:\n{}\n\
1610             Expected:\n{}\n\
1611             ==============================",
1612            output,
1613            expected_output
1614        );
1615
1616        println!("✓ Multi-registry prevents Prometheus collisions!");
1617    }
1618
1619    #[tokio::test]
1620    async fn test_duplicate_series_warning() {
1621        // Test that duplicate series (same metric name + same labels) are detected and deduplicated.
1622        // This should log a warning and keep only one of the duplicate series.
1623        let drt = create_test_drt_async().await;
1624        let namespace = drt.namespace("ns_dup").unwrap();
1625        let component = namespace.component("comp_dup").unwrap();
1626
1627        // Create two endpoints with counters that will have identical labels when scraped
1628        let ep1 = component.endpoint("ep_same");
1629        let ep2 = component.endpoint("ep_same"); // Same endpoint name = duplicate labels
1630
1631        let counter1 = ep1
1632            .metrics()
1633            .create_counter("dup_metric", "Duplicate metric test", &[])
1634            .unwrap();
1635        counter1.inc_by(50.0);
1636
1637        let counter2 = ep2
1638            .metrics()
1639            .create_counter("dup_metric", "Duplicate metric test", &[])
1640            .unwrap();
1641        counter2.inc_by(75.0);
1642
1643        // Get merged output - duplicates should be deduplicated
1644        let output = component.metrics().prometheus_expfmt().unwrap();
1645
1646        let expected_output = r#"# HELP dynamo_component_dup_metric Duplicate metric test
1647# TYPE dynamo_component_dup_metric counter
1648dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#;
1649
1650        assert_eq!(
1651            output.trim_end_matches('\n'),
1652            expected_output.trim_end_matches('\n'),
1653            "\n=== DEDUPLICATION COMPARISON FAILED ===\n\
1654             Actual:\n{}\n\
1655             Expected:\n{}\n\
1656             ==============================",
1657            output,
1658            expected_output
1659        );
1660
1661        println!("✓ Duplicate series detection and deduplication works!");
1662    }
1663}