1pub mod frontend_perf;
10pub mod prometheus_names;
11pub mod request_plane;
12pub mod tokio_perf;
13pub mod transport_metrics;
14pub mod work_handler_perf;
15pub mod work_handler_pool;
16
17use parking_lot::Mutex;
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use crate::component::ComponentBuilder;
22use anyhow;
23use once_cell::sync::Lazy;
24use regex::Regex;
25use std::any::Any;
26use std::collections::HashMap;
27
28use prometheus_names::{
30 build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
31 sanitize_prometheus_name, work_handler,
32};
33
34use crate::pipeline::{
36 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
37 network::Ingress,
38};
39use crate::protocols::annotated::Annotated;
40use crate::stream;
41use crate::stream::StreamExt;
42
43use prometheus::Encoder;
45
46fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
49 let mut seen_keys = std::collections::HashSet::new();
50 for (key, _) in labels {
51 if !seen_keys.insert(*key) {
52 return Err(anyhow::anyhow!(
53 "Duplicate label key '{}' found in labels",
54 key
55 ));
56 }
57 }
58 Ok(())
59}
60
61pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
66 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
68 where
69 Self: Sized;
70
71 fn with_histogram_opts_and_buckets(
74 _opts: prometheus::HistogramOpts,
75 _buckets: Option<Vec<f64>>,
76 ) -> Result<Self, prometheus::Error>
77 where
78 Self: Sized,
79 {
80 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
81 }
82
83 fn with_opts_and_label_names(
86 _opts: prometheus::Opts,
87 _label_names: &[&str],
88 ) -> Result<Self, prometheus::Error>
89 where
90 Self: Sized,
91 {
92 panic!("with_opts_and_label_names is not implemented for this metric type");
93 }
94}
95
96impl PrometheusMetric for prometheus::Counter {
98 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
99 prometheus::Counter::with_opts(opts)
100 }
101}
102
103impl PrometheusMetric for prometheus::IntCounter {
104 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
105 prometheus::IntCounter::with_opts(opts)
106 }
107}
108
109impl PrometheusMetric for prometheus::Gauge {
110 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
111 prometheus::Gauge::with_opts(opts)
112 }
113}
114
115impl PrometheusMetric for prometheus::IntGauge {
116 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
117 prometheus::IntGauge::with_opts(opts)
118 }
119}
120
121impl PrometheusMetric for prometheus::GaugeVec {
122 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
123 Err(prometheus::Error::Msg(
124 "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
125 ))
126 }
127
128 fn with_opts_and_label_names(
129 opts: prometheus::Opts,
130 label_names: &[&str],
131 ) -> Result<Self, prometheus::Error> {
132 prometheus::GaugeVec::new(opts, label_names)
133 }
134}
135
136impl PrometheusMetric for prometheus::IntGaugeVec {
137 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
138 Err(prometheus::Error::Msg(
139 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
140 ))
141 }
142
143 fn with_opts_and_label_names(
144 opts: prometheus::Opts,
145 label_names: &[&str],
146 ) -> Result<Self, prometheus::Error> {
147 prometheus::IntGaugeVec::new(opts, label_names)
148 }
149}
150
151impl PrometheusMetric for prometheus::IntCounterVec {
152 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
153 Err(prometheus::Error::Msg(
154 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
155 ))
156 }
157
158 fn with_opts_and_label_names(
159 opts: prometheus::Opts,
160 label_names: &[&str],
161 ) -> Result<Self, prometheus::Error> {
162 prometheus::IntCounterVec::new(opts, label_names)
163 }
164}
165
166impl PrometheusMetric for prometheus::Histogram {
168 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
169 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
171 prometheus::Histogram::with_opts(histogram_opts)
172 }
173
174 fn with_histogram_opts_and_buckets(
175 mut opts: prometheus::HistogramOpts,
176 buckets: Option<Vec<f64>>,
177 ) -> Result<Self, prometheus::Error> {
178 if let Some(custom_buckets) = buckets {
179 opts = opts.buckets(custom_buckets);
180 }
181 prometheus::Histogram::with_opts(opts)
182 }
183}
184
185impl PrometheusMetric for prometheus::CounterVec {
187 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
188 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
190 }
191
192 fn with_opts_and_label_names(
193 opts: prometheus::Opts,
194 label_names: &[&str],
195 ) -> Result<Self, prometheus::Error> {
196 prometheus::CounterVec::new(opts, label_names)
197 }
198}
199
200pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
205 hierarchy: &H,
206 metric_name: &str,
207 metric_desc: &str,
208 labels: &[(&str, &str)],
209 buckets: Option<Vec<f64>>,
210 const_labels: Option<&[&str]>,
211) -> anyhow::Result<T> {
212 validate_no_duplicate_label_keys(labels)?;
214 let basename = hierarchy.basename();
217 let parent_hierarchies = hierarchy.parent_hierarchies();
218
219 let mut hierarchy_names: Vec<String> =
221 parent_hierarchies.iter().map(|p| p.basename()).collect();
222 hierarchy_names.push(basename.clone());
223
224 let metric_name = build_component_metric_name(metric_name);
225
226 let mut updated_labels: Vec<(String, String)> = Vec::new();
228
229 for (key, _) in labels {
236 if *key == labels::NAMESPACE
237 || *key == labels::COMPONENT
238 || *key == labels::ENDPOINT
239 || *key == labels::WORKER_ID
240 {
241 return Err(anyhow::anyhow!(
242 "Label '{}' is automatically added by auto-label injection and cannot be manually set",
243 key
244 ));
245 }
246 }
247
248 if let Some(label_names) = const_labels {
252 for name in label_names.iter() {
253 if *name == labels::NAMESPACE
254 || *name == labels::COMPONENT
255 || *name == labels::ENDPOINT
256 || *name == labels::WORKER_ID
257 {
258 return Err(anyhow::anyhow!(
259 "Variable label name '{}' conflicts with auto-injected const label and cannot be used",
260 name
261 ));
262 }
263 }
264 }
265
266 if hierarchy_names.len() > 1 {
269 let namespace = &hierarchy_names[1];
270 if !namespace.is_empty() {
271 let valid_namespace = sanitize_prometheus_label(namespace)?;
272 if !valid_namespace.is_empty() {
273 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
274 }
275 }
276 }
277 if hierarchy_names.len() > 2 {
278 let component = &hierarchy_names[2];
279 if !component.is_empty() {
280 let valid_component = sanitize_prometheus_label(component)?;
281 if !valid_component.is_empty() {
282 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
283 }
284 }
285 }
286 if hierarchy_names.len() > 3 {
287 let endpoint = &hierarchy_names[3];
288 if !endpoint.is_empty() {
289 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
290 if !valid_endpoint.is_empty() {
291 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
292 }
293 }
294 }
295
296 if let Some(conn_id) = hierarchy.connection_id() {
300 updated_labels.push((labels::WORKER_ID.to_string(), format!("{:x}", conn_id)));
301 }
302
303 updated_labels.extend(
305 labels
306 .iter()
307 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
308 );
309 let prometheus_metric = if std::any::TypeId::of::<T>()
313 == std::any::TypeId::of::<prometheus::CounterVec>()
314 {
315 if buckets.is_some() {
318 return Err(anyhow::anyhow!(
319 "buckets parameter is not valid for CounterVec"
320 ));
321 }
322 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
323 for (key, value) in &updated_labels {
324 opts = opts.const_label(key.clone(), value.clone());
325 }
326 let label_names = const_labels
327 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
328 T::with_opts_and_label_names(opts, label_names)?
329 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
330 if buckets.is_some() {
333 return Err(anyhow::anyhow!(
334 "buckets parameter is not valid for GaugeVec"
335 ));
336 }
337 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
338 for (key, value) in &updated_labels {
339 opts = opts.const_label(key.clone(), value.clone());
340 }
341 let label_names = const_labels
342 .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
343 T::with_opts_and_label_names(opts, label_names)?
344 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
345 if const_labels.is_some() {
348 return Err(anyhow::anyhow!(
349 "const_labels parameter is not valid for Histogram"
350 ));
351 }
352 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
353 for (key, value) in &updated_labels {
354 opts = opts.const_label(key.clone(), value.clone());
355 }
356 T::with_histogram_opts_and_buckets(opts, buckets)?
357 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
358 if buckets.is_some() {
361 return Err(anyhow::anyhow!(
362 "buckets parameter is not valid for IntCounterVec"
363 ));
364 }
365 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366 for (key, value) in &updated_labels {
367 opts = opts.const_label(key.clone(), value.clone());
368 }
369 let label_names = const_labels
370 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
371 T::with_opts_and_label_names(opts, label_names)?
372 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
373 if buckets.is_some() {
376 return Err(anyhow::anyhow!(
377 "buckets parameter is not valid for IntGaugeVec"
378 ));
379 }
380 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
381 for (key, value) in &updated_labels {
382 opts = opts.const_label(key.clone(), value.clone());
383 }
384 let label_names = const_labels
385 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
386 T::with_opts_and_label_names(opts, label_names)?
387 } else {
388 if buckets.is_some() {
391 return Err(anyhow::anyhow!(
392 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
393 ));
394 }
395 if const_labels.is_some() {
396 return Err(anyhow::anyhow!(
397 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
398 ));
399 }
400 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
401 for (key, value) in &updated_labels {
402 opts = opts.const_label(key.clone(), value.clone());
403 }
404 T::with_opts(opts)?
405 };
406
407 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
408 hierarchy.get_metrics_registry().add_metric(collector)?;
409
410 Ok(prometheus_metric)
411}
412
413pub struct Metrics<H: MetricsHierarchy> {
416 hierarchy: H,
417}
418
419impl<H: MetricsHierarchy> Metrics<H> {
420 pub fn new(hierarchy: H) -> Self {
421 Self { hierarchy }
422 }
423
424 pub fn create_counter(
447 &self,
448 name: &str,
449 description: &str,
450 labels: &[(&str, &str)],
451 ) -> anyhow::Result<prometheus::Counter> {
452 create_metric(&self.hierarchy, name, description, labels, None, None)
453 }
454
455 pub fn create_countervec(
457 &self,
458 name: &str,
459 description: &str,
460 const_labels: &[&str],
461 const_label_values: &[(&str, &str)],
462 ) -> anyhow::Result<prometheus::CounterVec> {
463 create_metric(
464 &self.hierarchy,
465 name,
466 description,
467 const_label_values,
468 None,
469 Some(const_labels),
470 )
471 }
472
473 pub fn create_gauge(
475 &self,
476 name: &str,
477 description: &str,
478 labels: &[(&str, &str)],
479 ) -> anyhow::Result<prometheus::Gauge> {
480 create_metric(&self.hierarchy, name, description, labels, None, None)
481 }
482
483 pub fn create_gaugevec(
485 &self,
486 name: &str,
487 description: &str,
488 const_labels: &[&str],
489 const_label_values: &[(&str, &str)],
490 ) -> anyhow::Result<prometheus::GaugeVec> {
491 create_metric(
492 &self.hierarchy,
493 name,
494 description,
495 const_label_values,
496 None,
497 Some(const_labels),
498 )
499 }
500
501 pub fn create_histogram(
503 &self,
504 name: &str,
505 description: &str,
506 labels: &[(&str, &str)],
507 buckets: Option<Vec<f64>>,
508 ) -> anyhow::Result<prometheus::Histogram> {
509 create_metric(&self.hierarchy, name, description, labels, buckets, None)
510 }
511
512 pub fn create_intcounter(
514 &self,
515 name: &str,
516 description: &str,
517 labels: &[(&str, &str)],
518 ) -> anyhow::Result<prometheus::IntCounter> {
519 create_metric(&self.hierarchy, name, description, labels, None, None)
520 }
521
522 pub fn create_intcountervec(
524 &self,
525 name: &str,
526 description: &str,
527 const_labels: &[&str],
528 const_label_values: &[(&str, &str)],
529 ) -> anyhow::Result<prometheus::IntCounterVec> {
530 create_metric(
531 &self.hierarchy,
532 name,
533 description,
534 const_label_values,
535 None,
536 Some(const_labels),
537 )
538 }
539
540 pub fn create_intgauge(
542 &self,
543 name: &str,
544 description: &str,
545 labels: &[(&str, &str)],
546 ) -> anyhow::Result<prometheus::IntGauge> {
547 create_metric(&self.hierarchy, name, description, labels, None, None)
548 }
549
550 pub fn create_intgaugevec(
552 &self,
553 name: &str,
554 description: &str,
555 const_labels: &[&str],
556 const_label_values: &[(&str, &str)],
557 ) -> anyhow::Result<prometheus::IntGaugeVec> {
558 create_metric(
559 &self.hierarchy,
560 name,
561 description,
562 const_label_values,
563 None,
564 Some(const_labels),
565 )
566 }
567
568 pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
570 self.hierarchy
571 .get_metrics_registry()
572 .prometheus_expfmt_combined()
573 }
574}
575
576use crate::traits::DistributedRuntimeProvider;
580
581pub trait MetricsHierarchy: Send + Sync {
582 fn basename(&self) -> String;
588
589 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
593
594 fn get_metrics_registry(&self) -> &MetricsRegistry;
596
597 fn connection_id(&self) -> Option<u64> {
607 None
608 }
609
610 fn metrics(&self) -> Metrics<&Self>
613 where
614 Self: Sized,
615 {
616 Metrics::new(self)
617 }
618}
619
620impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
622 fn basename(&self) -> String {
623 (**self).basename()
624 }
625
626 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
627 (**self).parent_hierarchies()
628 }
629
630 fn get_metrics_registry(&self) -> &MetricsRegistry {
631 (**self).get_metrics_registry()
632 }
633
634 fn connection_id(&self) -> Option<u64> {
635 (**self).connection_id()
636 }
637}
638
639pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
648
649pub type PrometheusExpositionFormatCallback =
651 Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
652
653#[derive(Clone)]
658pub struct MetricsRegistry {
659 pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
662
663 child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
677
678 pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
681
682 pub prometheus_expfmt_callbacks:
685 Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
686}
687
688impl std::fmt::Debug for MetricsRegistry {
689 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690 f.debug_struct("MetricsRegistry")
691 .field("prometheus_registry", &"<RwLock<Registry>>")
692 .field(
693 "prometheus_update_callbacks",
694 &format!(
695 "<RwLock<Vec<Callback>>> with {} callbacks",
696 self.prometheus_update_callbacks.read().unwrap().len()
697 ),
698 )
699 .field(
700 "prometheus_expfmt_callbacks",
701 &format!(
702 "<RwLock<Vec<Callback>>> with {} callbacks",
703 self.prometheus_expfmt_callbacks.read().unwrap().len()
704 ),
705 )
706 .finish()
707 }
708}
709
710impl MetricsRegistry {
711 pub fn new() -> Self {
713 Self {
714 prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
715 child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
716 prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
717 prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
718 }
719 }
720
721 pub fn add_child_registry(&self, child: &MetricsRegistry) {
725 let child_ptr = Arc::as_ptr(&child.prometheus_registry);
726 let mut guard = self.child_registries.write().unwrap();
727 if guard
728 .iter()
729 .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
730 {
731 return;
732 }
733 guard.push(child.clone());
734 }
735
736 fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
737 fn visit(
743 registry: &MetricsRegistry,
744 out: &mut Vec<MetricsRegistry>,
745 seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
746 ) {
747 let ptr = Arc::as_ptr(®istry.prometheus_registry);
748 if !seen.insert(ptr) {
749 return;
750 }
751
752 out.push(registry.clone());
753
754 let children: Vec<MetricsRegistry> = registry
755 .child_registries
756 .read()
757 .unwrap()
758 .iter()
759 .cloned()
760 .collect();
761 for child in children {
762 visit(&child, out, seen);
763 }
764 }
765
766 let mut out = Vec::new();
767 let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
768 visit(self, &mut out, &mut seen);
769 out
770 }
771
772 pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
778 let registries = self.registries_for_combined_scrape();
779
780 for registry in ®istries {
782 for result in registry.execute_update_callbacks() {
783 if let Err(e) = result {
784 tracing::error!("Error executing metrics callback: {e}");
785 }
786 }
787 }
788
789 let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
791 let mut seen_series: HashSet<String> = HashSet::new();
792
793 for (registry_idx, registry) in registries.iter().enumerate() {
794 let families = registry.get_prometheus_registry().gather();
795 for mut family in families {
796 let name = family.name().to_string();
797
798 let entry = by_name.entry(name.clone()).or_insert_with(|| {
799 let mut out = prometheus::proto::MetricFamily::new();
800 out.set_name(name.clone());
801 out.set_help(family.help().to_string());
802 out.set_field_type(family.get_field_type());
803 out
804 });
805
806 if entry.help() != family.help()
807 || entry.get_field_type() != family.get_field_type()
808 {
809 return Err(anyhow::anyhow!(
810 "Metric family '{}' has inconsistent help/type across registries (idx={})",
811 name,
812 registry_idx
813 ));
814 }
815
816 let mut metrics = family.take_metric();
817 for metric in metrics.drain(..) {
818 let mut labels: Vec<(String, String)> = metric
819 .get_label()
820 .iter()
821 .map(|lp| (lp.name().to_string(), lp.value().to_string()))
822 .collect();
823 labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
824
825 let key = format!(
826 "{}|{}",
827 name,
828 labels
829 .iter()
830 .map(|(k, v)| format!("{}={}", k, v))
831 .collect::<Vec<_>>()
832 .join(",")
833 );
834
835 if !seen_series.insert(key) {
836 tracing::warn!(
837 metric_name = %name,
838 labels = ?labels,
839 registry_idx,
840 "Duplicate Prometheus series while merging registries; dropping later sample"
841 );
842 continue;
843 }
844
845 entry.mut_metric().push(metric);
846 }
847 }
848 }
849
850 let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
851 merged.sort_by(|a, b| a.name().cmp(b.name()));
852
853 let encoder = prometheus::TextEncoder::new();
854 let mut buffer = Vec::new();
855 encoder.encode(&merged, &mut buffer)?;
856 let mut result = String::from_utf8(buffer)?;
857
858 let mut expfmt = String::new();
860 for registry in registries {
861 let text = registry.execute_expfmt_callbacks();
862 if !text.is_empty() {
863 if !expfmt.is_empty() && !expfmt.ends_with('\n') {
864 expfmt.push('\n');
865 }
866 expfmt.push_str(&text);
867 }
868 }
869
870 if !expfmt.is_empty() {
871 if !result.ends_with('\n') {
872 result.push('\n');
873 }
874 result.push_str(&expfmt);
875 }
876
877 Ok(result)
878 }
879
880 pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
882 self.prometheus_update_callbacks
883 .write()
884 .unwrap()
885 .push(callback);
886 }
887
888 pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
890 self.prometheus_expfmt_callbacks
891 .write()
892 .unwrap()
893 .push(callback);
894 }
895
896 pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
898 self.prometheus_update_callbacks
899 .read()
900 .unwrap()
901 .iter()
902 .map(|callback| callback())
903 .collect()
904 }
905
906 pub fn execute_expfmt_callbacks(&self) -> String {
908 let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
909 let mut result = String::new();
910 for callback in callbacks.iter() {
911 match callback() {
912 Ok(text) => {
913 if !text.is_empty() {
914 if !result.is_empty() && !result.ends_with('\n') {
915 result.push('\n');
916 }
917 result.push_str(&text);
918 }
919 }
920 Err(e) => {
921 tracing::error!("Error executing exposition text callback: {e}");
922 }
923 }
924 }
925 result
926 }
927
928 pub fn add_metric(
930 &self,
931 collector: Box<dyn prometheus::core::Collector>,
932 ) -> anyhow::Result<()> {
933 self.prometheus_registry
934 .write()
935 .unwrap()
936 .register(collector)
937 .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
938 }
939
940 pub fn add_metric_or_warn(&self, collector: Box<dyn prometheus::core::Collector>, name: &str) {
942 if let Err(e) = self.add_metric(collector) {
943 tracing::warn!(error = %e, metric = name, "Failed to register metric");
944 }
945 }
946
947 pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
949 self.prometheus_registry.read().unwrap()
950 }
951
952 pub fn has_metric_named(&self, metric_name: &str) -> bool {
954 self.prometheus_registry
955 .read()
956 .unwrap()
957 .gather()
958 .iter()
959 .any(|mf| mf.name() == metric_name)
960 }
961}
962
963impl Default for MetricsRegistry {
964 fn default() -> Self {
965 Self::new()
966 }
967}
968
969#[cfg(test)]
970mod test_helpers {
971 use super::prometheus_names::name_prefix;
972 use super::*;
973
974 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
977 where
978 F: FnMut(&str) -> bool,
979 {
980 input
981 .lines()
982 .filter(|line| predicate(line))
983 .map(|line| line.to_string())
984 .collect::<Vec<_>>()
985 }
986
987 pub fn extract_metrics(input: &str) -> Vec<String> {
990 filter_prometheus_lines(input, |line| {
991 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
992 && !line.starts_with("#")
993 && !line.trim().is_empty()
994 })
995 }
996
997 pub fn parse_prometheus_metric(
1009 line: &str,
1010 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
1011 if line.trim().is_empty() || line.starts_with('#') {
1012 return None;
1013 }
1014
1015 let parts: Vec<&str> = line.split_whitespace().collect();
1016 if parts.len() < 2 {
1017 return None;
1018 }
1019
1020 let metric_part = parts[0];
1021 let value: f64 = parts[1].parse().ok()?;
1022
1023 let (name, labels) = if metric_part.contains('{') {
1024 let brace_start = metric_part.find('{').unwrap();
1025 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
1026 let name = &metric_part[..brace_start];
1027 let labels_str = &metric_part[brace_start + 1..brace_end];
1028
1029 let mut labels = std::collections::HashMap::new();
1030 for pair in labels_str.split(',') {
1031 if let Some((k, v)) = pair.split_once('=') {
1032 let v = v.trim_matches('"');
1033 labels.insert(k.trim().to_string(), v.to_string());
1034 }
1035 }
1036 (name.to_string(), labels)
1037 } else {
1038 (metric_part.to_string(), std::collections::HashMap::new())
1039 };
1040
1041 Some((name, labels, value))
1042 }
1043
1044 pub fn inject_worker_id(expected: &str, wid: &str) -> String {
1050 let wid_label = format!(",worker_id=\"{}\"", wid);
1051 expected
1052 .lines()
1053 .map(|line| {
1054 if line.starts_with('#') || line.trim().is_empty() || !line.contains('{') {
1055 line.to_string()
1056 } else if let Some(le_pos) = line.find(",le=") {
1057 let mut s = line.to_string();
1060 s.insert_str(le_pos, &wid_label);
1061 s
1062 } else {
1063 line.replacen("}", &format!("{}}}", wid_label), 1)
1064 }
1065 })
1066 .collect::<Vec<_>>()
1067 .join("\n")
1068 }
1069}
1070
1071#[cfg(test)]
1072mod test_metricsregistry_units {
1073 use super::*;
1074
1075 #[test]
1076 fn test_build_component_metric_name_with_prefix() {
1077 let result = build_component_metric_name("requests");
1079 assert_eq!(result, "dynamo_component_requests");
1080
1081 let result = build_component_metric_name("counter");
1082 assert_eq!(result, "dynamo_component_counter");
1083 }
1084
1085 #[test]
1086 fn test_parse_prometheus_metric() {
1087 use super::test_helpers::parse_prometheus_metric;
1088 use std::collections::HashMap;
1089
1090 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
1092 let parsed = parse_prometheus_metric(line);
1093 assert!(parsed.is_some());
1094
1095 let (name, labels, value) = parsed.unwrap();
1096 assert_eq!(name, "http_requests_total");
1097
1098 let mut expected_labels = HashMap::new();
1099 expected_labels.insert("method".to_string(), "GET".to_string());
1100 expected_labels.insert("status".to_string(), "200".to_string());
1101 assert_eq!(labels, expected_labels);
1102
1103 assert_eq!(value, 1234.0);
1104
1105 let line = "cpu_usage 98.5";
1107 let parsed = parse_prometheus_metric(line);
1108 assert!(parsed.is_some());
1109
1110 let (name, labels, value) = parsed.unwrap();
1111 assert_eq!(name, "cpu_usage");
1112 assert!(labels.is_empty());
1113 assert_eq!(value, 98.5);
1114
1115 let line = "response_time{service=\"api\"} 0.123";
1117 let parsed = parse_prometheus_metric(line);
1118 assert!(parsed.is_some());
1119
1120 let (name, labels, value) = parsed.unwrap();
1121 assert_eq!(name, "response_time");
1122
1123 let mut expected_labels = HashMap::new();
1124 expected_labels.insert("service".to_string(), "api".to_string());
1125 assert_eq!(labels, expected_labels);
1126
1127 assert_eq!(value, 0.123);
1128
1129 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
1136 }
1137
1138 #[test]
1139 fn test_metrics_registry_entry_callbacks() {
1140 use crate::MetricsRegistry;
1141 use std::sync::atomic::{AtomicUsize, Ordering};
1142
1143 {
1145 let registry = MetricsRegistry::new();
1146 let counter = Arc::new(AtomicUsize::new(0));
1147
1148 for increment in [1, 10, 100] {
1150 let counter_clone = counter.clone();
1151 registry.add_update_callback(Arc::new(move || {
1152 counter_clone.fetch_add(increment, Ordering::SeqCst);
1153 Ok(())
1154 }));
1155 }
1156
1157 assert_eq!(counter.load(Ordering::SeqCst), 0);
1159
1160 let results = registry.execute_update_callbacks();
1162 assert_eq!(results.len(), 3);
1163 assert!(results.iter().all(|r| r.is_ok()));
1164 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = registry.execute_update_callbacks();
1168 assert_eq!(results.len(), 3);
1169 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = registry.clone();
1173 assert_eq!(cloned.execute_update_callbacks().len(), 3);
1174 assert_eq!(counter.load(Ordering::SeqCst), 333); registry.execute_update_callbacks();
1178 assert_eq!(counter.load(Ordering::SeqCst), 444); }
1180
1181 {
1183 let registry = MetricsRegistry::new();
1184 let counter = Arc::new(AtomicUsize::new(0));
1185
1186 let counter_clone = counter.clone();
1188 registry.add_update_callback(Arc::new(move || {
1189 counter_clone.fetch_add(1, Ordering::SeqCst);
1190 Ok(())
1191 }));
1192
1193 registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1195
1196 let counter_clone = counter.clone();
1198 registry.add_update_callback(Arc::new(move || {
1199 counter_clone.fetch_add(10, Ordering::SeqCst);
1200 Ok(())
1201 }));
1202
1203 let results = registry.execute_update_callbacks();
1205 assert_eq!(results.len(), 3);
1206 assert!(results[0].is_ok());
1207 assert!(results[1].is_err());
1208 assert!(results[2].is_ok());
1209
1210 assert_eq!(
1212 results[1].as_ref().unwrap_err().to_string(),
1213 "Simulated error"
1214 );
1215
1216 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = registry.execute_update_callbacks();
1221 assert!(results[1].is_err());
1222 assert_eq!(counter.load(Ordering::SeqCst), 22); }
1224
1225 {
1227 let registry = MetricsRegistry::new();
1228 let results = registry.execute_update_callbacks();
1229 assert_eq!(results.len(), 0);
1230 }
1231 }
1232}
1233
1234#[cfg(feature = "integration")]
1235#[cfg(test)]
1236mod test_metricsregistry_prefixes {
1237 use super::*;
1238 use crate::distributed::distributed_test_utils::create_test_drt_async;
1239 use prometheus::core::Collector;
1240
1241 #[tokio::test]
1242 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1243 let drt = create_test_drt_async().await;
1244
1245 const DRT_NAME: &str = "";
1246 const NAMESPACE_NAME: &str = "ns901";
1247 const COMPONENT_NAME: &str = "comp901";
1248 const ENDPOINT_NAME: &str = "ep901";
1249 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1250 let component = namespace.component(COMPONENT_NAME).unwrap();
1251 let endpoint = component.endpoint(ENDPOINT_NAME);
1252
1253 assert_eq!(drt.basename(), DRT_NAME);
1255 assert_eq!(drt.parent_hierarchies().len(), 0);
1256 assert_eq!(namespace.basename(), NAMESPACE_NAME);
1260 assert_eq!(namespace.parent_hierarchies().len(), 1);
1261 assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1262 assert_eq!(component.basename(), COMPONENT_NAME);
1266 assert_eq!(component.parent_hierarchies().len(), 2);
1267 assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1268 assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1269 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1273 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1274 assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1275 assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1276 assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1277 assert!(
1281 namespace
1282 .parent_hierarchies()
1283 .iter()
1284 .any(|h| h.basename() == drt.basename())
1285 );
1286 assert!(
1287 component
1288 .parent_hierarchies()
1289 .iter()
1290 .any(|h| h.basename() == namespace.basename())
1291 );
1292 assert!(
1293 endpoint
1294 .parent_hierarchies()
1295 .iter()
1296 .any(|h| h.basename() == component.basename())
1297 );
1298
1299 assert_eq!(drt.parent_hierarchies().len(), 0);
1301 assert_eq!(namespace.parent_hierarchies().len(), 1);
1302 assert_eq!(component.parent_hierarchies().len(), 2);
1303 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1304
1305 let invalid_namespace = drt.namespace("@@123").unwrap();
1309 let result =
1310 invalid_namespace
1311 .metrics()
1312 .create_counter("test_counter", "A test counter", &[]);
1313 assert!(result.is_ok());
1314 if let Ok(counter) = &result {
1315 let desc = counter.desc();
1317 let namespace_label = desc[0]
1318 .const_label_pairs
1319 .iter()
1320 .find(|l| l.name() == "dynamo_namespace")
1321 .expect("Should have dynamo_namespace label");
1322 assert_eq!(namespace_label.value(), "_123");
1323 }
1324
1325 let valid_namespace = drt.namespace("ns567").unwrap();
1327 assert!(
1328 valid_namespace
1329 .metrics()
1330 .create_counter("test_counter", "A test counter", &[])
1331 .is_ok()
1332 );
1333 }
1334
1335 #[tokio::test]
1336 async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
1337 let drt = create_test_drt_async().await;
1341 let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
1342 let component = namespace.component("comp_expfmt_ep_only").unwrap();
1343 let endpoint = component.endpoint("ep_expfmt_ep_only");
1344
1345 let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
1346 let callback: PrometheusExpositionFormatCallback =
1347 Arc::new(move || Ok(metric_line.to_string()));
1348
1349 endpoint
1350 .get_metrics_registry()
1351 .add_expfmt_callback(callback);
1352
1353 let output = drt.metrics().prometheus_expfmt().unwrap();
1354 let occurrences = output
1355 .lines()
1356 .filter(|line| line == &metric_line.trim_end_matches('\n'))
1357 .count();
1358
1359 assert_eq!(
1360 occurrences, 1,
1361 "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
1362 occurrences, output
1363 );
1364 }
1365
1366 #[tokio::test]
1367 async fn test_recursive_namespace() {
1368 let drt = create_test_drt_async().await;
1370
1371 let ns1 = drt.namespace("ns1").unwrap();
1373 let ns2 = ns1.namespace("ns2").unwrap();
1374 let ns3 = ns2.namespace("ns3").unwrap();
1375
1376 let component = ns3.component("test-component").unwrap();
1378
1379 assert_eq!(ns1.basename(), "ns1");
1381 assert_eq!(ns1.parent_hierarchies().len(), 1);
1382 assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1383 assert_eq!(ns2.basename(), "ns2");
1386 assert_eq!(ns2.parent_hierarchies().len(), 2);
1387 assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1388 assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1389 assert_eq!(ns3.basename(), "ns3");
1392 assert_eq!(ns3.parent_hierarchies().len(), 3);
1393 assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1394 assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1395 assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1396 assert_eq!(component.basename(), "test-component");
1399 assert_eq!(component.parent_hierarchies().len(), 4);
1400 assert_eq!(component.parent_hierarchies()[0].basename(), "");
1401 assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1402 assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1403 assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1404 println!("✓ Chained namespace test passed - all prefixes correct");
1407 }
1408}
1409
1410#[cfg(feature = "integration")]
1411#[cfg(test)]
1412mod test_metricsregistry_prometheus_fmt_outputs {
1413 use super::prometheus_names::name_prefix;
1414 use super::*;
1415 use crate::distributed::distributed_test_utils::create_test_drt_async;
1416 use prometheus::Counter;
1417 use std::sync::Arc;
1418
1419 #[tokio::test]
1420 async fn test_prometheusfactory_using_metrics_registry_trait() {
1421 let drt = create_test_drt_async().await;
1423
1424 let namespace_name = "ns345";
1426
1427 let namespace = drt.namespace(namespace_name).unwrap();
1428 let component = namespace.component("comp345").unwrap();
1429 let endpoint = component.endpoint("ep345");
1430
1431 let counter = endpoint
1433 .metrics()
1434 .create_counter("testcounter", "A test counter", &[])
1435 .unwrap();
1436 counter.inc_by(123.456789);
1437 let epsilon = 0.01;
1438 assert!((counter.get() - 123.456789).abs() < epsilon);
1439
1440 let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1441 println!("Endpoint output:");
1442 println!("{}", endpoint_output_raw);
1443
1444 let wid = format!("{:x}", drt.connection_id());
1447 use super::test_helpers::inject_worker_id;
1448
1449 let expected_endpoint_output = inject_worker_id(
1450 r#"# HELP dynamo_component_testcounter A test counter
1451# TYPE dynamo_component_testcounter counter
1452dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#,
1453 &wid,
1454 );
1455
1456 assert_eq!(
1457 endpoint_output_raw.trim_end_matches('\n'),
1458 expected_endpoint_output.trim_end_matches('\n'),
1459 "\n=== ENDPOINT COMPARISON FAILED ===\n\
1460 Actual:\n{}\n\
1461 Expected:\n{}\n\
1462 ==============================",
1463 endpoint_output_raw,
1464 expected_endpoint_output
1465 );
1466
1467 let gauge = component
1469 .metrics()
1470 .create_gauge("testgauge", "A test gauge", &[])
1471 .unwrap();
1472 gauge.set(50000.0);
1473 assert_eq!(gauge.get(), 50000.0);
1474
1475 let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1477 println!("Component output:");
1478 println!("{}", component_output_raw);
1479
1480 let expected_component_output = inject_worker_id(
1481 r#"# HELP dynamo_component_testcounter A test counter
1482# TYPE dynamo_component_testcounter counter
1483dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1484# HELP dynamo_component_testgauge A test gauge
1485# TYPE dynamo_component_testgauge gauge
1486dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#,
1487 &wid,
1488 );
1489
1490 assert_eq!(
1491 component_output_raw.trim_end_matches('\n'),
1492 expected_component_output.trim_end_matches('\n'),
1493 "\n=== COMPONENT COMPARISON FAILED ===\n\
1494 Actual:\n{}\n\
1495 Expected:\n{}\n\
1496 ==============================",
1497 component_output_raw,
1498 expected_component_output
1499 );
1500
1501 let intcounter = namespace
1502 .metrics()
1503 .create_intcounter("testintcounter", "A test int counter", &[])
1504 .unwrap();
1505 intcounter.inc_by(12345);
1506 assert_eq!(intcounter.get(), 12345);
1507
1508 let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1510 println!("Namespace output:");
1511 println!("{}", namespace_output_raw);
1512
1513 let expected_namespace_output = inject_worker_id(
1514 r#"# HELP dynamo_component_testcounter A test counter
1515# TYPE dynamo_component_testcounter counter
1516dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1517# HELP dynamo_component_testgauge A test gauge
1518# TYPE dynamo_component_testgauge gauge
1519dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1520# HELP dynamo_component_testintcounter A test int counter
1521# TYPE dynamo_component_testintcounter counter
1522dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#,
1523 &wid,
1524 );
1525
1526 assert_eq!(
1527 namespace_output_raw.trim_end_matches('\n'),
1528 expected_namespace_output.trim_end_matches('\n'),
1529 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1530 Actual:\n{}\n\
1531 Expected:\n{}\n\
1532 ==============================",
1533 namespace_output_raw,
1534 expected_namespace_output
1535 );
1536
1537 let intgauge = namespace
1539 .metrics()
1540 .create_intgauge("testintgauge", "A test int gauge", &[])
1541 .unwrap();
1542 intgauge.set(42);
1543 assert_eq!(intgauge.get(), 42);
1544
1545 let intgaugevec = namespace
1547 .metrics()
1548 .create_intgaugevec(
1549 "testintgaugevec",
1550 "A test int gauge vector",
1551 &["instance", "status"],
1552 &[("service", "api")],
1553 )
1554 .unwrap();
1555 intgaugevec
1556 .with_label_values(&["server1", "active"])
1557 .set(10);
1558 intgaugevec
1559 .with_label_values(&["server2", "inactive"])
1560 .set(0);
1561
1562 let countervec = endpoint
1564 .metrics()
1565 .create_countervec(
1566 "testcountervec",
1567 "A test counter vector",
1568 &["method", "status"],
1569 &[("service", "api")],
1570 )
1571 .unwrap();
1572 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1573 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1574
1575 let histogram = component
1577 .metrics()
1578 .create_histogram("testhistogram", "A test histogram", &[], None)
1579 .unwrap();
1580 histogram.observe(1.0);
1581 histogram.observe(2.5);
1582 histogram.observe(4.0);
1583
1584 let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1586 println!("DRT output:");
1587 println!("{}", drt_output_raw);
1588
1589 let expected_drt_output_without_uptime = inject_worker_id(
1592 r#"# HELP dynamo_component_testcounter A test counter
1593# TYPE dynamo_component_testcounter counter
1594dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1595# HELP dynamo_component_testcountervec A test counter vector
1596# TYPE dynamo_component_testcountervec counter
1597dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1598dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1599# HELP dynamo_component_testgauge A test gauge
1600# TYPE dynamo_component_testgauge gauge
1601dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1602# HELP dynamo_component_testhistogram A test histogram
1603# TYPE dynamo_component_testhistogram histogram
1604dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1605dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1606dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1607dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1608dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1609dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1610dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1611dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1612dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1613dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1614dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1615dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1616dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1617dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1618# HELP dynamo_component_testintcounter A test int counter
1619# TYPE dynamo_component_testintcounter counter
1620dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1621# HELP dynamo_component_testintgauge A test int gauge
1622# TYPE dynamo_component_testintgauge gauge
1623dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1624# HELP dynamo_component_testintgaugevec A test int gauge vector
1625# TYPE dynamo_component_testintgaugevec gauge
1626dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1627dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#,
1628 &wid,
1629 );
1630
1631 let mut non_uptime_lines = Vec::new();
1635 let mut saw_uptime_value = false;
1636 for line in drt_output_raw.trim_end_matches('\n').lines() {
1637 if line.starts_with("dynamo_component_uptime_seconds") && !line.starts_with('#') {
1638 let val_str = line.split_whitespace().last().unwrap();
1639 val_str.parse::<f64>().expect("uptime should be a float");
1640 saw_uptime_value = true;
1641 } else if line.starts_with("# HELP dynamo_component_uptime_seconds")
1642 || line.starts_with("# TYPE dynamo_component_uptime_seconds")
1643 {
1644 } else {
1646 non_uptime_lines.push(line);
1647 }
1648 }
1649 assert!(
1650 saw_uptime_value,
1651 "uptime_seconds metric should be present in initial scrape"
1652 );
1653
1654 let actual_without_uptime = non_uptime_lines.join("\n");
1655 assert_eq!(
1656 actual_without_uptime,
1657 expected_drt_output_without_uptime.trim_end_matches('\n'),
1658 "\n=== DRT COMPARISON FAILED (excluding uptime) ===\n\
1659 Expected:\n{}\n\
1660 Actual:\n{}\n\
1661 ==============================",
1662 expected_drt_output_without_uptime,
1663 actual_without_uptime
1664 );
1665
1666 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1668 let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
1669 let uptime_line = drt_output_after
1670 .lines()
1671 .find(|l| l.starts_with("dynamo_component_uptime_seconds") && !l.starts_with('#'))
1672 .expect("uptime_seconds metric should be present after sleep");
1673 let uptime_after: f64 = uptime_line
1674 .split_whitespace()
1675 .last()
1676 .unwrap()
1677 .parse()
1678 .expect("uptime should be a float");
1679 assert!(
1680 uptime_after > 0.0,
1681 "uptime_seconds should be > 0 after 10ms sleep, got {}",
1682 uptime_after
1683 );
1684
1685 println!("✓ All Prometheus format outputs verified successfully!");
1686 }
1687
1688 #[test]
1689 fn test_refactored_filter_functions() {
1690 let test_input = r#"# HELP dynamo_component_requests Total requests
1692# TYPE dynamo_component_requests counter
1693dynamo_component_requests 42
1694# HELP dynamo_component_latency Response latency
1695# TYPE dynamo_component_latency histogram
1696dynamo_component_latency_bucket{le="0.1"} 10
1697dynamo_component_latency_bucket{le="0.5"} 25
1698dynamo_component_errors_total 5"#;
1699
1700 let metrics_only = super::test_helpers::extract_metrics(test_input);
1702 assert_eq!(metrics_only.len(), 4); assert!(
1704 metrics_only
1705 .iter()
1706 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1707 );
1708
1709 println!("✓ All refactored filter functions work correctly!");
1710 }
1711
1712 #[tokio::test]
1713 async fn test_same_metric_name_different_endpoints() {
1714 let drt = create_test_drt_async().await;
1718 let namespace = drt.namespace("ns_test").unwrap();
1719 let component = namespace.component("comp_test").unwrap();
1720
1721 let ep1 = component.endpoint("ep1");
1723 let ep2 = component.endpoint("ep2");
1724
1725 let counter1 = ep1
1726 .metrics()
1727 .create_counter("requests_total", "Total requests", &[])
1728 .unwrap();
1729 counter1.inc_by(100.0);
1730
1731 let counter2 = ep2
1732 .metrics()
1733 .create_counter("requests_total", "Total requests", &[])
1734 .unwrap();
1735 counter2.inc_by(200.0);
1736
1737 let output = component.metrics().prometheus_expfmt().unwrap();
1739
1740 let wid = format!("{:x}", drt.connection_id());
1741 use super::test_helpers::inject_worker_id;
1742
1743 let expected_output = inject_worker_id(
1744 r#"# HELP dynamo_component_requests_total Total requests
1745# TYPE dynamo_component_requests_total counter
1746dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
1747dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#,
1748 &wid,
1749 );
1750
1751 assert_eq!(
1752 output.trim_end_matches('\n'),
1753 expected_output.trim_end_matches('\n'),
1754 "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
1755 Actual:\n{}\n\
1756 Expected:\n{}\n\
1757 ==============================",
1758 output,
1759 expected_output
1760 );
1761
1762 println!("✓ Multi-registry prevents Prometheus collisions!");
1763 }
1764
1765 #[tokio::test]
1766 async fn test_duplicate_series_warning() {
1767 let drt = create_test_drt_async().await;
1770 let namespace = drt.namespace("ns_dup").unwrap();
1771 let component = namespace.component("comp_dup").unwrap();
1772
1773 let ep1 = component.endpoint("ep_same");
1775 let ep2 = component.endpoint("ep_same"); let counter1 = ep1
1778 .metrics()
1779 .create_counter("dup_metric", "Duplicate metric test", &[])
1780 .unwrap();
1781 counter1.inc_by(50.0);
1782
1783 let counter2 = ep2
1784 .metrics()
1785 .create_counter("dup_metric", "Duplicate metric test", &[])
1786 .unwrap();
1787 counter2.inc_by(75.0);
1788
1789 let output = component.metrics().prometheus_expfmt().unwrap();
1791
1792 let wid = format!("{:x}", drt.connection_id());
1793 use super::test_helpers::inject_worker_id;
1794
1795 let expected_output = inject_worker_id(
1796 r#"# HELP dynamo_component_dup_metric Duplicate metric test
1797# TYPE dynamo_component_dup_metric counter
1798dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#,
1799 &wid,
1800 );
1801
1802 assert_eq!(
1803 output.trim_end_matches('\n'),
1804 expected_output.trim_end_matches('\n'),
1805 "\n=== DEDUPLICATION COMPARISON FAILED ===\n\
1806 Actual:\n{}\n\
1807 Expected:\n{}\n\
1808 ==============================",
1809 output,
1810 expected_output
1811 );
1812
1813 println!("✓ Duplicate series detection and deduplication works!");
1814 }
1815}