1pub mod prometheus_names;
10
11use parking_lot::Mutex;
12use std::collections::HashSet;
13use std::sync::Arc;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22use prometheus_names::{
24 build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
25 sanitize_prometheus_name, work_handler,
26};
27
28use crate::pipeline::{
30 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31 network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37pub const USE_AUTO_LABELS: bool = true;
40
41use prometheus::Encoder;
43
44fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47 let mut seen_keys = std::collections::HashSet::new();
48 for (key, _) in labels {
49 if !seen_keys.insert(*key) {
50 return Err(anyhow::anyhow!(
51 "Duplicate label key '{}' found in labels",
52 key
53 ));
54 }
55 }
56 Ok(())
57}
58
59pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
64 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
66 where
67 Self: Sized;
68
69 fn with_histogram_opts_and_buckets(
72 _opts: prometheus::HistogramOpts,
73 _buckets: Option<Vec<f64>>,
74 ) -> Result<Self, prometheus::Error>
75 where
76 Self: Sized,
77 {
78 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
79 }
80
81 fn with_opts_and_label_names(
84 _opts: prometheus::Opts,
85 _label_names: &[&str],
86 ) -> Result<Self, prometheus::Error>
87 where
88 Self: Sized,
89 {
90 panic!("with_opts_and_label_names is not implemented for this metric type");
91 }
92}
93
94impl PrometheusMetric for prometheus::Counter {
96 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
97 prometheus::Counter::with_opts(opts)
98 }
99}
100
101impl PrometheusMetric for prometheus::IntCounter {
102 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
103 prometheus::IntCounter::with_opts(opts)
104 }
105}
106
107impl PrometheusMetric for prometheus::Gauge {
108 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
109 prometheus::Gauge::with_opts(opts)
110 }
111}
112
113impl PrometheusMetric for prometheus::IntGauge {
114 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
115 prometheus::IntGauge::with_opts(opts)
116 }
117}
118
119impl PrometheusMetric for prometheus::GaugeVec {
120 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
121 Err(prometheus::Error::Msg(
122 "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
123 ))
124 }
125
126 fn with_opts_and_label_names(
127 opts: prometheus::Opts,
128 label_names: &[&str],
129 ) -> Result<Self, prometheus::Error> {
130 prometheus::GaugeVec::new(opts, label_names)
131 }
132}
133
134impl PrometheusMetric for prometheus::IntGaugeVec {
135 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
136 Err(prometheus::Error::Msg(
137 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
138 ))
139 }
140
141 fn with_opts_and_label_names(
142 opts: prometheus::Opts,
143 label_names: &[&str],
144 ) -> Result<Self, prometheus::Error> {
145 prometheus::IntGaugeVec::new(opts, label_names)
146 }
147}
148
149impl PrometheusMetric for prometheus::IntCounterVec {
150 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
151 Err(prometheus::Error::Msg(
152 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
153 ))
154 }
155
156 fn with_opts_and_label_names(
157 opts: prometheus::Opts,
158 label_names: &[&str],
159 ) -> Result<Self, prometheus::Error> {
160 prometheus::IntCounterVec::new(opts, label_names)
161 }
162}
163
164impl PrometheusMetric for prometheus::Histogram {
166 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
167 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
169 prometheus::Histogram::with_opts(histogram_opts)
170 }
171
172 fn with_histogram_opts_and_buckets(
173 mut opts: prometheus::HistogramOpts,
174 buckets: Option<Vec<f64>>,
175 ) -> Result<Self, prometheus::Error> {
176 if let Some(custom_buckets) = buckets {
177 opts = opts.buckets(custom_buckets);
178 }
179 prometheus::Histogram::with_opts(opts)
180 }
181}
182
183impl PrometheusMetric for prometheus::CounterVec {
185 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
186 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
188 }
189
190 fn with_opts_and_label_names(
191 opts: prometheus::Opts,
192 label_names: &[&str],
193 ) -> Result<Self, prometheus::Error> {
194 prometheus::CounterVec::new(opts, label_names)
195 }
196}
197
198pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
203 hierarchy: &H,
204 metric_name: &str,
205 metric_desc: &str,
206 labels: &[(&str, &str)],
207 buckets: Option<Vec<f64>>,
208 const_labels: Option<&[&str]>,
209) -> anyhow::Result<T> {
210 validate_no_duplicate_label_keys(labels)?;
212 let basename = hierarchy.basename();
215 let parent_hierarchies = hierarchy.parent_hierarchies();
216
217 let mut hierarchy_names: Vec<String> =
219 parent_hierarchies.iter().map(|p| p.basename()).collect();
220 hierarchy_names.push(basename.clone());
221
222 let metric_name = build_component_metric_name(metric_name);
223
224 let mut updated_labels: Vec<(String, String)> = Vec::new();
226
227 if USE_AUTO_LABELS {
228 for (key, _) in labels {
230 if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
231 return Err(anyhow::anyhow!(
232 "Label '{}' is automatically added by auto_label feature and cannot be manually set",
233 key
234 ));
235 }
236 }
237
238 if hierarchy_names.len() > 1 {
240 let namespace = &hierarchy_names[1];
241 if !namespace.is_empty() {
242 let valid_namespace = sanitize_prometheus_label(namespace)?;
243 if !valid_namespace.is_empty() {
244 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
245 }
246 }
247 }
248 if hierarchy_names.len() > 2 {
249 let component = &hierarchy_names[2];
250 if !component.is_empty() {
251 let valid_component = sanitize_prometheus_label(component)?;
252 if !valid_component.is_empty() {
253 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
254 }
255 }
256 }
257 if hierarchy_names.len() > 3 {
258 let endpoint = &hierarchy_names[3];
259 if !endpoint.is_empty() {
260 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
261 if !valid_endpoint.is_empty() {
262 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
263 }
264 }
265 }
266 }
267
268 updated_labels.extend(
270 labels
271 .iter()
272 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
273 );
274 let prometheus_metric = if std::any::TypeId::of::<T>()
278 == std::any::TypeId::of::<prometheus::CounterVec>()
279 {
280 if buckets.is_some() {
283 return Err(anyhow::anyhow!(
284 "buckets parameter is not valid for CounterVec"
285 ));
286 }
287 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
288 for (key, value) in &updated_labels {
289 opts = opts.const_label(key.clone(), value.clone());
290 }
291 let label_names = const_labels
292 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
293 T::with_opts_and_label_names(opts, label_names)?
294 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
295 if buckets.is_some() {
298 return Err(anyhow::anyhow!(
299 "buckets parameter is not valid for GaugeVec"
300 ));
301 }
302 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
303 for (key, value) in &updated_labels {
304 opts = opts.const_label(key.clone(), value.clone());
305 }
306 let label_names = const_labels
307 .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
308 T::with_opts_and_label_names(opts, label_names)?
309 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
310 if const_labels.is_some() {
313 return Err(anyhow::anyhow!(
314 "const_labels parameter is not valid for Histogram"
315 ));
316 }
317 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
318 for (key, value) in &updated_labels {
319 opts = opts.const_label(key.clone(), value.clone());
320 }
321 T::with_histogram_opts_and_buckets(opts, buckets)?
322 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
323 if buckets.is_some() {
326 return Err(anyhow::anyhow!(
327 "buckets parameter is not valid for IntCounterVec"
328 ));
329 }
330 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
331 for (key, value) in &updated_labels {
332 opts = opts.const_label(key.clone(), value.clone());
333 }
334 let label_names = const_labels
335 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
336 T::with_opts_and_label_names(opts, label_names)?
337 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
338 if buckets.is_some() {
341 return Err(anyhow::anyhow!(
342 "buckets parameter is not valid for IntGaugeVec"
343 ));
344 }
345 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
346 for (key, value) in &updated_labels {
347 opts = opts.const_label(key.clone(), value.clone());
348 }
349 let label_names = const_labels
350 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
351 T::with_opts_and_label_names(opts, label_names)?
352 } else {
353 if buckets.is_some() {
356 return Err(anyhow::anyhow!(
357 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
358 ));
359 }
360 if const_labels.is_some() {
361 return Err(anyhow::anyhow!(
362 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
363 ));
364 }
365 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366 for (key, value) in &updated_labels {
367 opts = opts.const_label(key.clone(), value.clone());
368 }
369 T::with_opts(opts)?
370 };
371
372 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
373 hierarchy.get_metrics_registry().add_metric(collector)?;
374
375 Ok(prometheus_metric)
376}
377
378pub struct Metrics<H: MetricsHierarchy> {
381 hierarchy: H,
382}
383
384impl<H: MetricsHierarchy> Metrics<H> {
385 pub fn new(hierarchy: H) -> Self {
386 Self { hierarchy }
387 }
388
389 pub fn create_counter(
412 &self,
413 name: &str,
414 description: &str,
415 labels: &[(&str, &str)],
416 ) -> anyhow::Result<prometheus::Counter> {
417 create_metric(&self.hierarchy, name, description, labels, None, None)
418 }
419
420 pub fn create_countervec(
422 &self,
423 name: &str,
424 description: &str,
425 const_labels: &[&str],
426 const_label_values: &[(&str, &str)],
427 ) -> anyhow::Result<prometheus::CounterVec> {
428 create_metric(
429 &self.hierarchy,
430 name,
431 description,
432 const_label_values,
433 None,
434 Some(const_labels),
435 )
436 }
437
438 pub fn create_gauge(
440 &self,
441 name: &str,
442 description: &str,
443 labels: &[(&str, &str)],
444 ) -> anyhow::Result<prometheus::Gauge> {
445 create_metric(&self.hierarchy, name, description, labels, None, None)
446 }
447
448 pub fn create_gaugevec(
450 &self,
451 name: &str,
452 description: &str,
453 const_labels: &[&str],
454 const_label_values: &[(&str, &str)],
455 ) -> anyhow::Result<prometheus::GaugeVec> {
456 create_metric(
457 &self.hierarchy,
458 name,
459 description,
460 const_label_values,
461 None,
462 Some(const_labels),
463 )
464 }
465
466 pub fn create_histogram(
468 &self,
469 name: &str,
470 description: &str,
471 labels: &[(&str, &str)],
472 buckets: Option<Vec<f64>>,
473 ) -> anyhow::Result<prometheus::Histogram> {
474 create_metric(&self.hierarchy, name, description, labels, buckets, None)
475 }
476
477 pub fn create_intcounter(
479 &self,
480 name: &str,
481 description: &str,
482 labels: &[(&str, &str)],
483 ) -> anyhow::Result<prometheus::IntCounter> {
484 create_metric(&self.hierarchy, name, description, labels, None, None)
485 }
486
487 pub fn create_intcountervec(
489 &self,
490 name: &str,
491 description: &str,
492 const_labels: &[&str],
493 const_label_values: &[(&str, &str)],
494 ) -> anyhow::Result<prometheus::IntCounterVec> {
495 create_metric(
496 &self.hierarchy,
497 name,
498 description,
499 const_label_values,
500 None,
501 Some(const_labels),
502 )
503 }
504
505 pub fn create_intgauge(
507 &self,
508 name: &str,
509 description: &str,
510 labels: &[(&str, &str)],
511 ) -> anyhow::Result<prometheus::IntGauge> {
512 create_metric(&self.hierarchy, name, description, labels, None, None)
513 }
514
515 pub fn create_intgaugevec(
517 &self,
518 name: &str,
519 description: &str,
520 const_labels: &[&str],
521 const_label_values: &[(&str, &str)],
522 ) -> anyhow::Result<prometheus::IntGaugeVec> {
523 create_metric(
524 &self.hierarchy,
525 name,
526 description,
527 const_label_values,
528 None,
529 Some(const_labels),
530 )
531 }
532
533 pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
535 self.hierarchy
536 .get_metrics_registry()
537 .prometheus_expfmt_combined()
538 }
539}
540
541use crate::traits::DistributedRuntimeProvider;
545
546pub trait MetricsHierarchy: Send + Sync {
547 fn basename(&self) -> String;
553
554 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
558
559 fn get_metrics_registry(&self) -> &MetricsRegistry;
561
562 fn metrics(&self) -> Metrics<&Self>
569 where
570 Self: Sized,
571 {
572 Metrics::new(self)
573 }
574}
575
576impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
578 fn basename(&self) -> String {
579 (**self).basename()
580 }
581
582 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
583 (**self).parent_hierarchies()
584 }
585
586 fn get_metrics_registry(&self) -> &MetricsRegistry {
587 (**self).get_metrics_registry()
588 }
589}
590
591pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
600
601pub type PrometheusExpositionFormatCallback =
603 Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
604
605#[derive(Clone)]
610pub struct MetricsRegistry {
611 pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
614
615 child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
629
630 pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
633
634 pub prometheus_expfmt_callbacks:
637 Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
638}
639
640impl std::fmt::Debug for MetricsRegistry {
641 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642 f.debug_struct("MetricsRegistry")
643 .field("prometheus_registry", &"<RwLock<Registry>>")
644 .field(
645 "prometheus_update_callbacks",
646 &format!(
647 "<RwLock<Vec<Callback>>> with {} callbacks",
648 self.prometheus_update_callbacks.read().unwrap().len()
649 ),
650 )
651 .field(
652 "prometheus_expfmt_callbacks",
653 &format!(
654 "<RwLock<Vec<Callback>>> with {} callbacks",
655 self.prometheus_expfmt_callbacks.read().unwrap().len()
656 ),
657 )
658 .finish()
659 }
660}
661
662impl MetricsRegistry {
663 pub fn new() -> Self {
665 Self {
666 prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
667 child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
668 prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
669 prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
670 }
671 }
672
673 pub fn add_child_registry(&self, child: &MetricsRegistry) {
677 let child_ptr = Arc::as_ptr(&child.prometheus_registry);
678 let mut guard = self.child_registries.write().unwrap();
679 if guard
680 .iter()
681 .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
682 {
683 return;
684 }
685 guard.push(child.clone());
686 }
687
688 fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
689 fn visit(
695 registry: &MetricsRegistry,
696 out: &mut Vec<MetricsRegistry>,
697 seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
698 ) {
699 let ptr = Arc::as_ptr(®istry.prometheus_registry);
700 if !seen.insert(ptr) {
701 return;
702 }
703
704 out.push(registry.clone());
705
706 let children: Vec<MetricsRegistry> = registry
707 .child_registries
708 .read()
709 .unwrap()
710 .iter()
711 .cloned()
712 .collect();
713 for child in children {
714 visit(&child, out, seen);
715 }
716 }
717
718 let mut out = Vec::new();
719 let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
720 visit(self, &mut out, &mut seen);
721 out
722 }
723
724 pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
730 let registries = self.registries_for_combined_scrape();
731
732 for registry in ®istries {
734 for result in registry.execute_update_callbacks() {
735 if let Err(e) = result {
736 tracing::error!("Error executing metrics callback: {}", e);
737 }
738 }
739 }
740
741 let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
743 let mut seen_series: HashSet<String> = HashSet::new();
744
745 for (registry_idx, registry) in registries.iter().enumerate() {
746 let families = registry.get_prometheus_registry().gather();
747 for mut family in families {
748 let name = family.name().to_string();
749
750 let entry = by_name.entry(name.clone()).or_insert_with(|| {
751 let mut out = prometheus::proto::MetricFamily::new();
752 out.set_name(name.clone());
753 out.set_help(family.help().to_string());
754 out.set_field_type(family.get_field_type());
755 out
756 });
757
758 if entry.help() != family.help()
759 || entry.get_field_type() != family.get_field_type()
760 {
761 return Err(anyhow::anyhow!(
762 "Metric family '{}' has inconsistent help/type across registries (idx={})",
763 name,
764 registry_idx
765 ));
766 }
767
768 let mut metrics = family.take_metric();
769 for metric in metrics.drain(..) {
770 let mut labels: Vec<(String, String)> = metric
771 .get_label()
772 .iter()
773 .map(|lp| (lp.name().to_string(), lp.value().to_string()))
774 .collect();
775 labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
776
777 let key = format!(
778 "{}|{}",
779 name,
780 labels
781 .iter()
782 .map(|(k, v)| format!("{}={}", k, v))
783 .collect::<Vec<_>>()
784 .join(",")
785 );
786
787 if !seen_series.insert(key) {
788 tracing::warn!(
789 metric_name = %name,
790 labels = ?labels,
791 registry_idx,
792 "Duplicate Prometheus series while merging registries; dropping later sample"
793 );
794 continue;
795 }
796
797 entry.mut_metric().push(metric);
798 }
799 }
800 }
801
802 let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
803 merged.sort_by(|a, b| a.name().cmp(b.name()));
804
805 let encoder = prometheus::TextEncoder::new();
806 let mut buffer = Vec::new();
807 encoder.encode(&merged, &mut buffer)?;
808 let mut result = String::from_utf8(buffer)?;
809
810 let mut expfmt = String::new();
812 for registry in registries {
813 let text = registry.execute_expfmt_callbacks();
814 if !text.is_empty() {
815 if !expfmt.is_empty() && !expfmt.ends_with('\n') {
816 expfmt.push('\n');
817 }
818 expfmt.push_str(&text);
819 }
820 }
821
822 if !expfmt.is_empty() {
823 if !result.ends_with('\n') {
824 result.push('\n');
825 }
826 result.push_str(&expfmt);
827 }
828
829 Ok(result)
830 }
831
832 pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
834 self.prometheus_update_callbacks
835 .write()
836 .unwrap()
837 .push(callback);
838 }
839
840 pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
842 self.prometheus_expfmt_callbacks
843 .write()
844 .unwrap()
845 .push(callback);
846 }
847
848 pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
850 self.prometheus_update_callbacks
851 .read()
852 .unwrap()
853 .iter()
854 .map(|callback| callback())
855 .collect()
856 }
857
858 pub fn execute_expfmt_callbacks(&self) -> String {
860 let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
861 let mut result = String::new();
862 for callback in callbacks.iter() {
863 match callback() {
864 Ok(text) => {
865 if !text.is_empty() {
866 if !result.is_empty() && !result.ends_with('\n') {
867 result.push('\n');
868 }
869 result.push_str(&text);
870 }
871 }
872 Err(e) => {
873 tracing::error!("Error executing exposition text callback: {}", e);
874 }
875 }
876 }
877 result
878 }
879
880 pub fn add_metric(
882 &self,
883 collector: Box<dyn prometheus::core::Collector>,
884 ) -> anyhow::Result<()> {
885 self.prometheus_registry
886 .write()
887 .unwrap()
888 .register(collector)
889 .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
890 }
891
892 pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
894 self.prometheus_registry.read().unwrap()
895 }
896
897 pub fn has_metric_named(&self, metric_name: &str) -> bool {
899 self.prometheus_registry
900 .read()
901 .unwrap()
902 .gather()
903 .iter()
904 .any(|mf| mf.name() == metric_name)
905 }
906}
907
908impl Default for MetricsRegistry {
909 fn default() -> Self {
910 Self::new()
911 }
912}
913
914#[cfg(test)]
915mod test_helpers {
916 use super::prometheus_names::name_prefix;
917 use super::*;
918
919 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
922 where
923 F: FnMut(&str) -> bool,
924 {
925 input
926 .lines()
927 .filter(|line| predicate(line))
928 .map(|line| line.to_string())
929 .collect::<Vec<_>>()
930 }
931
932 pub fn extract_metrics(input: &str) -> Vec<String> {
935 filter_prometheus_lines(input, |line| {
936 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
937 && !line.starts_with("#")
938 && !line.trim().is_empty()
939 })
940 }
941
942 pub fn parse_prometheus_metric(
954 line: &str,
955 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
956 if line.trim().is_empty() || line.starts_with('#') {
957 return None;
958 }
959
960 let parts: Vec<&str> = line.split_whitespace().collect();
961 if parts.len() < 2 {
962 return None;
963 }
964
965 let metric_part = parts[0];
966 let value: f64 = parts[1].parse().ok()?;
967
968 let (name, labels) = if metric_part.contains('{') {
969 let brace_start = metric_part.find('{').unwrap();
970 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
971 let name = &metric_part[..brace_start];
972 let labels_str = &metric_part[brace_start + 1..brace_end];
973
974 let mut labels = std::collections::HashMap::new();
975 for pair in labels_str.split(',') {
976 if let Some((k, v)) = pair.split_once('=') {
977 let v = v.trim_matches('"');
978 labels.insert(k.trim().to_string(), v.to_string());
979 }
980 }
981 (name.to_string(), labels)
982 } else {
983 (metric_part.to_string(), std::collections::HashMap::new())
984 };
985
986 Some((name, labels, value))
987 }
988}
989
990#[cfg(test)]
991mod test_metricsregistry_units {
992 use super::*;
993
994 #[test]
995 fn test_build_component_metric_name_with_prefix() {
996 let result = build_component_metric_name("requests");
998 assert_eq!(result, "dynamo_component_requests");
999
1000 let result = build_component_metric_name("counter");
1001 assert_eq!(result, "dynamo_component_counter");
1002 }
1003
1004 #[test]
1005 fn test_parse_prometheus_metric() {
1006 use super::test_helpers::parse_prometheus_metric;
1007 use std::collections::HashMap;
1008
1009 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
1011 let parsed = parse_prometheus_metric(line);
1012 assert!(parsed.is_some());
1013
1014 let (name, labels, value) = parsed.unwrap();
1015 assert_eq!(name, "http_requests_total");
1016
1017 let mut expected_labels = HashMap::new();
1018 expected_labels.insert("method".to_string(), "GET".to_string());
1019 expected_labels.insert("status".to_string(), "200".to_string());
1020 assert_eq!(labels, expected_labels);
1021
1022 assert_eq!(value, 1234.0);
1023
1024 let line = "cpu_usage 98.5";
1026 let parsed = parse_prometheus_metric(line);
1027 assert!(parsed.is_some());
1028
1029 let (name, labels, value) = parsed.unwrap();
1030 assert_eq!(name, "cpu_usage");
1031 assert!(labels.is_empty());
1032 assert_eq!(value, 98.5);
1033
1034 let line = "response_time{service=\"api\"} 0.123";
1036 let parsed = parse_prometheus_metric(line);
1037 assert!(parsed.is_some());
1038
1039 let (name, labels, value) = parsed.unwrap();
1040 assert_eq!(name, "response_time");
1041
1042 let mut expected_labels = HashMap::new();
1043 expected_labels.insert("service".to_string(), "api".to_string());
1044 assert_eq!(labels, expected_labels);
1045
1046 assert_eq!(value, 0.123);
1047
1048 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
1055 }
1056
1057 #[test]
1058 fn test_metrics_registry_entry_callbacks() {
1059 use crate::MetricsRegistry;
1060 use std::sync::atomic::{AtomicUsize, Ordering};
1061
1062 {
1064 let registry = MetricsRegistry::new();
1065 let counter = Arc::new(AtomicUsize::new(0));
1066
1067 for increment in [1, 10, 100] {
1069 let counter_clone = counter.clone();
1070 registry.add_update_callback(Arc::new(move || {
1071 counter_clone.fetch_add(increment, Ordering::SeqCst);
1072 Ok(())
1073 }));
1074 }
1075
1076 assert_eq!(counter.load(Ordering::SeqCst), 0);
1078
1079 let results = registry.execute_update_callbacks();
1081 assert_eq!(results.len(), 3);
1082 assert!(results.iter().all(|r| r.is_ok()));
1083 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = registry.execute_update_callbacks();
1087 assert_eq!(results.len(), 3);
1088 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = registry.clone();
1092 assert_eq!(cloned.execute_update_callbacks().len(), 3);
1093 assert_eq!(counter.load(Ordering::SeqCst), 333); registry.execute_update_callbacks();
1097 assert_eq!(counter.load(Ordering::SeqCst), 444); }
1099
1100 {
1102 let registry = MetricsRegistry::new();
1103 let counter = Arc::new(AtomicUsize::new(0));
1104
1105 let counter_clone = counter.clone();
1107 registry.add_update_callback(Arc::new(move || {
1108 counter_clone.fetch_add(1, Ordering::SeqCst);
1109 Ok(())
1110 }));
1111
1112 registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1114
1115 let counter_clone = counter.clone();
1117 registry.add_update_callback(Arc::new(move || {
1118 counter_clone.fetch_add(10, Ordering::SeqCst);
1119 Ok(())
1120 }));
1121
1122 let results = registry.execute_update_callbacks();
1124 assert_eq!(results.len(), 3);
1125 assert!(results[0].is_ok());
1126 assert!(results[1].is_err());
1127 assert!(results[2].is_ok());
1128
1129 assert_eq!(
1131 results[1].as_ref().unwrap_err().to_string(),
1132 "Simulated error"
1133 );
1134
1135 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = registry.execute_update_callbacks();
1140 assert!(results[1].is_err());
1141 assert_eq!(counter.load(Ordering::SeqCst), 22); }
1143
1144 {
1146 let registry = MetricsRegistry::new();
1147 let results = registry.execute_update_callbacks();
1148 assert_eq!(results.len(), 0);
1149 }
1150 }
1151}
1152
1153#[cfg(feature = "integration")]
1154#[cfg(test)]
1155mod test_metricsregistry_prefixes {
1156 use super::*;
1157 use crate::distributed::distributed_test_utils::create_test_drt_async;
1158 use prometheus::core::Collector;
1159
1160 #[tokio::test]
1161 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1162 let drt = create_test_drt_async().await;
1163
1164 const DRT_NAME: &str = "";
1165 const NAMESPACE_NAME: &str = "ns901";
1166 const COMPONENT_NAME: &str = "comp901";
1167 const ENDPOINT_NAME: &str = "ep901";
1168 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1169 let component = namespace.component(COMPONENT_NAME).unwrap();
1170 let endpoint = component.endpoint(ENDPOINT_NAME);
1171
1172 assert_eq!(drt.basename(), DRT_NAME);
1174 assert_eq!(drt.parent_hierarchies().len(), 0);
1175 assert_eq!(namespace.basename(), NAMESPACE_NAME);
1179 assert_eq!(namespace.parent_hierarchies().len(), 1);
1180 assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1181 assert_eq!(component.basename(), COMPONENT_NAME);
1185 assert_eq!(component.parent_hierarchies().len(), 2);
1186 assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1187 assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1188 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1192 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1193 assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1194 assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1195 assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1196 assert!(
1200 namespace
1201 .parent_hierarchies()
1202 .iter()
1203 .any(|h| h.basename() == drt.basename())
1204 );
1205 assert!(
1206 component
1207 .parent_hierarchies()
1208 .iter()
1209 .any(|h| h.basename() == namespace.basename())
1210 );
1211 assert!(
1212 endpoint
1213 .parent_hierarchies()
1214 .iter()
1215 .any(|h| h.basename() == component.basename())
1216 );
1217
1218 assert_eq!(drt.parent_hierarchies().len(), 0);
1220 assert_eq!(namespace.parent_hierarchies().len(), 1);
1221 assert_eq!(component.parent_hierarchies().len(), 2);
1222 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1223
1224 let invalid_namespace = drt.namespace("@@123").unwrap();
1228 let result =
1229 invalid_namespace
1230 .metrics()
1231 .create_counter("test_counter", "A test counter", &[]);
1232 assert!(result.is_ok());
1233 if let Ok(counter) = &result {
1234 let desc = counter.desc();
1236 let namespace_label = desc[0]
1237 .const_label_pairs
1238 .iter()
1239 .find(|l| l.name() == "dynamo_namespace")
1240 .expect("Should have dynamo_namespace label");
1241 assert_eq!(namespace_label.value(), "_123");
1242 }
1243
1244 let valid_namespace = drt.namespace("ns567").unwrap();
1246 assert!(
1247 valid_namespace
1248 .metrics()
1249 .create_counter("test_counter", "A test counter", &[])
1250 .is_ok()
1251 );
1252 }
1253
1254 #[tokio::test]
1255 async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
1256 let drt = create_test_drt_async().await;
1260 let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
1261 let component = namespace.component("comp_expfmt_ep_only").unwrap();
1262 let endpoint = component.endpoint("ep_expfmt_ep_only");
1263
1264 let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
1265 let callback: PrometheusExpositionFormatCallback =
1266 Arc::new(move || Ok(metric_line.to_string()));
1267
1268 endpoint
1269 .get_metrics_registry()
1270 .add_expfmt_callback(callback);
1271
1272 let output = drt.metrics().prometheus_expfmt().unwrap();
1273 let occurrences = output
1274 .lines()
1275 .filter(|line| line == &metric_line.trim_end_matches('\n'))
1276 .count();
1277
1278 assert_eq!(
1279 occurrences, 1,
1280 "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
1281 occurrences, output
1282 );
1283 }
1284
1285 #[tokio::test]
1286 async fn test_recursive_namespace() {
1287 let drt = create_test_drt_async().await;
1289
1290 let ns1 = drt.namespace("ns1").unwrap();
1292 let ns2 = ns1.namespace("ns2").unwrap();
1293 let ns3 = ns2.namespace("ns3").unwrap();
1294
1295 let component = ns3.component("test-component").unwrap();
1297
1298 assert_eq!(ns1.basename(), "ns1");
1300 assert_eq!(ns1.parent_hierarchies().len(), 1);
1301 assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1302 assert_eq!(ns2.basename(), "ns2");
1305 assert_eq!(ns2.parent_hierarchies().len(), 2);
1306 assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1307 assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1308 assert_eq!(ns3.basename(), "ns3");
1311 assert_eq!(ns3.parent_hierarchies().len(), 3);
1312 assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1313 assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1314 assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1315 assert_eq!(component.basename(), "test-component");
1318 assert_eq!(component.parent_hierarchies().len(), 4);
1319 assert_eq!(component.parent_hierarchies()[0].basename(), "");
1320 assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1321 assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1322 assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1323 println!("✓ Chained namespace test passed - all prefixes correct");
1326 }
1327}
1328
1329#[cfg(feature = "integration")]
1330#[cfg(test)]
1331mod test_metricsregistry_prometheus_fmt_outputs {
1332 use super::prometheus_names::name_prefix;
1333 use super::*;
1334 use crate::distributed::distributed_test_utils::create_test_drt_async;
1335 use prometheus::Counter;
1336 use std::sync::Arc;
1337
1338 #[tokio::test]
1339 async fn test_prometheusfactory_using_metrics_registry_trait() {
1340 let drt = create_test_drt_async().await;
1342
1343 let namespace_name = "ns345";
1345
1346 let namespace = drt.namespace(namespace_name).unwrap();
1347 let component = namespace.component("comp345").unwrap();
1348 let endpoint = component.endpoint("ep345");
1349
1350 let counter = endpoint
1352 .metrics()
1353 .create_counter("testcounter", "A test counter", &[])
1354 .unwrap();
1355 counter.inc_by(123.456789);
1356 let epsilon = 0.01;
1357 assert!((counter.get() - 123.456789).abs() < epsilon);
1358
1359 let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1360 println!("Endpoint output:");
1361 println!("{}", endpoint_output_raw);
1362
1363 let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1364# TYPE dynamo_component_testcounter counter
1365dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1366
1367 assert_eq!(
1368 endpoint_output_raw.trim_end_matches('\n'),
1369 expected_endpoint_output.trim_end_matches('\n'),
1370 "\n=== ENDPOINT COMPARISON FAILED ===\n\
1371 Actual:\n{}\n\
1372 Expected:\n{}\n\
1373 ==============================",
1374 endpoint_output_raw,
1375 expected_endpoint_output
1376 );
1377
1378 let gauge = component
1380 .metrics()
1381 .create_gauge("testgauge", "A test gauge", &[])
1382 .unwrap();
1383 gauge.set(50000.0);
1384 assert_eq!(gauge.get(), 50000.0);
1385
1386 let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1388 println!("Component output:");
1389 println!("{}", component_output_raw);
1390
1391 let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1392# TYPE dynamo_component_testcounter counter
1393dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1394# HELP dynamo_component_testgauge A test gauge
1395# TYPE dynamo_component_testgauge gauge
1396dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1397
1398 assert_eq!(
1399 component_output_raw.trim_end_matches('\n'),
1400 expected_component_output.trim_end_matches('\n'),
1401 "\n=== COMPONENT COMPARISON FAILED ===\n\
1402 Actual:\n{}\n\
1403 Expected:\n{}\n\
1404 ==============================",
1405 component_output_raw,
1406 expected_component_output
1407 );
1408
1409 let intcounter = namespace
1410 .metrics()
1411 .create_intcounter("testintcounter", "A test int counter", &[])
1412 .unwrap();
1413 intcounter.inc_by(12345);
1414 assert_eq!(intcounter.get(), 12345);
1415
1416 let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1418 println!("Namespace output:");
1419 println!("{}", namespace_output_raw);
1420
1421 let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1422# TYPE dynamo_component_testcounter counter
1423dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1424# HELP dynamo_component_testgauge A test gauge
1425# TYPE dynamo_component_testgauge gauge
1426dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1427# HELP dynamo_component_testintcounter A test int counter
1428# TYPE dynamo_component_testintcounter counter
1429dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1430
1431 assert_eq!(
1432 namespace_output_raw.trim_end_matches('\n'),
1433 expected_namespace_output.trim_end_matches('\n'),
1434 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1435 Actual:\n{}\n\
1436 Expected:\n{}\n\
1437 ==============================",
1438 namespace_output_raw,
1439 expected_namespace_output
1440 );
1441
1442 let intgauge = namespace
1444 .metrics()
1445 .create_intgauge("testintgauge", "A test int gauge", &[])
1446 .unwrap();
1447 intgauge.set(42);
1448 assert_eq!(intgauge.get(), 42);
1449
1450 let intgaugevec = namespace
1452 .metrics()
1453 .create_intgaugevec(
1454 "testintgaugevec",
1455 "A test int gauge vector",
1456 &["instance", "status"],
1457 &[("service", "api")],
1458 )
1459 .unwrap();
1460 intgaugevec
1461 .with_label_values(&["server1", "active"])
1462 .set(10);
1463 intgaugevec
1464 .with_label_values(&["server2", "inactive"])
1465 .set(0);
1466
1467 let countervec = endpoint
1469 .metrics()
1470 .create_countervec(
1471 "testcountervec",
1472 "A test counter vector",
1473 &["method", "status"],
1474 &[("service", "api")],
1475 )
1476 .unwrap();
1477 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1478 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1479
1480 let histogram = component
1482 .metrics()
1483 .create_histogram("testhistogram", "A test histogram", &[], None)
1484 .unwrap();
1485 histogram.observe(1.0);
1486 histogram.observe(2.5);
1487 histogram.observe(4.0);
1488
1489 let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1491 println!("DRT output:");
1492 println!("{}", drt_output_raw);
1493
1494 let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1495# TYPE dynamo_component_testcounter counter
1496dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1497# HELP dynamo_component_testcountervec A test counter vector
1498# TYPE dynamo_component_testcountervec counter
1499dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1500dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1501# HELP dynamo_component_testgauge A test gauge
1502# TYPE dynamo_component_testgauge gauge
1503dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1504# HELP dynamo_component_testhistogram A test histogram
1505# TYPE dynamo_component_testhistogram histogram
1506dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1507dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1508dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1509dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1510dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1511dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1512dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1513dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1514dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1515dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1516dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1517dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1518dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1519dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1520# HELP dynamo_component_testintcounter A test int counter
1521# TYPE dynamo_component_testintcounter counter
1522dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1523# HELP dynamo_component_testintgauge A test int gauge
1524# TYPE dynamo_component_testintgauge gauge
1525dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1526# HELP dynamo_component_testintgaugevec A test int gauge vector
1527# TYPE dynamo_component_testintgaugevec gauge
1528dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1529dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1530# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1531# TYPE dynamo_component_uptime_seconds gauge
1532dynamo_component_uptime_seconds 0"#.to_string();
1533
1534 assert_eq!(
1535 drt_output_raw.trim_end_matches('\n'),
1536 expected_drt_output.trim_end_matches('\n'),
1537 "\n=== DRT COMPARISON FAILED ===\n\
1538 Expected:\n{}\n\
1539 Actual (filtered):\n{}\n\
1540 ==============================",
1541 expected_drt_output,
1542 drt_output_raw
1543 );
1544
1545 println!("✓ All Prometheus format outputs verified successfully!");
1546 }
1547
1548 #[test]
1549 fn test_refactored_filter_functions() {
1550 let test_input = r#"# HELP dynamo_component_requests Total requests
1552# TYPE dynamo_component_requests counter
1553dynamo_component_requests 42
1554# HELP dynamo_component_latency Response latency
1555# TYPE dynamo_component_latency histogram
1556dynamo_component_latency_bucket{le="0.1"} 10
1557dynamo_component_latency_bucket{le="0.5"} 25
1558dynamo_component_errors_total 5"#;
1559
1560 let metrics_only = super::test_helpers::extract_metrics(test_input);
1562 assert_eq!(metrics_only.len(), 4); assert!(
1564 metrics_only
1565 .iter()
1566 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1567 );
1568
1569 println!("✓ All refactored filter functions work correctly!");
1570 }
1571
1572 #[tokio::test]
1573 async fn test_same_metric_name_different_endpoints() {
1574 let drt = create_test_drt_async().await;
1578 let namespace = drt.namespace("ns_test").unwrap();
1579 let component = namespace.component("comp_test").unwrap();
1580
1581 let ep1 = component.endpoint("ep1");
1583 let ep2 = component.endpoint("ep2");
1584
1585 let counter1 = ep1
1586 .metrics()
1587 .create_counter("requests_total", "Total requests", &[])
1588 .unwrap();
1589 counter1.inc_by(100.0);
1590
1591 let counter2 = ep2
1592 .metrics()
1593 .create_counter("requests_total", "Total requests", &[])
1594 .unwrap();
1595 counter2.inc_by(200.0);
1596
1597 let output = component.metrics().prometheus_expfmt().unwrap();
1599
1600 let expected_output = r#"# HELP dynamo_component_requests_total Total requests
1601# TYPE dynamo_component_requests_total counter
1602dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
1603dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#;
1604
1605 assert_eq!(
1606 output.trim_end_matches('\n'),
1607 expected_output.trim_end_matches('\n'),
1608 "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
1609 Actual:\n{}\n\
1610 Expected:\n{}\n\
1611 ==============================",
1612 output,
1613 expected_output
1614 );
1615
1616 println!("✓ Multi-registry prevents Prometheus collisions!");
1617 }
1618
1619 #[tokio::test]
1620 async fn test_duplicate_series_warning() {
1621 let drt = create_test_drt_async().await;
1624 let namespace = drt.namespace("ns_dup").unwrap();
1625 let component = namespace.component("comp_dup").unwrap();
1626
1627 let ep1 = component.endpoint("ep_same");
1629 let ep2 = component.endpoint("ep_same"); let counter1 = ep1
1632 .metrics()
1633 .create_counter("dup_metric", "Duplicate metric test", &[])
1634 .unwrap();
1635 counter1.inc_by(50.0);
1636
1637 let counter2 = ep2
1638 .metrics()
1639 .create_counter("dup_metric", "Duplicate metric test", &[])
1640 .unwrap();
1641 counter2.inc_by(75.0);
1642
1643 let output = component.metrics().prometheus_expfmt().unwrap();
1645
1646 let expected_output = r#"# HELP dynamo_component_dup_metric Duplicate metric test
1647# TYPE dynamo_component_dup_metric counter
1648dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#;
1649
1650 assert_eq!(
1651 output.trim_end_matches('\n'),
1652 expected_output.trim_end_matches('\n'),
1653 "\n=== DEDUPLICATION COMPARISON FAILED ===\n\
1654 Actual:\n{}\n\
1655 Expected:\n{}\n\
1656 ==============================",
1657 output,
1658 expected_output
1659 );
1660
1661 println!("✓ Duplicate series detection and deduplication works!");
1662 }
1663}