1pub mod prometheus_names;
10
11use parking_lot::Mutex;
12use std::collections::HashSet;
13use std::sync::Arc;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22use prometheus_names::{
24 build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
25 sanitize_prometheus_name, work_handler,
26};
27
28use crate::pipeline::{
30 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31 network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37pub const USE_AUTO_LABELS: bool = true;
40
41use prometheus::Encoder;
43
44fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47 let mut seen_keys = std::collections::HashSet::new();
48 for (key, _) in labels {
49 if !seen_keys.insert(*key) {
50 return Err(anyhow::anyhow!(
51 "Duplicate label key '{}' found in labels",
52 key
53 ));
54 }
55 }
56 Ok(())
57}
58
59pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
64 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
66 where
67 Self: Sized;
68
69 fn with_histogram_opts_and_buckets(
72 _opts: prometheus::HistogramOpts,
73 _buckets: Option<Vec<f64>>,
74 ) -> Result<Self, prometheus::Error>
75 where
76 Self: Sized,
77 {
78 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
79 }
80
81 fn with_opts_and_label_names(
84 _opts: prometheus::Opts,
85 _label_names: &[&str],
86 ) -> Result<Self, prometheus::Error>
87 where
88 Self: Sized,
89 {
90 panic!("with_opts_and_label_names is not implemented for this metric type");
91 }
92}
93
94impl PrometheusMetric for prometheus::Counter {
96 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
97 prometheus::Counter::with_opts(opts)
98 }
99}
100
101impl PrometheusMetric for prometheus::IntCounter {
102 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
103 prometheus::IntCounter::with_opts(opts)
104 }
105}
106
107impl PrometheusMetric for prometheus::Gauge {
108 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
109 prometheus::Gauge::with_opts(opts)
110 }
111}
112
113impl PrometheusMetric for prometheus::IntGauge {
114 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
115 prometheus::IntGauge::with_opts(opts)
116 }
117}
118
119impl PrometheusMetric for prometheus::GaugeVec {
120 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
121 Err(prometheus::Error::Msg(
122 "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
123 ))
124 }
125
126 fn with_opts_and_label_names(
127 opts: prometheus::Opts,
128 label_names: &[&str],
129 ) -> Result<Self, prometheus::Error> {
130 prometheus::GaugeVec::new(opts, label_names)
131 }
132}
133
134impl PrometheusMetric for prometheus::IntGaugeVec {
135 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
136 Err(prometheus::Error::Msg(
137 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
138 ))
139 }
140
141 fn with_opts_and_label_names(
142 opts: prometheus::Opts,
143 label_names: &[&str],
144 ) -> Result<Self, prometheus::Error> {
145 prometheus::IntGaugeVec::new(opts, label_names)
146 }
147}
148
149impl PrometheusMetric for prometheus::IntCounterVec {
150 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
151 Err(prometheus::Error::Msg(
152 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
153 ))
154 }
155
156 fn with_opts_and_label_names(
157 opts: prometheus::Opts,
158 label_names: &[&str],
159 ) -> Result<Self, prometheus::Error> {
160 prometheus::IntCounterVec::new(opts, label_names)
161 }
162}
163
164impl PrometheusMetric for prometheus::Histogram {
166 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
167 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
169 prometheus::Histogram::with_opts(histogram_opts)
170 }
171
172 fn with_histogram_opts_and_buckets(
173 mut opts: prometheus::HistogramOpts,
174 buckets: Option<Vec<f64>>,
175 ) -> Result<Self, prometheus::Error> {
176 if let Some(custom_buckets) = buckets {
177 opts = opts.buckets(custom_buckets);
178 }
179 prometheus::Histogram::with_opts(opts)
180 }
181}
182
183impl PrometheusMetric for prometheus::CounterVec {
185 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
186 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
188 }
189
190 fn with_opts_and_label_names(
191 opts: prometheus::Opts,
192 label_names: &[&str],
193 ) -> Result<Self, prometheus::Error> {
194 prometheus::CounterVec::new(opts, label_names)
195 }
196}
197
198pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
203 hierarchy: &H,
204 metric_name: &str,
205 metric_desc: &str,
206 labels: &[(&str, &str)],
207 buckets: Option<Vec<f64>>,
208 const_labels: Option<&[&str]>,
209) -> anyhow::Result<T> {
210 validate_no_duplicate_label_keys(labels)?;
212 let basename = hierarchy.basename();
215 let parent_hierarchies = hierarchy.parent_hierarchies();
216
217 let mut hierarchy_names: Vec<String> =
219 parent_hierarchies.iter().map(|p| p.basename()).collect();
220 hierarchy_names.push(basename.clone());
221
222 let metric_name = build_component_metric_name(metric_name);
223
224 let mut updated_labels: Vec<(String, String)> = Vec::new();
226
227 if USE_AUTO_LABELS {
228 for (key, _) in labels {
230 if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
231 return Err(anyhow::anyhow!(
232 "Label '{}' is automatically added by auto_label feature and cannot be manually set",
233 key
234 ));
235 }
236 }
237
238 if hierarchy_names.len() > 1 {
240 let namespace = &hierarchy_names[1];
241 if !namespace.is_empty() {
242 let valid_namespace = sanitize_prometheus_label(namespace)?;
243 if !valid_namespace.is_empty() {
244 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
245 }
246 }
247 }
248 if hierarchy_names.len() > 2 {
249 let component = &hierarchy_names[2];
250 if !component.is_empty() {
251 let valid_component = sanitize_prometheus_label(component)?;
252 if !valid_component.is_empty() {
253 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
254 }
255 }
256 }
257 if hierarchy_names.len() > 3 {
258 let endpoint = &hierarchy_names[3];
259 if !endpoint.is_empty() {
260 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
261 if !valid_endpoint.is_empty() {
262 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
263 }
264 }
265 }
266 }
267
268 updated_labels.extend(
270 labels
271 .iter()
272 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
273 );
274 let prometheus_metric = if std::any::TypeId::of::<T>()
278 == std::any::TypeId::of::<prometheus::CounterVec>()
279 {
280 if buckets.is_some() {
283 return Err(anyhow::anyhow!(
284 "buckets parameter is not valid for CounterVec"
285 ));
286 }
287 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
288 for (key, value) in &updated_labels {
289 opts = opts.const_label(key.clone(), value.clone());
290 }
291 let label_names = const_labels
292 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
293 T::with_opts_and_label_names(opts, label_names)?
294 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
295 if buckets.is_some() {
298 return Err(anyhow::anyhow!(
299 "buckets parameter is not valid for GaugeVec"
300 ));
301 }
302 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
303 for (key, value) in &updated_labels {
304 opts = opts.const_label(key.clone(), value.clone());
305 }
306 let label_names = const_labels
307 .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
308 T::with_opts_and_label_names(opts, label_names)?
309 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
310 if const_labels.is_some() {
313 return Err(anyhow::anyhow!(
314 "const_labels parameter is not valid for Histogram"
315 ));
316 }
317 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
318 for (key, value) in &updated_labels {
319 opts = opts.const_label(key.clone(), value.clone());
320 }
321 T::with_histogram_opts_and_buckets(opts, buckets)?
322 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
323 if buckets.is_some() {
326 return Err(anyhow::anyhow!(
327 "buckets parameter is not valid for IntCounterVec"
328 ));
329 }
330 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
331 for (key, value) in &updated_labels {
332 opts = opts.const_label(key.clone(), value.clone());
333 }
334 let label_names = const_labels
335 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
336 T::with_opts_and_label_names(opts, label_names)?
337 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
338 if buckets.is_some() {
341 return Err(anyhow::anyhow!(
342 "buckets parameter is not valid for IntGaugeVec"
343 ));
344 }
345 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
346 for (key, value) in &updated_labels {
347 opts = opts.const_label(key.clone(), value.clone());
348 }
349 let label_names = const_labels
350 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
351 T::with_opts_and_label_names(opts, label_names)?
352 } else {
353 if buckets.is_some() {
356 return Err(anyhow::anyhow!(
357 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
358 ));
359 }
360 if const_labels.is_some() {
361 return Err(anyhow::anyhow!(
362 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
363 ));
364 }
365 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366 for (key, value) in &updated_labels {
367 opts = opts.const_label(key.clone(), value.clone());
368 }
369 T::with_opts(opts)?
370 };
371
372 for parent in parent_hierarchies {
375 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
376 parent.get_metrics_registry().add_metric(collector)?;
377 }
378
379 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
381 hierarchy.get_metrics_registry().add_metric(collector)?;
382
383 Ok(prometheus_metric)
384}
385
386pub struct Metrics<H: MetricsHierarchy> {
389 hierarchy: H,
390}
391
392impl<H: MetricsHierarchy> Metrics<H> {
393 pub fn new(hierarchy: H) -> Self {
394 Self { hierarchy }
395 }
396
397 pub fn create_counter(
420 &self,
421 name: &str,
422 description: &str,
423 labels: &[(&str, &str)],
424 ) -> anyhow::Result<prometheus::Counter> {
425 create_metric(&self.hierarchy, name, description, labels, None, None)
426 }
427
428 pub fn create_countervec(
430 &self,
431 name: &str,
432 description: &str,
433 const_labels: &[&str],
434 const_label_values: &[(&str, &str)],
435 ) -> anyhow::Result<prometheus::CounterVec> {
436 create_metric(
437 &self.hierarchy,
438 name,
439 description,
440 const_label_values,
441 None,
442 Some(const_labels),
443 )
444 }
445
446 pub fn create_gauge(
448 &self,
449 name: &str,
450 description: &str,
451 labels: &[(&str, &str)],
452 ) -> anyhow::Result<prometheus::Gauge> {
453 create_metric(&self.hierarchy, name, description, labels, None, None)
454 }
455
456 pub fn create_gaugevec(
458 &self,
459 name: &str,
460 description: &str,
461 const_labels: &[&str],
462 const_label_values: &[(&str, &str)],
463 ) -> anyhow::Result<prometheus::GaugeVec> {
464 create_metric(
465 &self.hierarchy,
466 name,
467 description,
468 const_label_values,
469 None,
470 Some(const_labels),
471 )
472 }
473
474 pub fn create_histogram(
476 &self,
477 name: &str,
478 description: &str,
479 labels: &[(&str, &str)],
480 buckets: Option<Vec<f64>>,
481 ) -> anyhow::Result<prometheus::Histogram> {
482 create_metric(&self.hierarchy, name, description, labels, buckets, None)
483 }
484
485 pub fn create_intcounter(
487 &self,
488 name: &str,
489 description: &str,
490 labels: &[(&str, &str)],
491 ) -> anyhow::Result<prometheus::IntCounter> {
492 create_metric(&self.hierarchy, name, description, labels, None, None)
493 }
494
495 pub fn create_intcountervec(
497 &self,
498 name: &str,
499 description: &str,
500 const_labels: &[&str],
501 const_label_values: &[(&str, &str)],
502 ) -> anyhow::Result<prometheus::IntCounterVec> {
503 create_metric(
504 &self.hierarchy,
505 name,
506 description,
507 const_label_values,
508 None,
509 Some(const_labels),
510 )
511 }
512
513 pub fn create_intgauge(
515 &self,
516 name: &str,
517 description: &str,
518 labels: &[(&str, &str)],
519 ) -> anyhow::Result<prometheus::IntGauge> {
520 create_metric(&self.hierarchy, name, description, labels, None, None)
521 }
522
523 pub fn create_intgaugevec(
525 &self,
526 name: &str,
527 description: &str,
528 const_labels: &[&str],
529 const_label_values: &[(&str, &str)],
530 ) -> anyhow::Result<prometheus::IntGaugeVec> {
531 create_metric(
532 &self.hierarchy,
533 name,
534 description,
535 const_label_values,
536 None,
537 Some(const_labels),
538 )
539 }
540
541 pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
543 let callback_results = self
545 .hierarchy
546 .get_metrics_registry()
547 .execute_update_callbacks();
548
549 for result in callback_results {
551 if let Err(e) = result {
552 tracing::error!("Error executing metrics callback: {}", e);
553 }
554 }
555
556 let prometheus_registry = self
558 .hierarchy
559 .get_metrics_registry()
560 .get_prometheus_registry();
561
562 let metric_families = prometheus_registry.gather();
564 let encoder = prometheus::TextEncoder::new();
565 let mut buffer = Vec::new();
566 encoder.encode(&metric_families, &mut buffer)?;
567 let mut result = String::from_utf8(buffer)?;
568
569 let expfmt = self
571 .hierarchy
572 .get_metrics_registry()
573 .execute_expfmt_callbacks();
574 if !expfmt.is_empty() {
575 if !result.ends_with('\n') {
576 result.push('\n');
577 }
578 result.push_str(&expfmt);
579 }
580
581 Ok(result)
582 }
583}
584
585use crate::traits::DistributedRuntimeProvider;
589
590pub trait MetricsHierarchy: Send + Sync {
591 fn basename(&self) -> String;
597
598 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
602
603 fn get_metrics_registry(&self) -> &MetricsRegistry;
605
606 fn metrics(&self) -> Metrics<&Self>
613 where
614 Self: Sized,
615 {
616 Metrics::new(self)
617 }
618}
619
620impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
622 fn basename(&self) -> String {
623 (**self).basename()
624 }
625
626 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
627 (**self).parent_hierarchies()
628 }
629
630 fn get_metrics_registry(&self) -> &MetricsRegistry {
631 (**self).get_metrics_registry()
632 }
633}
634
635pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
644
645pub type PrometheusExpositionFormatCallback =
647 Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
648
649#[derive(Clone)]
654pub struct MetricsRegistry {
655 pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
658
659 pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
662
663 pub prometheus_expfmt_callbacks:
666 Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
667}
668
669impl std::fmt::Debug for MetricsRegistry {
670 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
671 f.debug_struct("MetricsRegistry")
672 .field("prometheus_registry", &"<RwLock<Registry>>")
673 .field(
674 "prometheus_update_callbacks",
675 &format!(
676 "<RwLock<Vec<Callback>>> with {} callbacks",
677 self.prometheus_update_callbacks.read().unwrap().len()
678 ),
679 )
680 .field(
681 "prometheus_expfmt_callbacks",
682 &format!(
683 "<RwLock<Vec<Callback>>> with {} callbacks",
684 self.prometheus_expfmt_callbacks.read().unwrap().len()
685 ),
686 )
687 .finish()
688 }
689}
690
691impl MetricsRegistry {
692 pub fn new() -> Self {
694 Self {
695 prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
696 prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
697 prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
698 }
699 }
700
701 pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
703 self.prometheus_update_callbacks
704 .write()
705 .unwrap()
706 .push(callback);
707 }
708
709 pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
711 self.prometheus_expfmt_callbacks
712 .write()
713 .unwrap()
714 .push(callback);
715 }
716
717 pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
719 self.prometheus_update_callbacks
720 .read()
721 .unwrap()
722 .iter()
723 .map(|callback| callback())
724 .collect()
725 }
726
727 pub fn execute_expfmt_callbacks(&self) -> String {
729 let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
730 let mut result = String::new();
731 for callback in callbacks.iter() {
732 match callback() {
733 Ok(text) => {
734 if !text.is_empty() {
735 if !result.is_empty() && !result.ends_with('\n') {
736 result.push('\n');
737 }
738 result.push_str(&text);
739 }
740 }
741 Err(e) => {
742 tracing::error!("Error executing exposition text callback: {}", e);
743 }
744 }
745 }
746 result
747 }
748
749 pub fn add_metric(
751 &self,
752 collector: Box<dyn prometheus::core::Collector>,
753 ) -> anyhow::Result<()> {
754 self.prometheus_registry
755 .write()
756 .unwrap()
757 .register(collector)
758 .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
759 }
760
761 pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
763 self.prometheus_registry.read().unwrap()
764 }
765
766 pub fn has_metric_named(&self, metric_name: &str) -> bool {
768 self.prometheus_registry
769 .read()
770 .unwrap()
771 .gather()
772 .iter()
773 .any(|mf| mf.name() == metric_name)
774 }
775}
776
777impl Default for MetricsRegistry {
778 fn default() -> Self {
779 Self::new()
780 }
781}
782
783#[cfg(test)]
784mod test_helpers {
785 use super::prometheus_names::name_prefix;
786 use super::*;
787
788 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
791 where
792 F: FnMut(&str) -> bool,
793 {
794 input
795 .lines()
796 .filter(|line| predicate(line))
797 .map(|line| line.to_string())
798 .collect::<Vec<_>>()
799 }
800
801 pub fn extract_metrics(input: &str) -> Vec<String> {
804 filter_prometheus_lines(input, |line| {
805 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
806 && !line.starts_with("#")
807 && !line.trim().is_empty()
808 })
809 }
810
811 pub fn parse_prometheus_metric(
823 line: &str,
824 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
825 if line.trim().is_empty() || line.starts_with('#') {
826 return None;
827 }
828
829 let parts: Vec<&str> = line.split_whitespace().collect();
830 if parts.len() < 2 {
831 return None;
832 }
833
834 let metric_part = parts[0];
835 let value: f64 = parts[1].parse().ok()?;
836
837 let (name, labels) = if metric_part.contains('{') {
838 let brace_start = metric_part.find('{').unwrap();
839 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
840 let name = &metric_part[..brace_start];
841 let labels_str = &metric_part[brace_start + 1..brace_end];
842
843 let mut labels = std::collections::HashMap::new();
844 for pair in labels_str.split(',') {
845 if let Some((k, v)) = pair.split_once('=') {
846 let v = v.trim_matches('"');
847 labels.insert(k.trim().to_string(), v.to_string());
848 }
849 }
850 (name.to_string(), labels)
851 } else {
852 (metric_part.to_string(), std::collections::HashMap::new())
853 };
854
855 Some((name, labels, value))
856 }
857}
858
859#[cfg(test)]
860mod test_metricsregistry_units {
861 use super::*;
862
863 #[test]
864 fn test_build_component_metric_name_with_prefix() {
865 let result = build_component_metric_name("requests");
867 assert_eq!(result, "dynamo_component_requests");
868
869 let result = build_component_metric_name("counter");
870 assert_eq!(result, "dynamo_component_counter");
871 }
872
873 #[test]
874 fn test_parse_prometheus_metric() {
875 use super::test_helpers::parse_prometheus_metric;
876 use std::collections::HashMap;
877
878 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
880 let parsed = parse_prometheus_metric(line);
881 assert!(parsed.is_some());
882
883 let (name, labels, value) = parsed.unwrap();
884 assert_eq!(name, "http_requests_total");
885
886 let mut expected_labels = HashMap::new();
887 expected_labels.insert("method".to_string(), "GET".to_string());
888 expected_labels.insert("status".to_string(), "200".to_string());
889 assert_eq!(labels, expected_labels);
890
891 assert_eq!(value, 1234.0);
892
893 let line = "cpu_usage 98.5";
895 let parsed = parse_prometheus_metric(line);
896 assert!(parsed.is_some());
897
898 let (name, labels, value) = parsed.unwrap();
899 assert_eq!(name, "cpu_usage");
900 assert!(labels.is_empty());
901 assert_eq!(value, 98.5);
902
903 let line = "response_time{service=\"api\"} 0.123";
905 let parsed = parse_prometheus_metric(line);
906 assert!(parsed.is_some());
907
908 let (name, labels, value) = parsed.unwrap();
909 assert_eq!(name, "response_time");
910
911 let mut expected_labels = HashMap::new();
912 expected_labels.insert("service".to_string(), "api".to_string());
913 assert_eq!(labels, expected_labels);
914
915 assert_eq!(value, 0.123);
916
917 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
924 }
925
926 #[test]
927 fn test_metrics_registry_entry_callbacks() {
928 use crate::MetricsRegistry;
929 use std::sync::atomic::{AtomicUsize, Ordering};
930
931 {
933 let registry = MetricsRegistry::new();
934 let counter = Arc::new(AtomicUsize::new(0));
935
936 for increment in [1, 10, 100] {
938 let counter_clone = counter.clone();
939 registry.add_update_callback(Arc::new(move || {
940 counter_clone.fetch_add(increment, Ordering::SeqCst);
941 Ok(())
942 }));
943 }
944
945 assert_eq!(counter.load(Ordering::SeqCst), 0);
947
948 let results = registry.execute_update_callbacks();
950 assert_eq!(results.len(), 3);
951 assert!(results.iter().all(|r| r.is_ok()));
952 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = registry.execute_update_callbacks();
956 assert_eq!(results.len(), 3);
957 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = registry.clone();
961 assert_eq!(cloned.execute_update_callbacks().len(), 3);
962 assert_eq!(counter.load(Ordering::SeqCst), 333); registry.execute_update_callbacks();
966 assert_eq!(counter.load(Ordering::SeqCst), 444); }
968
969 {
971 let registry = MetricsRegistry::new();
972 let counter = Arc::new(AtomicUsize::new(0));
973
974 let counter_clone = counter.clone();
976 registry.add_update_callback(Arc::new(move || {
977 counter_clone.fetch_add(1, Ordering::SeqCst);
978 Ok(())
979 }));
980
981 registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
983
984 let counter_clone = counter.clone();
986 registry.add_update_callback(Arc::new(move || {
987 counter_clone.fetch_add(10, Ordering::SeqCst);
988 Ok(())
989 }));
990
991 let results = registry.execute_update_callbacks();
993 assert_eq!(results.len(), 3);
994 assert!(results[0].is_ok());
995 assert!(results[1].is_err());
996 assert!(results[2].is_ok());
997
998 assert_eq!(
1000 results[1].as_ref().unwrap_err().to_string(),
1001 "Simulated error"
1002 );
1003
1004 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = registry.execute_update_callbacks();
1009 assert!(results[1].is_err());
1010 assert_eq!(counter.load(Ordering::SeqCst), 22); }
1012
1013 {
1015 let registry = MetricsRegistry::new();
1016 let results = registry.execute_update_callbacks();
1017 assert_eq!(results.len(), 0);
1018 }
1019 }
1020}
1021
1022#[cfg(feature = "integration")]
1023#[cfg(test)]
1024mod test_metricsregistry_prefixes {
1025 use super::*;
1026 use crate::distributed::distributed_test_utils::create_test_drt_async;
1027 use prometheus::core::Collector;
1028
1029 #[tokio::test]
1030 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1031 let drt = create_test_drt_async().await;
1032
1033 const DRT_NAME: &str = "";
1034 const NAMESPACE_NAME: &str = "ns901";
1035 const COMPONENT_NAME: &str = "comp901";
1036 const ENDPOINT_NAME: &str = "ep901";
1037 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1038 let component = namespace.component(COMPONENT_NAME).unwrap();
1039 let endpoint = component.endpoint(ENDPOINT_NAME);
1040
1041 assert_eq!(drt.basename(), DRT_NAME);
1043 assert_eq!(drt.parent_hierarchies().len(), 0);
1044 assert_eq!(namespace.basename(), NAMESPACE_NAME);
1048 assert_eq!(namespace.parent_hierarchies().len(), 1);
1049 assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1050 assert_eq!(component.basename(), COMPONENT_NAME);
1054 assert_eq!(component.parent_hierarchies().len(), 2);
1055 assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1056 assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1057 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1061 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1062 assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1063 assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1064 assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1065 assert!(
1069 namespace
1070 .parent_hierarchies()
1071 .iter()
1072 .any(|h| h.basename() == drt.basename())
1073 );
1074 assert!(
1075 component
1076 .parent_hierarchies()
1077 .iter()
1078 .any(|h| h.basename() == namespace.basename())
1079 );
1080 assert!(
1081 endpoint
1082 .parent_hierarchies()
1083 .iter()
1084 .any(|h| h.basename() == component.basename())
1085 );
1086
1087 assert_eq!(drt.parent_hierarchies().len(), 0);
1089 assert_eq!(namespace.parent_hierarchies().len(), 1);
1090 assert_eq!(component.parent_hierarchies().len(), 2);
1091 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1092
1093 let invalid_namespace = drt.namespace("@@123").unwrap();
1097 let result =
1098 invalid_namespace
1099 .metrics()
1100 .create_counter("test_counter", "A test counter", &[]);
1101 assert!(result.is_ok());
1102 if let Ok(counter) = &result {
1103 let desc = counter.desc();
1105 let namespace_label = desc[0]
1106 .const_label_pairs
1107 .iter()
1108 .find(|l| l.name() == "dynamo_namespace")
1109 .expect("Should have dynamo_namespace label");
1110 assert_eq!(namespace_label.value(), "_123");
1111 }
1112
1113 let valid_namespace = drt.namespace("ns567").unwrap();
1115 assert!(
1116 valid_namespace
1117 .metrics()
1118 .create_counter("test_counter", "A test counter", &[])
1119 .is_ok()
1120 );
1121 }
1122
1123 #[tokio::test]
1124 async fn test_recursive_namespace() {
1125 let drt = create_test_drt_async().await;
1127
1128 let ns1 = drt.namespace("ns1").unwrap();
1130 let ns2 = ns1.namespace("ns2").unwrap();
1131 let ns3 = ns2.namespace("ns3").unwrap();
1132
1133 let component = ns3.component("test-component").unwrap();
1135
1136 assert_eq!(ns1.basename(), "ns1");
1138 assert_eq!(ns1.parent_hierarchies().len(), 1);
1139 assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1140 assert_eq!(ns2.basename(), "ns2");
1143 assert_eq!(ns2.parent_hierarchies().len(), 2);
1144 assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1145 assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1146 assert_eq!(ns3.basename(), "ns3");
1149 assert_eq!(ns3.parent_hierarchies().len(), 3);
1150 assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1151 assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1152 assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1153 assert_eq!(component.basename(), "test-component");
1156 assert_eq!(component.parent_hierarchies().len(), 4);
1157 assert_eq!(component.parent_hierarchies()[0].basename(), "");
1158 assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1159 assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1160 assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1161 println!("✓ Chained namespace test passed - all prefixes correct");
1164 }
1165}
1166
1167#[cfg(feature = "integration")]
1168#[cfg(test)]
1169mod test_metricsregistry_prometheus_fmt_outputs {
1170 use super::prometheus_names::name_prefix;
1171 use super::*;
1172 use crate::distributed::distributed_test_utils::create_test_drt_async;
1173 use prometheus::Counter;
1174 use std::sync::Arc;
1175
1176 #[tokio::test]
1177 async fn test_prometheusfactory_using_metrics_registry_trait() {
1178 let drt = create_test_drt_async().await;
1180
1181 let namespace_name = "ns345";
1183
1184 let namespace = drt.namespace(namespace_name).unwrap();
1185 let component = namespace.component("comp345").unwrap();
1186 let endpoint = component.endpoint("ep345");
1187
1188 let counter = endpoint
1190 .metrics()
1191 .create_counter("testcounter", "A test counter", &[])
1192 .unwrap();
1193 counter.inc_by(123.456789);
1194 let epsilon = 0.01;
1195 assert!((counter.get() - 123.456789).abs() < epsilon);
1196
1197 let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1198 println!("Endpoint output:");
1199 println!("{}", endpoint_output_raw);
1200
1201 let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1202# TYPE dynamo_component_testcounter counter
1203dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1204
1205 assert_eq!(
1206 endpoint_output_raw, expected_endpoint_output,
1207 "\n=== ENDPOINT COMPARISON FAILED ===\n\
1208 Actual:\n{}\n\
1209 Expected:\n{}\n\
1210 ==============================",
1211 endpoint_output_raw, expected_endpoint_output
1212 );
1213
1214 let gauge = component
1216 .metrics()
1217 .create_gauge("testgauge", "A test gauge", &[])
1218 .unwrap();
1219 gauge.set(50000.0);
1220 assert_eq!(gauge.get(), 50000.0);
1221
1222 let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1224 println!("Component output:");
1225 println!("{}", component_output_raw);
1226
1227 let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1228# TYPE dynamo_component_testcounter counter
1229dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1230# HELP dynamo_component_testgauge A test gauge
1231# TYPE dynamo_component_testgauge gauge
1232dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1233
1234 assert_eq!(
1235 component_output_raw, expected_component_output,
1236 "\n=== COMPONENT COMPARISON FAILED ===\n\
1237 Actual:\n{}\n\
1238 Expected:\n{}\n\
1239 ==============================",
1240 component_output_raw, expected_component_output
1241 );
1242
1243 let intcounter = namespace
1244 .metrics()
1245 .create_intcounter("testintcounter", "A test int counter", &[])
1246 .unwrap();
1247 intcounter.inc_by(12345);
1248 assert_eq!(intcounter.get(), 12345);
1249
1250 let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1252 println!("Namespace output:");
1253 println!("{}", namespace_output_raw);
1254
1255 let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1256# TYPE dynamo_component_testcounter counter
1257dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1258# HELP dynamo_component_testgauge A test gauge
1259# TYPE dynamo_component_testgauge gauge
1260dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1261# HELP dynamo_component_testintcounter A test int counter
1262# TYPE dynamo_component_testintcounter counter
1263dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1264
1265 assert_eq!(
1266 namespace_output_raw, expected_namespace_output,
1267 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1268 Actual:\n{}\n\
1269 Expected:\n{}\n\
1270 ==============================",
1271 namespace_output_raw, expected_namespace_output
1272 );
1273
1274 let intgauge = namespace
1276 .metrics()
1277 .create_intgauge("testintgauge", "A test int gauge", &[])
1278 .unwrap();
1279 intgauge.set(42);
1280 assert_eq!(intgauge.get(), 42);
1281
1282 let intgaugevec = namespace
1284 .metrics()
1285 .create_intgaugevec(
1286 "testintgaugevec",
1287 "A test int gauge vector",
1288 &["instance", "status"],
1289 &[("service", "api")],
1290 )
1291 .unwrap();
1292 intgaugevec
1293 .with_label_values(&["server1", "active"])
1294 .set(10);
1295 intgaugevec
1296 .with_label_values(&["server2", "inactive"])
1297 .set(0);
1298
1299 let countervec = endpoint
1301 .metrics()
1302 .create_countervec(
1303 "testcountervec",
1304 "A test counter vector",
1305 &["method", "status"],
1306 &[("service", "api")],
1307 )
1308 .unwrap();
1309 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1310 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1311
1312 let histogram = component
1314 .metrics()
1315 .create_histogram("testhistogram", "A test histogram", &[], None)
1316 .unwrap();
1317 histogram.observe(1.0);
1318 histogram.observe(2.5);
1319 histogram.observe(4.0);
1320
1321 let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1323 println!("DRT output:");
1324 println!("{}", drt_output_raw);
1325
1326 let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1327# TYPE dynamo_component_testcounter counter
1328dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1329# HELP dynamo_component_testcountervec A test counter vector
1330# TYPE dynamo_component_testcountervec counter
1331dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1332dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1333# HELP dynamo_component_testgauge A test gauge
1334# TYPE dynamo_component_testgauge gauge
1335dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1336# HELP dynamo_component_testhistogram A test histogram
1337# TYPE dynamo_component_testhistogram histogram
1338dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1339dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1340dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1341dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1342dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1343dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1344dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1345dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1346dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1347dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1348dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1349dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1350dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1351dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1352# HELP dynamo_component_testintcounter A test int counter
1353# TYPE dynamo_component_testintcounter counter
1354dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1355# HELP dynamo_component_testintgauge A test int gauge
1356# TYPE dynamo_component_testintgauge gauge
1357dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1358# HELP dynamo_component_testintgaugevec A test int gauge vector
1359# TYPE dynamo_component_testintgaugevec gauge
1360dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1361dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1362# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1363# TYPE dynamo_component_uptime_seconds gauge
1364dynamo_component_uptime_seconds 0"#.to_string();
1365
1366 assert_eq!(
1367 drt_output_raw, expected_drt_output,
1368 "\n=== DRT COMPARISON FAILED ===\n\
1369 Expected:\n{}\n\
1370 Actual (filtered):\n{}\n\
1371 ==============================",
1372 expected_drt_output, drt_output_raw
1373 );
1374
1375 println!("✓ All Prometheus format outputs verified successfully!");
1376 }
1377
1378 #[test]
1379 fn test_refactored_filter_functions() {
1380 let test_input = r#"# HELP dynamo_component_requests Total requests
1382# TYPE dynamo_component_requests counter
1383dynamo_component_requests 42
1384# HELP dynamo_component_latency Response latency
1385# TYPE dynamo_component_latency histogram
1386dynamo_component_latency_bucket{le="0.1"} 10
1387dynamo_component_latency_bucket{le="0.5"} 25
1388dynamo_component_errors_total 5"#;
1389
1390 let metrics_only = super::test_helpers::extract_metrics(test_input);
1392 assert_eq!(metrics_only.len(), 4); assert!(
1394 metrics_only
1395 .iter()
1396 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1397 );
1398
1399 println!("✓ All refactored filter functions work correctly!");
1400 }
1401}