1pub mod frontend_perf;
10pub mod prometheus_names;
11pub mod request_plane;
12pub mod tokio_perf;
13pub mod transport_metrics;
14pub mod work_handler_perf;
15
16use parking_lot::Mutex;
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use crate::component::ComponentBuilder;
21use anyhow;
22use once_cell::sync::Lazy;
23use regex::Regex;
24use std::any::Any;
25use std::collections::HashMap;
26
27use prometheus_names::{
29 build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
30 sanitize_prometheus_name, work_handler,
31};
32
33use crate::pipeline::{
35 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
36 network::Ingress,
37};
38use crate::protocols::annotated::Annotated;
39use crate::stream;
40use crate::stream::StreamExt;
41
42use prometheus::Encoder;
44
45fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
48 let mut seen_keys = std::collections::HashSet::new();
49 for (key, _) in labels {
50 if !seen_keys.insert(*key) {
51 return Err(anyhow::anyhow!(
52 "Duplicate label key '{}' found in labels",
53 key
54 ));
55 }
56 }
57 Ok(())
58}
59
60pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
65 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
67 where
68 Self: Sized;
69
70 fn with_histogram_opts_and_buckets(
73 _opts: prometheus::HistogramOpts,
74 _buckets: Option<Vec<f64>>,
75 ) -> Result<Self, prometheus::Error>
76 where
77 Self: Sized,
78 {
79 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
80 }
81
82 fn with_opts_and_label_names(
85 _opts: prometheus::Opts,
86 _label_names: &[&str],
87 ) -> Result<Self, prometheus::Error>
88 where
89 Self: Sized,
90 {
91 panic!("with_opts_and_label_names is not implemented for this metric type");
92 }
93}
94
95impl PrometheusMetric for prometheus::Counter {
97 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
98 prometheus::Counter::with_opts(opts)
99 }
100}
101
102impl PrometheusMetric for prometheus::IntCounter {
103 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
104 prometheus::IntCounter::with_opts(opts)
105 }
106}
107
108impl PrometheusMetric for prometheus::Gauge {
109 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
110 prometheus::Gauge::with_opts(opts)
111 }
112}
113
114impl PrometheusMetric for prometheus::IntGauge {
115 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
116 prometheus::IntGauge::with_opts(opts)
117 }
118}
119
120impl PrometheusMetric for prometheus::GaugeVec {
121 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
122 Err(prometheus::Error::Msg(
123 "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
124 ))
125 }
126
127 fn with_opts_and_label_names(
128 opts: prometheus::Opts,
129 label_names: &[&str],
130 ) -> Result<Self, prometheus::Error> {
131 prometheus::GaugeVec::new(opts, label_names)
132 }
133}
134
135impl PrometheusMetric for prometheus::IntGaugeVec {
136 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
137 Err(prometheus::Error::Msg(
138 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
139 ))
140 }
141
142 fn with_opts_and_label_names(
143 opts: prometheus::Opts,
144 label_names: &[&str],
145 ) -> Result<Self, prometheus::Error> {
146 prometheus::IntGaugeVec::new(opts, label_names)
147 }
148}
149
150impl PrometheusMetric for prometheus::IntCounterVec {
151 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
152 Err(prometheus::Error::Msg(
153 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
154 ))
155 }
156
157 fn with_opts_and_label_names(
158 opts: prometheus::Opts,
159 label_names: &[&str],
160 ) -> Result<Self, prometheus::Error> {
161 prometheus::IntCounterVec::new(opts, label_names)
162 }
163}
164
165impl PrometheusMetric for prometheus::Histogram {
167 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
168 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
170 prometheus::Histogram::with_opts(histogram_opts)
171 }
172
173 fn with_histogram_opts_and_buckets(
174 mut opts: prometheus::HistogramOpts,
175 buckets: Option<Vec<f64>>,
176 ) -> Result<Self, prometheus::Error> {
177 if let Some(custom_buckets) = buckets {
178 opts = opts.buckets(custom_buckets);
179 }
180 prometheus::Histogram::with_opts(opts)
181 }
182}
183
184impl PrometheusMetric for prometheus::CounterVec {
186 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
187 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
189 }
190
191 fn with_opts_and_label_names(
192 opts: prometheus::Opts,
193 label_names: &[&str],
194 ) -> Result<Self, prometheus::Error> {
195 prometheus::CounterVec::new(opts, label_names)
196 }
197}
198
199pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
204 hierarchy: &H,
205 metric_name: &str,
206 metric_desc: &str,
207 labels: &[(&str, &str)],
208 buckets: Option<Vec<f64>>,
209 const_labels: Option<&[&str]>,
210) -> anyhow::Result<T> {
211 validate_no_duplicate_label_keys(labels)?;
213 let basename = hierarchy.basename();
216 let parent_hierarchies = hierarchy.parent_hierarchies();
217
218 let mut hierarchy_names: Vec<String> =
220 parent_hierarchies.iter().map(|p| p.basename()).collect();
221 hierarchy_names.push(basename.clone());
222
223 let metric_name = build_component_metric_name(metric_name);
224
225 let mut updated_labels: Vec<(String, String)> = Vec::new();
227
228 for (key, _) in labels {
235 if *key == labels::NAMESPACE
236 || *key == labels::COMPONENT
237 || *key == labels::ENDPOINT
238 || *key == labels::WORKER_ID
239 {
240 return Err(anyhow::anyhow!(
241 "Label '{}' is automatically added by auto-label injection and cannot be manually set",
242 key
243 ));
244 }
245 }
246
247 if let Some(label_names) = const_labels {
251 for name in label_names.iter() {
252 if *name == labels::NAMESPACE
253 || *name == labels::COMPONENT
254 || *name == labels::ENDPOINT
255 || *name == labels::WORKER_ID
256 {
257 return Err(anyhow::anyhow!(
258 "Variable label name '{}' conflicts with auto-injected const label and cannot be used",
259 name
260 ));
261 }
262 }
263 }
264
265 if hierarchy_names.len() > 1 {
268 let namespace = &hierarchy_names[1];
269 if !namespace.is_empty() {
270 let valid_namespace = sanitize_prometheus_label(namespace)?;
271 if !valid_namespace.is_empty() {
272 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
273 }
274 }
275 }
276 if hierarchy_names.len() > 2 {
277 let component = &hierarchy_names[2];
278 if !component.is_empty() {
279 let valid_component = sanitize_prometheus_label(component)?;
280 if !valid_component.is_empty() {
281 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
282 }
283 }
284 }
285 if hierarchy_names.len() > 3 {
286 let endpoint = &hierarchy_names[3];
287 if !endpoint.is_empty() {
288 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
289 if !valid_endpoint.is_empty() {
290 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
291 }
292 }
293 }
294
295 if let Some(conn_id) = hierarchy.connection_id() {
299 updated_labels.push((labels::WORKER_ID.to_string(), format!("{:x}", conn_id)));
300 }
301
302 updated_labels.extend(
304 labels
305 .iter()
306 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
307 );
308 let prometheus_metric = if std::any::TypeId::of::<T>()
312 == std::any::TypeId::of::<prometheus::CounterVec>()
313 {
314 if buckets.is_some() {
317 return Err(anyhow::anyhow!(
318 "buckets parameter is not valid for CounterVec"
319 ));
320 }
321 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
322 for (key, value) in &updated_labels {
323 opts = opts.const_label(key.clone(), value.clone());
324 }
325 let label_names = const_labels
326 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
327 T::with_opts_and_label_names(opts, label_names)?
328 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
329 if buckets.is_some() {
332 return Err(anyhow::anyhow!(
333 "buckets parameter is not valid for GaugeVec"
334 ));
335 }
336 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
337 for (key, value) in &updated_labels {
338 opts = opts.const_label(key.clone(), value.clone());
339 }
340 let label_names = const_labels
341 .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
342 T::with_opts_and_label_names(opts, label_names)?
343 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
344 if const_labels.is_some() {
347 return Err(anyhow::anyhow!(
348 "const_labels parameter is not valid for Histogram"
349 ));
350 }
351 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
352 for (key, value) in &updated_labels {
353 opts = opts.const_label(key.clone(), value.clone());
354 }
355 T::with_histogram_opts_and_buckets(opts, buckets)?
356 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
357 if buckets.is_some() {
360 return Err(anyhow::anyhow!(
361 "buckets parameter is not valid for IntCounterVec"
362 ));
363 }
364 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
365 for (key, value) in &updated_labels {
366 opts = opts.const_label(key.clone(), value.clone());
367 }
368 let label_names = const_labels
369 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
370 T::with_opts_and_label_names(opts, label_names)?
371 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
372 if buckets.is_some() {
375 return Err(anyhow::anyhow!(
376 "buckets parameter is not valid for IntGaugeVec"
377 ));
378 }
379 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
380 for (key, value) in &updated_labels {
381 opts = opts.const_label(key.clone(), value.clone());
382 }
383 let label_names = const_labels
384 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
385 T::with_opts_and_label_names(opts, label_names)?
386 } else {
387 if buckets.is_some() {
390 return Err(anyhow::anyhow!(
391 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
392 ));
393 }
394 if const_labels.is_some() {
395 return Err(anyhow::anyhow!(
396 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
397 ));
398 }
399 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
400 for (key, value) in &updated_labels {
401 opts = opts.const_label(key.clone(), value.clone());
402 }
403 T::with_opts(opts)?
404 };
405
406 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
407 hierarchy.get_metrics_registry().add_metric(collector)?;
408
409 Ok(prometheus_metric)
410}
411
412pub struct Metrics<H: MetricsHierarchy> {
415 hierarchy: H,
416}
417
418impl<H: MetricsHierarchy> Metrics<H> {
419 pub fn new(hierarchy: H) -> Self {
420 Self { hierarchy }
421 }
422
423 pub fn create_counter(
446 &self,
447 name: &str,
448 description: &str,
449 labels: &[(&str, &str)],
450 ) -> anyhow::Result<prometheus::Counter> {
451 create_metric(&self.hierarchy, name, description, labels, None, None)
452 }
453
454 pub fn create_countervec(
456 &self,
457 name: &str,
458 description: &str,
459 const_labels: &[&str],
460 const_label_values: &[(&str, &str)],
461 ) -> anyhow::Result<prometheus::CounterVec> {
462 create_metric(
463 &self.hierarchy,
464 name,
465 description,
466 const_label_values,
467 None,
468 Some(const_labels),
469 )
470 }
471
472 pub fn create_gauge(
474 &self,
475 name: &str,
476 description: &str,
477 labels: &[(&str, &str)],
478 ) -> anyhow::Result<prometheus::Gauge> {
479 create_metric(&self.hierarchy, name, description, labels, None, None)
480 }
481
482 pub fn create_gaugevec(
484 &self,
485 name: &str,
486 description: &str,
487 const_labels: &[&str],
488 const_label_values: &[(&str, &str)],
489 ) -> anyhow::Result<prometheus::GaugeVec> {
490 create_metric(
491 &self.hierarchy,
492 name,
493 description,
494 const_label_values,
495 None,
496 Some(const_labels),
497 )
498 }
499
500 pub fn create_histogram(
502 &self,
503 name: &str,
504 description: &str,
505 labels: &[(&str, &str)],
506 buckets: Option<Vec<f64>>,
507 ) -> anyhow::Result<prometheus::Histogram> {
508 create_metric(&self.hierarchy, name, description, labels, buckets, None)
509 }
510
511 pub fn create_intcounter(
513 &self,
514 name: &str,
515 description: &str,
516 labels: &[(&str, &str)],
517 ) -> anyhow::Result<prometheus::IntCounter> {
518 create_metric(&self.hierarchy, name, description, labels, None, None)
519 }
520
521 pub fn create_intcountervec(
523 &self,
524 name: &str,
525 description: &str,
526 const_labels: &[&str],
527 const_label_values: &[(&str, &str)],
528 ) -> anyhow::Result<prometheus::IntCounterVec> {
529 create_metric(
530 &self.hierarchy,
531 name,
532 description,
533 const_label_values,
534 None,
535 Some(const_labels),
536 )
537 }
538
539 pub fn create_intgauge(
541 &self,
542 name: &str,
543 description: &str,
544 labels: &[(&str, &str)],
545 ) -> anyhow::Result<prometheus::IntGauge> {
546 create_metric(&self.hierarchy, name, description, labels, None, None)
547 }
548
549 pub fn create_intgaugevec(
551 &self,
552 name: &str,
553 description: &str,
554 const_labels: &[&str],
555 const_label_values: &[(&str, &str)],
556 ) -> anyhow::Result<prometheus::IntGaugeVec> {
557 create_metric(
558 &self.hierarchy,
559 name,
560 description,
561 const_label_values,
562 None,
563 Some(const_labels),
564 )
565 }
566
567 pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
569 self.hierarchy
570 .get_metrics_registry()
571 .prometheus_expfmt_combined()
572 }
573}
574
575use crate::traits::DistributedRuntimeProvider;
579
580pub trait MetricsHierarchy: Send + Sync {
581 fn basename(&self) -> String;
587
588 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
592
593 fn get_metrics_registry(&self) -> &MetricsRegistry;
595
596 fn connection_id(&self) -> Option<u64> {
606 None
607 }
608
609 fn metrics(&self) -> Metrics<&Self>
612 where
613 Self: Sized,
614 {
615 Metrics::new(self)
616 }
617}
618
619impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
621 fn basename(&self) -> String {
622 (**self).basename()
623 }
624
625 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
626 (**self).parent_hierarchies()
627 }
628
629 fn get_metrics_registry(&self) -> &MetricsRegistry {
630 (**self).get_metrics_registry()
631 }
632
633 fn connection_id(&self) -> Option<u64> {
634 (**self).connection_id()
635 }
636}
637
638pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
647
648pub type PrometheusExpositionFormatCallback =
650 Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
651
652#[derive(Clone)]
657pub struct MetricsRegistry {
658 pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
661
662 child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
676
677 pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
680
681 pub prometheus_expfmt_callbacks:
684 Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
685}
686
687impl std::fmt::Debug for MetricsRegistry {
688 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
689 f.debug_struct("MetricsRegistry")
690 .field("prometheus_registry", &"<RwLock<Registry>>")
691 .field(
692 "prometheus_update_callbacks",
693 &format!(
694 "<RwLock<Vec<Callback>>> with {} callbacks",
695 self.prometheus_update_callbacks.read().unwrap().len()
696 ),
697 )
698 .field(
699 "prometheus_expfmt_callbacks",
700 &format!(
701 "<RwLock<Vec<Callback>>> with {} callbacks",
702 self.prometheus_expfmt_callbacks.read().unwrap().len()
703 ),
704 )
705 .finish()
706 }
707}
708
709impl MetricsRegistry {
710 pub fn new() -> Self {
712 Self {
713 prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
714 child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
715 prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
716 prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
717 }
718 }
719
720 pub fn add_child_registry(&self, child: &MetricsRegistry) {
724 let child_ptr = Arc::as_ptr(&child.prometheus_registry);
725 let mut guard = self.child_registries.write().unwrap();
726 if guard
727 .iter()
728 .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
729 {
730 return;
731 }
732 guard.push(child.clone());
733 }
734
735 fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
736 fn visit(
742 registry: &MetricsRegistry,
743 out: &mut Vec<MetricsRegistry>,
744 seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
745 ) {
746 let ptr = Arc::as_ptr(®istry.prometheus_registry);
747 if !seen.insert(ptr) {
748 return;
749 }
750
751 out.push(registry.clone());
752
753 let children: Vec<MetricsRegistry> = registry
754 .child_registries
755 .read()
756 .unwrap()
757 .iter()
758 .cloned()
759 .collect();
760 for child in children {
761 visit(&child, out, seen);
762 }
763 }
764
765 let mut out = Vec::new();
766 let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
767 visit(self, &mut out, &mut seen);
768 out
769 }
770
771 pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
777 let registries = self.registries_for_combined_scrape();
778
779 for registry in ®istries {
781 for result in registry.execute_update_callbacks() {
782 if let Err(e) = result {
783 tracing::error!("Error executing metrics callback: {e}");
784 }
785 }
786 }
787
788 let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
790 let mut seen_series: HashSet<String> = HashSet::new();
791
792 for (registry_idx, registry) in registries.iter().enumerate() {
793 let families = registry.get_prometheus_registry().gather();
794 for mut family in families {
795 let name = family.name().to_string();
796
797 let entry = by_name.entry(name.clone()).or_insert_with(|| {
798 let mut out = prometheus::proto::MetricFamily::new();
799 out.set_name(name.clone());
800 out.set_help(family.help().to_string());
801 out.set_field_type(family.get_field_type());
802 out
803 });
804
805 if entry.help() != family.help()
806 || entry.get_field_type() != family.get_field_type()
807 {
808 return Err(anyhow::anyhow!(
809 "Metric family '{}' has inconsistent help/type across registries (idx={})",
810 name,
811 registry_idx
812 ));
813 }
814
815 let mut metrics = family.take_metric();
816 for metric in metrics.drain(..) {
817 let mut labels: Vec<(String, String)> = metric
818 .get_label()
819 .iter()
820 .map(|lp| (lp.name().to_string(), lp.value().to_string()))
821 .collect();
822 labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
823
824 let key = format!(
825 "{}|{}",
826 name,
827 labels
828 .iter()
829 .map(|(k, v)| format!("{}={}", k, v))
830 .collect::<Vec<_>>()
831 .join(",")
832 );
833
834 if !seen_series.insert(key) {
835 tracing::warn!(
836 metric_name = %name,
837 labels = ?labels,
838 registry_idx,
839 "Duplicate Prometheus series while merging registries; dropping later sample"
840 );
841 continue;
842 }
843
844 entry.mut_metric().push(metric);
845 }
846 }
847 }
848
849 let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
850 merged.sort_by(|a, b| a.name().cmp(b.name()));
851
852 let encoder = prometheus::TextEncoder::new();
853 let mut buffer = Vec::new();
854 encoder.encode(&merged, &mut buffer)?;
855 let mut result = String::from_utf8(buffer)?;
856
857 let mut expfmt = String::new();
859 for registry in registries {
860 let text = registry.execute_expfmt_callbacks();
861 if !text.is_empty() {
862 if !expfmt.is_empty() && !expfmt.ends_with('\n') {
863 expfmt.push('\n');
864 }
865 expfmt.push_str(&text);
866 }
867 }
868
869 if !expfmt.is_empty() {
870 if !result.ends_with('\n') {
871 result.push('\n');
872 }
873 result.push_str(&expfmt);
874 }
875
876 Ok(result)
877 }
878
879 pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
881 self.prometheus_update_callbacks
882 .write()
883 .unwrap()
884 .push(callback);
885 }
886
887 pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
889 self.prometheus_expfmt_callbacks
890 .write()
891 .unwrap()
892 .push(callback);
893 }
894
895 pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
897 self.prometheus_update_callbacks
898 .read()
899 .unwrap()
900 .iter()
901 .map(|callback| callback())
902 .collect()
903 }
904
905 pub fn execute_expfmt_callbacks(&self) -> String {
907 let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
908 let mut result = String::new();
909 for callback in callbacks.iter() {
910 match callback() {
911 Ok(text) => {
912 if !text.is_empty() {
913 if !result.is_empty() && !result.ends_with('\n') {
914 result.push('\n');
915 }
916 result.push_str(&text);
917 }
918 }
919 Err(e) => {
920 tracing::error!("Error executing exposition text callback: {e}");
921 }
922 }
923 }
924 result
925 }
926
927 pub fn add_metric(
929 &self,
930 collector: Box<dyn prometheus::core::Collector>,
931 ) -> anyhow::Result<()> {
932 self.prometheus_registry
933 .write()
934 .unwrap()
935 .register(collector)
936 .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
937 }
938
939 pub fn add_metric_or_warn(&self, collector: Box<dyn prometheus::core::Collector>, name: &str) {
941 if let Err(e) = self.add_metric(collector) {
942 tracing::warn!(error = %e, metric = name, "Failed to register metric");
943 }
944 }
945
946 pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
948 self.prometheus_registry.read().unwrap()
949 }
950
951 pub fn has_metric_named(&self, metric_name: &str) -> bool {
953 self.prometheus_registry
954 .read()
955 .unwrap()
956 .gather()
957 .iter()
958 .any(|mf| mf.name() == metric_name)
959 }
960}
961
962impl Default for MetricsRegistry {
963 fn default() -> Self {
964 Self::new()
965 }
966}
967
968#[cfg(test)]
969mod test_helpers {
970 use super::prometheus_names::name_prefix;
971 use super::*;
972
973 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
976 where
977 F: FnMut(&str) -> bool,
978 {
979 input
980 .lines()
981 .filter(|line| predicate(line))
982 .map(|line| line.to_string())
983 .collect::<Vec<_>>()
984 }
985
986 pub fn extract_metrics(input: &str) -> Vec<String> {
989 filter_prometheus_lines(input, |line| {
990 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
991 && !line.starts_with("#")
992 && !line.trim().is_empty()
993 })
994 }
995
996 pub fn parse_prometheus_metric(
1008 line: &str,
1009 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
1010 if line.trim().is_empty() || line.starts_with('#') {
1011 return None;
1012 }
1013
1014 let parts: Vec<&str> = line.split_whitespace().collect();
1015 if parts.len() < 2 {
1016 return None;
1017 }
1018
1019 let metric_part = parts[0];
1020 let value: f64 = parts[1].parse().ok()?;
1021
1022 let (name, labels) = if metric_part.contains('{') {
1023 let brace_start = metric_part.find('{').unwrap();
1024 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
1025 let name = &metric_part[..brace_start];
1026 let labels_str = &metric_part[brace_start + 1..brace_end];
1027
1028 let mut labels = std::collections::HashMap::new();
1029 for pair in labels_str.split(',') {
1030 if let Some((k, v)) = pair.split_once('=') {
1031 let v = v.trim_matches('"');
1032 labels.insert(k.trim().to_string(), v.to_string());
1033 }
1034 }
1035 (name.to_string(), labels)
1036 } else {
1037 (metric_part.to_string(), std::collections::HashMap::new())
1038 };
1039
1040 Some((name, labels, value))
1041 }
1042
1043 pub fn inject_worker_id(expected: &str, wid: &str) -> String {
1049 let wid_label = format!(",worker_id=\"{}\"", wid);
1050 expected
1051 .lines()
1052 .map(|line| {
1053 if line.starts_with('#') || line.trim().is_empty() || !line.contains('{') {
1054 line.to_string()
1055 } else if let Some(le_pos) = line.find(",le=") {
1056 let mut s = line.to_string();
1059 s.insert_str(le_pos, &wid_label);
1060 s
1061 } else {
1062 line.replacen("}", &format!("{}}}", wid_label), 1)
1063 }
1064 })
1065 .collect::<Vec<_>>()
1066 .join("\n")
1067 }
1068}
1069
1070#[cfg(test)]
1071mod test_metricsregistry_units {
1072 use super::*;
1073
1074 #[test]
1075 fn test_build_component_metric_name_with_prefix() {
1076 let result = build_component_metric_name("requests");
1078 assert_eq!(result, "dynamo_component_requests");
1079
1080 let result = build_component_metric_name("counter");
1081 assert_eq!(result, "dynamo_component_counter");
1082 }
1083
1084 #[test]
1085 fn test_parse_prometheus_metric() {
1086 use super::test_helpers::parse_prometheus_metric;
1087 use std::collections::HashMap;
1088
1089 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
1091 let parsed = parse_prometheus_metric(line);
1092 assert!(parsed.is_some());
1093
1094 let (name, labels, value) = parsed.unwrap();
1095 assert_eq!(name, "http_requests_total");
1096
1097 let mut expected_labels = HashMap::new();
1098 expected_labels.insert("method".to_string(), "GET".to_string());
1099 expected_labels.insert("status".to_string(), "200".to_string());
1100 assert_eq!(labels, expected_labels);
1101
1102 assert_eq!(value, 1234.0);
1103
1104 let line = "cpu_usage 98.5";
1106 let parsed = parse_prometheus_metric(line);
1107 assert!(parsed.is_some());
1108
1109 let (name, labels, value) = parsed.unwrap();
1110 assert_eq!(name, "cpu_usage");
1111 assert!(labels.is_empty());
1112 assert_eq!(value, 98.5);
1113
1114 let line = "response_time{service=\"api\"} 0.123";
1116 let parsed = parse_prometheus_metric(line);
1117 assert!(parsed.is_some());
1118
1119 let (name, labels, value) = parsed.unwrap();
1120 assert_eq!(name, "response_time");
1121
1122 let mut expected_labels = HashMap::new();
1123 expected_labels.insert("service".to_string(), "api".to_string());
1124 assert_eq!(labels, expected_labels);
1125
1126 assert_eq!(value, 0.123);
1127
1128 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
1135 }
1136
1137 #[test]
1138 fn test_metrics_registry_entry_callbacks() {
1139 use crate::MetricsRegistry;
1140 use std::sync::atomic::{AtomicUsize, Ordering};
1141
1142 {
1144 let registry = MetricsRegistry::new();
1145 let counter = Arc::new(AtomicUsize::new(0));
1146
1147 for increment in [1, 10, 100] {
1149 let counter_clone = counter.clone();
1150 registry.add_update_callback(Arc::new(move || {
1151 counter_clone.fetch_add(increment, Ordering::SeqCst);
1152 Ok(())
1153 }));
1154 }
1155
1156 assert_eq!(counter.load(Ordering::SeqCst), 0);
1158
1159 let results = registry.execute_update_callbacks();
1161 assert_eq!(results.len(), 3);
1162 assert!(results.iter().all(|r| r.is_ok()));
1163 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = registry.execute_update_callbacks();
1167 assert_eq!(results.len(), 3);
1168 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = registry.clone();
1172 assert_eq!(cloned.execute_update_callbacks().len(), 3);
1173 assert_eq!(counter.load(Ordering::SeqCst), 333); registry.execute_update_callbacks();
1177 assert_eq!(counter.load(Ordering::SeqCst), 444); }
1179
1180 {
1182 let registry = MetricsRegistry::new();
1183 let counter = Arc::new(AtomicUsize::new(0));
1184
1185 let counter_clone = counter.clone();
1187 registry.add_update_callback(Arc::new(move || {
1188 counter_clone.fetch_add(1, Ordering::SeqCst);
1189 Ok(())
1190 }));
1191
1192 registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1194
1195 let counter_clone = counter.clone();
1197 registry.add_update_callback(Arc::new(move || {
1198 counter_clone.fetch_add(10, Ordering::SeqCst);
1199 Ok(())
1200 }));
1201
1202 let results = registry.execute_update_callbacks();
1204 assert_eq!(results.len(), 3);
1205 assert!(results[0].is_ok());
1206 assert!(results[1].is_err());
1207 assert!(results[2].is_ok());
1208
1209 assert_eq!(
1211 results[1].as_ref().unwrap_err().to_string(),
1212 "Simulated error"
1213 );
1214
1215 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = registry.execute_update_callbacks();
1220 assert!(results[1].is_err());
1221 assert_eq!(counter.load(Ordering::SeqCst), 22); }
1223
1224 {
1226 let registry = MetricsRegistry::new();
1227 let results = registry.execute_update_callbacks();
1228 assert_eq!(results.len(), 0);
1229 }
1230 }
1231}
1232
1233#[cfg(feature = "integration")]
1234#[cfg(test)]
1235mod test_metricsregistry_prefixes {
1236 use super::*;
1237 use crate::distributed::distributed_test_utils::create_test_drt_async;
1238 use prometheus::core::Collector;
1239
1240 #[tokio::test]
1241 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1242 let drt = create_test_drt_async().await;
1243
1244 const DRT_NAME: &str = "";
1245 const NAMESPACE_NAME: &str = "ns901";
1246 const COMPONENT_NAME: &str = "comp901";
1247 const ENDPOINT_NAME: &str = "ep901";
1248 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1249 let component = namespace.component(COMPONENT_NAME).unwrap();
1250 let endpoint = component.endpoint(ENDPOINT_NAME);
1251
1252 assert_eq!(drt.basename(), DRT_NAME);
1254 assert_eq!(drt.parent_hierarchies().len(), 0);
1255 assert_eq!(namespace.basename(), NAMESPACE_NAME);
1259 assert_eq!(namespace.parent_hierarchies().len(), 1);
1260 assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1261 assert_eq!(component.basename(), COMPONENT_NAME);
1265 assert_eq!(component.parent_hierarchies().len(), 2);
1266 assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1267 assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1268 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1272 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1273 assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1274 assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1275 assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1276 assert!(
1280 namespace
1281 .parent_hierarchies()
1282 .iter()
1283 .any(|h| h.basename() == drt.basename())
1284 );
1285 assert!(
1286 component
1287 .parent_hierarchies()
1288 .iter()
1289 .any(|h| h.basename() == namespace.basename())
1290 );
1291 assert!(
1292 endpoint
1293 .parent_hierarchies()
1294 .iter()
1295 .any(|h| h.basename() == component.basename())
1296 );
1297
1298 assert_eq!(drt.parent_hierarchies().len(), 0);
1300 assert_eq!(namespace.parent_hierarchies().len(), 1);
1301 assert_eq!(component.parent_hierarchies().len(), 2);
1302 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1303
1304 let invalid_namespace = drt.namespace("@@123").unwrap();
1308 let result =
1309 invalid_namespace
1310 .metrics()
1311 .create_counter("test_counter", "A test counter", &[]);
1312 assert!(result.is_ok());
1313 if let Ok(counter) = &result {
1314 let desc = counter.desc();
1316 let namespace_label = desc[0]
1317 .const_label_pairs
1318 .iter()
1319 .find(|l| l.name() == "dynamo_namespace")
1320 .expect("Should have dynamo_namespace label");
1321 assert_eq!(namespace_label.value(), "_123");
1322 }
1323
1324 let valid_namespace = drt.namespace("ns567").unwrap();
1326 assert!(
1327 valid_namespace
1328 .metrics()
1329 .create_counter("test_counter", "A test counter", &[])
1330 .is_ok()
1331 );
1332 }
1333
1334 #[tokio::test]
1335 async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
1336 let drt = create_test_drt_async().await;
1340 let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
1341 let component = namespace.component("comp_expfmt_ep_only").unwrap();
1342 let endpoint = component.endpoint("ep_expfmt_ep_only");
1343
1344 let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
1345 let callback: PrometheusExpositionFormatCallback =
1346 Arc::new(move || Ok(metric_line.to_string()));
1347
1348 endpoint
1349 .get_metrics_registry()
1350 .add_expfmt_callback(callback);
1351
1352 let output = drt.metrics().prometheus_expfmt().unwrap();
1353 let occurrences = output
1354 .lines()
1355 .filter(|line| line == &metric_line.trim_end_matches('\n'))
1356 .count();
1357
1358 assert_eq!(
1359 occurrences, 1,
1360 "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
1361 occurrences, output
1362 );
1363 }
1364
1365 #[tokio::test]
1366 async fn test_recursive_namespace() {
1367 let drt = create_test_drt_async().await;
1369
1370 let ns1 = drt.namespace("ns1").unwrap();
1372 let ns2 = ns1.namespace("ns2").unwrap();
1373 let ns3 = ns2.namespace("ns3").unwrap();
1374
1375 let component = ns3.component("test-component").unwrap();
1377
1378 assert_eq!(ns1.basename(), "ns1");
1380 assert_eq!(ns1.parent_hierarchies().len(), 1);
1381 assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1382 assert_eq!(ns2.basename(), "ns2");
1385 assert_eq!(ns2.parent_hierarchies().len(), 2);
1386 assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1387 assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1388 assert_eq!(ns3.basename(), "ns3");
1391 assert_eq!(ns3.parent_hierarchies().len(), 3);
1392 assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1393 assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1394 assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1395 assert_eq!(component.basename(), "test-component");
1398 assert_eq!(component.parent_hierarchies().len(), 4);
1399 assert_eq!(component.parent_hierarchies()[0].basename(), "");
1400 assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1401 assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1402 assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1403 println!("✓ Chained namespace test passed - all prefixes correct");
1406 }
1407}
1408
1409#[cfg(feature = "integration")]
1410#[cfg(test)]
1411mod test_metricsregistry_prometheus_fmt_outputs {
1412 use super::prometheus_names::name_prefix;
1413 use super::*;
1414 use crate::distributed::distributed_test_utils::create_test_drt_async;
1415 use prometheus::Counter;
1416 use std::sync::Arc;
1417
1418 #[tokio::test]
1419 async fn test_prometheusfactory_using_metrics_registry_trait() {
1420 let drt = create_test_drt_async().await;
1422
1423 let namespace_name = "ns345";
1425
1426 let namespace = drt.namespace(namespace_name).unwrap();
1427 let component = namespace.component("comp345").unwrap();
1428 let endpoint = component.endpoint("ep345");
1429
1430 let counter = endpoint
1432 .metrics()
1433 .create_counter("testcounter", "A test counter", &[])
1434 .unwrap();
1435 counter.inc_by(123.456789);
1436 let epsilon = 0.01;
1437 assert!((counter.get() - 123.456789).abs() < epsilon);
1438
1439 let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1440 println!("Endpoint output:");
1441 println!("{}", endpoint_output_raw);
1442
1443 let wid = format!("{:x}", drt.connection_id());
1446 use super::test_helpers::inject_worker_id;
1447
1448 let expected_endpoint_output = inject_worker_id(
1449 r#"# HELP dynamo_component_testcounter A test counter
1450# TYPE dynamo_component_testcounter counter
1451dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#,
1452 &wid,
1453 );
1454
1455 assert_eq!(
1456 endpoint_output_raw.trim_end_matches('\n'),
1457 expected_endpoint_output.trim_end_matches('\n'),
1458 "\n=== ENDPOINT COMPARISON FAILED ===\n\
1459 Actual:\n{}\n\
1460 Expected:\n{}\n\
1461 ==============================",
1462 endpoint_output_raw,
1463 expected_endpoint_output
1464 );
1465
1466 let gauge = component
1468 .metrics()
1469 .create_gauge("testgauge", "A test gauge", &[])
1470 .unwrap();
1471 gauge.set(50000.0);
1472 assert_eq!(gauge.get(), 50000.0);
1473
1474 let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1476 println!("Component output:");
1477 println!("{}", component_output_raw);
1478
1479 let expected_component_output = inject_worker_id(
1480 r#"# HELP dynamo_component_testcounter A test counter
1481# TYPE dynamo_component_testcounter counter
1482dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1483# HELP dynamo_component_testgauge A test gauge
1484# TYPE dynamo_component_testgauge gauge
1485dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#,
1486 &wid,
1487 );
1488
1489 assert_eq!(
1490 component_output_raw.trim_end_matches('\n'),
1491 expected_component_output.trim_end_matches('\n'),
1492 "\n=== COMPONENT COMPARISON FAILED ===\n\
1493 Actual:\n{}\n\
1494 Expected:\n{}\n\
1495 ==============================",
1496 component_output_raw,
1497 expected_component_output
1498 );
1499
1500 let intcounter = namespace
1501 .metrics()
1502 .create_intcounter("testintcounter", "A test int counter", &[])
1503 .unwrap();
1504 intcounter.inc_by(12345);
1505 assert_eq!(intcounter.get(), 12345);
1506
1507 let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1509 println!("Namespace output:");
1510 println!("{}", namespace_output_raw);
1511
1512 let expected_namespace_output = inject_worker_id(
1513 r#"# HELP dynamo_component_testcounter A test counter
1514# TYPE dynamo_component_testcounter counter
1515dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1516# HELP dynamo_component_testgauge A test gauge
1517# TYPE dynamo_component_testgauge gauge
1518dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1519# HELP dynamo_component_testintcounter A test int counter
1520# TYPE dynamo_component_testintcounter counter
1521dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#,
1522 &wid,
1523 );
1524
1525 assert_eq!(
1526 namespace_output_raw.trim_end_matches('\n'),
1527 expected_namespace_output.trim_end_matches('\n'),
1528 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1529 Actual:\n{}\n\
1530 Expected:\n{}\n\
1531 ==============================",
1532 namespace_output_raw,
1533 expected_namespace_output
1534 );
1535
1536 let intgauge = namespace
1538 .metrics()
1539 .create_intgauge("testintgauge", "A test int gauge", &[])
1540 .unwrap();
1541 intgauge.set(42);
1542 assert_eq!(intgauge.get(), 42);
1543
1544 let intgaugevec = namespace
1546 .metrics()
1547 .create_intgaugevec(
1548 "testintgaugevec",
1549 "A test int gauge vector",
1550 &["instance", "status"],
1551 &[("service", "api")],
1552 )
1553 .unwrap();
1554 intgaugevec
1555 .with_label_values(&["server1", "active"])
1556 .set(10);
1557 intgaugevec
1558 .with_label_values(&["server2", "inactive"])
1559 .set(0);
1560
1561 let countervec = endpoint
1563 .metrics()
1564 .create_countervec(
1565 "testcountervec",
1566 "A test counter vector",
1567 &["method", "status"],
1568 &[("service", "api")],
1569 )
1570 .unwrap();
1571 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1572 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1573
1574 let histogram = component
1576 .metrics()
1577 .create_histogram("testhistogram", "A test histogram", &[], None)
1578 .unwrap();
1579 histogram.observe(1.0);
1580 histogram.observe(2.5);
1581 histogram.observe(4.0);
1582
1583 let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1585 println!("DRT output:");
1586 println!("{}", drt_output_raw);
1587
1588 let expected_drt_output_without_uptime = inject_worker_id(
1591 r#"# HELP dynamo_component_testcounter A test counter
1592# TYPE dynamo_component_testcounter counter
1593dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1594# HELP dynamo_component_testcountervec A test counter vector
1595# TYPE dynamo_component_testcountervec counter
1596dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1597dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1598# HELP dynamo_component_testgauge A test gauge
1599# TYPE dynamo_component_testgauge gauge
1600dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1601# HELP dynamo_component_testhistogram A test histogram
1602# TYPE dynamo_component_testhistogram histogram
1603dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1604dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1605dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1606dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1607dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1608dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1609dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1610dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1611dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1612dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1613dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1614dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1615dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1616dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1617# HELP dynamo_component_testintcounter A test int counter
1618# TYPE dynamo_component_testintcounter counter
1619dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1620# HELP dynamo_component_testintgauge A test int gauge
1621# TYPE dynamo_component_testintgauge gauge
1622dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1623# HELP dynamo_component_testintgaugevec A test int gauge vector
1624# TYPE dynamo_component_testintgaugevec gauge
1625dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1626dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#,
1627 &wid,
1628 );
1629
1630 let mut non_uptime_lines = Vec::new();
1634 let mut saw_uptime_value = false;
1635 for line in drt_output_raw.trim_end_matches('\n').lines() {
1636 if line.starts_with("dynamo_component_uptime_seconds") && !line.starts_with('#') {
1637 let val_str = line.split_whitespace().last().unwrap();
1638 val_str.parse::<f64>().expect("uptime should be a float");
1639 saw_uptime_value = true;
1640 } else if line.starts_with("# HELP dynamo_component_uptime_seconds")
1641 || line.starts_with("# TYPE dynamo_component_uptime_seconds")
1642 {
1643 } else {
1645 non_uptime_lines.push(line);
1646 }
1647 }
1648 assert!(
1649 saw_uptime_value,
1650 "uptime_seconds metric should be present in initial scrape"
1651 );
1652
1653 let actual_without_uptime = non_uptime_lines.join("\n");
1654 assert_eq!(
1655 actual_without_uptime,
1656 expected_drt_output_without_uptime.trim_end_matches('\n'),
1657 "\n=== DRT COMPARISON FAILED (excluding uptime) ===\n\
1658 Expected:\n{}\n\
1659 Actual:\n{}\n\
1660 ==============================",
1661 expected_drt_output_without_uptime,
1662 actual_without_uptime
1663 );
1664
1665 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1667 let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
1668 let uptime_line = drt_output_after
1669 .lines()
1670 .find(|l| l.starts_with("dynamo_component_uptime_seconds") && !l.starts_with('#'))
1671 .expect("uptime_seconds metric should be present after sleep");
1672 let uptime_after: f64 = uptime_line
1673 .split_whitespace()
1674 .last()
1675 .unwrap()
1676 .parse()
1677 .expect("uptime should be a float");
1678 assert!(
1679 uptime_after > 0.0,
1680 "uptime_seconds should be > 0 after 10ms sleep, got {}",
1681 uptime_after
1682 );
1683
1684 println!("✓ All Prometheus format outputs verified successfully!");
1685 }
1686
1687 #[test]
1688 fn test_refactored_filter_functions() {
1689 let test_input = r#"# HELP dynamo_component_requests Total requests
1691# TYPE dynamo_component_requests counter
1692dynamo_component_requests 42
1693# HELP dynamo_component_latency Response latency
1694# TYPE dynamo_component_latency histogram
1695dynamo_component_latency_bucket{le="0.1"} 10
1696dynamo_component_latency_bucket{le="0.5"} 25
1697dynamo_component_errors_total 5"#;
1698
1699 let metrics_only = super::test_helpers::extract_metrics(test_input);
1701 assert_eq!(metrics_only.len(), 4); assert!(
1703 metrics_only
1704 .iter()
1705 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1706 );
1707
1708 println!("✓ All refactored filter functions work correctly!");
1709 }
1710
1711 #[tokio::test]
1712 async fn test_same_metric_name_different_endpoints() {
1713 let drt = create_test_drt_async().await;
1717 let namespace = drt.namespace("ns_test").unwrap();
1718 let component = namespace.component("comp_test").unwrap();
1719
1720 let ep1 = component.endpoint("ep1");
1722 let ep2 = component.endpoint("ep2");
1723
1724 let counter1 = ep1
1725 .metrics()
1726 .create_counter("requests_total", "Total requests", &[])
1727 .unwrap();
1728 counter1.inc_by(100.0);
1729
1730 let counter2 = ep2
1731 .metrics()
1732 .create_counter("requests_total", "Total requests", &[])
1733 .unwrap();
1734 counter2.inc_by(200.0);
1735
1736 let output = component.metrics().prometheus_expfmt().unwrap();
1738
1739 let wid = format!("{:x}", drt.connection_id());
1740 use super::test_helpers::inject_worker_id;
1741
1742 let expected_output = inject_worker_id(
1743 r#"# HELP dynamo_component_requests_total Total requests
1744# TYPE dynamo_component_requests_total counter
1745dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
1746dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#,
1747 &wid,
1748 );
1749
1750 assert_eq!(
1751 output.trim_end_matches('\n'),
1752 expected_output.trim_end_matches('\n'),
1753 "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
1754 Actual:\n{}\n\
1755 Expected:\n{}\n\
1756 ==============================",
1757 output,
1758 expected_output
1759 );
1760
1761 println!("✓ Multi-registry prevents Prometheus collisions!");
1762 }
1763
1764 #[tokio::test]
1765 async fn test_duplicate_series_warning() {
1766 let drt = create_test_drt_async().await;
1769 let namespace = drt.namespace("ns_dup").unwrap();
1770 let component = namespace.component("comp_dup").unwrap();
1771
1772 let ep1 = component.endpoint("ep_same");
1774 let ep2 = component.endpoint("ep_same"); let counter1 = ep1
1777 .metrics()
1778 .create_counter("dup_metric", "Duplicate metric test", &[])
1779 .unwrap();
1780 counter1.inc_by(50.0);
1781
1782 let counter2 = ep2
1783 .metrics()
1784 .create_counter("dup_metric", "Duplicate metric test", &[])
1785 .unwrap();
1786 counter2.inc_by(75.0);
1787
1788 let output = component.metrics().prometheus_expfmt().unwrap();
1790
1791 let wid = format!("{:x}", drt.connection_id());
1792 use super::test_helpers::inject_worker_id;
1793
1794 let expected_output = inject_worker_id(
1795 r#"# HELP dynamo_component_dup_metric Duplicate metric test
1796# TYPE dynamo_component_dup_metric counter
1797dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#,
1798 &wid,
1799 );
1800
1801 assert_eq!(
1802 output.trim_end_matches('\n'),
1803 expected_output.trim_end_matches('\n'),
1804 "\n=== DEDUPLICATION COMPARISON FAILED ===\n\
1805 Actual:\n{}\n\
1806 Expected:\n{}\n\
1807 ==============================",
1808 output,
1809 expected_output
1810 );
1811
1812 println!("✓ Duplicate series detection and deduplication works!");
1813 }
1814}