Skip to main content

dynamo_runtime/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics registry trait and implementation for Prometheus metrics
5//!
6//! This module provides a trait-based interface for creating and managing Prometheus metrics
7//! with automatic label injection and hierarchical naming support.
8
9pub mod frontend_perf;
10pub mod prometheus_names;
11pub mod request_plane;
12pub mod tokio_perf;
13pub mod transport_metrics;
14pub mod work_handler_perf;
15
16use parking_lot::Mutex;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use crate::component::ComponentBuilder;
21use anyhow;
22use once_cell::sync::Lazy;
23use regex::Regex;
24use std::any::Any;
25use std::collections::HashMap;
26
27// Import commonly used items to avoid verbose prefixes
28use prometheus_names::{
29    build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
30    sanitize_prometheus_name, work_handler,
31};
32
33// Pipeline imports for endpoint creation
34use crate::pipeline::{
35    AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
36    network::Ingress,
37};
38use crate::protocols::annotated::Annotated;
39use crate::stream;
40use crate::stream::StreamExt;
41
42// Prometheus imports
43use prometheus::Encoder;
44
45/// Validate that a label slice has no duplicate keys.
46/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
47fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
48    let mut seen_keys = std::collections::HashSet::new();
49    for (key, _) in labels {
50        if !seen_keys.insert(*key) {
51            return Err(anyhow::anyhow!(
52                "Duplicate label key '{}' found in labels",
53                key
54            ));
55        }
56    }
57    Ok(())
58}
59
60/// ==============================
61/// Prometheus section
62/// ==============================
63/// Trait that defines common behavior for Prometheus metric types
64pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
65    /// Create a new metric with the given options
66    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
67    where
68        Self: Sized;
69
70    /// Create a new metric with histogram options and custom buckets
71    /// This is a default implementation that will panic for non-histogram metrics
72    fn with_histogram_opts_and_buckets(
73        _opts: prometheus::HistogramOpts,
74        _buckets: Option<Vec<f64>>,
75    ) -> Result<Self, prometheus::Error>
76    where
77        Self: Sized,
78    {
79        panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
80    }
81
82    /// Create a new metric with counter options and label names (for CounterVec)
83    /// This is a default implementation that will panic for non-countervec metrics
84    fn with_opts_and_label_names(
85        _opts: prometheus::Opts,
86        _label_names: &[&str],
87    ) -> Result<Self, prometheus::Error>
88    where
89        Self: Sized,
90    {
91        panic!("with_opts_and_label_names is not implemented for this metric type");
92    }
93}
94
95// Implement the trait for Counter, IntCounter, and Gauge
96impl PrometheusMetric for prometheus::Counter {
97    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
98        prometheus::Counter::with_opts(opts)
99    }
100}
101
102impl PrometheusMetric for prometheus::IntCounter {
103    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
104        prometheus::IntCounter::with_opts(opts)
105    }
106}
107
108impl PrometheusMetric for prometheus::Gauge {
109    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
110        prometheus::Gauge::with_opts(opts)
111    }
112}
113
114impl PrometheusMetric for prometheus::IntGauge {
115    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
116        prometheus::IntGauge::with_opts(opts)
117    }
118}
119
120impl PrometheusMetric for prometheus::GaugeVec {
121    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
122        Err(prometheus::Error::Msg(
123            "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
124        ))
125    }
126
127    fn with_opts_and_label_names(
128        opts: prometheus::Opts,
129        label_names: &[&str],
130    ) -> Result<Self, prometheus::Error> {
131        prometheus::GaugeVec::new(opts, label_names)
132    }
133}
134
135impl PrometheusMetric for prometheus::IntGaugeVec {
136    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
137        Err(prometheus::Error::Msg(
138            "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
139        ))
140    }
141
142    fn with_opts_and_label_names(
143        opts: prometheus::Opts,
144        label_names: &[&str],
145    ) -> Result<Self, prometheus::Error> {
146        prometheus::IntGaugeVec::new(opts, label_names)
147    }
148}
149
150impl PrometheusMetric for prometheus::IntCounterVec {
151    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
152        Err(prometheus::Error::Msg(
153            "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
154        ))
155    }
156
157    fn with_opts_and_label_names(
158        opts: prometheus::Opts,
159        label_names: &[&str],
160    ) -> Result<Self, prometheus::Error> {
161        prometheus::IntCounterVec::new(opts, label_names)
162    }
163}
164
165// Implement the trait for Histogram
166impl PrometheusMetric for prometheus::Histogram {
167    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
168        // Convert Opts to HistogramOpts
169        let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
170        prometheus::Histogram::with_opts(histogram_opts)
171    }
172
173    fn with_histogram_opts_and_buckets(
174        mut opts: prometheus::HistogramOpts,
175        buckets: Option<Vec<f64>>,
176    ) -> Result<Self, prometheus::Error> {
177        if let Some(custom_buckets) = buckets {
178            opts = opts.buckets(custom_buckets);
179        }
180        prometheus::Histogram::with_opts(opts)
181    }
182}
183
184// Implement the trait for CounterVec
185impl PrometheusMetric for prometheus::CounterVec {
186    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
187        // This will panic - CounterVec needs label names
188        panic!("CounterVec requires label names, use with_opts_and_label_names instead");
189    }
190
191    fn with_opts_and_label_names(
192        opts: prometheus::Opts,
193        label_names: &[&str],
194    ) -> Result<Self, prometheus::Error> {
195        prometheus::CounterVec::new(opts, label_names)
196    }
197}
198
199/// ==============================
200/// Metrics section
201/// ==============================
202/// Public helper function to create metrics - accessible for Python bindings
203pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
204    hierarchy: &H,
205    metric_name: &str,
206    metric_desc: &str,
207    labels: &[(&str, &str)],
208    buckets: Option<Vec<f64>>,
209    const_labels: Option<&[&str]>,
210) -> anyhow::Result<T> {
211    // Validate that user-provided labels don't have duplicate keys
212    validate_no_duplicate_label_keys(labels)?;
213    // Note: stored labels functionality has been removed
214
215    let basename = hierarchy.basename();
216    let parent_hierarchies = hierarchy.parent_hierarchies();
217
218    // Build hierarchy path as vector of strings: parent names + [basename]
219    let mut hierarchy_names: Vec<String> =
220        parent_hierarchies.iter().map(|p| p.basename()).collect();
221    hierarchy_names.push(basename.clone());
222
223    let metric_name = build_component_metric_name(metric_name);
224
225    // Build updated_labels: auto-labels first, then `labels` + stored labels
226    let mut updated_labels: Vec<(String, String)> = Vec::new();
227
228    // Auto-label injection: Always add dynamo_namespace, dynamo_component, dynamo_endpoint labels
229    // based on the hierarchy. Label constants defined in prometheus_names.rs labels module.
230    //
231    // Python counterpart: components/src/dynamo/common/utils/prometheus.py register_engine_metrics_callback()
232
233    // Validate that user-provided labels don't conflict with auto-generated labels
234    for (key, _) in labels {
235        if *key == labels::NAMESPACE
236            || *key == labels::COMPONENT
237            || *key == labels::ENDPOINT
238            || *key == labels::WORKER_ID
239        {
240            return Err(anyhow::anyhow!(
241                "Label '{}' is automatically added by auto-label injection and cannot be manually set",
242                key
243            ));
244        }
245    }
246
247    // Also validate that vector label names (const_labels) don't collide with auto-injected
248    // const labels. A variable label named "worker_id" would conflict with the auto-injected
249    // worker_id const label, causing a prometheus registration error or ambiguous output.
250    if let Some(label_names) = const_labels {
251        for name in label_names.iter() {
252            if *name == labels::NAMESPACE
253                || *name == labels::COMPONENT
254                || *name == labels::ENDPOINT
255                || *name == labels::WORKER_ID
256            {
257                return Err(anyhow::anyhow!(
258                    "Variable label name '{}' conflicts with auto-injected const label and cannot be used",
259                    name
260                ));
261            }
262        }
263    }
264
265    // Add auto-generated labels with sanitized values
266    // Hierarchy: [drt, namespace, component, endpoint]
267    if hierarchy_names.len() > 1 {
268        let namespace = &hierarchy_names[1];
269        if !namespace.is_empty() {
270            let valid_namespace = sanitize_prometheus_label(namespace)?;
271            if !valid_namespace.is_empty() {
272                updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
273            }
274        }
275    }
276    if hierarchy_names.len() > 2 {
277        let component = &hierarchy_names[2];
278        if !component.is_empty() {
279            let valid_component = sanitize_prometheus_label(component)?;
280            if !valid_component.is_empty() {
281                updated_labels.push((labels::COMPONENT.to_string(), valid_component));
282            }
283        }
284    }
285    if hierarchy_names.len() > 3 {
286        let endpoint = &hierarchy_names[3];
287        if !endpoint.is_empty() {
288            let valid_endpoint = sanitize_prometheus_label(endpoint)?;
289            if !valid_endpoint.is_empty() {
290                updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
291            }
292        }
293    }
294
295    // Auto-inject worker_id label from the hierarchy's connection_id (discovery instance ID).
296    // This provides a stable per-worker identity label so metrics from different workers
297    // serving the same endpoint can be distinguished without relying on Kubernetes labels.
298    if let Some(conn_id) = hierarchy.connection_id() {
299        updated_labels.push((labels::WORKER_ID.to_string(), format!("{:x}", conn_id)));
300    }
301
302    // Add user labels
303    updated_labels.extend(
304        labels
305            .iter()
306            .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
307    );
308    // Note: stored labels functionality has been removed
309
310    // Handle different metric types
311    let prometheus_metric = if std::any::TypeId::of::<T>()
312        == std::any::TypeId::of::<prometheus::CounterVec>()
313    {
314        // Special handling for CounterVec with label names
315        // const_labels parameter is required for CounterVec
316        if buckets.is_some() {
317            return Err(anyhow::anyhow!(
318                "buckets parameter is not valid for CounterVec"
319            ));
320        }
321        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
322        for (key, value) in &updated_labels {
323            opts = opts.const_label(key.clone(), value.clone());
324        }
325        let label_names = const_labels
326            .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
327        T::with_opts_and_label_names(opts, label_names)?
328    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
329        // Special handling for GaugeVec with label names
330        // const_labels parameter is required for GaugeVec
331        if buckets.is_some() {
332            return Err(anyhow::anyhow!(
333                "buckets parameter is not valid for GaugeVec"
334            ));
335        }
336        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
337        for (key, value) in &updated_labels {
338            opts = opts.const_label(key.clone(), value.clone());
339        }
340        let label_names = const_labels
341            .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
342        T::with_opts_and_label_names(opts, label_names)?
343    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
344        // Special handling for Histogram with custom buckets
345        // buckets parameter is valid for Histogram, const_labels is not used
346        if const_labels.is_some() {
347            return Err(anyhow::anyhow!(
348                "const_labels parameter is not valid for Histogram"
349            ));
350        }
351        let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
352        for (key, value) in &updated_labels {
353            opts = opts.const_label(key.clone(), value.clone());
354        }
355        T::with_histogram_opts_and_buckets(opts, buckets)?
356    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
357        // Special handling for IntCounterVec with label names
358        // const_labels parameter is required for IntCounterVec
359        if buckets.is_some() {
360            return Err(anyhow::anyhow!(
361                "buckets parameter is not valid for IntCounterVec"
362            ));
363        }
364        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
365        for (key, value) in &updated_labels {
366            opts = opts.const_label(key.clone(), value.clone());
367        }
368        let label_names = const_labels
369            .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
370        T::with_opts_and_label_names(opts, label_names)?
371    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
372        // Special handling for IntGaugeVec with label names
373        // const_labels parameter is required for IntGaugeVec
374        if buckets.is_some() {
375            return Err(anyhow::anyhow!(
376                "buckets parameter is not valid for IntGaugeVec"
377            ));
378        }
379        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
380        for (key, value) in &updated_labels {
381            opts = opts.const_label(key.clone(), value.clone());
382        }
383        let label_names = const_labels
384            .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
385        T::with_opts_and_label_names(opts, label_names)?
386    } else {
387        // Standard handling for Counter, IntCounter, Gauge, IntGauge
388        // buckets and const_labels parameters are not valid for these types
389        if buckets.is_some() {
390            return Err(anyhow::anyhow!(
391                "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
392            ));
393        }
394        if const_labels.is_some() {
395            return Err(anyhow::anyhow!(
396                "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
397            ));
398        }
399        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
400        for (key, value) in &updated_labels {
401            opts = opts.const_label(key.clone(), value.clone());
402        }
403        T::with_opts(opts)?
404    };
405
406    let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
407    hierarchy.get_metrics_registry().add_metric(collector)?;
408
409    Ok(prometheus_metric)
410}
411
412/// Wrapper struct that provides access to metrics functionality
413/// This struct is accessed via the `.metrics()` method on DistributedRuntime, Namespace, Component, and Endpoint
414pub struct Metrics<H: MetricsHierarchy> {
415    hierarchy: H,
416}
417
418impl<H: MetricsHierarchy> Metrics<H> {
419    pub fn new(hierarchy: H) -> Self {
420        Self { hierarchy }
421    }
422
423    // TODO: Add support for additional Prometheus metric types:
424    // - Counter: ✅ IMPLEMENTED - create_counter()
425    // - CounterVec: ✅ IMPLEMENTED - create_countervec()
426    // - Gauge: ✅ IMPLEMENTED - create_gauge()
427    // - GaugeVec: ✅ IMPLEMENTED - create_gaugevec()
428    // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
429    // - Histogram: ✅ IMPLEMENTED - create_histogram()
430    // - HistogramVec with custom buckets: create_histogram_with_buckets()
431    // - Info: create_info() - for info metrics with labels
432    // - IntCounter: ✅ IMPLEMENTED - create_intcounter()
433    // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
434    // - IntGauge: ✅ IMPLEMENTED - create_intgauge()
435    // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
436    // - Stateset: create_stateset() - for state-based metrics
437    // - Summary: create_summary() - for quantiles and sum/count metrics
438    // - SummaryVec: create_summary_vec() - for labeled summaries
439    // - Untyped: create_untyped() - for untyped metrics
440    //
441    // NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics
442    // Keep them synchronized when adding new metric types
443
444    /// Create a Counter metric
445    pub fn create_counter(
446        &self,
447        name: &str,
448        description: &str,
449        labels: &[(&str, &str)],
450    ) -> anyhow::Result<prometheus::Counter> {
451        create_metric(&self.hierarchy, name, description, labels, None, None)
452    }
453
454    /// Create a CounterVec metric with label names (for dynamic labels)
455    pub fn create_countervec(
456        &self,
457        name: &str,
458        description: &str,
459        const_labels: &[&str],
460        const_label_values: &[(&str, &str)],
461    ) -> anyhow::Result<prometheus::CounterVec> {
462        create_metric(
463            &self.hierarchy,
464            name,
465            description,
466            const_label_values,
467            None,
468            Some(const_labels),
469        )
470    }
471
472    /// Create a Gauge metric
473    pub fn create_gauge(
474        &self,
475        name: &str,
476        description: &str,
477        labels: &[(&str, &str)],
478    ) -> anyhow::Result<prometheus::Gauge> {
479        create_metric(&self.hierarchy, name, description, labels, None, None)
480    }
481
482    /// Create a GaugeVec metric with label names (for dynamic labels)
483    pub fn create_gaugevec(
484        &self,
485        name: &str,
486        description: &str,
487        const_labels: &[&str],
488        const_label_values: &[(&str, &str)],
489    ) -> anyhow::Result<prometheus::GaugeVec> {
490        create_metric(
491            &self.hierarchy,
492            name,
493            description,
494            const_label_values,
495            None,
496            Some(const_labels),
497        )
498    }
499
500    /// Create a Histogram metric with custom buckets
501    pub fn create_histogram(
502        &self,
503        name: &str,
504        description: &str,
505        labels: &[(&str, &str)],
506        buckets: Option<Vec<f64>>,
507    ) -> anyhow::Result<prometheus::Histogram> {
508        create_metric(&self.hierarchy, name, description, labels, buckets, None)
509    }
510
511    /// Create an IntCounter metric
512    pub fn create_intcounter(
513        &self,
514        name: &str,
515        description: &str,
516        labels: &[(&str, &str)],
517    ) -> anyhow::Result<prometheus::IntCounter> {
518        create_metric(&self.hierarchy, name, description, labels, None, None)
519    }
520
521    /// Create an IntCounterVec metric with label names (for dynamic labels)
522    pub fn create_intcountervec(
523        &self,
524        name: &str,
525        description: &str,
526        const_labels: &[&str],
527        const_label_values: &[(&str, &str)],
528    ) -> anyhow::Result<prometheus::IntCounterVec> {
529        create_metric(
530            &self.hierarchy,
531            name,
532            description,
533            const_label_values,
534            None,
535            Some(const_labels),
536        )
537    }
538
539    /// Create an IntGauge metric
540    pub fn create_intgauge(
541        &self,
542        name: &str,
543        description: &str,
544        labels: &[(&str, &str)],
545    ) -> anyhow::Result<prometheus::IntGauge> {
546        create_metric(&self.hierarchy, name, description, labels, None, None)
547    }
548
549    /// Create an IntGaugeVec metric with label names (for dynamic labels)
550    pub fn create_intgaugevec(
551        &self,
552        name: &str,
553        description: &str,
554        const_labels: &[&str],
555        const_label_values: &[(&str, &str)],
556    ) -> anyhow::Result<prometheus::IntGaugeVec> {
557        create_metric(
558            &self.hierarchy,
559            name,
560            description,
561            const_label_values,
562            None,
563            Some(const_labels),
564        )
565    }
566
567    /// Get metrics in Prometheus text format
568    pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
569        self.hierarchy
570            .get_metrics_registry()
571            .prometheus_expfmt_combined()
572    }
573}
574
575/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
576/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
577/// generating output in Prometheus text format.
578use crate::traits::DistributedRuntimeProvider;
579
580pub trait MetricsHierarchy: Send + Sync {
581    // ========================================================================
582    // Required methods - must be implemented by all types
583    // ========================================================================
584
585    /// Get the name of this hierarchy (without any hierarchy prefix)
586    fn basename(&self) -> String;
587
588    /// Get the parent hierarchies as actual objects (not strings)
589    /// Returns a vector of hierarchy references, ordered from root to immediate parent.
590    /// For example, an Endpoint would return [DRT, Namespace, Component].
591    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
592
593    /// Get a reference to this hierarchy's metrics registry
594    fn get_metrics_registry(&self) -> &MetricsRegistry;
595
596    // ========================================================================
597    // Provided methods - have default implementations
598    // ========================================================================
599
600    /// Get the connection ID (discovery instance ID) for this hierarchy level.
601    ///
602    /// Returns `Some(id)` when the hierarchy has access to the DistributedRuntime
603    /// (e.g. Namespace, Component, Endpoint). Used by `create_metric()` to auto-inject
604    /// the `worker_id` label. Returns `None` by default.
605    fn connection_id(&self) -> Option<u64> {
606        None
607    }
608
609    /// Access the metrics interface for this hierarchy
610    /// This is a provided method that works for any type implementing MetricsHierarchy
611    fn metrics(&self) -> Metrics<&Self>
612    where
613        Self: Sized,
614    {
615        Metrics::new(self)
616    }
617}
618
619// Blanket implementation for references to types that implement MetricsHierarchy
620impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
621    fn basename(&self) -> String {
622        (**self).basename()
623    }
624
625    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
626        (**self).parent_hierarchies()
627    }
628
629    fn get_metrics_registry(&self) -> &MetricsRegistry {
630        (**self).get_metrics_registry()
631    }
632
633    fn connection_id(&self) -> Option<u64> {
634        (**self).connection_id()
635    }
636}
637
638/// Type alias for runtime callback functions to reduce complexity
639///
640/// This type represents an Arc-wrapped callback function that can be:
641/// - Shared efficiently across multiple threads and contexts
642/// - Cloned without duplicating the underlying closure
643/// - Used in generic contexts requiring 'static lifetime
644///
645/// The Arc wrapper is included in the type to make sharing explicit.
646pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
647
648/// Type alias for exposition text callback functions that return Prometheus text
649pub type PrometheusExpositionFormatCallback =
650    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
651
652/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy.
653///
654/// All fields are Arc-wrapped, so cloning shares state. This ensures metrics registered
655/// on cloned instances (e.g., cloned Client/Endpoint) are visible to the original.
656#[derive(Clone)]
657pub struct MetricsRegistry {
658    /// The Prometheus registry for this hierarchy.
659    /// Arc-wrapped so clones share the same registry (metrics registered on clones are visible everywhere).
660    pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
661
662    /// Child registries included when emitting combined `/metrics` output.
663    ///
664    /// Why this exists:
665    /// - Previously, `create_metric()` registered every collector into *all* parent registries
666    ///   (Endpoint → Component → Namespace → DRT) so scraping the root registry included everything.
667    /// - That fan-out caused Prometheus collisions when different endpoints tried to register the
668    ///   same metric name with different const-labels (descriptor mismatch).
669    ///
670    /// We now register metrics only into the local hierarchy registry to avoid collisions.
671    /// `child_registries` rebuilds “what to scrape” as a tree of registries so `/metrics` can:
672    /// - traverse registries recursively,
673    /// - merge metric families into one exposition payload,
674    /// - warn/drop exact duplicate series, while allowing same metric name with different labels.
675    child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
676
677    /// Update callbacks invoked before metrics are scraped.
678    /// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned).
679    pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
680
681    /// Callbacks that return Prometheus exposition text appended to metrics output.
682    /// Wrapped in Arc to preserve callbacks across clones (e.g., vLLM callbacks registered at Endpoint remain accessible at DRT).
683    pub prometheus_expfmt_callbacks:
684        Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
685}
686
687impl std::fmt::Debug for MetricsRegistry {
688    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
689        f.debug_struct("MetricsRegistry")
690            .field("prometheus_registry", &"<RwLock<Registry>>")
691            .field(
692                "prometheus_update_callbacks",
693                &format!(
694                    "<RwLock<Vec<Callback>>> with {} callbacks",
695                    self.prometheus_update_callbacks.read().unwrap().len()
696                ),
697            )
698            .field(
699                "prometheus_expfmt_callbacks",
700                &format!(
701                    "<RwLock<Vec<Callback>>> with {} callbacks",
702                    self.prometheus_expfmt_callbacks.read().unwrap().len()
703                ),
704            )
705            .finish()
706    }
707}
708
709impl MetricsRegistry {
710    /// Create a new metrics registry with an empty Prometheus registry and callback lists
711    pub fn new() -> Self {
712        Self {
713            prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
714            child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
715            prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
716            prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
717        }
718    }
719
720    /// Add a child registry to be included in combined /metrics output.
721    ///
722    /// Dedup is by underlying Prometheus registry pointer, so repeated registration via clones is safe.
723    pub fn add_child_registry(&self, child: &MetricsRegistry) {
724        let child_ptr = Arc::as_ptr(&child.prometheus_registry);
725        let mut guard = self.child_registries.write().unwrap();
726        if guard
727            .iter()
728            .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
729        {
730            return;
731        }
732        guard.push(child.clone());
733    }
734
735    fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
736        // Traverse child registries recursively so `prometheus_expfmt()` on any hierarchy
737        // (DRT/namespace/component/endpoint) includes metrics from its descendants.
738        //
739        // Dedup by underlying Prometheus registry pointer so multiple paths (e.g. also registering
740        // directly on the root) won't duplicate output.
741        fn visit(
742            registry: &MetricsRegistry,
743            out: &mut Vec<MetricsRegistry>,
744            seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
745        ) {
746            let ptr = Arc::as_ptr(&registry.prometheus_registry);
747            if !seen.insert(ptr) {
748                return;
749            }
750
751            out.push(registry.clone());
752
753            let children: Vec<MetricsRegistry> = registry
754                .child_registries
755                .read()
756                .unwrap()
757                .iter()
758                .cloned()
759                .collect();
760            for child in children {
761                visit(&child, out, seen);
762            }
763        }
764
765        let mut out = Vec::new();
766        let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
767        visit(self, &mut out, &mut seen);
768        out
769    }
770
771    /// Combine metrics across this registry and all registered children into one Prometheus exposition output.
772    ///
773    /// - Families are merged by name; HELP and TYPE must match.
774    /// - Multiple series for the same name are allowed if labels differ.
775    /// - Exact duplicate series (same name + identical label pairs) are warned and dropped.
776    pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
777        let registries = self.registries_for_combined_scrape();
778
779        // Run per-registry update callbacks first.
780        for registry in &registries {
781            for result in registry.execute_update_callbacks() {
782                if let Err(e) = result {
783                    tracing::error!("Error executing metrics callback: {e}");
784                }
785            }
786        }
787
788        // Merge metric families.
789        let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
790        let mut seen_series: HashSet<String> = HashSet::new();
791
792        for (registry_idx, registry) in registries.iter().enumerate() {
793            let families = registry.get_prometheus_registry().gather();
794            for mut family in families {
795                let name = family.name().to_string();
796
797                let entry = by_name.entry(name.clone()).or_insert_with(|| {
798                    let mut out = prometheus::proto::MetricFamily::new();
799                    out.set_name(name.clone());
800                    out.set_help(family.help().to_string());
801                    out.set_field_type(family.get_field_type());
802                    out
803                });
804
805                if entry.help() != family.help()
806                    || entry.get_field_type() != family.get_field_type()
807                {
808                    return Err(anyhow::anyhow!(
809                        "Metric family '{}' has inconsistent help/type across registries (idx={})",
810                        name,
811                        registry_idx
812                    ));
813                }
814
815                let mut metrics = family.take_metric();
816                for metric in metrics.drain(..) {
817                    let mut labels: Vec<(String, String)> = metric
818                        .get_label()
819                        .iter()
820                        .map(|lp| (lp.name().to_string(), lp.value().to_string()))
821                        .collect();
822                    labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
823
824                    let key = format!(
825                        "{}|{}",
826                        name,
827                        labels
828                            .iter()
829                            .map(|(k, v)| format!("{}={}", k, v))
830                            .collect::<Vec<_>>()
831                            .join(",")
832                    );
833
834                    if !seen_series.insert(key) {
835                        tracing::warn!(
836                            metric_name = %name,
837                            labels = ?labels,
838                            registry_idx,
839                            "Duplicate Prometheus series while merging registries; dropping later sample"
840                        );
841                        continue;
842                    }
843
844                    entry.mut_metric().push(metric);
845                }
846            }
847        }
848
849        let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
850        merged.sort_by(|a, b| a.name().cmp(b.name()));
851
852        let encoder = prometheus::TextEncoder::new();
853        let mut buffer = Vec::new();
854        encoder.encode(&merged, &mut buffer)?;
855        let mut result = String::from_utf8(buffer)?;
856
857        // Append expfmt callbacks deterministically in registry order.
858        let mut expfmt = String::new();
859        for registry in registries {
860            let text = registry.execute_expfmt_callbacks();
861            if !text.is_empty() {
862                if !expfmt.is_empty() && !expfmt.ends_with('\n') {
863                    expfmt.push('\n');
864                }
865                expfmt.push_str(&text);
866            }
867        }
868
869        if !expfmt.is_empty() {
870            if !result.ends_with('\n') {
871                result.push('\n');
872            }
873            result.push_str(&expfmt);
874        }
875
876        Ok(result)
877    }
878
879    /// Add a callback function that receives a reference to any MetricsHierarchy
880    pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
881        self.prometheus_update_callbacks
882            .write()
883            .unwrap()
884            .push(callback);
885    }
886
887    /// Add an exposition text callback that returns Prometheus text
888    pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
889        self.prometheus_expfmt_callbacks
890            .write()
891            .unwrap()
892            .push(callback);
893    }
894
895    /// Execute all update callbacks and return their results
896    pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
897        self.prometheus_update_callbacks
898            .read()
899            .unwrap()
900            .iter()
901            .map(|callback| callback())
902            .collect()
903    }
904
905    /// Execute all exposition text callbacks and return their concatenated text
906    pub fn execute_expfmt_callbacks(&self) -> String {
907        let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
908        let mut result = String::new();
909        for callback in callbacks.iter() {
910            match callback() {
911                Ok(text) => {
912                    if !text.is_empty() {
913                        if !result.is_empty() && !result.ends_with('\n') {
914                            result.push('\n');
915                        }
916                        result.push_str(&text);
917                    }
918                }
919                Err(e) => {
920                    tracing::error!("Error executing exposition text callback: {e}");
921                }
922            }
923        }
924        result
925    }
926
927    /// Add a Prometheus metric collector to this registry
928    pub fn add_metric(
929        &self,
930        collector: Box<dyn prometheus::core::Collector>,
931    ) -> anyhow::Result<()> {
932        self.prometheus_registry
933            .write()
934            .unwrap()
935            .register(collector)
936            .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
937    }
938
939    /// Add a Prometheus metric collector, logging a warning on failure instead of returning an error.
940    pub fn add_metric_or_warn(&self, collector: Box<dyn prometheus::core::Collector>, name: &str) {
941        if let Err(e) = self.add_metric(collector) {
942            tracing::warn!(error = %e, metric = name, "Failed to register metric");
943        }
944    }
945
946    /// Get a read guard to the Prometheus registry for scraping
947    pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
948        self.prometheus_registry.read().unwrap()
949    }
950
951    /// Returns true if a metric with the given name already exists in the Prometheus registry
952    pub fn has_metric_named(&self, metric_name: &str) -> bool {
953        self.prometheus_registry
954            .read()
955            .unwrap()
956            .gather()
957            .iter()
958            .any(|mf| mf.name() == metric_name)
959    }
960}
961
962impl Default for MetricsRegistry {
963    fn default() -> Self {
964        Self::new()
965    }
966}
967
968#[cfg(test)]
969mod test_helpers {
970    use super::prometheus_names::name_prefix;
971    use super::*;
972
973    /// Base function to filter Prometheus output lines based on a predicate.
974    /// Returns lines that match the predicate, converted to String.
975    fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
976    where
977        F: FnMut(&str) -> bool,
978    {
979        input
980            .lines()
981            .filter(|line| predicate(line))
982            .map(|line| line.to_string())
983            .collect::<Vec<_>>()
984    }
985
986    /// Extracts all component metrics (excluding help text and type definitions).
987    /// Returns only the actual metric lines with values.
988    pub fn extract_metrics(input: &str) -> Vec<String> {
989        filter_prometheus_lines(input, |line| {
990            line.starts_with(&format!("{}_", name_prefix::COMPONENT))
991                && !line.starts_with("#")
992                && !line.trim().is_empty()
993        })
994    }
995
996    /// Parses a Prometheus metric line and extracts the name, labels, and value.
997    /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state.
998    ///
999    /// # Example
1000    /// ```
1001    /// let line = "http_requests_total{method=\"GET\"} 1234";
1002    /// let (name, labels, value) = parse_prometheus_metric(line).unwrap();
1003    /// assert_eq!(name, "http_requests_total");
1004    /// assert_eq!(labels.get("method"), Some(&"GET".to_string()));
1005    /// assert_eq!(value, 1234.0);
1006    /// ```
1007    pub fn parse_prometheus_metric(
1008        line: &str,
1009    ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
1010        if line.trim().is_empty() || line.starts_with('#') {
1011            return None;
1012        }
1013
1014        let parts: Vec<&str> = line.split_whitespace().collect();
1015        if parts.len() < 2 {
1016            return None;
1017        }
1018
1019        let metric_part = parts[0];
1020        let value: f64 = parts[1].parse().ok()?;
1021
1022        let (name, labels) = if metric_part.contains('{') {
1023            let brace_start = metric_part.find('{').unwrap();
1024            let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
1025            let name = &metric_part[..brace_start];
1026            let labels_str = &metric_part[brace_start + 1..brace_end];
1027
1028            let mut labels = std::collections::HashMap::new();
1029            for pair in labels_str.split(',') {
1030                if let Some((k, v)) = pair.split_once('=') {
1031                    let v = v.trim_matches('"');
1032                    labels.insert(k.trim().to_string(), v.to_string());
1033                }
1034            }
1035            (name.to_string(), labels)
1036        } else {
1037            (metric_part.to_string(), std::collections::HashMap::new())
1038        };
1039
1040        Some((name, labels, value))
1041    }
1042
1043    /// Injects a `worker_id` label into Prometheus metric data lines.
1044    /// Prometheus places const labels (like worker_id) before special labels
1045    /// (like histogram `le`), so for histogram bucket lines we insert before
1046    /// `,le=`. For all other metric lines, we insert before the closing `}`.
1047    /// Comment lines and lines without labels are left unchanged.
1048    pub fn inject_worker_id(expected: &str, wid: &str) -> String {
1049        let wid_label = format!(",worker_id=\"{}\"", wid);
1050        expected
1051            .lines()
1052            .map(|line| {
1053                if line.starts_with('#') || line.trim().is_empty() || !line.contains('{') {
1054                    line.to_string()
1055                } else if let Some(le_pos) = line.find(",le=") {
1056                    // Histogram bucket lines: worker_id is a const label, `le` is special,
1057                    // so worker_id sorts before `le` in Prometheus output.
1058                    let mut s = line.to_string();
1059                    s.insert_str(le_pos, &wid_label);
1060                    s
1061                } else {
1062                    line.replacen("}", &format!("{}}}", wid_label), 1)
1063                }
1064            })
1065            .collect::<Vec<_>>()
1066            .join("\n")
1067    }
1068}
1069
1070#[cfg(test)]
1071mod test_metricsregistry_units {
1072    use super::*;
1073
1074    #[test]
1075    fn test_build_component_metric_name_with_prefix() {
1076        // Test that build_component_metric_name correctly prepends the dynamo_component prefix
1077        let result = build_component_metric_name("requests");
1078        assert_eq!(result, "dynamo_component_requests");
1079
1080        let result = build_component_metric_name("counter");
1081        assert_eq!(result, "dynamo_component_counter");
1082    }
1083
1084    #[test]
1085    fn test_parse_prometheus_metric() {
1086        use super::test_helpers::parse_prometheus_metric;
1087        use std::collections::HashMap;
1088
1089        // Test parsing a metric with labels
1090        let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
1091        let parsed = parse_prometheus_metric(line);
1092        assert!(parsed.is_some());
1093
1094        let (name, labels, value) = parsed.unwrap();
1095        assert_eq!(name, "http_requests_total");
1096
1097        let mut expected_labels = HashMap::new();
1098        expected_labels.insert("method".to_string(), "GET".to_string());
1099        expected_labels.insert("status".to_string(), "200".to_string());
1100        assert_eq!(labels, expected_labels);
1101
1102        assert_eq!(value, 1234.0);
1103
1104        // Test parsing a metric without labels
1105        let line = "cpu_usage 98.5";
1106        let parsed = parse_prometheus_metric(line);
1107        assert!(parsed.is_some());
1108
1109        let (name, labels, value) = parsed.unwrap();
1110        assert_eq!(name, "cpu_usage");
1111        assert!(labels.is_empty());
1112        assert_eq!(value, 98.5);
1113
1114        // Test parsing a metric with float value
1115        let line = "response_time{service=\"api\"} 0.123";
1116        let parsed = parse_prometheus_metric(line);
1117        assert!(parsed.is_some());
1118
1119        let (name, labels, value) = parsed.unwrap();
1120        assert_eq!(name, "response_time");
1121
1122        let mut expected_labels = HashMap::new();
1123        expected_labels.insert("service".to_string(), "api".to_string());
1124        assert_eq!(labels, expected_labels);
1125
1126        assert_eq!(value, 0.123);
1127
1128        // Test parsing invalid lines
1129        assert!(parse_prometheus_metric("").is_none()); // Empty line
1130        assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text
1131        assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition
1132        assert!(parse_prometheus_metric("metric_name").is_none()); // No value
1133
1134        println!("✓ Prometheus metric parsing works correctly!");
1135    }
1136
1137    #[test]
1138    fn test_metrics_registry_entry_callbacks() {
1139        use crate::MetricsRegistry;
1140        use std::sync::atomic::{AtomicUsize, Ordering};
1141
1142        // Test 1: Basic callback execution with counter increments
1143        {
1144            let registry = MetricsRegistry::new();
1145            let counter = Arc::new(AtomicUsize::new(0));
1146
1147            // Add callbacks with different increment values
1148            for increment in [1, 10, 100] {
1149                let counter_clone = counter.clone();
1150                registry.add_update_callback(Arc::new(move || {
1151                    counter_clone.fetch_add(increment, Ordering::SeqCst);
1152                    Ok(())
1153                }));
1154            }
1155
1156            // Verify counter starts at 0
1157            assert_eq!(counter.load(Ordering::SeqCst), 0);
1158
1159            // First execution
1160            let results = registry.execute_update_callbacks();
1161            assert_eq!(results.len(), 3);
1162            assert!(results.iter().all(|r| r.is_ok()));
1163            assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
1164
1165            // Second execution - callbacks should be reusable
1166            let results = registry.execute_update_callbacks();
1167            assert_eq!(results.len(), 3);
1168            assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
1169
1170            // Test cloning - cloned entry shares callbacks (callbacks are Arc-wrapped)
1171            let cloned = registry.clone();
1172            assert_eq!(cloned.execute_update_callbacks().len(), 3);
1173            assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
1174
1175            // Original still has callbacks and shares the same Arc
1176            registry.execute_update_callbacks();
1177            assert_eq!(counter.load(Ordering::SeqCst), 444); // 333 + 111
1178        }
1179
1180        // Test 2: Mixed success and error callbacks
1181        {
1182            let registry = MetricsRegistry::new();
1183            let counter = Arc::new(AtomicUsize::new(0));
1184
1185            // Successful callback
1186            let counter_clone = counter.clone();
1187            registry.add_update_callback(Arc::new(move || {
1188                counter_clone.fetch_add(1, Ordering::SeqCst);
1189                Ok(())
1190            }));
1191
1192            // Error callback
1193            registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1194
1195            // Another successful callback
1196            let counter_clone = counter.clone();
1197            registry.add_update_callback(Arc::new(move || {
1198                counter_clone.fetch_add(10, Ordering::SeqCst);
1199                Ok(())
1200            }));
1201
1202            // Execute and verify mixed results
1203            let results = registry.execute_update_callbacks();
1204            assert_eq!(results.len(), 3);
1205            assert!(results[0].is_ok());
1206            assert!(results[1].is_err());
1207            assert!(results[2].is_ok());
1208
1209            // Verify error message
1210            assert_eq!(
1211                results[1].as_ref().unwrap_err().to_string(),
1212                "Simulated error"
1213            );
1214
1215            // Verify successful callbacks still executed
1216            assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
1217
1218            // Execute again - errors should be consistent
1219            let results = registry.execute_update_callbacks();
1220            assert!(results[1].is_err());
1221            assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
1222        }
1223
1224        // Test 3: Empty registry
1225        {
1226            let registry = MetricsRegistry::new();
1227            let results = registry.execute_update_callbacks();
1228            assert_eq!(results.len(), 0);
1229        }
1230    }
1231}
1232
1233#[cfg(feature = "integration")]
1234#[cfg(test)]
1235mod test_metricsregistry_prefixes {
1236    use super::*;
1237    use crate::distributed::distributed_test_utils::create_test_drt_async;
1238    use prometheus::core::Collector;
1239
1240    #[tokio::test]
1241    async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1242        let drt = create_test_drt_async().await;
1243
1244        const DRT_NAME: &str = "";
1245        const NAMESPACE_NAME: &str = "ns901";
1246        const COMPONENT_NAME: &str = "comp901";
1247        const ENDPOINT_NAME: &str = "ep901";
1248        let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1249        let component = namespace.component(COMPONENT_NAME).unwrap();
1250        let endpoint = component.endpoint(ENDPOINT_NAME);
1251
1252        // DRT
1253        assert_eq!(drt.basename(), DRT_NAME);
1254        assert_eq!(drt.parent_hierarchies().len(), 0);
1255        // DRT hierarchy is just its basename (empty string)
1256
1257        // Namespace
1258        assert_eq!(namespace.basename(), NAMESPACE_NAME);
1259        assert_eq!(namespace.parent_hierarchies().len(), 1);
1260        assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1261        // Namespace hierarchy is just its basename since parent is empty
1262
1263        // Component
1264        assert_eq!(component.basename(), COMPONENT_NAME);
1265        assert_eq!(component.parent_hierarchies().len(), 2);
1266        assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1267        assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1268        // Component hierarchy structure is validated by the individual assertions above
1269
1270        // Endpoint
1271        assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1272        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1273        assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1274        assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1275        assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1276        // Endpoint hierarchy structure is validated by the individual assertions above
1277
1278        // Relationships
1279        assert!(
1280            namespace
1281                .parent_hierarchies()
1282                .iter()
1283                .any(|h| h.basename() == drt.basename())
1284        );
1285        assert!(
1286            component
1287                .parent_hierarchies()
1288                .iter()
1289                .any(|h| h.basename() == namespace.basename())
1290        );
1291        assert!(
1292            endpoint
1293                .parent_hierarchies()
1294                .iter()
1295                .any(|h| h.basename() == component.basename())
1296        );
1297
1298        // Depth
1299        assert_eq!(drt.parent_hierarchies().len(), 0);
1300        assert_eq!(namespace.parent_hierarchies().len(), 1);
1301        assert_eq!(component.parent_hierarchies().len(), 2);
1302        assert_eq!(endpoint.parent_hierarchies().len(), 3);
1303
1304        // Invalid namespace behavior - sanitizes to "_123" and succeeds
1305        // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
1306        // so invalid characters are sanitized in MetricsRegistry rather than rejected.
1307        let invalid_namespace = drt.namespace("@@123").unwrap();
1308        let result =
1309            invalid_namespace
1310                .metrics()
1311                .create_counter("test_counter", "A test counter", &[]);
1312        assert!(result.is_ok());
1313        if let Ok(counter) = &result {
1314            // Verify the namespace was sanitized to "_123" in the label
1315            let desc = counter.desc();
1316            let namespace_label = desc[0]
1317                .const_label_pairs
1318                .iter()
1319                .find(|l| l.name() == "dynamo_namespace")
1320                .expect("Should have dynamo_namespace label");
1321            assert_eq!(namespace_label.value(), "_123");
1322        }
1323
1324        // Valid namespace works
1325        let valid_namespace = drt.namespace("ns567").unwrap();
1326        assert!(
1327            valid_namespace
1328                .metrics()
1329                .create_counter("test_counter", "A test counter", &[])
1330                .is_ok()
1331        );
1332    }
1333
1334    #[tokio::test]
1335    async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
1336        // Sanity test: if an expfmt callback is registered only on the endpoint registry,
1337        // scraping from the root (DRT) should still include it exactly once via the
1338        // child-registry traversal.
1339        let drt = create_test_drt_async().await;
1340        let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
1341        let component = namespace.component("comp_expfmt_ep_only").unwrap();
1342        let endpoint = component.endpoint("ep_expfmt_ep_only");
1343
1344        let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
1345        let callback: PrometheusExpositionFormatCallback =
1346            Arc::new(move || Ok(metric_line.to_string()));
1347
1348        endpoint
1349            .get_metrics_registry()
1350            .add_expfmt_callback(callback);
1351
1352        let output = drt.metrics().prometheus_expfmt().unwrap();
1353        let occurrences = output
1354            .lines()
1355            .filter(|line| line == &metric_line.trim_end_matches('\n'))
1356            .count();
1357
1358        assert_eq!(
1359            occurrences, 1,
1360            "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
1361            occurrences, output
1362        );
1363    }
1364
1365    #[tokio::test]
1366    async fn test_recursive_namespace() {
1367        // Create a distributed runtime for testing
1368        let drt = create_test_drt_async().await;
1369
1370        // Create a deeply chained namespace: ns1.ns2.ns3
1371        let ns1 = drt.namespace("ns1").unwrap();
1372        let ns2 = ns1.namespace("ns2").unwrap();
1373        let ns3 = ns2.namespace("ns3").unwrap();
1374
1375        // Create a component in the deepest namespace
1376        let component = ns3.component("test-component").unwrap();
1377
1378        // Verify the hierarchy structure
1379        assert_eq!(ns1.basename(), "ns1");
1380        assert_eq!(ns1.parent_hierarchies().len(), 1);
1381        assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1382        // ns1 hierarchy is just its basename since parent is empty
1383
1384        assert_eq!(ns2.basename(), "ns2");
1385        assert_eq!(ns2.parent_hierarchies().len(), 2);
1386        assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1387        assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1388        // ns2 hierarchy structure validated by parent assertions above
1389
1390        assert_eq!(ns3.basename(), "ns3");
1391        assert_eq!(ns3.parent_hierarchies().len(), 3);
1392        assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1393        assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1394        assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1395        // ns3 hierarchy structure validated by parent assertions above
1396
1397        assert_eq!(component.basename(), "test-component");
1398        assert_eq!(component.parent_hierarchies().len(), 4);
1399        assert_eq!(component.parent_hierarchies()[0].basename(), "");
1400        assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1401        assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1402        assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1403        // component hierarchy structure validated by parent assertions above
1404
1405        println!("✓ Chained namespace test passed - all prefixes correct");
1406    }
1407}
1408
1409#[cfg(feature = "integration")]
1410#[cfg(test)]
1411mod test_metricsregistry_prometheus_fmt_outputs {
1412    use super::prometheus_names::name_prefix;
1413    use super::*;
1414    use crate::distributed::distributed_test_utils::create_test_drt_async;
1415    use prometheus::Counter;
1416    use std::sync::Arc;
1417
1418    #[tokio::test]
1419    async fn test_prometheusfactory_using_metrics_registry_trait() {
1420        // Setup real DRT and registry using the test-friendly constructor
1421        let drt = create_test_drt_async().await;
1422
1423        // Use a simple constant namespace name
1424        let namespace_name = "ns345";
1425
1426        let namespace = drt.namespace(namespace_name).unwrap();
1427        let component = namespace.component("comp345").unwrap();
1428        let endpoint = component.endpoint("ep345");
1429
1430        // Test Counter creation
1431        let counter = endpoint
1432            .metrics()
1433            .create_counter("testcounter", "A test counter", &[])
1434            .unwrap();
1435        counter.inc_by(123.456789);
1436        let epsilon = 0.01;
1437        assert!((counter.get() - 123.456789).abs() < epsilon);
1438
1439        let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1440        println!("Endpoint output:");
1441        println!("{}", endpoint_output_raw);
1442
1443        // worker_id is runtime-generated (etcd lease ID), so we grab it from the DRT
1444        // and inject it into expected strings via the inject_worker_id helper.
1445        let wid = format!("{:x}", drt.connection_id());
1446        use super::test_helpers::inject_worker_id;
1447
1448        let expected_endpoint_output = inject_worker_id(
1449            r#"# HELP dynamo_component_testcounter A test counter
1450# TYPE dynamo_component_testcounter counter
1451dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#,
1452            &wid,
1453        );
1454
1455        assert_eq!(
1456            endpoint_output_raw.trim_end_matches('\n'),
1457            expected_endpoint_output.trim_end_matches('\n'),
1458            "\n=== ENDPOINT COMPARISON FAILED ===\n\
1459             Actual:\n{}\n\
1460             Expected:\n{}\n\
1461             ==============================",
1462            endpoint_output_raw,
1463            expected_endpoint_output
1464        );
1465
1466        // Test Gauge creation
1467        let gauge = component
1468            .metrics()
1469            .create_gauge("testgauge", "A test gauge", &[])
1470            .unwrap();
1471        gauge.set(50000.0);
1472        assert_eq!(gauge.get(), 50000.0);
1473
1474        // Test Prometheus format output for Component (gauge + histogram)
1475        let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1476        println!("Component output:");
1477        println!("{}", component_output_raw);
1478
1479        let expected_component_output = inject_worker_id(
1480            r#"# HELP dynamo_component_testcounter A test counter
1481# TYPE dynamo_component_testcounter counter
1482dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1483# HELP dynamo_component_testgauge A test gauge
1484# TYPE dynamo_component_testgauge gauge
1485dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#,
1486            &wid,
1487        );
1488
1489        assert_eq!(
1490            component_output_raw.trim_end_matches('\n'),
1491            expected_component_output.trim_end_matches('\n'),
1492            "\n=== COMPONENT COMPARISON FAILED ===\n\
1493             Actual:\n{}\n\
1494             Expected:\n{}\n\
1495             ==============================",
1496            component_output_raw,
1497            expected_component_output
1498        );
1499
1500        let intcounter = namespace
1501            .metrics()
1502            .create_intcounter("testintcounter", "A test int counter", &[])
1503            .unwrap();
1504        intcounter.inc_by(12345);
1505        assert_eq!(intcounter.get(), 12345);
1506
1507        // Test Prometheus format output for Namespace (int_counter + gauge + histogram)
1508        let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1509        println!("Namespace output:");
1510        println!("{}", namespace_output_raw);
1511
1512        let expected_namespace_output = inject_worker_id(
1513            r#"# HELP dynamo_component_testcounter A test counter
1514# TYPE dynamo_component_testcounter counter
1515dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1516# HELP dynamo_component_testgauge A test gauge
1517# TYPE dynamo_component_testgauge gauge
1518dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1519# HELP dynamo_component_testintcounter A test int counter
1520# TYPE dynamo_component_testintcounter counter
1521dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#,
1522            &wid,
1523        );
1524
1525        assert_eq!(
1526            namespace_output_raw.trim_end_matches('\n'),
1527            expected_namespace_output.trim_end_matches('\n'),
1528            "\n=== NAMESPACE COMPARISON FAILED ===\n\
1529             Actual:\n{}\n\
1530             Expected:\n{}\n\
1531             ==============================",
1532            namespace_output_raw,
1533            expected_namespace_output
1534        );
1535
1536        // Test IntGauge creation
1537        let intgauge = namespace
1538            .metrics()
1539            .create_intgauge("testintgauge", "A test int gauge", &[])
1540            .unwrap();
1541        intgauge.set(42);
1542        assert_eq!(intgauge.get(), 42);
1543
1544        // Test IntGaugeVec creation
1545        let intgaugevec = namespace
1546            .metrics()
1547            .create_intgaugevec(
1548                "testintgaugevec",
1549                "A test int gauge vector",
1550                &["instance", "status"],
1551                &[("service", "api")],
1552            )
1553            .unwrap();
1554        intgaugevec
1555            .with_label_values(&["server1", "active"])
1556            .set(10);
1557        intgaugevec
1558            .with_label_values(&["server2", "inactive"])
1559            .set(0);
1560
1561        // Test CounterVec creation
1562        let countervec = endpoint
1563            .metrics()
1564            .create_countervec(
1565                "testcountervec",
1566                "A test counter vector",
1567                &["method", "status"],
1568                &[("service", "api")],
1569            )
1570            .unwrap();
1571        countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1572        countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1573
1574        // Test Histogram creation
1575        let histogram = component
1576            .metrics()
1577            .create_histogram("testhistogram", "A test histogram", &[], None)
1578            .unwrap();
1579        histogram.observe(1.0);
1580        histogram.observe(2.5);
1581        histogram.observe(4.0);
1582
1583        // Test Prometheus format output for DRT (all metrics combined)
1584        let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1585        println!("DRT output:");
1586        println!("{}", drt_output_raw);
1587
1588        // The uptime_seconds value is dynamic (depends on elapsed wall-clock time),
1589        // so we check all other lines exactly and validate uptime separately.
1590        let expected_drt_output_without_uptime = inject_worker_id(
1591            r#"# HELP dynamo_component_testcounter A test counter
1592# TYPE dynamo_component_testcounter counter
1593dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1594# HELP dynamo_component_testcountervec A test counter vector
1595# TYPE dynamo_component_testcountervec counter
1596dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1597dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1598# HELP dynamo_component_testgauge A test gauge
1599# TYPE dynamo_component_testgauge gauge
1600dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1601# HELP dynamo_component_testhistogram A test histogram
1602# TYPE dynamo_component_testhistogram histogram
1603dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1604dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1605dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1606dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1607dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1608dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1609dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1610dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1611dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1612dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1613dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1614dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1615dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1616dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1617# HELP dynamo_component_testintcounter A test int counter
1618# TYPE dynamo_component_testintcounter counter
1619dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1620# HELP dynamo_component_testintgauge A test int gauge
1621# TYPE dynamo_component_testintgauge gauge
1622dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1623# HELP dynamo_component_testintgaugevec A test int gauge vector
1624# TYPE dynamo_component_testintgaugevec gauge
1625dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1626dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#,
1627            &wid,
1628        );
1629
1630        // Split actual output into non-uptime lines and validate the uptime value line.
1631        // The uptime metric now carries a worker_id label, so we match on the metric name
1632        // prefix and extract the value as the last whitespace-delimited token.
1633        let mut non_uptime_lines = Vec::new();
1634        let mut saw_uptime_value = false;
1635        for line in drt_output_raw.trim_end_matches('\n').lines() {
1636            if line.starts_with("dynamo_component_uptime_seconds") && !line.starts_with('#') {
1637                let val_str = line.split_whitespace().last().unwrap();
1638                val_str.parse::<f64>().expect("uptime should be a float");
1639                saw_uptime_value = true;
1640            } else if line.starts_with("# HELP dynamo_component_uptime_seconds")
1641                || line.starts_with("# TYPE dynamo_component_uptime_seconds")
1642            {
1643                // Skip HELP/TYPE lines for uptime (we just verify it exists via the value)
1644            } else {
1645                non_uptime_lines.push(line);
1646            }
1647        }
1648        assert!(
1649            saw_uptime_value,
1650            "uptime_seconds metric should be present in initial scrape"
1651        );
1652
1653        let actual_without_uptime = non_uptime_lines.join("\n");
1654        assert_eq!(
1655            actual_without_uptime,
1656            expected_drt_output_without_uptime.trim_end_matches('\n'),
1657            "\n=== DRT COMPARISON FAILED (excluding uptime) ===\n\
1658             Expected:\n{}\n\
1659             Actual:\n{}\n\
1660             ==============================",
1661            expected_drt_output_without_uptime,
1662            actual_without_uptime
1663        );
1664
1665        // Wait briefly so the uptime gauge is clearly positive on the next scrape.
1666        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1667        let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
1668        let uptime_line = drt_output_after
1669            .lines()
1670            .find(|l| l.starts_with("dynamo_component_uptime_seconds") && !l.starts_with('#'))
1671            .expect("uptime_seconds metric should be present after sleep");
1672        let uptime_after: f64 = uptime_line
1673            .split_whitespace()
1674            .last()
1675            .unwrap()
1676            .parse()
1677            .expect("uptime should be a float");
1678        assert!(
1679            uptime_after > 0.0,
1680            "uptime_seconds should be > 0 after 10ms sleep, got {}",
1681            uptime_after
1682        );
1683
1684        println!("✓ All Prometheus format outputs verified successfully!");
1685    }
1686
1687    #[test]
1688    fn test_refactored_filter_functions() {
1689        // Test data with component metrics
1690        let test_input = r#"# HELP dynamo_component_requests Total requests
1691# TYPE dynamo_component_requests counter
1692dynamo_component_requests 42
1693# HELP dynamo_component_latency Response latency
1694# TYPE dynamo_component_latency histogram
1695dynamo_component_latency_bucket{le="0.1"} 10
1696dynamo_component_latency_bucket{le="0.5"} 25
1697dynamo_component_errors_total 5"#;
1698
1699        // Test extract_metrics (only actual metric lines, excluding help/type)
1700        let metrics_only = super::test_helpers::extract_metrics(test_input);
1701        assert_eq!(metrics_only.len(), 4); // 4 actual metric lines (excluding help/type)
1702        assert!(
1703            metrics_only
1704                .iter()
1705                .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1706        );
1707
1708        println!("✓ All refactored filter functions work correctly!");
1709    }
1710
1711    #[tokio::test]
1712    async fn test_same_metric_name_different_endpoints() {
1713        // Test that the same metric name can exist in different endpoints without collision.
1714        // This validates the multi-registry approach: each endpoint has its own registry,
1715        // and metrics are merged at scrape time with distinct labels.
1716        let drt = create_test_drt_async().await;
1717        let namespace = drt.namespace("ns_test").unwrap();
1718        let component = namespace.component("comp_test").unwrap();
1719
1720        // Create two endpoints with the same metric name
1721        let ep1 = component.endpoint("ep1");
1722        let ep2 = component.endpoint("ep2");
1723
1724        let counter1 = ep1
1725            .metrics()
1726            .create_counter("requests_total", "Total requests", &[])
1727            .unwrap();
1728        counter1.inc_by(100.0);
1729
1730        let counter2 = ep2
1731            .metrics()
1732            .create_counter("requests_total", "Total requests", &[])
1733            .unwrap();
1734        counter2.inc_by(200.0);
1735
1736        // Get merged Prometheus output from component level
1737        let output = component.metrics().prometheus_expfmt().unwrap();
1738
1739        let wid = format!("{:x}", drt.connection_id());
1740        use super::test_helpers::inject_worker_id;
1741
1742        let expected_output = inject_worker_id(
1743            r#"# HELP dynamo_component_requests_total Total requests
1744# TYPE dynamo_component_requests_total counter
1745dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
1746dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#,
1747            &wid,
1748        );
1749
1750        assert_eq!(
1751            output.trim_end_matches('\n'),
1752            expected_output.trim_end_matches('\n'),
1753            "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
1754             Actual:\n{}\n\
1755             Expected:\n{}\n\
1756             ==============================",
1757            output,
1758            expected_output
1759        );
1760
1761        println!("✓ Multi-registry prevents Prometheus collisions!");
1762    }
1763
1764    #[tokio::test]
1765    async fn test_duplicate_series_warning() {
1766        // Test that duplicate series (same metric name + same labels) are detected and deduplicated.
1767        // This should log a warning and keep only one of the duplicate series.
1768        let drt = create_test_drt_async().await;
1769        let namespace = drt.namespace("ns_dup").unwrap();
1770        let component = namespace.component("comp_dup").unwrap();
1771
1772        // Create two endpoints with counters that will have identical labels when scraped
1773        let ep1 = component.endpoint("ep_same");
1774        let ep2 = component.endpoint("ep_same"); // Same endpoint name = duplicate labels
1775
1776        let counter1 = ep1
1777            .metrics()
1778            .create_counter("dup_metric", "Duplicate metric test", &[])
1779            .unwrap();
1780        counter1.inc_by(50.0);
1781
1782        let counter2 = ep2
1783            .metrics()
1784            .create_counter("dup_metric", "Duplicate metric test", &[])
1785            .unwrap();
1786        counter2.inc_by(75.0);
1787
1788        // Get merged output - duplicates should be deduplicated
1789        let output = component.metrics().prometheus_expfmt().unwrap();
1790
1791        let wid = format!("{:x}", drt.connection_id());
1792        use super::test_helpers::inject_worker_id;
1793
1794        let expected_output = inject_worker_id(
1795            r#"# HELP dynamo_component_dup_metric Duplicate metric test
1796# TYPE dynamo_component_dup_metric counter
1797dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#,
1798            &wid,
1799        );
1800
1801        assert_eq!(
1802            output.trim_end_matches('\n'),
1803            expected_output.trim_end_matches('\n'),
1804            "\n=== DEDUPLICATION COMPARISON FAILED ===\n\
1805             Actual:\n{}\n\
1806             Expected:\n{}\n\
1807             ==============================",
1808            output,
1809            expected_output
1810        );
1811
1812        println!("✓ Duplicate series detection and deduplication works!");
1813    }
1814}