1pub mod prometheus_names;
10
11use parking_lot::Mutex;
12use std::collections::HashSet;
13use std::sync::Arc;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22use prometheus_names::{
24 build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
25 sanitize_prometheus_name, work_handler,
26};
27
28use crate::pipeline::{
30 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31 network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37use prometheus::Encoder;
39
40fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
43 let mut seen_keys = std::collections::HashSet::new();
44 for (key, _) in labels {
45 if !seen_keys.insert(*key) {
46 return Err(anyhow::anyhow!(
47 "Duplicate label key '{}' found in labels",
48 key
49 ));
50 }
51 }
52 Ok(())
53}
54
55pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
60 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
62 where
63 Self: Sized;
64
65 fn with_histogram_opts_and_buckets(
68 _opts: prometheus::HistogramOpts,
69 _buckets: Option<Vec<f64>>,
70 ) -> Result<Self, prometheus::Error>
71 where
72 Self: Sized,
73 {
74 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
75 }
76
77 fn with_opts_and_label_names(
80 _opts: prometheus::Opts,
81 _label_names: &[&str],
82 ) -> Result<Self, prometheus::Error>
83 where
84 Self: Sized,
85 {
86 panic!("with_opts_and_label_names is not implemented for this metric type");
87 }
88}
89
90impl PrometheusMetric for prometheus::Counter {
92 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
93 prometheus::Counter::with_opts(opts)
94 }
95}
96
97impl PrometheusMetric for prometheus::IntCounter {
98 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
99 prometheus::IntCounter::with_opts(opts)
100 }
101}
102
103impl PrometheusMetric for prometheus::Gauge {
104 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
105 prometheus::Gauge::with_opts(opts)
106 }
107}
108
109impl PrometheusMetric for prometheus::IntGauge {
110 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
111 prometheus::IntGauge::with_opts(opts)
112 }
113}
114
115impl PrometheusMetric for prometheus::GaugeVec {
116 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
117 Err(prometheus::Error::Msg(
118 "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
119 ))
120 }
121
122 fn with_opts_and_label_names(
123 opts: prometheus::Opts,
124 label_names: &[&str],
125 ) -> Result<Self, prometheus::Error> {
126 prometheus::GaugeVec::new(opts, label_names)
127 }
128}
129
130impl PrometheusMetric for prometheus::IntGaugeVec {
131 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
132 Err(prometheus::Error::Msg(
133 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
134 ))
135 }
136
137 fn with_opts_and_label_names(
138 opts: prometheus::Opts,
139 label_names: &[&str],
140 ) -> Result<Self, prometheus::Error> {
141 prometheus::IntGaugeVec::new(opts, label_names)
142 }
143}
144
145impl PrometheusMetric for prometheus::IntCounterVec {
146 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
147 Err(prometheus::Error::Msg(
148 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
149 ))
150 }
151
152 fn with_opts_and_label_names(
153 opts: prometheus::Opts,
154 label_names: &[&str],
155 ) -> Result<Self, prometheus::Error> {
156 prometheus::IntCounterVec::new(opts, label_names)
157 }
158}
159
160impl PrometheusMetric for prometheus::Histogram {
162 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
163 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
165 prometheus::Histogram::with_opts(histogram_opts)
166 }
167
168 fn with_histogram_opts_and_buckets(
169 mut opts: prometheus::HistogramOpts,
170 buckets: Option<Vec<f64>>,
171 ) -> Result<Self, prometheus::Error> {
172 if let Some(custom_buckets) = buckets {
173 opts = opts.buckets(custom_buckets);
174 }
175 prometheus::Histogram::with_opts(opts)
176 }
177}
178
179impl PrometheusMetric for prometheus::CounterVec {
181 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
182 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
184 }
185
186 fn with_opts_and_label_names(
187 opts: prometheus::Opts,
188 label_names: &[&str],
189 ) -> Result<Self, prometheus::Error> {
190 prometheus::CounterVec::new(opts, label_names)
191 }
192}
193
194pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
199 hierarchy: &H,
200 metric_name: &str,
201 metric_desc: &str,
202 labels: &[(&str, &str)],
203 buckets: Option<Vec<f64>>,
204 const_labels: Option<&[&str]>,
205) -> anyhow::Result<T> {
206 validate_no_duplicate_label_keys(labels)?;
208 let basename = hierarchy.basename();
211 let parent_hierarchies = hierarchy.parent_hierarchies();
212
213 let mut hierarchy_names: Vec<String> =
215 parent_hierarchies.iter().map(|p| p.basename()).collect();
216 hierarchy_names.push(basename.clone());
217
218 let metric_name = build_component_metric_name(metric_name);
219
220 let mut updated_labels: Vec<(String, String)> = Vec::new();
222
223 for (key, _) in labels {
230 if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
231 return Err(anyhow::anyhow!(
232 "Label '{}' is automatically added by auto-label injection and cannot be manually set",
233 key
234 ));
235 }
236 }
237
238 if hierarchy_names.len() > 1 {
241 let namespace = &hierarchy_names[1];
242 if !namespace.is_empty() {
243 let valid_namespace = sanitize_prometheus_label(namespace)?;
244 if !valid_namespace.is_empty() {
245 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
246 }
247 }
248 }
249 if hierarchy_names.len() > 2 {
250 let component = &hierarchy_names[2];
251 if !component.is_empty() {
252 let valid_component = sanitize_prometheus_label(component)?;
253 if !valid_component.is_empty() {
254 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
255 }
256 }
257 }
258 if hierarchy_names.len() > 3 {
259 let endpoint = &hierarchy_names[3];
260 if !endpoint.is_empty() {
261 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
262 if !valid_endpoint.is_empty() {
263 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
264 }
265 }
266 }
267
268 updated_labels.extend(
270 labels
271 .iter()
272 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
273 );
274 let prometheus_metric = if std::any::TypeId::of::<T>()
278 == std::any::TypeId::of::<prometheus::CounterVec>()
279 {
280 if buckets.is_some() {
283 return Err(anyhow::anyhow!(
284 "buckets parameter is not valid for CounterVec"
285 ));
286 }
287 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
288 for (key, value) in &updated_labels {
289 opts = opts.const_label(key.clone(), value.clone());
290 }
291 let label_names = const_labels
292 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
293 T::with_opts_and_label_names(opts, label_names)?
294 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
295 if buckets.is_some() {
298 return Err(anyhow::anyhow!(
299 "buckets parameter is not valid for GaugeVec"
300 ));
301 }
302 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
303 for (key, value) in &updated_labels {
304 opts = opts.const_label(key.clone(), value.clone());
305 }
306 let label_names = const_labels
307 .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
308 T::with_opts_and_label_names(opts, label_names)?
309 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
310 if const_labels.is_some() {
313 return Err(anyhow::anyhow!(
314 "const_labels parameter is not valid for Histogram"
315 ));
316 }
317 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
318 for (key, value) in &updated_labels {
319 opts = opts.const_label(key.clone(), value.clone());
320 }
321 T::with_histogram_opts_and_buckets(opts, buckets)?
322 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
323 if buckets.is_some() {
326 return Err(anyhow::anyhow!(
327 "buckets parameter is not valid for IntCounterVec"
328 ));
329 }
330 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
331 for (key, value) in &updated_labels {
332 opts = opts.const_label(key.clone(), value.clone());
333 }
334 let label_names = const_labels
335 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
336 T::with_opts_and_label_names(opts, label_names)?
337 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
338 if buckets.is_some() {
341 return Err(anyhow::anyhow!(
342 "buckets parameter is not valid for IntGaugeVec"
343 ));
344 }
345 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
346 for (key, value) in &updated_labels {
347 opts = opts.const_label(key.clone(), value.clone());
348 }
349 let label_names = const_labels
350 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
351 T::with_opts_and_label_names(opts, label_names)?
352 } else {
353 if buckets.is_some() {
356 return Err(anyhow::anyhow!(
357 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
358 ));
359 }
360 if const_labels.is_some() {
361 return Err(anyhow::anyhow!(
362 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
363 ));
364 }
365 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366 for (key, value) in &updated_labels {
367 opts = opts.const_label(key.clone(), value.clone());
368 }
369 T::with_opts(opts)?
370 };
371
372 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
373 hierarchy.get_metrics_registry().add_metric(collector)?;
374
375 Ok(prometheus_metric)
376}
377
378pub struct Metrics<H: MetricsHierarchy> {
381 hierarchy: H,
382}
383
384impl<H: MetricsHierarchy> Metrics<H> {
385 pub fn new(hierarchy: H) -> Self {
386 Self { hierarchy }
387 }
388
389 pub fn create_counter(
412 &self,
413 name: &str,
414 description: &str,
415 labels: &[(&str, &str)],
416 ) -> anyhow::Result<prometheus::Counter> {
417 create_metric(&self.hierarchy, name, description, labels, None, None)
418 }
419
420 pub fn create_countervec(
422 &self,
423 name: &str,
424 description: &str,
425 const_labels: &[&str],
426 const_label_values: &[(&str, &str)],
427 ) -> anyhow::Result<prometheus::CounterVec> {
428 create_metric(
429 &self.hierarchy,
430 name,
431 description,
432 const_label_values,
433 None,
434 Some(const_labels),
435 )
436 }
437
438 pub fn create_gauge(
440 &self,
441 name: &str,
442 description: &str,
443 labels: &[(&str, &str)],
444 ) -> anyhow::Result<prometheus::Gauge> {
445 create_metric(&self.hierarchy, name, description, labels, None, None)
446 }
447
448 pub fn create_gaugevec(
450 &self,
451 name: &str,
452 description: &str,
453 const_labels: &[&str],
454 const_label_values: &[(&str, &str)],
455 ) -> anyhow::Result<prometheus::GaugeVec> {
456 create_metric(
457 &self.hierarchy,
458 name,
459 description,
460 const_label_values,
461 None,
462 Some(const_labels),
463 )
464 }
465
466 pub fn create_histogram(
468 &self,
469 name: &str,
470 description: &str,
471 labels: &[(&str, &str)],
472 buckets: Option<Vec<f64>>,
473 ) -> anyhow::Result<prometheus::Histogram> {
474 create_metric(&self.hierarchy, name, description, labels, buckets, None)
475 }
476
477 pub fn create_intcounter(
479 &self,
480 name: &str,
481 description: &str,
482 labels: &[(&str, &str)],
483 ) -> anyhow::Result<prometheus::IntCounter> {
484 create_metric(&self.hierarchy, name, description, labels, None, None)
485 }
486
487 pub fn create_intcountervec(
489 &self,
490 name: &str,
491 description: &str,
492 const_labels: &[&str],
493 const_label_values: &[(&str, &str)],
494 ) -> anyhow::Result<prometheus::IntCounterVec> {
495 create_metric(
496 &self.hierarchy,
497 name,
498 description,
499 const_label_values,
500 None,
501 Some(const_labels),
502 )
503 }
504
505 pub fn create_intgauge(
507 &self,
508 name: &str,
509 description: &str,
510 labels: &[(&str, &str)],
511 ) -> anyhow::Result<prometheus::IntGauge> {
512 create_metric(&self.hierarchy, name, description, labels, None, None)
513 }
514
515 pub fn create_intgaugevec(
517 &self,
518 name: &str,
519 description: &str,
520 const_labels: &[&str],
521 const_label_values: &[(&str, &str)],
522 ) -> anyhow::Result<prometheus::IntGaugeVec> {
523 create_metric(
524 &self.hierarchy,
525 name,
526 description,
527 const_label_values,
528 None,
529 Some(const_labels),
530 )
531 }
532
533 pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
535 self.hierarchy
536 .get_metrics_registry()
537 .prometheus_expfmt_combined()
538 }
539}
540
541use crate::traits::DistributedRuntimeProvider;
545
546pub trait MetricsHierarchy: Send + Sync {
547 fn basename(&self) -> String;
553
554 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
558
559 fn get_metrics_registry(&self) -> &MetricsRegistry;
561
562 fn metrics(&self) -> Metrics<&Self>
569 where
570 Self: Sized,
571 {
572 Metrics::new(self)
573 }
574}
575
576impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
578 fn basename(&self) -> String {
579 (**self).basename()
580 }
581
582 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
583 (**self).parent_hierarchies()
584 }
585
586 fn get_metrics_registry(&self) -> &MetricsRegistry {
587 (**self).get_metrics_registry()
588 }
589}
590
591pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
600
601pub type PrometheusExpositionFormatCallback =
603 Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
604
605#[derive(Clone)]
610pub struct MetricsRegistry {
611 pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
614
615 child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
629
630 pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
633
634 pub prometheus_expfmt_callbacks:
637 Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
638}
639
640impl std::fmt::Debug for MetricsRegistry {
641 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642 f.debug_struct("MetricsRegistry")
643 .field("prometheus_registry", &"<RwLock<Registry>>")
644 .field(
645 "prometheus_update_callbacks",
646 &format!(
647 "<RwLock<Vec<Callback>>> with {} callbacks",
648 self.prometheus_update_callbacks.read().unwrap().len()
649 ),
650 )
651 .field(
652 "prometheus_expfmt_callbacks",
653 &format!(
654 "<RwLock<Vec<Callback>>> with {} callbacks",
655 self.prometheus_expfmt_callbacks.read().unwrap().len()
656 ),
657 )
658 .finish()
659 }
660}
661
662impl MetricsRegistry {
663 pub fn new() -> Self {
665 Self {
666 prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
667 child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
668 prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
669 prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
670 }
671 }
672
673 pub fn add_child_registry(&self, child: &MetricsRegistry) {
677 let child_ptr = Arc::as_ptr(&child.prometheus_registry);
678 let mut guard = self.child_registries.write().unwrap();
679 if guard
680 .iter()
681 .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
682 {
683 return;
684 }
685 guard.push(child.clone());
686 }
687
688 fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
689 fn visit(
695 registry: &MetricsRegistry,
696 out: &mut Vec<MetricsRegistry>,
697 seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
698 ) {
699 let ptr = Arc::as_ptr(®istry.prometheus_registry);
700 if !seen.insert(ptr) {
701 return;
702 }
703
704 out.push(registry.clone());
705
706 let children: Vec<MetricsRegistry> = registry
707 .child_registries
708 .read()
709 .unwrap()
710 .iter()
711 .cloned()
712 .collect();
713 for child in children {
714 visit(&child, out, seen);
715 }
716 }
717
718 let mut out = Vec::new();
719 let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
720 visit(self, &mut out, &mut seen);
721 out
722 }
723
724 pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
730 let registries = self.registries_for_combined_scrape();
731
732 for registry in ®istries {
734 for result in registry.execute_update_callbacks() {
735 if let Err(e) = result {
736 tracing::error!("Error executing metrics callback: {}", e);
737 }
738 }
739 }
740
741 let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
743 let mut seen_series: HashSet<String> = HashSet::new();
744
745 for (registry_idx, registry) in registries.iter().enumerate() {
746 let families = registry.get_prometheus_registry().gather();
747 for mut family in families {
748 let name = family.name().to_string();
749
750 let entry = by_name.entry(name.clone()).or_insert_with(|| {
751 let mut out = prometheus::proto::MetricFamily::new();
752 out.set_name(name.clone());
753 out.set_help(family.help().to_string());
754 out.set_field_type(family.get_field_type());
755 out
756 });
757
758 if entry.help() != family.help()
759 || entry.get_field_type() != family.get_field_type()
760 {
761 return Err(anyhow::anyhow!(
762 "Metric family '{}' has inconsistent help/type across registries (idx={})",
763 name,
764 registry_idx
765 ));
766 }
767
768 let mut metrics = family.take_metric();
769 for metric in metrics.drain(..) {
770 let mut labels: Vec<(String, String)> = metric
771 .get_label()
772 .iter()
773 .map(|lp| (lp.name().to_string(), lp.value().to_string()))
774 .collect();
775 labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
776
777 let key = format!(
778 "{}|{}",
779 name,
780 labels
781 .iter()
782 .map(|(k, v)| format!("{}={}", k, v))
783 .collect::<Vec<_>>()
784 .join(",")
785 );
786
787 if !seen_series.insert(key) {
788 tracing::warn!(
789 metric_name = %name,
790 labels = ?labels,
791 registry_idx,
792 "Duplicate Prometheus series while merging registries; dropping later sample"
793 );
794 continue;
795 }
796
797 entry.mut_metric().push(metric);
798 }
799 }
800 }
801
802 let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
803 merged.sort_by(|a, b| a.name().cmp(b.name()));
804
805 let encoder = prometheus::TextEncoder::new();
806 let mut buffer = Vec::new();
807 encoder.encode(&merged, &mut buffer)?;
808 let mut result = String::from_utf8(buffer)?;
809
810 let mut expfmt = String::new();
812 for registry in registries {
813 let text = registry.execute_expfmt_callbacks();
814 if !text.is_empty() {
815 if !expfmt.is_empty() && !expfmt.ends_with('\n') {
816 expfmt.push('\n');
817 }
818 expfmt.push_str(&text);
819 }
820 }
821
822 if !expfmt.is_empty() {
823 if !result.ends_with('\n') {
824 result.push('\n');
825 }
826 result.push_str(&expfmt);
827 }
828
829 Ok(result)
830 }
831
832 pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
834 self.prometheus_update_callbacks
835 .write()
836 .unwrap()
837 .push(callback);
838 }
839
840 pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
842 self.prometheus_expfmt_callbacks
843 .write()
844 .unwrap()
845 .push(callback);
846 }
847
848 pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
850 self.prometheus_update_callbacks
851 .read()
852 .unwrap()
853 .iter()
854 .map(|callback| callback())
855 .collect()
856 }
857
858 pub fn execute_expfmt_callbacks(&self) -> String {
860 let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
861 let mut result = String::new();
862 for callback in callbacks.iter() {
863 match callback() {
864 Ok(text) => {
865 if !text.is_empty() {
866 if !result.is_empty() && !result.ends_with('\n') {
867 result.push('\n');
868 }
869 result.push_str(&text);
870 }
871 }
872 Err(e) => {
873 tracing::error!("Error executing exposition text callback: {}", e);
874 }
875 }
876 }
877 result
878 }
879
880 pub fn add_metric(
882 &self,
883 collector: Box<dyn prometheus::core::Collector>,
884 ) -> anyhow::Result<()> {
885 self.prometheus_registry
886 .write()
887 .unwrap()
888 .register(collector)
889 .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
890 }
891
892 pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
894 self.prometheus_registry.read().unwrap()
895 }
896
897 pub fn has_metric_named(&self, metric_name: &str) -> bool {
899 self.prometheus_registry
900 .read()
901 .unwrap()
902 .gather()
903 .iter()
904 .any(|mf| mf.name() == metric_name)
905 }
906}
907
908impl Default for MetricsRegistry {
909 fn default() -> Self {
910 Self::new()
911 }
912}
913
914#[cfg(test)]
915mod test_helpers {
916 use super::prometheus_names::name_prefix;
917 use super::*;
918
919 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
922 where
923 F: FnMut(&str) -> bool,
924 {
925 input
926 .lines()
927 .filter(|line| predicate(line))
928 .map(|line| line.to_string())
929 .collect::<Vec<_>>()
930 }
931
932 pub fn extract_metrics(input: &str) -> Vec<String> {
935 filter_prometheus_lines(input, |line| {
936 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
937 && !line.starts_with("#")
938 && !line.trim().is_empty()
939 })
940 }
941
942 pub fn parse_prometheus_metric(
954 line: &str,
955 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
956 if line.trim().is_empty() || line.starts_with('#') {
957 return None;
958 }
959
960 let parts: Vec<&str> = line.split_whitespace().collect();
961 if parts.len() < 2 {
962 return None;
963 }
964
965 let metric_part = parts[0];
966 let value: f64 = parts[1].parse().ok()?;
967
968 let (name, labels) = if metric_part.contains('{') {
969 let brace_start = metric_part.find('{').unwrap();
970 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
971 let name = &metric_part[..brace_start];
972 let labels_str = &metric_part[brace_start + 1..brace_end];
973
974 let mut labels = std::collections::HashMap::new();
975 for pair in labels_str.split(',') {
976 if let Some((k, v)) = pair.split_once('=') {
977 let v = v.trim_matches('"');
978 labels.insert(k.trim().to_string(), v.to_string());
979 }
980 }
981 (name.to_string(), labels)
982 } else {
983 (metric_part.to_string(), std::collections::HashMap::new())
984 };
985
986 Some((name, labels, value))
987 }
988}
989
990#[cfg(test)]
991mod test_metricsregistry_units {
992 use super::*;
993
994 #[test]
995 fn test_build_component_metric_name_with_prefix() {
996 let result = build_component_metric_name("requests");
998 assert_eq!(result, "dynamo_component_requests");
999
1000 let result = build_component_metric_name("counter");
1001 assert_eq!(result, "dynamo_component_counter");
1002 }
1003
1004 #[test]
1005 fn test_parse_prometheus_metric() {
1006 use super::test_helpers::parse_prometheus_metric;
1007 use std::collections::HashMap;
1008
1009 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
1011 let parsed = parse_prometheus_metric(line);
1012 assert!(parsed.is_some());
1013
1014 let (name, labels, value) = parsed.unwrap();
1015 assert_eq!(name, "http_requests_total");
1016
1017 let mut expected_labels = HashMap::new();
1018 expected_labels.insert("method".to_string(), "GET".to_string());
1019 expected_labels.insert("status".to_string(), "200".to_string());
1020 assert_eq!(labels, expected_labels);
1021
1022 assert_eq!(value, 1234.0);
1023
1024 let line = "cpu_usage 98.5";
1026 let parsed = parse_prometheus_metric(line);
1027 assert!(parsed.is_some());
1028
1029 let (name, labels, value) = parsed.unwrap();
1030 assert_eq!(name, "cpu_usage");
1031 assert!(labels.is_empty());
1032 assert_eq!(value, 98.5);
1033
1034 let line = "response_time{service=\"api\"} 0.123";
1036 let parsed = parse_prometheus_metric(line);
1037 assert!(parsed.is_some());
1038
1039 let (name, labels, value) = parsed.unwrap();
1040 assert_eq!(name, "response_time");
1041
1042 let mut expected_labels = HashMap::new();
1043 expected_labels.insert("service".to_string(), "api".to_string());
1044 assert_eq!(labels, expected_labels);
1045
1046 assert_eq!(value, 0.123);
1047
1048 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
1055 }
1056
1057 #[test]
1058 fn test_metrics_registry_entry_callbacks() {
1059 use crate::MetricsRegistry;
1060 use std::sync::atomic::{AtomicUsize, Ordering};
1061
1062 {
1064 let registry = MetricsRegistry::new();
1065 let counter = Arc::new(AtomicUsize::new(0));
1066
1067 for increment in [1, 10, 100] {
1069 let counter_clone = counter.clone();
1070 registry.add_update_callback(Arc::new(move || {
1071 counter_clone.fetch_add(increment, Ordering::SeqCst);
1072 Ok(())
1073 }));
1074 }
1075
1076 assert_eq!(counter.load(Ordering::SeqCst), 0);
1078
1079 let results = registry.execute_update_callbacks();
1081 assert_eq!(results.len(), 3);
1082 assert!(results.iter().all(|r| r.is_ok()));
1083 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = registry.execute_update_callbacks();
1087 assert_eq!(results.len(), 3);
1088 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = registry.clone();
1092 assert_eq!(cloned.execute_update_callbacks().len(), 3);
1093 assert_eq!(counter.load(Ordering::SeqCst), 333); registry.execute_update_callbacks();
1097 assert_eq!(counter.load(Ordering::SeqCst), 444); }
1099
1100 {
1102 let registry = MetricsRegistry::new();
1103 let counter = Arc::new(AtomicUsize::new(0));
1104
1105 let counter_clone = counter.clone();
1107 registry.add_update_callback(Arc::new(move || {
1108 counter_clone.fetch_add(1, Ordering::SeqCst);
1109 Ok(())
1110 }));
1111
1112 registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1114
1115 let counter_clone = counter.clone();
1117 registry.add_update_callback(Arc::new(move || {
1118 counter_clone.fetch_add(10, Ordering::SeqCst);
1119 Ok(())
1120 }));
1121
1122 let results = registry.execute_update_callbacks();
1124 assert_eq!(results.len(), 3);
1125 assert!(results[0].is_ok());
1126 assert!(results[1].is_err());
1127 assert!(results[2].is_ok());
1128
1129 assert_eq!(
1131 results[1].as_ref().unwrap_err().to_string(),
1132 "Simulated error"
1133 );
1134
1135 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = registry.execute_update_callbacks();
1140 assert!(results[1].is_err());
1141 assert_eq!(counter.load(Ordering::SeqCst), 22); }
1143
1144 {
1146 let registry = MetricsRegistry::new();
1147 let results = registry.execute_update_callbacks();
1148 assert_eq!(results.len(), 0);
1149 }
1150 }
1151}
1152
1153#[cfg(feature = "integration")]
1154#[cfg(test)]
1155mod test_metricsregistry_prefixes {
1156 use super::*;
1157 use crate::distributed::distributed_test_utils::create_test_drt_async;
1158 use prometheus::core::Collector;
1159
1160 #[tokio::test]
1161 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1162 let drt = create_test_drt_async().await;
1163
1164 const DRT_NAME: &str = "";
1165 const NAMESPACE_NAME: &str = "ns901";
1166 const COMPONENT_NAME: &str = "comp901";
1167 const ENDPOINT_NAME: &str = "ep901";
1168 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1169 let component = namespace.component(COMPONENT_NAME).unwrap();
1170 let endpoint = component.endpoint(ENDPOINT_NAME);
1171
1172 assert_eq!(drt.basename(), DRT_NAME);
1174 assert_eq!(drt.parent_hierarchies().len(), 0);
1175 assert_eq!(namespace.basename(), NAMESPACE_NAME);
1179 assert_eq!(namespace.parent_hierarchies().len(), 1);
1180 assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1181 assert_eq!(component.basename(), COMPONENT_NAME);
1185 assert_eq!(component.parent_hierarchies().len(), 2);
1186 assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1187 assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1188 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1192 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1193 assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1194 assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1195 assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1196 assert!(
1200 namespace
1201 .parent_hierarchies()
1202 .iter()
1203 .any(|h| h.basename() == drt.basename())
1204 );
1205 assert!(
1206 component
1207 .parent_hierarchies()
1208 .iter()
1209 .any(|h| h.basename() == namespace.basename())
1210 );
1211 assert!(
1212 endpoint
1213 .parent_hierarchies()
1214 .iter()
1215 .any(|h| h.basename() == component.basename())
1216 );
1217
1218 assert_eq!(drt.parent_hierarchies().len(), 0);
1220 assert_eq!(namespace.parent_hierarchies().len(), 1);
1221 assert_eq!(component.parent_hierarchies().len(), 2);
1222 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1223
1224 let invalid_namespace = drt.namespace("@@123").unwrap();
1228 let result =
1229 invalid_namespace
1230 .metrics()
1231 .create_counter("test_counter", "A test counter", &[]);
1232 assert!(result.is_ok());
1233 if let Ok(counter) = &result {
1234 let desc = counter.desc();
1236 let namespace_label = desc[0]
1237 .const_label_pairs
1238 .iter()
1239 .find(|l| l.name() == "dynamo_namespace")
1240 .expect("Should have dynamo_namespace label");
1241 assert_eq!(namespace_label.value(), "_123");
1242 }
1243
1244 let valid_namespace = drt.namespace("ns567").unwrap();
1246 assert!(
1247 valid_namespace
1248 .metrics()
1249 .create_counter("test_counter", "A test counter", &[])
1250 .is_ok()
1251 );
1252 }
1253
1254 #[tokio::test]
1255 async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
1256 let drt = create_test_drt_async().await;
1260 let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
1261 let component = namespace.component("comp_expfmt_ep_only").unwrap();
1262 let endpoint = component.endpoint("ep_expfmt_ep_only");
1263
1264 let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
1265 let callback: PrometheusExpositionFormatCallback =
1266 Arc::new(move || Ok(metric_line.to_string()));
1267
1268 endpoint
1269 .get_metrics_registry()
1270 .add_expfmt_callback(callback);
1271
1272 let output = drt.metrics().prometheus_expfmt().unwrap();
1273 let occurrences = output
1274 .lines()
1275 .filter(|line| line == &metric_line.trim_end_matches('\n'))
1276 .count();
1277
1278 assert_eq!(
1279 occurrences, 1,
1280 "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
1281 occurrences, output
1282 );
1283 }
1284
1285 #[tokio::test]
1286 async fn test_recursive_namespace() {
1287 let drt = create_test_drt_async().await;
1289
1290 let ns1 = drt.namespace("ns1").unwrap();
1292 let ns2 = ns1.namespace("ns2").unwrap();
1293 let ns3 = ns2.namespace("ns3").unwrap();
1294
1295 let component = ns3.component("test-component").unwrap();
1297
1298 assert_eq!(ns1.basename(), "ns1");
1300 assert_eq!(ns1.parent_hierarchies().len(), 1);
1301 assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1302 assert_eq!(ns2.basename(), "ns2");
1305 assert_eq!(ns2.parent_hierarchies().len(), 2);
1306 assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1307 assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1308 assert_eq!(ns3.basename(), "ns3");
1311 assert_eq!(ns3.parent_hierarchies().len(), 3);
1312 assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1313 assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1314 assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1315 assert_eq!(component.basename(), "test-component");
1318 assert_eq!(component.parent_hierarchies().len(), 4);
1319 assert_eq!(component.parent_hierarchies()[0].basename(), "");
1320 assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1321 assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1322 assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1323 println!("✓ Chained namespace test passed - all prefixes correct");
1326 }
1327}
1328
1329#[cfg(feature = "integration")]
1330#[cfg(test)]
1331mod test_metricsregistry_prometheus_fmt_outputs {
1332 use super::prometheus_names::name_prefix;
1333 use super::*;
1334 use crate::distributed::distributed_test_utils::create_test_drt_async;
1335 use prometheus::Counter;
1336 use std::sync::Arc;
1337
1338 #[tokio::test]
1339 async fn test_prometheusfactory_using_metrics_registry_trait() {
1340 let drt = create_test_drt_async().await;
1342
1343 let namespace_name = "ns345";
1345
1346 let namespace = drt.namespace(namespace_name).unwrap();
1347 let component = namespace.component("comp345").unwrap();
1348 let endpoint = component.endpoint("ep345");
1349
1350 let counter = endpoint
1352 .metrics()
1353 .create_counter("testcounter", "A test counter", &[])
1354 .unwrap();
1355 counter.inc_by(123.456789);
1356 let epsilon = 0.01;
1357 assert!((counter.get() - 123.456789).abs() < epsilon);
1358
1359 let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1360 println!("Endpoint output:");
1361 println!("{}", endpoint_output_raw);
1362
1363 let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1364# TYPE dynamo_component_testcounter counter
1365dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1366
1367 assert_eq!(
1368 endpoint_output_raw.trim_end_matches('\n'),
1369 expected_endpoint_output.trim_end_matches('\n'),
1370 "\n=== ENDPOINT COMPARISON FAILED ===\n\
1371 Actual:\n{}\n\
1372 Expected:\n{}\n\
1373 ==============================",
1374 endpoint_output_raw,
1375 expected_endpoint_output
1376 );
1377
1378 let gauge = component
1380 .metrics()
1381 .create_gauge("testgauge", "A test gauge", &[])
1382 .unwrap();
1383 gauge.set(50000.0);
1384 assert_eq!(gauge.get(), 50000.0);
1385
1386 let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1388 println!("Component output:");
1389 println!("{}", component_output_raw);
1390
1391 let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1392# TYPE dynamo_component_testcounter counter
1393dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1394# HELP dynamo_component_testgauge A test gauge
1395# TYPE dynamo_component_testgauge gauge
1396dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1397
1398 assert_eq!(
1399 component_output_raw.trim_end_matches('\n'),
1400 expected_component_output.trim_end_matches('\n'),
1401 "\n=== COMPONENT COMPARISON FAILED ===\n\
1402 Actual:\n{}\n\
1403 Expected:\n{}\n\
1404 ==============================",
1405 component_output_raw,
1406 expected_component_output
1407 );
1408
1409 let intcounter = namespace
1410 .metrics()
1411 .create_intcounter("testintcounter", "A test int counter", &[])
1412 .unwrap();
1413 intcounter.inc_by(12345);
1414 assert_eq!(intcounter.get(), 12345);
1415
1416 let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1418 println!("Namespace output:");
1419 println!("{}", namespace_output_raw);
1420
1421 let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1422# TYPE dynamo_component_testcounter counter
1423dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1424# HELP dynamo_component_testgauge A test gauge
1425# TYPE dynamo_component_testgauge gauge
1426dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1427# HELP dynamo_component_testintcounter A test int counter
1428# TYPE dynamo_component_testintcounter counter
1429dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1430
1431 assert_eq!(
1432 namespace_output_raw.trim_end_matches('\n'),
1433 expected_namespace_output.trim_end_matches('\n'),
1434 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1435 Actual:\n{}\n\
1436 Expected:\n{}\n\
1437 ==============================",
1438 namespace_output_raw,
1439 expected_namespace_output
1440 );
1441
1442 let intgauge = namespace
1444 .metrics()
1445 .create_intgauge("testintgauge", "A test int gauge", &[])
1446 .unwrap();
1447 intgauge.set(42);
1448 assert_eq!(intgauge.get(), 42);
1449
1450 let intgaugevec = namespace
1452 .metrics()
1453 .create_intgaugevec(
1454 "testintgaugevec",
1455 "A test int gauge vector",
1456 &["instance", "status"],
1457 &[("service", "api")],
1458 )
1459 .unwrap();
1460 intgaugevec
1461 .with_label_values(&["server1", "active"])
1462 .set(10);
1463 intgaugevec
1464 .with_label_values(&["server2", "inactive"])
1465 .set(0);
1466
1467 let countervec = endpoint
1469 .metrics()
1470 .create_countervec(
1471 "testcountervec",
1472 "A test counter vector",
1473 &["method", "status"],
1474 &[("service", "api")],
1475 )
1476 .unwrap();
1477 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1478 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1479
1480 let histogram = component
1482 .metrics()
1483 .create_histogram("testhistogram", "A test histogram", &[], None)
1484 .unwrap();
1485 histogram.observe(1.0);
1486 histogram.observe(2.5);
1487 histogram.observe(4.0);
1488
1489 let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1491 println!("DRT output:");
1492 println!("{}", drt_output_raw);
1493
1494 let expected_drt_output_without_uptime = r#"# HELP dynamo_component_testcounter A test counter
1497# TYPE dynamo_component_testcounter counter
1498dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1499# HELP dynamo_component_testcountervec A test counter vector
1500# TYPE dynamo_component_testcountervec counter
1501dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1502dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1503# HELP dynamo_component_testgauge A test gauge
1504# TYPE dynamo_component_testgauge gauge
1505dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1506# HELP dynamo_component_testhistogram A test histogram
1507# TYPE dynamo_component_testhistogram histogram
1508dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1509dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1510dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1511dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1512dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1513dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1514dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1515dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1516dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1517dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1518dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1519dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1520dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1521dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1522# HELP dynamo_component_testintcounter A test int counter
1523# TYPE dynamo_component_testintcounter counter
1524dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1525# HELP dynamo_component_testintgauge A test int gauge
1526# TYPE dynamo_component_testintgauge gauge
1527dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1528# HELP dynamo_component_testintgaugevec A test int gauge vector
1529# TYPE dynamo_component_testintgaugevec gauge
1530dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1531dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#;
1532
1533 let mut non_uptime_lines = Vec::new();
1535 let mut uptime_value: Option<f64> = None;
1536 for line in drt_output_raw.trim_end_matches('\n').lines() {
1537 if line.starts_with("dynamo_component_uptime_seconds ") {
1538 let val_str = line
1539 .strip_prefix("dynamo_component_uptime_seconds ")
1540 .unwrap();
1541 uptime_value = Some(val_str.parse::<f64>().expect("uptime should be a float"));
1542 } else if line.starts_with("# HELP dynamo_component_uptime_seconds")
1543 || line.starts_with("# TYPE dynamo_component_uptime_seconds")
1544 {
1545 } else {
1547 non_uptime_lines.push(line);
1548 }
1549 }
1550
1551 let actual_without_uptime = non_uptime_lines.join("\n");
1552 assert_eq!(
1553 actual_without_uptime,
1554 expected_drt_output_without_uptime.trim_end_matches('\n'),
1555 "\n=== DRT COMPARISON FAILED (excluding uptime) ===\n\
1556 Expected:\n{}\n\
1557 Actual:\n{}\n\
1558 ==============================",
1559 expected_drt_output_without_uptime,
1560 actual_without_uptime
1561 );
1562
1563 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1565 let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
1566 let uptime_after: f64 = drt_output_after
1567 .lines()
1568 .find(|l| l.starts_with("dynamo_component_uptime_seconds "))
1569 .expect("uptime_seconds metric should be present after sleep")
1570 .strip_prefix("dynamo_component_uptime_seconds ")
1571 .unwrap()
1572 .parse()
1573 .expect("uptime should be a float");
1574 assert!(
1575 uptime_after > 0.0,
1576 "uptime_seconds should be > 0 after 10ms sleep, got {}",
1577 uptime_after
1578 );
1579
1580 println!("✓ All Prometheus format outputs verified successfully!");
1581 }
1582
1583 #[test]
1584 fn test_refactored_filter_functions() {
1585 let test_input = r#"# HELP dynamo_component_requests Total requests
1587# TYPE dynamo_component_requests counter
1588dynamo_component_requests 42
1589# HELP dynamo_component_latency Response latency
1590# TYPE dynamo_component_latency histogram
1591dynamo_component_latency_bucket{le="0.1"} 10
1592dynamo_component_latency_bucket{le="0.5"} 25
1593dynamo_component_errors_total 5"#;
1594
1595 let metrics_only = super::test_helpers::extract_metrics(test_input);
1597 assert_eq!(metrics_only.len(), 4); assert!(
1599 metrics_only
1600 .iter()
1601 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1602 );
1603
1604 println!("✓ All refactored filter functions work correctly!");
1605 }
1606
1607 #[tokio::test]
1608 async fn test_same_metric_name_different_endpoints() {
1609 let drt = create_test_drt_async().await;
1613 let namespace = drt.namespace("ns_test").unwrap();
1614 let component = namespace.component("comp_test").unwrap();
1615
1616 let ep1 = component.endpoint("ep1");
1618 let ep2 = component.endpoint("ep2");
1619
1620 let counter1 = ep1
1621 .metrics()
1622 .create_counter("requests_total", "Total requests", &[])
1623 .unwrap();
1624 counter1.inc_by(100.0);
1625
1626 let counter2 = ep2
1627 .metrics()
1628 .create_counter("requests_total", "Total requests", &[])
1629 .unwrap();
1630 counter2.inc_by(200.0);
1631
1632 let output = component.metrics().prometheus_expfmt().unwrap();
1634
1635 let expected_output = r#"# HELP dynamo_component_requests_total Total requests
1636# TYPE dynamo_component_requests_total counter
1637dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
1638dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#;
1639
1640 assert_eq!(
1641 output.trim_end_matches('\n'),
1642 expected_output.trim_end_matches('\n'),
1643 "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
1644 Actual:\n{}\n\
1645 Expected:\n{}\n\
1646 ==============================",
1647 output,
1648 expected_output
1649 );
1650
1651 println!("✓ Multi-registry prevents Prometheus collisions!");
1652 }
1653
1654 #[tokio::test]
1655 async fn test_duplicate_series_warning() {
1656 let drt = create_test_drt_async().await;
1659 let namespace = drt.namespace("ns_dup").unwrap();
1660 let component = namespace.component("comp_dup").unwrap();
1661
1662 let ep1 = component.endpoint("ep_same");
1664 let ep2 = component.endpoint("ep_same"); let counter1 = ep1
1667 .metrics()
1668 .create_counter("dup_metric", "Duplicate metric test", &[])
1669 .unwrap();
1670 counter1.inc_by(50.0);
1671
1672 let counter2 = ep2
1673 .metrics()
1674 .create_counter("dup_metric", "Duplicate metric test", &[])
1675 .unwrap();
1676 counter2.inc_by(75.0);
1677
1678 let output = component.metrics().prometheus_expfmt().unwrap();
1680
1681 let expected_output = r#"# HELP dynamo_component_dup_metric Duplicate metric test
1682# TYPE dynamo_component_dup_metric counter
1683dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#;
1684
1685 assert_eq!(
1686 output.trim_end_matches('\n'),
1687 expected_output.trim_end_matches('\n'),
1688 "\n=== DEDUPLICATION COMPARISON FAILED ===\n\
1689 Actual:\n{}\n\
1690 Expected:\n{}\n\
1691 ==============================",
1692 output,
1693 expected_output
1694 );
1695
1696 println!("✓ Duplicate series detection and deduplication works!");
1697 }
1698}