dynamo_runtime/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics registry trait and implementation for Prometheus metrics
5//!
6//! This module provides a trait-based interface for creating and managing Prometheus metrics
7//! with automatic label injection and hierarchical naming support.
8
9pub mod prometheus_names;
10
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22// Import commonly used items to avoid verbose prefixes
23use prometheus_names::{
24    COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
25    nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
26};
27
28// Pipeline imports for endpoint creation
29use crate::pipeline::{
30    AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31    network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37// If set to true, then metrics will be labeled with the namespace, component, and endpoint labels.
38// These labels are prefixed with "dynamo_" to avoid collisions with Kubernetes and other monitoring system labels.
39pub const USE_AUTO_LABELS: bool = true;
40
41// Prometheus imports
42use prometheus::Encoder;
43
44/// Validate that a label slice has no duplicate keys.
45/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key.
46fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47    let mut seen_keys = std::collections::HashSet::new();
48    for (key, _) in labels {
49        if !seen_keys.insert(*key) {
50            return Err(anyhow::anyhow!(
51                "Duplicate label key '{}' found in labels",
52                key
53            ));
54        }
55    }
56    Ok(())
57}
58
59/// Trait that defines common behavior for Prometheus metric types
60pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
61    /// Create a new metric with the given options
62    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
63    where
64        Self: Sized;
65
66    /// Create a new metric with histogram options and custom buckets
67    /// This is a default implementation that will panic for non-histogram metrics
68    fn with_histogram_opts_and_buckets(
69        _opts: prometheus::HistogramOpts,
70        _buckets: Option<Vec<f64>>,
71    ) -> Result<Self, prometheus::Error>
72    where
73        Self: Sized,
74    {
75        panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
76    }
77
78    /// Create a new metric with counter options and label names (for CounterVec)
79    /// This is a default implementation that will panic for non-countervec metrics
80    fn with_opts_and_label_names(
81        _opts: prometheus::Opts,
82        _label_names: &[&str],
83    ) -> Result<Self, prometheus::Error>
84    where
85        Self: Sized,
86    {
87        panic!("with_opts_and_label_names is not implemented for this metric type");
88    }
89}
90
91// Implement the trait for Counter, IntCounter, and Gauge
92impl PrometheusMetric for prometheus::Counter {
93    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
94        prometheus::Counter::with_opts(opts)
95    }
96}
97
98impl PrometheusMetric for prometheus::IntCounter {
99    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
100        prometheus::IntCounter::with_opts(opts)
101    }
102}
103
104impl PrometheusMetric for prometheus::Gauge {
105    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
106        prometheus::Gauge::with_opts(opts)
107    }
108}
109
110impl PrometheusMetric for prometheus::IntGauge {
111    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
112        prometheus::IntGauge::with_opts(opts)
113    }
114}
115
116impl PrometheusMetric for prometheus::GaugeVec {
117    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
118        Err(prometheus::Error::Msg(
119            "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
120        ))
121    }
122
123    fn with_opts_and_label_names(
124        opts: prometheus::Opts,
125        label_names: &[&str],
126    ) -> Result<Self, prometheus::Error> {
127        prometheus::GaugeVec::new(opts, label_names)
128    }
129}
130
131impl PrometheusMetric for prometheus::IntGaugeVec {
132    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
133        Err(prometheus::Error::Msg(
134            "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
135        ))
136    }
137
138    fn with_opts_and_label_names(
139        opts: prometheus::Opts,
140        label_names: &[&str],
141    ) -> Result<Self, prometheus::Error> {
142        prometheus::IntGaugeVec::new(opts, label_names)
143    }
144}
145
146impl PrometheusMetric for prometheus::IntCounterVec {
147    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
148        Err(prometheus::Error::Msg(
149            "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
150        ))
151    }
152
153    fn with_opts_and_label_names(
154        opts: prometheus::Opts,
155        label_names: &[&str],
156    ) -> Result<Self, prometheus::Error> {
157        prometheus::IntCounterVec::new(opts, label_names)
158    }
159}
160
161// Implement the trait for Histogram
162impl PrometheusMetric for prometheus::Histogram {
163    fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
164        // Convert Opts to HistogramOpts
165        let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
166        prometheus::Histogram::with_opts(histogram_opts)
167    }
168
169    fn with_histogram_opts_and_buckets(
170        mut opts: prometheus::HistogramOpts,
171        buckets: Option<Vec<f64>>,
172    ) -> Result<Self, prometheus::Error> {
173        if let Some(custom_buckets) = buckets {
174            opts = opts.buckets(custom_buckets);
175        }
176        prometheus::Histogram::with_opts(opts)
177    }
178}
179
180// Implement the trait for CounterVec
181impl PrometheusMetric for prometheus::CounterVec {
182    fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
183        // This will panic - CounterVec needs label names
184        panic!("CounterVec requires label names, use with_opts_and_label_names instead");
185    }
186
187    fn with_opts_and_label_names(
188        opts: prometheus::Opts,
189        label_names: &[&str],
190    ) -> Result<Self, prometheus::Error> {
191        prometheus::CounterVec::new(opts, label_names)
192    }
193}
194
195/// Private helper function to create metrics - not accessible to trait implementors
196fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
197    registry: &R,
198    metric_name: &str,
199    metric_desc: &str,
200    labels: &[(&str, &str)],
201    buckets: Option<Vec<f64>>,
202    const_labels: Option<&[&str]>,
203) -> anyhow::Result<T> {
204    // Validate that user-provided labels don't have duplicate keys
205    validate_no_duplicate_label_keys(labels)?;
206    // Note: stored labels functionality has been removed
207
208    let basename = registry.basename();
209    let parent_hierarchy = registry.parent_hierarchy();
210
211    // Build hierarchy: parent_hierarchy + [basename]
212    let hierarchy = [parent_hierarchy.clone(), vec![basename.clone()]].concat();
213
214    let metric_name = build_component_metric_name(metric_name);
215
216    // Build updated_labels: auto-labels first, then `labels` + stored labels
217    let mut updated_labels: Vec<(String, String)> = Vec::new();
218
219    if USE_AUTO_LABELS {
220        // Validate that user-provided labels don't conflict with auto-generated labels
221        for (key, _) in labels {
222            if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
223                return Err(anyhow::anyhow!(
224                    "Label '{}' is automatically added by auto_label feature and cannot be manually set",
225                    key
226                ));
227            }
228        }
229
230        // Add auto-generated labels with sanitized values
231        if hierarchy.len() > 1 {
232            let namespace = &hierarchy[1];
233            if !namespace.is_empty() {
234                let valid_namespace = sanitize_prometheus_label(namespace)?;
235                if !valid_namespace.is_empty() {
236                    updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
237                }
238            }
239        }
240        if hierarchy.len() > 2 {
241            let component = &hierarchy[2];
242            if !component.is_empty() {
243                let valid_component = sanitize_prometheus_label(component)?;
244                if !valid_component.is_empty() {
245                    updated_labels.push((labels::COMPONENT.to_string(), valid_component));
246                }
247            }
248        }
249        if hierarchy.len() > 3 {
250            let endpoint = &hierarchy[3];
251            if !endpoint.is_empty() {
252                let valid_endpoint = sanitize_prometheus_label(endpoint)?;
253                if !valid_endpoint.is_empty() {
254                    updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
255                }
256            }
257        }
258    }
259
260    // Add user labels
261    updated_labels.extend(
262        labels
263            .iter()
264            .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
265    );
266    // Note: stored labels functionality has been removed
267
268    // Handle different metric types
269    let prometheus_metric = if std::any::TypeId::of::<T>()
270        == std::any::TypeId::of::<prometheus::CounterVec>()
271    {
272        // Special handling for CounterVec with label names
273        // const_labels parameter is required for CounterVec
274        if buckets.is_some() {
275            return Err(anyhow::anyhow!(
276                "buckets parameter is not valid for CounterVec"
277            ));
278        }
279        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
280        for (key, value) in &updated_labels {
281            opts = opts.const_label(key.clone(), value.clone());
282        }
283        let label_names = const_labels
284            .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
285        T::with_opts_and_label_names(opts, label_names)?
286    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
287        // Special handling for GaugeVec with label names
288        // const_labels parameter is required for GaugeVec
289        if buckets.is_some() {
290            return Err(anyhow::anyhow!(
291                "buckets parameter is not valid for GaugeVec"
292            ));
293        }
294        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
295        for (key, value) in &updated_labels {
296            opts = opts.const_label(key.clone(), value.clone());
297        }
298        let label_names = const_labels
299            .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
300        T::with_opts_and_label_names(opts, label_names)?
301    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
302        // Special handling for Histogram with custom buckets
303        // buckets parameter is valid for Histogram, const_labels is not used
304        if const_labels.is_some() {
305            return Err(anyhow::anyhow!(
306                "const_labels parameter is not valid for Histogram"
307            ));
308        }
309        let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
310        for (key, value) in &updated_labels {
311            opts = opts.const_label(key.clone(), value.clone());
312        }
313        T::with_histogram_opts_and_buckets(opts, buckets)?
314    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
315        // Special handling for IntCounterVec with label names
316        // const_labels parameter is required for IntCounterVec
317        if buckets.is_some() {
318            return Err(anyhow::anyhow!(
319                "buckets parameter is not valid for IntCounterVec"
320            ));
321        }
322        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
323        for (key, value) in &updated_labels {
324            opts = opts.const_label(key.clone(), value.clone());
325        }
326        let label_names = const_labels
327            .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
328        T::with_opts_and_label_names(opts, label_names)?
329    } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
330        // Special handling for IntGaugeVec with label names
331        // const_labels parameter is required for IntGaugeVec
332        if buckets.is_some() {
333            return Err(anyhow::anyhow!(
334                "buckets parameter is not valid for IntGaugeVec"
335            ));
336        }
337        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
338        for (key, value) in &updated_labels {
339            opts = opts.const_label(key.clone(), value.clone());
340        }
341        let label_names = const_labels
342            .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
343        T::with_opts_and_label_names(opts, label_names)?
344    } else {
345        // Standard handling for Counter, IntCounter, Gauge, IntGauge
346        // buckets and const_labels parameters are not valid for these types
347        if buckets.is_some() {
348            return Err(anyhow::anyhow!(
349                "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
350            ));
351        }
352        if const_labels.is_some() {
353            return Err(anyhow::anyhow!(
354                "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
355            ));
356        }
357        let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
358        for (key, value) in &updated_labels {
359            opts = opts.const_label(key.clone(), value.clone());
360        }
361        T::with_opts(opts)?
362    };
363
364    // Iterate over the DRT's registry and register this metric across all hierarchical levels.
365    // The accumulated hierarchy is structured as: ["", "testnamespace", "testnamespace_testcomponent", "testnamespace_testcomponent_testendpoint"]
366    // This accumulation is essential to differentiate between the names of children and grandchildren.
367    // Build accumulated hierarchy and register metrics in a single loop
368    // current_prefix accumulates the hierarchical path as we iterate through hierarchy
369    // For example, if hierarchy = ["", "testnamespace", "testcomponent"], then:
370    // - Iteration 1: current_prefix = "" (empty string from DRT)
371    // - Iteration 2: current_prefix = "testnamespace"
372    // - Iteration 3: current_prefix = "testnamespace_testcomponent"
373    let mut current_hierarchy = String::new();
374    for name in &hierarchy {
375        if !current_hierarchy.is_empty() && !name.is_empty() {
376            current_hierarchy.push('_');
377        }
378        current_hierarchy.push_str(name);
379
380        // Register metric at this hierarchical level using the new helper function
381        let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
382        registry
383            .drt()
384            .add_prometheus_metric(&current_hierarchy, collector)?;
385    }
386
387    Ok(prometheus_metric)
388}
389
390/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
391/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
392/// generating output in Prometheus text format.
393use crate::traits::DistributedRuntimeProvider;
394
395pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
396    // Get the name of this registry (without any hierarchy prefix)
397    fn basename(&self) -> String;
398
399    /// Retrieve the complete hierarchy and basename for this registry. Currently, the hierarchy for drt is an empty string,
400    /// so we must account for the leading underscore. The existing code remains unchanged to accommodate any future
401    /// scenarios where drt's prefix might be assigned a value.
402    fn hierarchy(&self) -> String {
403        [self.parent_hierarchy(), vec![self.basename()]]
404            .concat()
405            .join("_")
406            .trim_start_matches('_')
407            .to_string()
408    }
409
410    // Get the parent hierarchy for this registry (just the base names, NOT the flattened hierarchy key)
411    fn parent_hierarchy(&self) -> Vec<String>;
412
413    // TODO: Add support for additional Prometheus metric types:
414    // - Counter: ✅ IMPLEMENTED - create_counter()
415    // - CounterVec: ✅ IMPLEMENTED - create_countervec()
416    // - Gauge: ✅ IMPLEMENTED - create_gauge()
417    // - GaugeHistogram: create_gauge_histogram() - for gauge histograms
418    // - Histogram: ✅ IMPLEMENTED - create_histogram()
419    // - HistogramVec with custom buckets: create_histogram_with_buckets()
420    // - Info: create_info() - for info metrics with labels
421    // - IntCounter: ✅ IMPLEMENTED - create_intcounter()
422    // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec()
423    // - IntGauge: ✅ IMPLEMENTED - create_intgauge()
424    // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec()
425    // - Stateset: create_stateset() - for state-based metrics
426    // - Summary: create_summary() - for quantiles and sum/count metrics
427    // - SummaryVec: create_summary_vec() - for labeled summaries
428    // - Untyped: create_untyped() - for untyped metrics
429    //
430    // NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics
431    // Keep them synchronized when adding new metric types
432
433    /// Create a Counter metric
434    fn create_counter(
435        &self,
436        name: &str,
437        description: &str,
438        labels: &[(&str, &str)],
439    ) -> anyhow::Result<prometheus::Counter> {
440        create_metric(self, name, description, labels, None, None)
441    }
442
443    /// Create a CounterVec metric with label names (for dynamic labels)
444    fn create_countervec(
445        &self,
446        name: &str,
447        description: &str,
448        const_labels: &[&str],
449        const_label_values: &[(&str, &str)],
450    ) -> anyhow::Result<prometheus::CounterVec> {
451        create_metric(
452            self,
453            name,
454            description,
455            const_label_values,
456            None,
457            Some(const_labels),
458        )
459    }
460
461    /// Create a Gauge metric
462    fn create_gauge(
463        &self,
464        name: &str,
465        description: &str,
466        labels: &[(&str, &str)],
467    ) -> anyhow::Result<prometheus::Gauge> {
468        create_metric(self, name, description, labels, None, None)
469    }
470
471    /// Create a GaugeVec metric with label names (for dynamic labels)
472    fn create_gaugevec(
473        &self,
474        name: &str,
475        description: &str,
476        const_labels: &[&str],
477        const_label_values: &[(&str, &str)],
478    ) -> anyhow::Result<prometheus::GaugeVec> {
479        create_metric(
480            self,
481            name,
482            description,
483            const_label_values,
484            None,
485            Some(const_labels),
486        )
487    }
488
489    /// Create a Histogram metric with custom buckets
490    fn create_histogram(
491        &self,
492        name: &str,
493        description: &str,
494        labels: &[(&str, &str)],
495        buckets: Option<Vec<f64>>,
496    ) -> anyhow::Result<prometheus::Histogram> {
497        create_metric(self, name, description, labels, buckets, None)
498    }
499
500    /// Create an IntCounter metric
501    fn create_intcounter(
502        &self,
503        name: &str,
504        description: &str,
505        labels: &[(&str, &str)],
506    ) -> anyhow::Result<prometheus::IntCounter> {
507        create_metric(self, name, description, labels, None, None)
508    }
509
510    /// Create an IntCounterVec metric with label names (for dynamic labels)
511    fn create_intcountervec(
512        &self,
513        name: &str,
514        description: &str,
515        const_labels: &[&str],
516        const_label_values: &[(&str, &str)],
517    ) -> anyhow::Result<prometheus::IntCounterVec> {
518        create_metric(
519            self,
520            name,
521            description,
522            const_label_values,
523            None,
524            Some(const_labels),
525        )
526    }
527
528    /// Create an IntGauge metric
529    fn create_intgauge(
530        &self,
531        name: &str,
532        description: &str,
533        labels: &[(&str, &str)],
534    ) -> anyhow::Result<prometheus::IntGauge> {
535        create_metric(self, name, description, labels, None, None)
536    }
537
538    /// Create an IntGaugeVec metric with label names (for dynamic labels)
539    fn create_intgaugevec(
540        &self,
541        name: &str,
542        description: &str,
543        const_labels: &[&str],
544        const_label_values: &[(&str, &str)],
545    ) -> anyhow::Result<prometheus::IntGaugeVec> {
546        create_metric(
547            self,
548            name,
549            description,
550            const_label_values,
551            None,
552            Some(const_labels),
553        )
554    }
555
556    /// Get metrics in Prometheus text format
557    fn prometheus_expfmt(&self) -> anyhow::Result<String> {
558        // Execute callbacks first to ensure any new metrics are added to the registry
559        let callback_results = self
560            .drt()
561            .execute_prometheus_update_callbacks(&self.hierarchy());
562
563        // Log any callback errors but continue
564        for result in callback_results {
565            if let Err(e) = result {
566                tracing::error!("Error executing metrics callback: {}", e);
567            }
568        }
569
570        // Get the Prometheus registry for this hierarchy and execute exposition text callbacks
571        let (prometheus_registry, expfmt) = {
572            let mut registry_entry = self.drt().hierarchy_to_metricsregistry.write().unwrap();
573            let entry = registry_entry.entry(self.hierarchy()).or_default();
574            let registry = entry.prometheus_registry.clone();
575            let text = entry.execute_prometheus_expfmt_callbacks();
576            (registry, text)
577        };
578
579        // Encode metrics from the registry
580        let metric_families = prometheus_registry.gather();
581        let encoder = prometheus::TextEncoder::new();
582        let mut buffer = Vec::new();
583        encoder.encode(&metric_families, &mut buffer)?;
584        let mut result = String::from_utf8(buffer)?;
585
586        // Append exposition text callback results if any
587        if !expfmt.is_empty() {
588            if !result.ends_with('\n') {
589                result.push('\n');
590            }
591            result.push_str(&expfmt);
592        }
593
594        Ok(result)
595    }
596}
597
598#[cfg(test)]
599mod test_helpers {
600    use super::prometheus_names::name_prefix;
601    use super::prometheus_names::{nats_client, nats_service};
602    use super::*;
603
604    /// Base function to filter Prometheus output lines based on a predicate.
605    /// Returns lines that match the predicate, converted to String.
606    fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
607    where
608        F: FnMut(&str) -> bool,
609    {
610        input
611            .lines()
612            .filter(|line| predicate(line))
613            .map(|line| line.to_string())
614            .collect::<Vec<_>>()
615    }
616
617    /// Filters out all NATS metrics from Prometheus output for test comparisons.
618    pub fn remove_nats_lines(input: &str) -> Vec<String> {
619        filter_prometheus_lines(input, |line| {
620            !line.contains(&format!(
621                "{}_{}",
622                name_prefix::COMPONENT,
623                nats_client::PREFIX
624            )) && !line.contains(&format!(
625                "{}_{}",
626                name_prefix::COMPONENT,
627                nats_service::PREFIX
628            )) && !line.trim().is_empty()
629        })
630    }
631
632    /// Filters to only include NATS metrics from Prometheus output for test comparisons.
633    pub fn extract_nats_lines(input: &str) -> Vec<String> {
634        filter_prometheus_lines(input, |line| {
635            line.contains(&format!(
636                "{}_{}",
637                name_prefix::COMPONENT,
638                nats_client::PREFIX
639            )) || line.contains(&format!(
640                "{}_{}",
641                name_prefix::COMPONENT,
642                nats_service::PREFIX
643            ))
644        })
645    }
646
647    /// Extracts all component metrics (excluding help text and type definitions).
648    /// Returns only the actual metric lines with values.
649    pub fn extract_metrics(input: &str) -> Vec<String> {
650        filter_prometheus_lines(input, |line| {
651            line.starts_with(&format!("{}_", name_prefix::COMPONENT))
652                && !line.starts_with("#")
653                && !line.trim().is_empty()
654        })
655    }
656
657    /// Parses a Prometheus metric line and extracts the name, labels, and value.
658    /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state.
659    ///
660    /// # Example
661    /// ```
662    /// let line = "http_requests_total{method=\"GET\"} 1234";
663    /// let (name, labels, value) = parse_prometheus_metric(line).unwrap();
664    /// assert_eq!(name, "http_requests_total");
665    /// assert_eq!(labels.get("method"), Some(&"GET".to_string()));
666    /// assert_eq!(value, 1234.0);
667    /// ```
668    pub fn parse_prometheus_metric(
669        line: &str,
670    ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
671        if line.trim().is_empty() || line.starts_with('#') {
672            return None;
673        }
674
675        let parts: Vec<&str> = line.split_whitespace().collect();
676        if parts.len() < 2 {
677            return None;
678        }
679
680        let metric_part = parts[0];
681        let value: f64 = parts[1].parse().ok()?;
682
683        let (name, labels) = if metric_part.contains('{') {
684            let brace_start = metric_part.find('{').unwrap();
685            let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
686            let name = &metric_part[..brace_start];
687            let labels_str = &metric_part[brace_start + 1..brace_end];
688
689            let mut labels = std::collections::HashMap::new();
690            for pair in labels_str.split(',') {
691                if let Some((k, v)) = pair.split_once('=') {
692                    let v = v.trim_matches('"');
693                    labels.insert(k.trim().to_string(), v.to_string());
694                }
695            }
696            (name.to_string(), labels)
697        } else {
698            (metric_part.to_string(), std::collections::HashMap::new())
699        };
700
701        Some((name, labels, value))
702    }
703}
704
705#[cfg(test)]
706mod test_metricsregistry_units {
707    use super::*;
708
709    #[test]
710    fn test_build_component_metric_name_with_prefix() {
711        // Test that build_component_metric_name correctly prepends the dynamo_component prefix
712        let result = build_component_metric_name("requests");
713        assert_eq!(result, "dynamo_component_requests");
714
715        let result = build_component_metric_name("counter");
716        assert_eq!(result, "dynamo_component_counter");
717    }
718
719    #[test]
720    fn test_parse_prometheus_metric() {
721        use super::test_helpers::parse_prometheus_metric;
722        use std::collections::HashMap;
723
724        // Test parsing a metric with labels
725        let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
726        let parsed = parse_prometheus_metric(line);
727        assert!(parsed.is_some());
728
729        let (name, labels, value) = parsed.unwrap();
730        assert_eq!(name, "http_requests_total");
731
732        let mut expected_labels = HashMap::new();
733        expected_labels.insert("method".to_string(), "GET".to_string());
734        expected_labels.insert("status".to_string(), "200".to_string());
735        assert_eq!(labels, expected_labels);
736
737        assert_eq!(value, 1234.0);
738
739        // Test parsing a metric without labels
740        let line = "cpu_usage 98.5";
741        let parsed = parse_prometheus_metric(line);
742        assert!(parsed.is_some());
743
744        let (name, labels, value) = parsed.unwrap();
745        assert_eq!(name, "cpu_usage");
746        assert!(labels.is_empty());
747        assert_eq!(value, 98.5);
748
749        // Test parsing a metric with float value
750        let line = "response_time{service=\"api\"} 0.123";
751        let parsed = parse_prometheus_metric(line);
752        assert!(parsed.is_some());
753
754        let (name, labels, value) = parsed.unwrap();
755        assert_eq!(name, "response_time");
756
757        let mut expected_labels = HashMap::new();
758        expected_labels.insert("service".to_string(), "api".to_string());
759        assert_eq!(labels, expected_labels);
760
761        assert_eq!(value, 0.123);
762
763        // Test parsing invalid lines
764        assert!(parse_prometheus_metric("").is_none()); // Empty line
765        assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text
766        assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition
767        assert!(parse_prometheus_metric("metric_name").is_none()); // No value
768
769        println!("✓ Prometheus metric parsing works correctly!");
770    }
771
772    #[test]
773    fn test_metrics_registry_entry_callbacks() {
774        use crate::MetricsRegistryEntry;
775        use std::sync::atomic::{AtomicUsize, Ordering};
776
777        // Test 1: Basic callback execution with counter increments
778        {
779            let mut entry = MetricsRegistryEntry::new();
780            let counter = Arc::new(AtomicUsize::new(0));
781
782            // Add callbacks with different increment values
783            for increment in [1, 10, 100] {
784                let counter_clone = counter.clone();
785                entry.add_prometheus_update_callback(Arc::new(move || {
786                    counter_clone.fetch_add(increment, Ordering::SeqCst);
787                    Ok(())
788                }));
789            }
790
791            // Verify counter starts at 0
792            assert_eq!(counter.load(Ordering::SeqCst), 0);
793
794            // First execution
795            let results = entry.execute_prometheus_update_callbacks();
796            assert_eq!(results.len(), 3);
797            assert!(results.iter().all(|r| r.is_ok()));
798            assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
799
800            // Second execution - callbacks should be reusable
801            let results = entry.execute_prometheus_update_callbacks();
802            assert_eq!(results.len(), 3);
803            assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
804
805            // Test cloning - cloned entry should have no callbacks
806            let cloned = entry.clone();
807            assert_eq!(cloned.execute_prometheus_update_callbacks().len(), 0);
808            assert_eq!(counter.load(Ordering::SeqCst), 222); // No change
809
810            // Original still has callbacks
811            entry.execute_prometheus_update_callbacks();
812            assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
813        }
814
815        // Test 2: Mixed success and error callbacks
816        {
817            let mut entry = MetricsRegistryEntry::new();
818            let counter = Arc::new(AtomicUsize::new(0));
819
820            // Successful callback
821            let counter_clone = counter.clone();
822            entry.add_prometheus_update_callback(Arc::new(move || {
823                counter_clone.fetch_add(1, Ordering::SeqCst);
824                Ok(())
825            }));
826
827            // Error callback
828            entry.add_prometheus_update_callback(Arc::new(|| {
829                Err(anyhow::anyhow!("Simulated error"))
830            }));
831
832            // Another successful callback
833            let counter_clone = counter.clone();
834            entry.add_prometheus_update_callback(Arc::new(move || {
835                counter_clone.fetch_add(10, Ordering::SeqCst);
836                Ok(())
837            }));
838
839            // Execute and verify mixed results
840            let results = entry.execute_prometheus_update_callbacks();
841            assert_eq!(results.len(), 3);
842            assert!(results[0].is_ok());
843            assert!(results[1].is_err());
844            assert!(results[2].is_ok());
845
846            // Verify error message
847            assert_eq!(
848                results[1].as_ref().unwrap_err().to_string(),
849                "Simulated error"
850            );
851
852            // Verify successful callbacks still executed
853            assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
854
855            // Execute again - errors should be consistent
856            let results = entry.execute_prometheus_update_callbacks();
857            assert!(results[1].is_err());
858            assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
859        }
860
861        // Test 3: Empty registry
862        {
863            let entry = MetricsRegistryEntry::new();
864            let results = entry.execute_prometheus_update_callbacks();
865            assert_eq!(results.len(), 0);
866        }
867    }
868}
869
870#[cfg(feature = "integration")]
871#[cfg(test)]
872mod test_metricsregistry_prefixes {
873    use super::*;
874    use crate::distributed::distributed_test_utils::create_test_drt_async;
875    use prometheus::core::Collector;
876
877    #[tokio::test]
878    async fn test_hierarchical_prefixes_and_parent_hierarchies() {
879        let drt = create_test_drt_async().await;
880
881        const DRT_NAME: &str = "";
882        const NAMESPACE_NAME: &str = "ns901";
883        const COMPONENT_NAME: &str = "comp901";
884        const ENDPOINT_NAME: &str = "ep901";
885        let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
886        let component = namespace.component(COMPONENT_NAME).unwrap();
887        let endpoint = component.endpoint(ENDPOINT_NAME);
888
889        // DRT
890        assert_eq!(drt.basename(), DRT_NAME);
891        assert_eq!(drt.parent_hierarchy(), Vec::<String>::new());
892        assert_eq!(drt.hierarchy(), DRT_NAME);
893
894        // Namespace
895        assert_eq!(namespace.basename(), NAMESPACE_NAME);
896        assert_eq!(namespace.parent_hierarchy(), vec!["".to_string()]);
897        assert_eq!(namespace.hierarchy(), NAMESPACE_NAME);
898
899        // Component
900        assert_eq!(component.basename(), COMPONENT_NAME);
901        assert_eq!(
902            component.parent_hierarchy(),
903            vec!["".to_string(), NAMESPACE_NAME.to_string()]
904        );
905        assert_eq!(
906            component.hierarchy(),
907            format!("{}_{}", NAMESPACE_NAME, COMPONENT_NAME)
908        );
909
910        // Endpoint
911        assert_eq!(endpoint.basename(), ENDPOINT_NAME);
912        assert_eq!(
913            endpoint.parent_hierarchy(),
914            vec![
915                "".to_string(),
916                NAMESPACE_NAME.to_string(),
917                COMPONENT_NAME.to_string(),
918            ]
919        );
920        assert_eq!(
921            endpoint.hierarchy(),
922            format!("{}_{}_{}", NAMESPACE_NAME, COMPONENT_NAME, ENDPOINT_NAME)
923        );
924
925        // Relationships
926        assert!(namespace.parent_hierarchy().contains(&drt.basename()));
927        assert!(component.parent_hierarchy().contains(&namespace.basename()));
928        assert!(endpoint.parent_hierarchy().contains(&component.basename()));
929
930        // Depth
931        assert_eq!(drt.parent_hierarchy().len(), 0);
932        assert_eq!(namespace.parent_hierarchy().len(), 1);
933        assert_eq!(component.parent_hierarchy().len(), 2);
934        assert_eq!(endpoint.parent_hierarchy().len(), 3);
935
936        // Invalid namespace behavior - sanitizes to "_123" and succeeds
937        // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
938        // so invalid characters are sanitized in MetricsRegistry rather than rejected.
939        let invalid_namespace = drt.namespace("@@123").unwrap();
940        let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
941        assert!(result.is_ok());
942        if let Ok(counter) = &result {
943            // Verify the namespace was sanitized to "_123" in the label
944            let desc = counter.desc();
945            let namespace_label = desc[0]
946                .const_label_pairs
947                .iter()
948                .find(|l| l.name() == "dynamo_namespace")
949                .expect("Should have dynamo_namespace label");
950            assert_eq!(namespace_label.value(), "_123");
951        }
952
953        // Valid namespace works
954        let valid_namespace = drt.namespace("ns567").unwrap();
955        assert!(
956            valid_namespace
957                .create_counter("test_counter", "A test counter", &[])
958                .is_ok()
959        );
960    }
961
962    #[tokio::test]
963    async fn test_recursive_namespace() {
964        // Create a distributed runtime for testing
965        let drt = create_test_drt_async().await;
966
967        // Create a deeply chained namespace: ns1.ns2.ns3
968        let ns1 = drt.namespace("ns1").unwrap();
969        let ns2 = ns1.namespace("ns2").unwrap();
970        let ns3 = ns2.namespace("ns3").unwrap();
971
972        // Create a component in the deepest namespace
973        let component = ns3.component("test-component").unwrap();
974
975        // Verify the hierarchy structure
976        assert_eq!(ns1.basename(), "ns1");
977        assert_eq!(ns1.parent_hierarchy(), vec!("".to_string()));
978        assert_eq!(ns1.hierarchy(), "ns1");
979
980        assert_eq!(ns2.basename(), "ns2");
981        assert_eq!(
982            ns2.parent_hierarchy(),
983            vec!["".to_string(), "ns1".to_string()]
984        );
985        assert_eq!(ns2.hierarchy(), "ns1_ns2");
986
987        assert_eq!(ns3.basename(), "ns3");
988        assert_eq!(
989            ns3.parent_hierarchy(),
990            vec!["".to_string(), "ns1".to_string(), "ns2".to_string()]
991        );
992        assert_eq!(ns3.hierarchy(), "ns1_ns2_ns3");
993
994        assert_eq!(component.basename(), "test-component");
995        assert_eq!(
996            component.parent_hierarchy(),
997            vec![
998                "".to_string(),
999                "ns1".to_string(),
1000                "ns2".to_string(),
1001                "ns3".to_string()
1002            ]
1003        );
1004        assert_eq!(component.hierarchy(), "ns1_ns2_ns3_test-component");
1005
1006        println!("✓ Chained namespace test passed - all prefixes correct");
1007    }
1008}
1009
1010#[cfg(feature = "integration")]
1011#[cfg(test)]
1012mod test_metricsregistry_prometheus_fmt_outputs {
1013    use super::prometheus_names::name_prefix;
1014    use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1015    use super::prometheus_names::{nats_client, nats_service};
1016    use super::*;
1017    use crate::distributed::distributed_test_utils::create_test_drt_async;
1018    use prometheus::Counter;
1019    use std::sync::Arc;
1020
1021    #[tokio::test]
1022    async fn test_prometheusfactory_using_metrics_registry_trait() {
1023        // Setup real DRT and registry using the test-friendly constructor
1024        let drt = create_test_drt_async().await;
1025
1026        // Use a simple constant namespace name
1027        let namespace_name = "ns345";
1028
1029        let namespace = drt.namespace(namespace_name).unwrap();
1030        let component = namespace.component("comp345").unwrap();
1031        let endpoint = component.endpoint("ep345");
1032
1033        // Test Counter creation
1034        let counter = endpoint
1035            .create_counter("testcounter", "A test counter", &[])
1036            .unwrap();
1037        counter.inc_by(123.456789);
1038        let epsilon = 0.01;
1039        assert!((counter.get() - 123.456789).abs() < epsilon);
1040
1041        let endpoint_output_raw = endpoint.prometheus_expfmt().unwrap();
1042        println!("Endpoint output:");
1043        println!("{}", endpoint_output_raw);
1044
1045        // Filter out NATS service metrics for test comparison
1046        let endpoint_output =
1047            super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
1048
1049        let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1050# TYPE dynamo_component_testcounter counter
1051dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1052
1053        assert_eq!(
1054            endpoint_output, expected_endpoint_output,
1055            "\n=== ENDPOINT COMPARISON FAILED ===\n\
1056             Expected:\n{}\n\
1057             Actual:\n{}\n\
1058             ==============================",
1059            expected_endpoint_output, endpoint_output
1060        );
1061
1062        // Test Gauge creation
1063        let gauge = component
1064            .create_gauge("testgauge", "A test gauge", &[])
1065            .unwrap();
1066        gauge.set(50000.0);
1067        assert_eq!(gauge.get(), 50000.0);
1068
1069        // Test Prometheus format output for Component (gauge + histogram)
1070        let component_output_raw = component.prometheus_expfmt().unwrap();
1071        println!("Component output:");
1072        println!("{}", component_output_raw);
1073
1074        // Filter out NATS service metrics for test comparison
1075        let component_output =
1076            super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
1077
1078        let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1079# TYPE dynamo_component_testcounter counter
1080dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1081# HELP dynamo_component_testgauge A test gauge
1082# TYPE dynamo_component_testgauge gauge
1083dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1084
1085        assert_eq!(
1086            component_output, expected_component_output,
1087            "\n=== COMPONENT COMPARISON FAILED ===\n\
1088             Expected:\n{}\n\
1089             Actual:\n{}\n\
1090             ==============================",
1091            expected_component_output, component_output
1092        );
1093
1094        let intcounter = namespace
1095            .create_intcounter("testintcounter", "A test int counter", &[])
1096            .unwrap();
1097        intcounter.inc_by(12345);
1098        assert_eq!(intcounter.get(), 12345);
1099
1100        // Test Prometheus format output for Namespace (int_counter + gauge + histogram)
1101        let namespace_output_raw = namespace.prometheus_expfmt().unwrap();
1102        println!("Namespace output:");
1103        println!("{}", namespace_output_raw);
1104
1105        // Filter out NATS service metrics for test comparison
1106        let namespace_output =
1107            super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
1108
1109        let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1110# TYPE dynamo_component_testcounter counter
1111dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1112# HELP dynamo_component_testgauge A test gauge
1113# TYPE dynamo_component_testgauge gauge
1114dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1115# HELP dynamo_component_testintcounter A test int counter
1116# TYPE dynamo_component_testintcounter counter
1117dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1118
1119        assert_eq!(
1120            namespace_output, expected_namespace_output,
1121            "\n=== NAMESPACE COMPARISON FAILED ===\n\
1122             Expected:\n{}\n\
1123             Actual:\n{}\n\
1124             ==============================",
1125            expected_namespace_output, namespace_output
1126        );
1127
1128        // Test IntGauge creation
1129        let intgauge = namespace
1130            .create_intgauge("testintgauge", "A test int gauge", &[])
1131            .unwrap();
1132        intgauge.set(42);
1133        assert_eq!(intgauge.get(), 42);
1134
1135        // Test IntGaugeVec creation
1136        let intgaugevec = namespace
1137            .create_intgaugevec(
1138                "testintgaugevec",
1139                "A test int gauge vector",
1140                &["instance", "status"],
1141                &[("service", "api")],
1142            )
1143            .unwrap();
1144        intgaugevec
1145            .with_label_values(&["server1", "active"])
1146            .set(10);
1147        intgaugevec
1148            .with_label_values(&["server2", "inactive"])
1149            .set(0);
1150
1151        // Test CounterVec creation
1152        let countervec = endpoint
1153            .create_countervec(
1154                "testcountervec",
1155                "A test counter vector",
1156                &["method", "status"],
1157                &[("service", "api")],
1158            )
1159            .unwrap();
1160        countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1161        countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1162
1163        // Test Histogram creation
1164        let histogram = component
1165            .create_histogram("testhistogram", "A test histogram", &[], None)
1166            .unwrap();
1167        histogram.observe(1.0);
1168        histogram.observe(2.5);
1169        histogram.observe(4.0);
1170
1171        // Test Prometheus format output for DRT (all metrics combined)
1172        let drt_output_raw = drt.prometheus_expfmt().unwrap();
1173        println!("DRT output:");
1174        println!("{}", drt_output_raw);
1175
1176        // Filter out all NATS metrics for comparison
1177        let filtered_drt_output =
1178            super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
1179
1180        let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1181# TYPE dynamo_component_testcounter counter
1182dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1183# HELP dynamo_component_testcountervec A test counter vector
1184# TYPE dynamo_component_testcountervec counter
1185dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1186dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1187# HELP dynamo_component_testgauge A test gauge
1188# TYPE dynamo_component_testgauge gauge
1189dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1190# HELP dynamo_component_testhistogram A test histogram
1191# TYPE dynamo_component_testhistogram histogram
1192dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1193dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1194dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1195dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1196dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1197dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1198dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1199dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1200dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1201dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1202dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1203dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1204dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1205dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1206# HELP dynamo_component_testintcounter A test int counter
1207# TYPE dynamo_component_testintcounter counter
1208dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1209# HELP dynamo_component_testintgauge A test int gauge
1210# TYPE dynamo_component_testintgauge gauge
1211dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1212# HELP dynamo_component_testintgaugevec A test int gauge vector
1213# TYPE dynamo_component_testintgaugevec gauge
1214dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1215dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1216# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1217# TYPE dynamo_component_uptime_seconds gauge
1218dynamo_component_uptime_seconds 0"#.to_string();
1219
1220        assert_eq!(
1221            filtered_drt_output, expected_drt_output,
1222            "\n=== DRT COMPARISON FAILED ===\n\
1223             Expected:\n{}\n\
1224             Actual (filtered):\n{}\n\
1225             ==============================",
1226            expected_drt_output, filtered_drt_output
1227        );
1228
1229        println!("✓ All Prometheus format outputs verified successfully!");
1230    }
1231
1232    #[test]
1233    fn test_refactored_filter_functions() {
1234        // Test data with mixed content
1235        let test_input = r#"# HELP dynamo_component_requests Total requests
1236# TYPE dynamo_component_requests counter
1237dynamo_component_requests 42
1238# HELP dynamo_component_nats_client_connection_state Connection state
1239# TYPE dynamo_component_nats_client_connection_state gauge
1240dynamo_component_nats_client_connection_state 1
1241# HELP dynamo_component_latency Response latency
1242# TYPE dynamo_component_latency histogram
1243dynamo_component_latency_bucket{le="0.1"} 10
1244dynamo_component_latency_bucket{le="0.5"} 25
1245dynamo_component_nats_service_requests_total 100
1246dynamo_component_nats_service_errors_total 5"#;
1247
1248        // Test remove_nats_lines (excludes NATS lines but keeps help/type)
1249        let filtered_out = super::test_helpers::remove_nats_lines(test_input);
1250        assert_eq!(filtered_out.len(), 7); // 7 non-NATS lines
1251        assert!(!filtered_out.iter().any(|line| line.contains("nats")));
1252
1253        // Test extract_nats_lines (includes all NATS lines including help/type)
1254        let filtered_only = super::test_helpers::extract_nats_lines(test_input);
1255        assert_eq!(filtered_only.len(), 5); // 5 NATS lines
1256        assert!(filtered_only.iter().all(|line| line.contains("nats")));
1257
1258        // Test extract_metrics (only actual metric lines, excluding help/type)
1259        let metrics_only = super::test_helpers::extract_metrics(test_input);
1260        assert_eq!(metrics_only.len(), 6); // 6 actual metric lines (excluding help/type)
1261        assert!(
1262            metrics_only
1263                .iter()
1264                .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1265        );
1266
1267        println!("✓ All refactored filter functions work correctly!");
1268    }
1269}
1270
1271#[cfg(feature = "integration")]
1272#[cfg(test)]
1273mod test_metricsregistry_nats {
1274    use super::prometheus_names::name_prefix;
1275    use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1276    use super::prometheus_names::{nats_client, nats_service};
1277    use super::*;
1278    use crate::distributed::distributed_test_utils::create_test_drt_async;
1279    use crate::pipeline::PushRouter;
1280    use crate::{DistributedRuntime, Runtime};
1281    use tokio::time::{Duration, sleep};
1282    #[tokio::test]
1283    async fn test_drt_nats_metrics() {
1284        // Setup real DRT and registry using the test-friendly constructor
1285        let drt = create_test_drt_async().await;
1286
1287        // Get DRT output which should include NATS client metrics
1288        let drt_output = drt.prometheus_expfmt().unwrap();
1289        println!("DRT output with NATS metrics:");
1290        println!("{}", drt_output);
1291
1292        // Additional checks for NATS client metrics (without checking specific values)
1293        let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
1294
1295        // Check that NATS client metrics are present
1296        assert!(
1297            !drt_nats_metrics.is_empty(),
1298            "NATS client metrics should be present"
1299        );
1300
1301        // Check for specific NATS client metric names (without values)
1302        // Extract only the metric lines from the already-filtered NATS metrics
1303        let drt_nats_metric_lines =
1304            super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
1305        let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
1306            .iter()
1307            .map(|line| {
1308                let without_labels = line.split('{').next().unwrap_or(line);
1309                // Remove the value part (everything after the last space)
1310                without_labels.split(' ').next().unwrap_or(without_labels)
1311            })
1312            .collect();
1313
1314        let expect_drt_nats_metrics_sorted = {
1315            let mut temp = DRT_NATS_METRICS
1316                .iter()
1317                .map(|metric| build_component_metric_name(metric))
1318                .collect::<Vec<_>>();
1319            temp.sort();
1320            temp
1321        };
1322
1323        // Print both lists for comparison
1324        println!(
1325            "actual_drt_nats_metrics_sorted: {:?}",
1326            actual_drt_nats_metrics_sorted
1327        );
1328        println!(
1329            "expect_drt_nats_metrics_sorted: {:?}",
1330            expect_drt_nats_metrics_sorted
1331        );
1332
1333        // Compare the sorted lists
1334        assert_eq!(
1335            actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
1336            "DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1337        );
1338
1339        println!("✓ DistributedRuntime NATS metrics integration test passed!");
1340    }
1341
1342    #[tokio::test]
1343    async fn test_nats_metric_names() {
1344        // This test only tests the existence of the NATS metrics. It does not check
1345        // the values of the metrics.
1346
1347        // Setup real DRT and registry using the test-friendly constructor
1348        let drt = create_test_drt_async().await;
1349
1350        // Create a namespace and components from the DRT
1351        let namespace = drt.namespace("ns789").unwrap();
1352        let components = namespace.component("comp789").unwrap();
1353
1354        // Create a service to trigger metrics callback registration
1355        let _service = components.service_builder().create().await.unwrap();
1356
1357        // Get components output which should include NATS client metrics
1358        // Additional checks for NATS client metrics (without checking specific values)
1359        let component_nats_metrics =
1360            super::test_helpers::extract_nats_lines(&components.prometheus_expfmt().unwrap());
1361        println!(
1362            "Component NATS metrics count: {}",
1363            component_nats_metrics.len()
1364        );
1365
1366        // Check that NATS client metrics are present
1367        assert!(
1368            !component_nats_metrics.is_empty(),
1369            "NATS client metrics should be present"
1370        );
1371
1372        // Check for specific NATS client metric names (without values)
1373        let component_metrics =
1374            super::test_helpers::extract_metrics(&components.prometheus_expfmt().unwrap());
1375        let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
1376            .iter()
1377            .map(|line| {
1378                let without_labels = line.split('{').next().unwrap_or(line);
1379                // Remove the value part (everything after the last space)
1380                without_labels.split(' ').next().unwrap_or(without_labels)
1381            })
1382            .collect();
1383
1384        let expect_component_nats_metrics_sorted = {
1385            let mut temp = COMPONENT_NATS_METRICS
1386                .iter()
1387                .map(|metric| build_component_metric_name(metric))
1388                .collect::<Vec<_>>();
1389            temp.sort();
1390            temp
1391        };
1392
1393        // Print both lists for comparison
1394        println!(
1395            "actual_component_nats_metrics_sorted: {:?}",
1396            actual_component_nats_metrics_sorted
1397        );
1398        println!(
1399            "expect_component_nats_metrics_sorted: {:?}",
1400            expect_component_nats_metrics_sorted
1401        );
1402
1403        // Compare the sorted lists
1404        assert_eq!(
1405            actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
1406            "COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1407        );
1408
1409        // Get both DRT and component output and filter for NATS metrics only
1410        let drt_output = drt.prometheus_expfmt().unwrap();
1411        let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
1412        let drt_and_component_nats_metrics =
1413            super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
1414        println!(
1415            "DRT and component NATS metrics count: {}",
1416            drt_and_component_nats_metrics.len()
1417        );
1418
1419        // Check that the NATS metrics are present in the component output
1420        assert_eq!(
1421            drt_and_component_nats_metrics.len(),
1422            DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
1423            "DRT at this point should have both the DRT and component NATS metrics"
1424        );
1425
1426        // Check that the NATS metrics are present in the component output
1427        println!("✓ Component NATS metrics integration test passed!");
1428    }
1429
1430    /// Tests NATS metrics values before and after endpoint activity with large message processing.
1431    /// Creates endpoint, sends test messages + 10k byte message, validates metrics (NATS + work handler)
1432    /// at initial state and post-activity state. Ensures byte thresholds, message counts, and processing
1433    /// times are within expected ranges. Tests end-to-end client-server communication and metrics collection.
1434    #[tokio::test]
1435    async fn test_nats_metrics_values() -> anyhow::Result<()> {
1436        struct MessageHandler {}
1437        impl MessageHandler {
1438            fn new() -> std::sync::Arc<Self> {
1439                std::sync::Arc::new(Self {})
1440            }
1441        }
1442
1443        #[async_trait]
1444        impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
1445            async fn generate(
1446                &self,
1447                input: SingleIn<String>,
1448            ) -> Result<ManyOut<Annotated<String>>, Error> {
1449                let (data, ctx) = input.into_parts();
1450                let response = data.to_string();
1451                let stream = stream::iter(vec![Annotated::from_data(response)]);
1452                Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
1453            }
1454        }
1455
1456        println!("\n=== Initializing DistributedRuntime ===");
1457        let runtime = Runtime::from_current()?;
1458        let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
1459        let namespace = drt.namespace("ns123").unwrap();
1460        let component = namespace.component("comp123").unwrap();
1461        let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
1462
1463        let _backend_handle = tokio::spawn(async move {
1464            let service = component.service_builder().create().await.unwrap();
1465            let endpoint = service.endpoint("echo").endpoint_builder().handler(ingress);
1466            endpoint.start().await.unwrap();
1467        });
1468
1469        sleep(Duration::from_millis(500)).await;
1470        println!("✓ Launched endpoint service in background successfully");
1471
1472        let drt_output = drt.prometheus_expfmt().unwrap();
1473        let parsed_metrics: Vec<_> = drt_output
1474            .lines()
1475            .filter_map(super::test_helpers::parse_prometheus_metric)
1476            .collect();
1477
1478        println!("=== Initial DRT metrics output ===");
1479        println!("{}", drt_output);
1480
1481        println!("\n=== Checking Initial Metric Values ===");
1482
1483        let initial_expected_metric_values = [
1484            // DRT NATS metrics (ordered to match DRT_NATS_METRICS)
1485            (
1486                build_component_metric_name(nats_client::CONNECTION_STATE),
1487                1.0,
1488                1.0,
1489            ), // Should be connected
1490            (
1491                build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1492                1.0,
1493                1.0,
1494            ), // Should have 1 connection
1495            (
1496                build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1497                800.0,
1498                4000.0,
1499            ), // Wide range around observed value of 1888
1500            (
1501                build_component_metric_name(nats_client::IN_MESSAGES),
1502                0.0,
1503                5.0,
1504            ), // Wide range around 2
1505            (
1506                build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1507                1500.0,
1508                5000.0,
1509            ), // Wide range around observed value of 2752
1510            (
1511                build_component_metric_name(nats_client::OUT_MESSAGES),
1512                0.0,
1513                5.0,
1514            ), // Wide range around 2
1515            // Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
1516            (
1517                build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1518                0.0,
1519                0.0,
1520            ), // No processing yet
1521            (
1522                build_component_metric_name(nats_service::ERRORS_TOTAL),
1523                0.0,
1524                0.0,
1525            ), // No errors yet
1526            (
1527                build_component_metric_name(nats_service::REQUESTS_TOTAL),
1528                0.0,
1529                0.0,
1530            ), // No requests yet
1531            (
1532                build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1533                0.0,
1534                0.0,
1535            ), // No processing yet
1536            (
1537                build_component_metric_name(nats_service::ACTIVE_SERVICES),
1538                0.0,
1539                2.0,
1540            ), // Service may not be fully active yet
1541            (
1542                build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1543                0.0,
1544                2.0,
1545            ), // Endpoint may not be fully active yet
1546        ];
1547
1548        for (metric_name, min_value, max_value) in &initial_expected_metric_values {
1549            let actual_value = parsed_metrics
1550                .iter()
1551                .find(|(name, _, _)| name == metric_name)
1552                .map(|(_, _, value)| *value)
1553                .unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
1554
1555            assert!(
1556                actual_value >= *min_value && actual_value <= *max_value,
1557                "Initial metric {} should be between {} and {}, but got {}",
1558                metric_name,
1559                min_value,
1560                max_value,
1561                actual_value
1562            );
1563        }
1564
1565        println!("\n=== Client Runtime to hit the endpoint ===");
1566        let client_runtime = Runtime::from_current()?;
1567        let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
1568        let namespace = client_distributed.namespace("ns123")?;
1569        let component = namespace.component("comp123")?;
1570        let client = component.endpoint("echo").client().await?;
1571
1572        client.wait_for_instances().await?;
1573        println!("✓ Connected to endpoint, waiting for instances...");
1574
1575        let router =
1576            PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
1577                .await?;
1578
1579        for i in 0..10 {
1580            let msg = i.to_string().repeat(2000); // 2k bytes message
1581            let mut stream = router.random(msg.clone().into()).await?;
1582            while let Some(resp) = stream.next().await {
1583                // Check if response matches the original message
1584                if let Some(data) = &resp.data {
1585                    let is_same = data == &msg;
1586                    println!(
1587                        "Response {}: {} bytes, matches original: {}",
1588                        i,
1589                        data.len(),
1590                        is_same
1591                    );
1592                }
1593            }
1594        }
1595        println!("✓ Sent messages and received responses successfully");
1596
1597        println!("\n=== Waiting 500ms for metrics to update ===");
1598        sleep(Duration::from_millis(500)).await;
1599        println!("✓ Wait complete, getting final metrics...");
1600
1601        let final_drt_output = drt.prometheus_expfmt().unwrap();
1602        println!("\n=== Final Prometheus DRT output ===");
1603        println!("{}", final_drt_output);
1604
1605        let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
1606        println!("\n=== Filtered NATS metrics from final DRT output ===");
1607        for line in &final_drt_nats_output {
1608            println!("{}", line);
1609        }
1610
1611        let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
1612            .iter()
1613            .filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
1614            .collect();
1615
1616        let post_expected_metric_values = [
1617            // DRT NATS metrics
1618            (
1619                build_component_metric_name(nats_client::CONNECTION_STATE),
1620                1.0,
1621                1.0,
1622            ), // Connected
1623            (
1624                build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1625                1.0,
1626                1.0,
1627            ), // 1 connection
1628            (
1629                build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1630                20000.0,
1631                32000.0,
1632            ), // Wide range around 26117
1633            (
1634                build_component_metric_name(nats_client::IN_MESSAGES),
1635                8.0,
1636                20.0,
1637            ), // Wide range around 16
1638            (
1639                build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1640                2500.0,
1641                8000.0,
1642            ), // Wide range around 5524
1643            (
1644                build_component_metric_name(nats_client::OUT_MESSAGES),
1645                8.0,
1646                20.0,
1647            ), // Wide range around 16
1648            // Component NATS metrics
1649            (
1650                build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1651                0.0,
1652                1.0,
1653            ), // Low processing time
1654            (
1655                build_component_metric_name(nats_service::ERRORS_TOTAL),
1656                0.0,
1657                0.0,
1658            ), // No errors
1659            (
1660                build_component_metric_name(nats_service::REQUESTS_TOTAL),
1661                0.0,
1662                10.0,
1663            ), // NATS service stats requests (may differ from work handler count)
1664            (
1665                build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1666                0.0,
1667                5.0,
1668            ), // Low total processing time
1669            (
1670                build_component_metric_name(nats_service::ACTIVE_SERVICES),
1671                0.0,
1672                2.0,
1673            ), // Service may not be fully active
1674            (
1675                build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1676                0.0,
1677                2.0,
1678            ), // Endpoint may not be fully active
1679            // Work handler metrics
1680            (
1681                build_component_metric_name(work_handler::REQUESTS_TOTAL),
1682                10.0,
1683                10.0,
1684            ), // 10 messages
1685            (
1686                build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
1687                21000.0,
1688                26000.0,
1689            ), // ~75-125% of 23520
1690            (
1691                build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
1692                18000.0,
1693                23000.0,
1694            ), // ~75-125% of 20660
1695            (
1696                build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
1697                0.0,
1698                1.0,
1699            ), // 0 or very low
1700            // Histograms have _{count,sum} suffixes
1701            (
1702                format!(
1703                    "{}_count",
1704                    build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1705                ),
1706                10.0,
1707                10.0,
1708            ), // 10 messages
1709            (
1710                format!(
1711                    "{}_sum",
1712                    build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1713                ),
1714                0.0001,
1715                1.0,
1716            ), // Processing time sum (wide range)
1717        ];
1718
1719        println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
1720        for (metric_name, min_value, max_value) in &post_expected_metric_values {
1721            let actual_value = final_parsed_metrics
1722                .iter()
1723                .find(|(name, _, _)| name == metric_name)
1724                .map(|(_, _, value)| *value)
1725                .unwrap_or_else(|| {
1726                    panic!(
1727                        "Could not find expected post-activity metric: {}",
1728                        metric_name
1729                    )
1730                });
1731
1732            assert!(
1733                actual_value >= *min_value && actual_value <= *max_value,
1734                "Post-activity metric {} should be between {} and {}, but got {}",
1735                metric_name,
1736                min_value,
1737                max_value,
1738                actual_value
1739            );
1740            println!(
1741                "✓ {}: {} (range: {} to {})",
1742                metric_name, actual_value, min_value, max_value
1743            );
1744        }
1745
1746        println!("✓ All NATS and component metrics parsed successfully!");
1747        println!("✓ Byte metrics verified to be >= 100 bytes!");
1748        println!("✓ Post-activity metrics verified with higher thresholds!");
1749        println!("✓ Work handler metrics reflect increased activity!");
1750
1751        Ok(())
1752    }
1753}