Skip to main content

dynamo_runtime/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics registry trait and implementation for Prometheus metrics
5//!
6//! This module provides a trait-based interface for creating and managing Prometheus metrics
7//! with automatic label injection and hierarchical naming support.
8
9pub mod frontend_perf;
10pub mod prometheus_names;
11pub mod request_plane;
12pub mod tokio_perf;
13pub mod transport_metrics;
14pub mod work_handler_perf;
15pub mod work_handler_pool;
16
17use parking_lot::Mutex;
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use crate::component::ComponentBuilder;
22use anyhow;
23use once_cell::sync::Lazy;
24use regex::Regex;
25use std::any::Any;
26use std::collections::HashMap;
27
28// Import commonly used items to avoid verbose prefixes
29use prometheus_names::{
30    build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
31    sanitize_prometheus_name, work_handler,
32};
33
34// Pipeline imports for endpoint creation
35use crate::pipeline::{
36    AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
37    network::Ingress,
38};
39use crate::protocols::annotated::Annotated;
40use crate::stream;
41use crate::stream::StreamExt;
42
43// Prometheus imports
44use prometheus::Encoder;
45
46/// Validate that a label slice has no duplicate keys.
47/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
48fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
49    let mut seen_keys = std::collections::HashSet::new();
50    for (key, _) in labels {
51        if !seen_keys.insert(*key) {
52            return Err(anyhow::anyhow!(
53                "Duplicate label key '{}' found in labels",
54                key
55            ));
56        }
57    }
58    Ok(())
59}
60
61/// ==============================
62/// Prometheus section
63/// ==============================
64/// Trait that defines common behavior for Prometheus metric types
65pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
66    /// Create a new metric with the given options
67    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
68    where
69        Self: Sized;
70
71    /// Create a new metric with histogram options and custom buckets
72    /// This is a default implementation that will panic for non-histogram metrics
73    fn with_histogram_opts_and_buckets(
74        _opts: prometheus::HistogramOpts,
75        _buckets: Option<Vec<f64>>,
76    ) -> Result<Self, prometheus::Error>
77    where
78        Self: Sized,
79    {
80        panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
81    }
82
83    /// Create a new metric with counter options and label names (for CounterVec)
84    /// This is a default implementation that will panic for non-countervec metrics
85    fn with_opts_and_label_names(
86        _opts: prometheus::Opts,
87        _label_names: &[&str],
88    ) -> Result<Self, prometheus::Error>
89    where
90        Self: Sized,
91    {
92        panic!("with_opts_and_label_names is not implemented for this metric type");
93    }
94}
95
96// Implement the trait for Counter, IntCounter, and Gauge
97impl PrometheusMetric for prometheus::Counter {
98    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
99        prometheus::Counter::with_opts(opts)
100    }
101}
102
103impl PrometheusMetric for prometheus::IntCounter {
104    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
105        prometheus::IntCounter::with_opts(opts)
106    }
107}
108
109impl PrometheusMetric for prometheus::Gauge {
110    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
111        prometheus::Gauge::with_opts(opts)
112    }
113}
114
115impl PrometheusMetric for prometheus::IntGauge {
116    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
117        prometheus::IntGauge::with_opts(opts)
118    }
119}
120
121impl PrometheusMetric for prometheus::GaugeVec {
122    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
123        Err(prometheus::Error::Msg(
124            "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
125        ))
126    }
127
128    fn with_opts_and_label_names(
129        opts: prometheus::Opts,
130        label_names: &[&str],
131    ) -> Result<Self, prometheus::Error> {
132        prometheus::GaugeVec::new(opts, label_names)
133    }
134}
135
136impl PrometheusMetric for prometheus::IntGaugeVec {
137    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
138        Err(prometheus::Error::Msg(
139            "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
140        ))
141    }
142
143    fn with_opts_and_label_names(
144        opts: prometheus::Opts,
145        label_names: &[&str],
146    ) -> Result<Self, prometheus::Error> {
147        prometheus::IntGaugeVec::new(opts, label_names)
148    }
149}
150
151impl PrometheusMetric for prometheus::IntCounterVec {
152    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
153        Err(prometheus::Error::Msg(
154            "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
155        ))
156    }
157
158    fn with_opts_and_label_names(
159        opts: prometheus::Opts,
160        label_names: &[&str],
161    ) -> Result<Self, prometheus::Error> {
162        prometheus::IntCounterVec::new(opts, label_names)
163    }
164}
165
166// Implement the trait for Histogram
167impl PrometheusMetric for prometheus::Histogram {
168    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
169        // Convert Opts to HistogramOpts
170        let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
171        prometheus::Histogram::with_opts(histogram_opts)
172    }
173
174    fn with_histogram_opts_and_buckets(
175        mut opts: prometheus::HistogramOpts,
176        buckets: Option<Vec<f64>>,
177    ) -> Result<Self, prometheus::Error> {
178        if let Some(custom_buckets) = buckets {
179            opts = opts.buckets(custom_buckets);
180        }
181        prometheus::Histogram::with_opts(opts)
182    }
183}
184
185// Implement the trait for CounterVec
186impl PrometheusMetric for prometheus::CounterVec {
187    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
188        // This will panic - CounterVec needs label names
189        panic!("CounterVec requires label names, use with_opts_and_label_names instead");
190    }
191
192    fn with_opts_and_label_names(
193        opts: prometheus::Opts,
194        label_names: &[&str],
195    ) -> Result<Self, prometheus::Error> {
196        prometheus::CounterVec::new(opts, label_names)
197    }
198}
199
200/// ==============================
201/// Metrics section
202/// ==============================
203/// Public helper function to create metrics - accessible for Python bindings
204pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
205    hierarchy: &H,
206    metric_name: &str,
207    metric_desc: &str,
208    labels: &[(&str, &str)],
209    buckets: Option<Vec<f64>>,
210    const_labels: Option<&[&str]>,
211) -> anyhow::Result<T> {
212    // Validate that user-provided labels don't have duplicate keys
213    validate_no_duplicate_label_keys(labels)?;
214    // Note: stored labels functionality has been removed
215
216    let basename = hierarchy.basename();
217    let parent_hierarchies = hierarchy.parent_hierarchies();
218
219    // Build hierarchy path as vector of strings: parent names + [basename]
220    let mut hierarchy_names: Vec<String> =
221        parent_hierarchies.iter().map(|p| p.basename()).collect();
222    hierarchy_names.push(basename.clone());
223
224    let metric_name = build_component_metric_name(metric_name);
225
226    // Build updated_labels: auto-labels first, then `labels` + stored labels
227    let mut updated_labels: Vec<(String, String)> = Vec::new();
228
229    // Auto-label injection: Always add dynamo_namespace, dynamo_component, dynamo_endpoint labels
230    // based on the hierarchy. Label constants defined in prometheus_names.rs labels module.
231    //
232    // Python counterpart: components/src/dynamo/common/utils/prometheus.py register_engine_metrics_callback()
233
234    // Validate that user-provided labels don't conflict with auto-generated labels
235    for (key, _) in labels {
236        if *key == labels::NAMESPACE
237            || *key == labels::COMPONENT
238            || *key == labels::ENDPOINT
239            || *key == labels::WORKER_ID
240        {
241            return Err(anyhow::anyhow!(
242                "Label '{}' is automatically added by auto-label injection and cannot be manually set",
243                key
244            ));
245        }
246    }
247
248    // Also validate that vector label names (const_labels) don't collide with auto-injected
249    // const labels. A variable label named "worker_id" would conflict with the auto-injected
250    // worker_id const label, causing a prometheus registration error or ambiguous output.
251    if let Some(label_names) = const_labels {
252        for name in label_names.iter() {
253            if *name == labels::NAMESPACE
254                || *name == labels::COMPONENT
255                || *name == labels::ENDPOINT
256                || *name == labels::WORKER_ID
257            {
258                return Err(anyhow::anyhow!(
259                    "Variable label name '{}' conflicts with auto-injected const label and cannot be used",
260                    name
261                ));
262            }
263        }
264    }
265
266    // Add auto-generated labels with sanitized values
267    // Hierarchy: [drt, namespace, component, endpoint]
268    if hierarchy_names.len() > 1 {
269        let namespace = &hierarchy_names[1];
270        if !namespace.is_empty() {
271            let valid_namespace = sanitize_prometheus_label(namespace)?;
272            if !valid_namespace.is_empty() {
273                updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
274            }
275        }
276    }
277    if hierarchy_names.len() > 2 {
278        let component = &hierarchy_names[2];
279        if !component.is_empty() {
280            let valid_component = sanitize_prometheus_label(component)?;
281            if !valid_component.is_empty() {
282                updated_labels.push((labels::COMPONENT.to_string(), valid_component));
283            }
284        }
285    }
286    if hierarchy_names.len() > 3 {
287        let endpoint = &hierarchy_names[3];
288        if !endpoint.is_empty() {
289            let valid_endpoint = sanitize_prometheus_label(endpoint)?;
290            if !valid_endpoint.is_empty() {
291                updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
292            }
293        }
294    }
295
296    // Auto-inject worker_id label from the hierarchy's connection_id (discovery instance ID).
297    // This provides a stable per-worker identity label so metrics from different workers
298    // serving the same endpoint can be distinguished without relying on Kubernetes labels.
299    if let Some(conn_id) = hierarchy.connection_id() {
300        updated_labels.push((labels::WORKER_ID.to_string(), format!("{:x}", conn_id)));
301    }
302
303    // Add user labels
304    updated_labels.extend(
305        labels
306            .iter()
307            .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
308    );
309    // Note: stored labels functionality has been removed
310
311    // Handle different metric types
312    let prometheus_metric = if std::any::TypeId::of::<T>()
313        == std::any::TypeId::of::<prometheus::CounterVec>()
314    {
315        // Special handling for CounterVec with label names
316        // const_labels parameter is required for CounterVec
317        if buckets.is_some() {
318            return Err(anyhow::anyhow!(
319                "buckets parameter is not valid for CounterVec"
320            ));
321        }
322        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
323        for (key, value) in &updated_labels {
324            opts = opts.const_label(key.clone(), value.clone());
325        }
326        let label_names = const_labels
327            .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
328        T::with_opts_and_label_names(opts, label_names)?
329    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
330        // Special handling for GaugeVec with label names
331        // const_labels parameter is required for GaugeVec
332        if buckets.is_some() {
333            return Err(anyhow::anyhow!(
334                "buckets parameter is not valid for GaugeVec"
335            ));
336        }
337        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
338        for (key, value) in &updated_labels {
339            opts = opts.const_label(key.clone(), value.clone());
340        }
341        let label_names = const_labels
342            .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
343        T::with_opts_and_label_names(opts, label_names)?
344    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
345        // Special handling for Histogram with custom buckets
346        // buckets parameter is valid for Histogram, const_labels is not used
347        if const_labels.is_some() {
348            return Err(anyhow::anyhow!(
349                "const_labels parameter is not valid for Histogram"
350            ));
351        }
352        let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
353        for (key, value) in &updated_labels {
354            opts = opts.const_label(key.clone(), value.clone());
355        }
356        T::with_histogram_opts_and_buckets(opts, buckets)?
357    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
358        // Special handling for IntCounterVec with label names
359        // const_labels parameter is required for IntCounterVec
360        if buckets.is_some() {
361            return Err(anyhow::anyhow!(
362                "buckets parameter is not valid for IntCounterVec"
363            ));
364        }
365        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366        for (key, value) in &updated_labels {
367            opts = opts.const_label(key.clone(), value.clone());
368        }
369        let label_names = const_labels
370            .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
371        T::with_opts_and_label_names(opts, label_names)?
372    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
373        // Special handling for IntGaugeVec with label names
374        // const_labels parameter is required for IntGaugeVec
375        if buckets.is_some() {
376            return Err(anyhow::anyhow!(
377                "buckets parameter is not valid for IntGaugeVec"
378            ));
379        }
380        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
381        for (key, value) in &updated_labels {
382            opts = opts.const_label(key.clone(), value.clone());
383        }
384        let label_names = const_labels
385            .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
386        T::with_opts_and_label_names(opts, label_names)?
387    } else {
388        // Standard handling for Counter, IntCounter, Gauge, IntGauge
389        // buckets and const_labels parameters are not valid for these types
390        if buckets.is_some() {
391            return Err(anyhow::anyhow!(
392                "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
393            ));
394        }
395        if const_labels.is_some() {
396            return Err(anyhow::anyhow!(
397                "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
398            ));
399        }
400        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
401        for (key, value) in &updated_labels {
402            opts = opts.const_label(key.clone(), value.clone());
403        }
404        T::with_opts(opts)?
405    };
406
407    let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
408    hierarchy.get_metrics_registry().add_metric(collector)?;
409
410    Ok(prometheus_metric)
411}
412
413/// Wrapper struct that provides access to metrics functionality
414/// This struct is accessed via the `.metrics()` method on DistributedRuntime, Namespace, Component, and Endpoint
415pub struct Metrics<H: MetricsHierarchy> {
416    hierarchy: H,
417}
418
419impl<H: MetricsHierarchy> Metrics<H> {
420    pub fn new(hierarchy: H) -> Self {
421        Self { hierarchy }
422    }
423
424    // TODO: Add support for additional Prometheus metric types:
425    // - Counter: ✅ IMPLEMENTED - create_counter()
426    // - CounterVec: ✅ IMPLEMENTED - create_countervec()
427    // - Gauge: ✅ IMPLEMENTED - create_gauge()
428    // - GaugeVec: ✅ IMPLEMENTED - create_gaugevec()
429    // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
430    // - Histogram: ✅ IMPLEMENTED - create_histogram()
431    // - HistogramVec with custom buckets: create_histogram_with_buckets()
432    // - Info: create_info() - for info metrics with labels
433    // - IntCounter: ✅ IMPLEMENTED - create_intcounter()
434    // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
435    // - IntGauge: ✅ IMPLEMENTED - create_intgauge()
436    // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
437    // - Stateset: create_stateset() - for state-based metrics
438    // - Summary: create_summary() - for quantiles and sum/count metrics
439    // - SummaryVec: create_summary_vec() - for labeled summaries
440    // - Untyped: create_untyped() - for untyped metrics
441    //
442    // NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics
443    // Keep them synchronized when adding new metric types
444
445    /// Create a Counter metric
446    pub fn create_counter(
447        &self,
448        name: &str,
449        description: &str,
450        labels: &[(&str, &str)],
451    ) -> anyhow::Result<prometheus::Counter> {
452        create_metric(&self.hierarchy, name, description, labels, None, None)
453    }
454
455    /// Create a CounterVec metric with label names (for dynamic labels)
456    pub fn create_countervec(
457        &self,
458        name: &str,
459        description: &str,
460        const_labels: &[&str],
461        const_label_values: &[(&str, &str)],
462    ) -> anyhow::Result<prometheus::CounterVec> {
463        create_metric(
464            &self.hierarchy,
465            name,
466            description,
467            const_label_values,
468            None,
469            Some(const_labels),
470        )
471    }
472
473    /// Create a Gauge metric
474    pub fn create_gauge(
475        &self,
476        name: &str,
477        description: &str,
478        labels: &[(&str, &str)],
479    ) -> anyhow::Result<prometheus::Gauge> {
480        create_metric(&self.hierarchy, name, description, labels, None, None)
481    }
482
483    /// Create a GaugeVec metric with label names (for dynamic labels)
484    pub fn create_gaugevec(
485        &self,
486        name: &str,
487        description: &str,
488        const_labels: &[&str],
489        const_label_values: &[(&str, &str)],
490    ) -> anyhow::Result<prometheus::GaugeVec> {
491        create_metric(
492            &self.hierarchy,
493            name,
494            description,
495            const_label_values,
496            None,
497            Some(const_labels),
498        )
499    }
500
501    /// Create a Histogram metric with custom buckets
502    pub fn create_histogram(
503        &self,
504        name: &str,
505        description: &str,
506        labels: &[(&str, &str)],
507        buckets: Option<Vec<f64>>,
508    ) -> anyhow::Result<prometheus::Histogram> {
509        create_metric(&self.hierarchy, name, description, labels, buckets, None)
510    }
511
512    /// Create an IntCounter metric
513    pub fn create_intcounter(
514        &self,
515        name: &str,
516        description: &str,
517        labels: &[(&str, &str)],
518    ) -> anyhow::Result<prometheus::IntCounter> {
519        create_metric(&self.hierarchy, name, description, labels, None, None)
520    }
521
522    /// Create an IntCounterVec metric with label names (for dynamic labels)
523    pub fn create_intcountervec(
524        &self,
525        name: &str,
526        description: &str,
527        const_labels: &[&str],
528        const_label_values: &[(&str, &str)],
529    ) -> anyhow::Result<prometheus::IntCounterVec> {
530        create_metric(
531            &self.hierarchy,
532            name,
533            description,
534            const_label_values,
535            None,
536            Some(const_labels),
537        )
538    }
539
540    /// Create an IntGauge metric
541    pub fn create_intgauge(
542        &self,
543        name: &str,
544        description: &str,
545        labels: &[(&str, &str)],
546    ) -> anyhow::Result<prometheus::IntGauge> {
547        create_metric(&self.hierarchy, name, description, labels, None, None)
548    }
549
550    /// Create an IntGaugeVec metric with label names (for dynamic labels)
551    pub fn create_intgaugevec(
552        &self,
553        name: &str,
554        description: &str,
555        const_labels: &[&str],
556        const_label_values: &[(&str, &str)],
557    ) -> anyhow::Result<prometheus::IntGaugeVec> {
558        create_metric(
559            &self.hierarchy,
560            name,
561            description,
562            const_label_values,
563            None,
564            Some(const_labels),
565        )
566    }
567
568    /// Get metrics in Prometheus text format
569    pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
570        self.hierarchy
571            .get_metrics_registry()
572            .prometheus_expfmt_combined()
573    }
574}
575
576/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
577/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
578/// generating output in Prometheus text format.
579use crate::traits::DistributedRuntimeProvider;
580
581pub trait MetricsHierarchy: Send + Sync {
582    // ========================================================================
583    // Required methods - must be implemented by all types
584    // ========================================================================
585
586    /// Get the name of this hierarchy (without any hierarchy prefix)
587    fn basename(&self) -> String;
588
589    /// Get the parent hierarchies as actual objects (not strings)
590    /// Returns a vector of hierarchy references, ordered from root to immediate parent.
591    /// For example, an Endpoint would return [DRT, Namespace, Component].
592    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
593
594    /// Get a reference to this hierarchy's metrics registry
595    fn get_metrics_registry(&self) -> &MetricsRegistry;
596
597    // ========================================================================
598    // Provided methods - have default implementations
599    // ========================================================================
600
601    /// Get the connection ID (discovery instance ID) for this hierarchy level.
602    ///
603    /// Returns `Some(id)` when the hierarchy has access to the DistributedRuntime
604    /// (e.g. Namespace, Component, Endpoint). Used by `create_metric()` to auto-inject
605    /// the `worker_id` label. Returns `None` by default.
606    fn connection_id(&self) -> Option<u64> {
607        None
608    }
609
610    /// Access the metrics interface for this hierarchy
611    /// This is a provided method that works for any type implementing MetricsHierarchy
612    fn metrics(&self) -> Metrics<&Self>
613    where
614        Self: Sized,
615    {
616        Metrics::new(self)
617    }
618}
619
620// Blanket implementation for references to types that implement MetricsHierarchy
621impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
622    fn basename(&self) -> String {
623        (**self).basename()
624    }
625
626    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
627        (**self).parent_hierarchies()
628    }
629
630    fn get_metrics_registry(&self) -> &MetricsRegistry {
631        (**self).get_metrics_registry()
632    }
633
634    fn connection_id(&self) -> Option<u64> {
635        (**self).connection_id()
636    }
637}
638
639/// Type alias for runtime callback functions to reduce complexity
640///
641/// This type represents an Arc-wrapped callback function that can be:
642/// - Shared efficiently across multiple threads and contexts
643/// - Cloned without duplicating the underlying closure
644/// - Used in generic contexts requiring 'static lifetime
645///
646/// The Arc wrapper is included in the type to make sharing explicit.
647pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
648
649/// Type alias for exposition text callback functions that return Prometheus text
650pub type PrometheusExpositionFormatCallback =
651    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
652
653/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy.
654///
655/// All fields are Arc-wrapped, so cloning shares state. This ensures metrics registered
656/// on cloned instances (e.g., cloned Client/Endpoint) are visible to the original.
657#[derive(Clone)]
658pub struct MetricsRegistry {
659    /// The Prometheus registry for this hierarchy.
660    /// Arc-wrapped so clones share the same registry (metrics registered on clones are visible everywhere).
661    pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
662
663    /// Child registries included when emitting combined `/metrics` output.
664    ///
665    /// Why this exists:
666    /// - Previously, `create_metric()` registered every collector into *all* parent registries
667    ///   (Endpoint → Component → Namespace → DRT) so scraping the root registry included everything.
668    /// - That fan-out caused Prometheus collisions when different endpoints tried to register the
669    ///   same metric name with different const-labels (descriptor mismatch).
670    ///
671    /// We now register metrics only into the local hierarchy registry to avoid collisions.
672    /// `child_registries` rebuilds “what to scrape” as a tree of registries so `/metrics` can:
673    /// - traverse registries recursively,
674    /// - merge metric families into one exposition payload,
675    /// - warn/drop exact duplicate series, while allowing same metric name with different labels.
676    child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
677
678    /// Update callbacks invoked before metrics are scraped.
679    /// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned).
680    pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
681
682    /// Callbacks that return Prometheus exposition text appended to metrics output.
683    /// Wrapped in Arc to preserve callbacks across clones (e.g., vLLM callbacks registered at Endpoint remain accessible at DRT).
684    pub prometheus_expfmt_callbacks:
685        Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
686}
687
688impl std::fmt::Debug for MetricsRegistry {
689    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690        f.debug_struct("MetricsRegistry")
691            .field("prometheus_registry", &"<RwLock<Registry>>")
692            .field(
693                "prometheus_update_callbacks",
694                &format!(
695                    "<RwLock<Vec<Callback>>> with {} callbacks",
696                    self.prometheus_update_callbacks.read().unwrap().len()
697                ),
698            )
699            .field(
700                "prometheus_expfmt_callbacks",
701                &format!(
702                    "<RwLock<Vec<Callback>>> with {} callbacks",
703                    self.prometheus_expfmt_callbacks.read().unwrap().len()
704                ),
705            )
706            .finish()
707    }
708}
709
710impl MetricsRegistry {
711    /// Create a new metrics registry with an empty Prometheus registry and callback lists
712    pub fn new() -> Self {
713        Self {
714            prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
715            child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
716            prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
717            prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
718        }
719    }
720
721    /// Add a child registry to be included in combined /metrics output.
722    ///
723    /// Dedup is by underlying Prometheus registry pointer, so repeated registration via clones is safe.
724    pub fn add_child_registry(&self, child: &MetricsRegistry) {
725        let child_ptr = Arc::as_ptr(&child.prometheus_registry);
726        let mut guard = self.child_registries.write().unwrap();
727        if guard
728            .iter()
729            .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
730        {
731            return;
732        }
733        guard.push(child.clone());
734    }
735
736    fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
737        // Traverse child registries recursively so `prometheus_expfmt()` on any hierarchy
738        // (DRT/namespace/component/endpoint) includes metrics from its descendants.
739        //
740        // Dedup by underlying Prometheus registry pointer so multiple paths (e.g. also registering
741        // directly on the root) won't duplicate output.
742        fn visit(
743            registry: &MetricsRegistry,
744            out: &mut Vec<MetricsRegistry>,
745            seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
746        ) {
747            let ptr = Arc::as_ptr(&registry.prometheus_registry);
748            if !seen.insert(ptr) {
749                return;
750            }
751
752            out.push(registry.clone());
753
754            let children: Vec<MetricsRegistry> = registry
755                .child_registries
756                .read()
757                .unwrap()
758                .iter()
759                .cloned()
760                .collect();
761            for child in children {
762                visit(&child, out, seen);
763            }
764        }
765
766        let mut out = Vec::new();
767        let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
768        visit(self, &mut out, &mut seen);
769        out
770    }
771
772    /// Combine metrics across this registry and all registered children into one Prometheus exposition output.
773    ///
774    /// - Families are merged by name; HELP and TYPE must match.
775    /// - Multiple series for the same name are allowed if labels differ.
776    /// - Exact duplicate series (same name + identical label pairs) are warned and dropped.
777    pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
778        let registries = self.registries_for_combined_scrape();
779
780        // Run per-registry update callbacks first.
781        for registry in &registries {
782            for result in registry.execute_update_callbacks() {
783                if let Err(e) = result {
784                    tracing::error!("Error executing metrics callback: {e}");
785                }
786            }
787        }
788
789        // Merge metric families.
790        let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
791        let mut seen_series: HashSet<String> = HashSet::new();
792
793        for (registry_idx, registry) in registries.iter().enumerate() {
794            let families = registry.get_prometheus_registry().gather();
795            for mut family in families {
796                let name = family.name().to_string();
797
798                let entry = by_name.entry(name.clone()).or_insert_with(|| {
799                    let mut out = prometheus::proto::MetricFamily::new();
800                    out.set_name(name.clone());
801                    out.set_help(family.help().to_string());
802                    out.set_field_type(family.get_field_type());
803                    out
804                });
805
806                if entry.help() != family.help()
807                    || entry.get_field_type() != family.get_field_type()
808                {
809                    return Err(anyhow::anyhow!(
810                        "Metric family '{}' has inconsistent help/type across registries (idx={})",
811                        name,
812                        registry_idx
813                    ));
814                }
815
816                let mut metrics = family.take_metric();
817                for metric in metrics.drain(..) {
818                    let mut labels: Vec<(String, String)> = metric
819                        .get_label()
820                        .iter()
821                        .map(|lp| (lp.name().to_string(), lp.value().to_string()))
822                        .collect();
823                    labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
824
825                    let key = format!(
826                        "{}|{}",
827                        name,
828                        labels
829                            .iter()
830                            .map(|(k, v)| format!("{}={}", k, v))
831                            .collect::<Vec<_>>()
832                            .join(",")
833                    );
834
835                    if !seen_series.insert(key) {
836                        tracing::warn!(
837                            metric_name = %name,
838                            labels = ?labels,
839                            registry_idx,
840                            "Duplicate Prometheus series while merging registries; dropping later sample"
841                        );
842                        continue;
843                    }
844
845                    entry.mut_metric().push(metric);
846                }
847            }
848        }
849
850        let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
851        merged.sort_by(|a, b| a.name().cmp(b.name()));
852
853        let encoder = prometheus::TextEncoder::new();
854        let mut buffer = Vec::new();
855        encoder.encode(&merged, &mut buffer)?;
856        let mut result = String::from_utf8(buffer)?;
857
858        // Append expfmt callbacks deterministically in registry order.
859        let mut expfmt = String::new();
860        for registry in registries {
861            let text = registry.execute_expfmt_callbacks();
862            if !text.is_empty() {
863                if !expfmt.is_empty() && !expfmt.ends_with('\n') {
864                    expfmt.push('\n');
865                }
866                expfmt.push_str(&text);
867            }
868        }
869
870        if !expfmt.is_empty() {
871            if !result.ends_with('\n') {
872                result.push('\n');
873            }
874            result.push_str(&expfmt);
875        }
876
877        Ok(result)
878    }
879
880    /// Add a callback function that receives a reference to any MetricsHierarchy
881    pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
882        self.prometheus_update_callbacks
883            .write()
884            .unwrap()
885            .push(callback);
886    }
887
888    /// Add an exposition text callback that returns Prometheus text
889    pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
890        self.prometheus_expfmt_callbacks
891            .write()
892            .unwrap()
893            .push(callback);
894    }
895
896    /// Execute all update callbacks and return their results
897    pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
898        self.prometheus_update_callbacks
899            .read()
900            .unwrap()
901            .iter()
902            .map(|callback| callback())
903            .collect()
904    }
905
906    /// Execute all exposition text callbacks and return their concatenated text
907    pub fn execute_expfmt_callbacks(&self) -> String {
908        let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
909        let mut result = String::new();
910        for callback in callbacks.iter() {
911            match callback() {
912                Ok(text) => {
913                    if !text.is_empty() {
914                        if !result.is_empty() && !result.ends_with('\n') {
915                            result.push('\n');
916                        }
917                        result.push_str(&text);
918                    }
919                }
920                Err(e) => {
921                    tracing::error!("Error executing exposition text callback: {e}");
922                }
923            }
924        }
925        result
926    }
927
928    /// Add a Prometheus metric collector to this registry
929    pub fn add_metric(
930        &self,
931        collector: Box<dyn prometheus::core::Collector>,
932    ) -> anyhow::Result<()> {
933        self.prometheus_registry
934            .write()
935            .unwrap()
936            .register(collector)
937            .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
938    }
939
940    /// Add a Prometheus metric collector, logging a warning on failure instead of returning an error.
941    pub fn add_metric_or_warn(&self, collector: Box<dyn prometheus::core::Collector>, name: &str) {
942        if let Err(e) = self.add_metric(collector) {
943            tracing::warn!(error = %e, metric = name, "Failed to register metric");
944        }
945    }
946
947    /// Get a read guard to the Prometheus registry for scraping
948    pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
949        self.prometheus_registry.read().unwrap()
950    }
951
952    /// Returns true if a metric with the given name already exists in the Prometheus registry
953    pub fn has_metric_named(&self, metric_name: &str) -> bool {
954        self.prometheus_registry
955            .read()
956            .unwrap()
957            .gather()
958            .iter()
959            .any(|mf| mf.name() == metric_name)
960    }
961}
962
963impl Default for MetricsRegistry {
964    fn default() -> Self {
965        Self::new()
966    }
967}
968
969#[cfg(test)]
970mod test_helpers {
971    use super::prometheus_names::name_prefix;
972    use super::*;
973
974    /// Base function to filter Prometheus output lines based on a predicate.
975    /// Returns lines that match the predicate, converted to String.
976    fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
977    where
978        F: FnMut(&str) -> bool,
979    {
980        input
981            .lines()
982            .filter(|line| predicate(line))
983            .map(|line| line.to_string())
984            .collect::<Vec<_>>()
985    }
986
987    /// Extracts all component metrics (excluding help text and type definitions).
988    /// Returns only the actual metric lines with values.
989    pub fn extract_metrics(input: &str) -> Vec<String> {
990        filter_prometheus_lines(input, |line| {
991            line.starts_with(&format!("{}_", name_prefix::COMPONENT))
992                && !line.starts_with("#")
993                && !line.trim().is_empty()
994        })
995    }
996
997    /// Parses a Prometheus metric line and extracts the name, labels, and value.
998    /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state.
999    ///
1000    /// # Example
1001    /// ```
1002    /// let line = "http_requests_total{method=\"GET\"} 1234";
1003    /// let (name, labels, value) = parse_prometheus_metric(line).unwrap();
1004    /// assert_eq!(name, "http_requests_total");
1005    /// assert_eq!(labels.get("method"), Some(&"GET".to_string()));
1006    /// assert_eq!(value, 1234.0);
1007    /// ```
1008    pub fn parse_prometheus_metric(
1009        line: &str,
1010    ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
1011        if line.trim().is_empty() || line.starts_with('#') {
1012            return None;
1013        }
1014
1015        let parts: Vec<&str> = line.split_whitespace().collect();
1016        if parts.len() < 2 {
1017            return None;
1018        }
1019
1020        let metric_part = parts[0];
1021        let value: f64 = parts[1].parse().ok()?;
1022
1023        let (name, labels) = if metric_part.contains('{') {
1024            let brace_start = metric_part.find('{').unwrap();
1025            let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
1026            let name = &metric_part[..brace_start];
1027            let labels_str = &metric_part[brace_start + 1..brace_end];
1028
1029            let mut labels = std::collections::HashMap::new();
1030            for pair in labels_str.split(',') {
1031                if let Some((k, v)) = pair.split_once('=') {
1032                    let v = v.trim_matches('"');
1033                    labels.insert(k.trim().to_string(), v.to_string());
1034                }
1035            }
1036            (name.to_string(), labels)
1037        } else {
1038            (metric_part.to_string(), std::collections::HashMap::new())
1039        };
1040
1041        Some((name, labels, value))
1042    }
1043
1044    /// Injects a `worker_id` label into Prometheus metric data lines.
1045    /// Prometheus places const labels (like worker_id) before special labels
1046    /// (like histogram `le`), so for histogram bucket lines we insert before
1047    /// `,le=`. For all other metric lines, we insert before the closing `}`.
1048    /// Comment lines and lines without labels are left unchanged.
1049    pub fn inject_worker_id(expected: &str, wid: &str) -> String {
1050        let wid_label = format!(",worker_id=\"{}\"", wid);
1051        expected
1052            .lines()
1053            .map(|line| {
1054                if line.starts_with('#') || line.trim().is_empty() || !line.contains('{') {
1055                    line.to_string()
1056                } else if let Some(le_pos) = line.find(",le=") {
1057                    // Histogram bucket lines: worker_id is a const label, `le` is special,
1058                    // so worker_id sorts before `le` in Prometheus output.
1059                    let mut s = line.to_string();
1060                    s.insert_str(le_pos, &wid_label);
1061                    s
1062                } else {
1063                    line.replacen("}", &format!("{}}}", wid_label), 1)
1064                }
1065            })
1066            .collect::<Vec<_>>()
1067            .join("\n")
1068    }
1069}
1070
1071#[cfg(test)]
1072mod test_metricsregistry_units {
1073    use super::*;
1074
1075    #[test]
1076    fn test_build_component_metric_name_with_prefix() {
1077        // Test that build_component_metric_name correctly prepends the dynamo_component prefix
1078        let result = build_component_metric_name("requests");
1079        assert_eq!(result, "dynamo_component_requests");
1080
1081        let result = build_component_metric_name("counter");
1082        assert_eq!(result, "dynamo_component_counter");
1083    }
1084
1085    #[test]
1086    fn test_parse_prometheus_metric() {
1087        use super::test_helpers::parse_prometheus_metric;
1088        use std::collections::HashMap;
1089
1090        // Test parsing a metric with labels
1091        let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
1092        let parsed = parse_prometheus_metric(line);
1093        assert!(parsed.is_some());
1094
1095        let (name, labels, value) = parsed.unwrap();
1096        assert_eq!(name, "http_requests_total");
1097
1098        let mut expected_labels = HashMap::new();
1099        expected_labels.insert("method".to_string(), "GET".to_string());
1100        expected_labels.insert("status".to_string(), "200".to_string());
1101        assert_eq!(labels, expected_labels);
1102
1103        assert_eq!(value, 1234.0);
1104
1105        // Test parsing a metric without labels
1106        let line = "cpu_usage 98.5";
1107        let parsed = parse_prometheus_metric(line);
1108        assert!(parsed.is_some());
1109
1110        let (name, labels, value) = parsed.unwrap();
1111        assert_eq!(name, "cpu_usage");
1112        assert!(labels.is_empty());
1113        assert_eq!(value, 98.5);
1114
1115        // Test parsing a metric with float value
1116        let line = "response_time{service=\"api\"} 0.123";
1117        let parsed = parse_prometheus_metric(line);
1118        assert!(parsed.is_some());
1119
1120        let (name, labels, value) = parsed.unwrap();
1121        assert_eq!(name, "response_time");
1122
1123        let mut expected_labels = HashMap::new();
1124        expected_labels.insert("service".to_string(), "api".to_string());
1125        assert_eq!(labels, expected_labels);
1126
1127        assert_eq!(value, 0.123);
1128
1129        // Test parsing invalid lines
1130        assert!(parse_prometheus_metric("").is_none()); // Empty line
1131        assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text
1132        assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition
1133        assert!(parse_prometheus_metric("metric_name").is_none()); // No value
1134
1135        println!("✓ Prometheus metric parsing works correctly!");
1136    }
1137
1138    #[test]
1139    fn test_metrics_registry_entry_callbacks() {
1140        use crate::MetricsRegistry;
1141        use std::sync::atomic::{AtomicUsize, Ordering};
1142
1143        // Test 1: Basic callback execution with counter increments
1144        {
1145            let registry = MetricsRegistry::new();
1146            let counter = Arc::new(AtomicUsize::new(0));
1147
1148            // Add callbacks with different increment values
1149            for increment in [1, 10, 100] {
1150                let counter_clone = counter.clone();
1151                registry.add_update_callback(Arc::new(move || {
1152                    counter_clone.fetch_add(increment, Ordering::SeqCst);
1153                    Ok(())
1154                }));
1155            }
1156
1157            // Verify counter starts at 0
1158            assert_eq!(counter.load(Ordering::SeqCst), 0);
1159
1160            // First execution
1161            let results = registry.execute_update_callbacks();
1162            assert_eq!(results.len(), 3);
1163            assert!(results.iter().all(|r| r.is_ok()));
1164            assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
1165
1166            // Second execution - callbacks should be reusable
1167            let results = registry.execute_update_callbacks();
1168            assert_eq!(results.len(), 3);
1169            assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
1170
1171            // Test cloning - cloned entry shares callbacks (callbacks are Arc-wrapped)
1172            let cloned = registry.clone();
1173            assert_eq!(cloned.execute_update_callbacks().len(), 3);
1174            assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
1175
1176            // Original still has callbacks and shares the same Arc
1177            registry.execute_update_callbacks();
1178            assert_eq!(counter.load(Ordering::SeqCst), 444); // 333 + 111
1179        }
1180
1181        // Test 2: Mixed success and error callbacks
1182        {
1183            let registry = MetricsRegistry::new();
1184            let counter = Arc::new(AtomicUsize::new(0));
1185
1186            // Successful callback
1187            let counter_clone = counter.clone();
1188            registry.add_update_callback(Arc::new(move || {
1189                counter_clone.fetch_add(1, Ordering::SeqCst);
1190                Ok(())
1191            }));
1192
1193            // Error callback
1194            registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1195
1196            // Another successful callback
1197            let counter_clone = counter.clone();
1198            registry.add_update_callback(Arc::new(move || {
1199                counter_clone.fetch_add(10, Ordering::SeqCst);
1200                Ok(())
1201            }));
1202
1203            // Execute and verify mixed results
1204            let results = registry.execute_update_callbacks();
1205            assert_eq!(results.len(), 3);
1206            assert!(results[0].is_ok());
1207            assert!(results[1].is_err());
1208            assert!(results[2].is_ok());
1209
1210            // Verify error message
1211            assert_eq!(
1212                results[1].as_ref().unwrap_err().to_string(),
1213                "Simulated error"
1214            );
1215
1216            // Verify successful callbacks still executed
1217            assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
1218
1219            // Execute again - errors should be consistent
1220            let results = registry.execute_update_callbacks();
1221            assert!(results[1].is_err());
1222            assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
1223        }
1224
1225        // Test 3: Empty registry
1226        {
1227            let registry = MetricsRegistry::new();
1228            let results = registry.execute_update_callbacks();
1229            assert_eq!(results.len(), 0);
1230        }
1231    }
1232}
1233
1234#[cfg(feature = "integration")]
1235#[cfg(test)]
1236mod test_metricsregistry_prefixes {
1237    use super::*;
1238    use crate::distributed::distributed_test_utils::create_test_drt_async;
1239    use prometheus::core::Collector;
1240
1241    #[tokio::test]
1242    async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1243        let drt = create_test_drt_async().await;
1244
1245        const DRT_NAME: &str = "";
1246        const NAMESPACE_NAME: &str = "ns901";
1247        const COMPONENT_NAME: &str = "comp901";
1248        const ENDPOINT_NAME: &str = "ep901";
1249        let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1250        let component = namespace.component(COMPONENT_NAME).unwrap();
1251        let endpoint = component.endpoint(ENDPOINT_NAME);
1252
1253        // DRT
1254        assert_eq!(drt.basename(), DRT_NAME);
1255        assert_eq!(drt.parent_hierarchies().len(), 0);
1256        // DRT hierarchy is just its basename (empty string)
1257
1258        // Namespace
1259        assert_eq!(namespace.basename(), NAMESPACE_NAME);
1260        assert_eq!(namespace.parent_hierarchies().len(), 1);
1261        assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1262        // Namespace hierarchy is just its basename since parent is empty
1263
1264        // Component
1265        assert_eq!(component.basename(), COMPONENT_NAME);
1266        assert_eq!(component.parent_hierarchies().len(), 2);
1267        assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1268        assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1269        // Component hierarchy structure is validated by the individual assertions above
1270
1271        // Endpoint
1272        assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1273        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1274        assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1275        assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1276        assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1277        // Endpoint hierarchy structure is validated by the individual assertions above
1278
1279        // Relationships
1280        assert!(
1281            namespace
1282                .parent_hierarchies()
1283                .iter()
1284                .any(|h| h.basename() == drt.basename())
1285        );
1286        assert!(
1287            component
1288                .parent_hierarchies()
1289                .iter()
1290                .any(|h| h.basename() == namespace.basename())
1291        );
1292        assert!(
1293            endpoint
1294                .parent_hierarchies()
1295                .iter()
1296                .any(|h| h.basename() == component.basename())
1297        );
1298
1299        // Depth
1300        assert_eq!(drt.parent_hierarchies().len(), 0);
1301        assert_eq!(namespace.parent_hierarchies().len(), 1);
1302        assert_eq!(component.parent_hierarchies().len(), 2);
1303        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1304
1305        // Invalid namespace behavior - sanitizes to "_123" and succeeds
1306        // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
1307        // so invalid characters are sanitized in MetricsRegistry rather than rejected.
1308        let invalid_namespace = drt.namespace("@@123").unwrap();
1309        let result =
1310            invalid_namespace
1311                .metrics()
1312                .create_counter("test_counter", "A test counter", &[]);
1313        assert!(result.is_ok());
1314        if let Ok(counter) = &result {
1315            // Verify the namespace was sanitized to "_123" in the label
1316            let desc = counter.desc();
1317            let namespace_label = desc[0]
1318                .const_label_pairs
1319                .iter()
1320                .find(|l| l.name() == "dynamo_namespace")
1321                .expect("Should have dynamo_namespace label");
1322            assert_eq!(namespace_label.value(), "_123");
1323        }
1324
1325        // Valid namespace works
1326        let valid_namespace = drt.namespace("ns567").unwrap();
1327        assert!(
1328            valid_namespace
1329                .metrics()
1330                .create_counter("test_counter", "A test counter", &[])
1331                .is_ok()
1332        );
1333    }
1334
1335    #[tokio::test]
1336    async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
1337        // Sanity test: if an expfmt callback is registered only on the endpoint registry,
1338        // scraping from the root (DRT) should still include it exactly once via the
1339        // child-registry traversal.
1340        let drt = create_test_drt_async().await;
1341        let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
1342        let component = namespace.component("comp_expfmt_ep_only").unwrap();
1343        let endpoint = component.endpoint("ep_expfmt_ep_only");
1344
1345        let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
1346        let callback: PrometheusExpositionFormatCallback =
1347            Arc::new(move || Ok(metric_line.to_string()));
1348
1349        endpoint
1350            .get_metrics_registry()
1351            .add_expfmt_callback(callback);
1352
1353        let output = drt.metrics().prometheus_expfmt().unwrap();
1354        let occurrences = output
1355            .lines()
1356            .filter(|line| line == &metric_line.trim_end_matches('\n'))
1357            .count();
1358
1359        assert_eq!(
1360            occurrences, 1,
1361            "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
1362            occurrences, output
1363        );
1364    }
1365
1366    #[tokio::test]
1367    async fn test_recursive_namespace() {
1368        // Create a distributed runtime for testing
1369        let drt = create_test_drt_async().await;
1370
1371        // Create a deeply chained namespace: ns1.ns2.ns3
1372        let ns1 = drt.namespace("ns1").unwrap();
1373        let ns2 = ns1.namespace("ns2").unwrap();
1374        let ns3 = ns2.namespace("ns3").unwrap();
1375
1376        // Create a component in the deepest namespace
1377        let component = ns3.component("test-component").unwrap();
1378
1379        // Verify the hierarchy structure
1380        assert_eq!(ns1.basename(), "ns1");
1381        assert_eq!(ns1.parent_hierarchies().len(), 1);
1382        assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1383        // ns1 hierarchy is just its basename since parent is empty
1384
1385        assert_eq!(ns2.basename(), "ns2");
1386        assert_eq!(ns2.parent_hierarchies().len(), 2);
1387        assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1388        assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1389        // ns2 hierarchy structure validated by parent assertions above
1390
1391        assert_eq!(ns3.basename(), "ns3");
1392        assert_eq!(ns3.parent_hierarchies().len(), 3);
1393        assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1394        assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1395        assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1396        // ns3 hierarchy structure validated by parent assertions above
1397
1398        assert_eq!(component.basename(), "test-component");
1399        assert_eq!(component.parent_hierarchies().len(), 4);
1400        assert_eq!(component.parent_hierarchies()[0].basename(), "");
1401        assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1402        assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1403        assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1404        // component hierarchy structure validated by parent assertions above
1405
1406        println!("✓ Chained namespace test passed - all prefixes correct");
1407    }
1408}
1409
1410#[cfg(feature = "integration")]
1411#[cfg(test)]
1412mod test_metricsregistry_prometheus_fmt_outputs {
1413    use super::prometheus_names::name_prefix;
1414    use super::*;
1415    use crate::distributed::distributed_test_utils::create_test_drt_async;
1416    use prometheus::Counter;
1417    use std::sync::Arc;
1418
1419    #[tokio::test]
1420    async fn test_prometheusfactory_using_metrics_registry_trait() {
1421        // Setup real DRT and registry using the test-friendly constructor
1422        let drt = create_test_drt_async().await;
1423
1424        // Use a simple constant namespace name
1425        let namespace_name = "ns345";
1426
1427        let namespace = drt.namespace(namespace_name).unwrap();
1428        let component = namespace.component("comp345").unwrap();
1429        let endpoint = component.endpoint("ep345");
1430
1431        // Test Counter creation
1432        let counter = endpoint
1433            .metrics()
1434            .create_counter("testcounter", "A test counter", &[])
1435            .unwrap();
1436        counter.inc_by(123.456789);
1437        let epsilon = 0.01;
1438        assert!((counter.get() - 123.456789).abs() < epsilon);
1439
1440        let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1441        println!("Endpoint output:");
1442        println!("{}", endpoint_output_raw);
1443
1444        // worker_id is runtime-generated (etcd lease ID), so we grab it from the DRT
1445        // and inject it into expected strings via the inject_worker_id helper.
1446        let wid = format!("{:x}", drt.connection_id());
1447        use super::test_helpers::inject_worker_id;
1448
1449        let expected_endpoint_output = inject_worker_id(
1450            r#"# HELP dynamo_component_testcounter A test counter
1451# TYPE dynamo_component_testcounter counter
1452dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#,
1453            &wid,
1454        );
1455
1456        assert_eq!(
1457            endpoint_output_raw.trim_end_matches('\n'),
1458            expected_endpoint_output.trim_end_matches('\n'),
1459            "\n=== ENDPOINT COMPARISON FAILED ===\n\
1460             Actual:\n{}\n\
1461             Expected:\n{}\n\
1462             ==============================",
1463            endpoint_output_raw,
1464            expected_endpoint_output
1465        );
1466
1467        // Test Gauge creation
1468        let gauge = component
1469            .metrics()
1470            .create_gauge("testgauge", "A test gauge", &[])
1471            .unwrap();
1472        gauge.set(50000.0);
1473        assert_eq!(gauge.get(), 50000.0);
1474
1475        // Test Prometheus format output for Component (gauge + histogram)
1476        let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1477        println!("Component output:");
1478        println!("{}", component_output_raw);
1479
1480        let expected_component_output = inject_worker_id(
1481            r#"# HELP dynamo_component_testcounter A test counter
1482# TYPE dynamo_component_testcounter counter
1483dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1484# HELP dynamo_component_testgauge A test gauge
1485# TYPE dynamo_component_testgauge gauge
1486dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#,
1487            &wid,
1488        );
1489
1490        assert_eq!(
1491            component_output_raw.trim_end_matches('\n'),
1492            expected_component_output.trim_end_matches('\n'),
1493            "\n=== COMPONENT COMPARISON FAILED ===\n\
1494             Actual:\n{}\n\
1495             Expected:\n{}\n\
1496             ==============================",
1497            component_output_raw,
1498            expected_component_output
1499        );
1500
1501        let intcounter = namespace
1502            .metrics()
1503            .create_intcounter("testintcounter", "A test int counter", &[])
1504            .unwrap();
1505        intcounter.inc_by(12345);
1506        assert_eq!(intcounter.get(), 12345);
1507
1508        // Test Prometheus format output for Namespace (int_counter + gauge + histogram)
1509        let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1510        println!("Namespace output:");
1511        println!("{}", namespace_output_raw);
1512
1513        let expected_namespace_output = inject_worker_id(
1514            r#"# HELP dynamo_component_testcounter A test counter
1515# TYPE dynamo_component_testcounter counter
1516dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1517# HELP dynamo_component_testgauge A test gauge
1518# TYPE dynamo_component_testgauge gauge
1519dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1520# HELP dynamo_component_testintcounter A test int counter
1521# TYPE dynamo_component_testintcounter counter
1522dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#,
1523            &wid,
1524        );
1525
1526        assert_eq!(
1527            namespace_output_raw.trim_end_matches('\n'),
1528            expected_namespace_output.trim_end_matches('\n'),
1529            "\n=== NAMESPACE COMPARISON FAILED ===\n\
1530             Actual:\n{}\n\
1531             Expected:\n{}\n\
1532             ==============================",
1533            namespace_output_raw,
1534            expected_namespace_output
1535        );
1536
1537        // Test IntGauge creation
1538        let intgauge = namespace
1539            .metrics()
1540            .create_intgauge("testintgauge", "A test int gauge", &[])
1541            .unwrap();
1542        intgauge.set(42);
1543        assert_eq!(intgauge.get(), 42);
1544
1545        // Test IntGaugeVec creation
1546        let intgaugevec = namespace
1547            .metrics()
1548            .create_intgaugevec(
1549                "testintgaugevec",
1550                "A test int gauge vector",
1551                &["instance", "status"],
1552                &[("service", "api")],
1553            )
1554            .unwrap();
1555        intgaugevec
1556            .with_label_values(&["server1", "active"])
1557            .set(10);
1558        intgaugevec
1559            .with_label_values(&["server2", "inactive"])
1560            .set(0);
1561
1562        // Test CounterVec creation
1563        let countervec = endpoint
1564            .metrics()
1565            .create_countervec(
1566                "testcountervec",
1567                "A test counter vector",
1568                &["method", "status"],
1569                &[("service", "api")],
1570            )
1571            .unwrap();
1572        countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1573        countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1574
1575        // Test Histogram creation
1576        let histogram = component
1577            .metrics()
1578            .create_histogram("testhistogram", "A test histogram", &[], None)
1579            .unwrap();
1580        histogram.observe(1.0);
1581        histogram.observe(2.5);
1582        histogram.observe(4.0);
1583
1584        // Test Prometheus format output for DRT (all metrics combined)
1585        let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1586        println!("DRT output:");
1587        println!("{}", drt_output_raw);
1588
1589        // The uptime_seconds value is dynamic (depends on elapsed wall-clock time),
1590        // so we check all other lines exactly and validate uptime separately.
1591        let expected_drt_output_without_uptime = inject_worker_id(
1592            r#"# HELP dynamo_component_testcounter A test counter
1593# TYPE dynamo_component_testcounter counter
1594dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1595# HELP dynamo_component_testcountervec A test counter vector
1596# TYPE dynamo_component_testcountervec counter
1597dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1598dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1599# HELP dynamo_component_testgauge A test gauge
1600# TYPE dynamo_component_testgauge gauge
1601dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1602# HELP dynamo_component_testhistogram A test histogram
1603# TYPE dynamo_component_testhistogram histogram
1604dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1605dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1606dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1607dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1608dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1609dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1610dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1611dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1612dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1613dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1614dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1615dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1616dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1617dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1618# HELP dynamo_component_testintcounter A test int counter
1619# TYPE dynamo_component_testintcounter counter
1620dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1621# HELP dynamo_component_testintgauge A test int gauge
1622# TYPE dynamo_component_testintgauge gauge
1623dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1624# HELP dynamo_component_testintgaugevec A test int gauge vector
1625# TYPE dynamo_component_testintgaugevec gauge
1626dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1627dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#,
1628            &wid,
1629        );
1630
1631        // Split actual output into non-uptime lines and validate the uptime value line.
1632        // The uptime metric now carries a worker_id label, so we match on the metric name
1633        // prefix and extract the value as the last whitespace-delimited token.
1634        let mut non_uptime_lines = Vec::new();
1635        let mut saw_uptime_value = false;
1636        for line in drt_output_raw.trim_end_matches('\n').lines() {
1637            if line.starts_with("dynamo_component_uptime_seconds") && !line.starts_with('#') {
1638                let val_str = line.split_whitespace().last().unwrap();
1639                val_str.parse::<f64>().expect("uptime should be a float");
1640                saw_uptime_value = true;
1641            } else if line.starts_with("# HELP dynamo_component_uptime_seconds")
1642                || line.starts_with("# TYPE dynamo_component_uptime_seconds")
1643            {
1644                // Skip HELP/TYPE lines for uptime (we just verify it exists via the value)
1645            } else {
1646                non_uptime_lines.push(line);
1647            }
1648        }
1649        assert!(
1650            saw_uptime_value,
1651            "uptime_seconds metric should be present in initial scrape"
1652        );
1653
1654        let actual_without_uptime = non_uptime_lines.join("\n");
1655        assert_eq!(
1656            actual_without_uptime,
1657            expected_drt_output_without_uptime.trim_end_matches('\n'),
1658            "\n=== DRT COMPARISON FAILED (excluding uptime) ===\n\
1659             Expected:\n{}\n\
1660             Actual:\n{}\n\
1661             ==============================",
1662            expected_drt_output_without_uptime,
1663            actual_without_uptime
1664        );
1665
1666        // Wait briefly so the uptime gauge is clearly positive on the next scrape.
1667        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1668        let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
1669        let uptime_line = drt_output_after
1670            .lines()
1671            .find(|l| l.starts_with("dynamo_component_uptime_seconds") && !l.starts_with('#'))
1672            .expect("uptime_seconds metric should be present after sleep");
1673        let uptime_after: f64 = uptime_line
1674            .split_whitespace()
1675            .last()
1676            .unwrap()
1677            .parse()
1678            .expect("uptime should be a float");
1679        assert!(
1680            uptime_after > 0.0,
1681            "uptime_seconds should be > 0 after 10ms sleep, got {}",
1682            uptime_after
1683        );
1684
1685        println!("✓ All Prometheus format outputs verified successfully!");
1686    }
1687
1688    #[test]
1689    fn test_refactored_filter_functions() {
1690        // Test data with component metrics
1691        let test_input = r#"# HELP dynamo_component_requests Total requests
1692# TYPE dynamo_component_requests counter
1693dynamo_component_requests 42
1694# HELP dynamo_component_latency Response latency
1695# TYPE dynamo_component_latency histogram
1696dynamo_component_latency_bucket{le="0.1"} 10
1697dynamo_component_latency_bucket{le="0.5"} 25
1698dynamo_component_errors_total 5"#;
1699
1700        // Test extract_metrics (only actual metric lines, excluding help/type)
1701        let metrics_only = super::test_helpers::extract_metrics(test_input);
1702        assert_eq!(metrics_only.len(), 4); // 4 actual metric lines (excluding help/type)
1703        assert!(
1704            metrics_only
1705                .iter()
1706                .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1707        );
1708
1709        println!("✓ All refactored filter functions work correctly!");
1710    }
1711
1712    #[tokio::test]
1713    async fn test_same_metric_name_different_endpoints() {
1714        // Test that the same metric name can exist in different endpoints without collision.
1715        // This validates the multi-registry approach: each endpoint has its own registry,
1716        // and metrics are merged at scrape time with distinct labels.
1717        let drt = create_test_drt_async().await;
1718        let namespace = drt.namespace("ns_test").unwrap();
1719        let component = namespace.component("comp_test").unwrap();
1720
1721        // Create two endpoints with the same metric name
1722        let ep1 = component.endpoint("ep1");
1723        let ep2 = component.endpoint("ep2");
1724
1725        let counter1 = ep1
1726            .metrics()
1727            .create_counter("requests_total", "Total requests", &[])
1728            .unwrap();
1729        counter1.inc_by(100.0);
1730
1731        let counter2 = ep2
1732            .metrics()
1733            .create_counter("requests_total", "Total requests", &[])
1734            .unwrap();
1735        counter2.inc_by(200.0);
1736
1737        // Get merged Prometheus output from component level
1738        let output = component.metrics().prometheus_expfmt().unwrap();
1739
1740        let wid = format!("{:x}", drt.connection_id());
1741        use super::test_helpers::inject_worker_id;
1742
1743        let expected_output = inject_worker_id(
1744            r#"# HELP dynamo_component_requests_total Total requests
1745# TYPE dynamo_component_requests_total counter
1746dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
1747dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#,
1748            &wid,
1749        );
1750
1751        assert_eq!(
1752            output.trim_end_matches('\n'),
1753            expected_output.trim_end_matches('\n'),
1754            "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
1755             Actual:\n{}\n\
1756             Expected:\n{}\n\
1757             ==============================",
1758            output,
1759            expected_output
1760        );
1761
1762        println!("✓ Multi-registry prevents Prometheus collisions!");
1763    }
1764
1765    #[tokio::test]
1766    async fn test_duplicate_series_warning() {
1767        // Test that duplicate series (same metric name + same labels) are detected and deduplicated.
1768        // This should log a warning and keep only one of the duplicate series.
1769        let drt = create_test_drt_async().await;
1770        let namespace = drt.namespace("ns_dup").unwrap();
1771        let component = namespace.component("comp_dup").unwrap();
1772
1773        // Create two endpoints with counters that will have identical labels when scraped
1774        let ep1 = component.endpoint("ep_same");
1775        let ep2 = component.endpoint("ep_same"); // Same endpoint name = duplicate labels
1776
1777        let counter1 = ep1
1778            .metrics()
1779            .create_counter("dup_metric", "Duplicate metric test", &[])
1780            .unwrap();
1781        counter1.inc_by(50.0);
1782
1783        let counter2 = ep2
1784            .metrics()
1785            .create_counter("dup_metric", "Duplicate metric test", &[])
1786            .unwrap();
1787        counter2.inc_by(75.0);
1788
1789        // Get merged output - duplicates should be deduplicated
1790        let output = component.metrics().prometheus_expfmt().unwrap();
1791
1792        let wid = format!("{:x}", drt.connection_id());
1793        use super::test_helpers::inject_worker_id;
1794
1795        let expected_output = inject_worker_id(
1796            r#"# HELP dynamo_component_dup_metric Duplicate metric test
1797# TYPE dynamo_component_dup_metric counter
1798dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#,
1799            &wid,
1800        );
1801
1802        assert_eq!(
1803            output.trim_end_matches('\n'),
1804            expected_output.trim_end_matches('\n'),
1805            "\n=== DEDUPLICATION COMPARISON FAILED ===\n\
1806             Actual:\n{}\n\
1807             Expected:\n{}\n\
1808             ==============================",
1809            output,
1810            expected_output
1811        );
1812
1813        println!("✓ Duplicate series detection and deduplication works!");
1814    }
1815}