1pub mod prometheus_names;
10
11use parking_lot::Mutex;
12use std::collections::HashSet;
13use std::sync::Arc;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22use prometheus_names::{
24 COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
25 nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
26};
27
28use crate::pipeline::{
30 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31 network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37pub const USE_AUTO_LABELS: bool = true;
40
41use prometheus::Encoder;
43
44fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47 let mut seen_keys = std::collections::HashSet::new();
48 for (key, _) in labels {
49 if !seen_keys.insert(*key) {
50 return Err(anyhow::anyhow!(
51 "Duplicate label key '{}' found in labels",
52 key
53 ));
54 }
55 }
56 Ok(())
57}
58
59pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
64 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
66 where
67 Self: Sized;
68
69 fn with_histogram_opts_and_buckets(
72 _opts: prometheus::HistogramOpts,
73 _buckets: Option<Vec<f64>>,
74 ) -> Result<Self, prometheus::Error>
75 where
76 Self: Sized,
77 {
78 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
79 }
80
81 fn with_opts_and_label_names(
84 _opts: prometheus::Opts,
85 _label_names: &[&str],
86 ) -> Result<Self, prometheus::Error>
87 where
88 Self: Sized,
89 {
90 panic!("with_opts_and_label_names is not implemented for this metric type");
91 }
92}
93
94impl PrometheusMetric for prometheus::Counter {
96 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
97 prometheus::Counter::with_opts(opts)
98 }
99}
100
101impl PrometheusMetric for prometheus::IntCounter {
102 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
103 prometheus::IntCounter::with_opts(opts)
104 }
105}
106
107impl PrometheusMetric for prometheus::Gauge {
108 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
109 prometheus::Gauge::with_opts(opts)
110 }
111}
112
113impl PrometheusMetric for prometheus::IntGauge {
114 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
115 prometheus::IntGauge::with_opts(opts)
116 }
117}
118
119impl PrometheusMetric for prometheus::GaugeVec {
120 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
121 Err(prometheus::Error::Msg(
122 "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
123 ))
124 }
125
126 fn with_opts_and_label_names(
127 opts: prometheus::Opts,
128 label_names: &[&str],
129 ) -> Result<Self, prometheus::Error> {
130 prometheus::GaugeVec::new(opts, label_names)
131 }
132}
133
134impl PrometheusMetric for prometheus::IntGaugeVec {
135 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
136 Err(prometheus::Error::Msg(
137 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
138 ))
139 }
140
141 fn with_opts_and_label_names(
142 opts: prometheus::Opts,
143 label_names: &[&str],
144 ) -> Result<Self, prometheus::Error> {
145 prometheus::IntGaugeVec::new(opts, label_names)
146 }
147}
148
149impl PrometheusMetric for prometheus::IntCounterVec {
150 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
151 Err(prometheus::Error::Msg(
152 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
153 ))
154 }
155
156 fn with_opts_and_label_names(
157 opts: prometheus::Opts,
158 label_names: &[&str],
159 ) -> Result<Self, prometheus::Error> {
160 prometheus::IntCounterVec::new(opts, label_names)
161 }
162}
163
164impl PrometheusMetric for prometheus::Histogram {
166 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
167 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
169 prometheus::Histogram::with_opts(histogram_opts)
170 }
171
172 fn with_histogram_opts_and_buckets(
173 mut opts: prometheus::HistogramOpts,
174 buckets: Option<Vec<f64>>,
175 ) -> Result<Self, prometheus::Error> {
176 if let Some(custom_buckets) = buckets {
177 opts = opts.buckets(custom_buckets);
178 }
179 prometheus::Histogram::with_opts(opts)
180 }
181}
182
183impl PrometheusMetric for prometheus::CounterVec {
185 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
186 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
188 }
189
190 fn with_opts_and_label_names(
191 opts: prometheus::Opts,
192 label_names: &[&str],
193 ) -> Result<Self, prometheus::Error> {
194 prometheus::CounterVec::new(opts, label_names)
195 }
196}
197
198pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
203 hierarchy: &H,
204 metric_name: &str,
205 metric_desc: &str,
206 labels: &[(&str, &str)],
207 buckets: Option<Vec<f64>>,
208 const_labels: Option<&[&str]>,
209) -> anyhow::Result<T> {
210 validate_no_duplicate_label_keys(labels)?;
212 let basename = hierarchy.basename();
215 let parent_hierarchies = hierarchy.parent_hierarchies();
216
217 let mut hierarchy_names: Vec<String> =
219 parent_hierarchies.iter().map(|p| p.basename()).collect();
220 hierarchy_names.push(basename.clone());
221
222 let metric_name = build_component_metric_name(metric_name);
223
224 let mut updated_labels: Vec<(String, String)> = Vec::new();
226
227 if USE_AUTO_LABELS {
228 for (key, _) in labels {
230 if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
231 return Err(anyhow::anyhow!(
232 "Label '{}' is automatically added by auto_label feature and cannot be manually set",
233 key
234 ));
235 }
236 }
237
238 if hierarchy_names.len() > 1 {
240 let namespace = &hierarchy_names[1];
241 if !namespace.is_empty() {
242 let valid_namespace = sanitize_prometheus_label(namespace)?;
243 if !valid_namespace.is_empty() {
244 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
245 }
246 }
247 }
248 if hierarchy_names.len() > 2 {
249 let component = &hierarchy_names[2];
250 if !component.is_empty() {
251 let valid_component = sanitize_prometheus_label(component)?;
252 if !valid_component.is_empty() {
253 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
254 }
255 }
256 }
257 if hierarchy_names.len() > 3 {
258 let endpoint = &hierarchy_names[3];
259 if !endpoint.is_empty() {
260 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
261 if !valid_endpoint.is_empty() {
262 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
263 }
264 }
265 }
266 }
267
268 updated_labels.extend(
270 labels
271 .iter()
272 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
273 );
274 let prometheus_metric = if std::any::TypeId::of::<T>()
278 == std::any::TypeId::of::<prometheus::CounterVec>()
279 {
280 if buckets.is_some() {
283 return Err(anyhow::anyhow!(
284 "buckets parameter is not valid for CounterVec"
285 ));
286 }
287 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
288 for (key, value) in &updated_labels {
289 opts = opts.const_label(key.clone(), value.clone());
290 }
291 let label_names = const_labels
292 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
293 T::with_opts_and_label_names(opts, label_names)?
294 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
295 if buckets.is_some() {
298 return Err(anyhow::anyhow!(
299 "buckets parameter is not valid for GaugeVec"
300 ));
301 }
302 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
303 for (key, value) in &updated_labels {
304 opts = opts.const_label(key.clone(), value.clone());
305 }
306 let label_names = const_labels
307 .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
308 T::with_opts_and_label_names(opts, label_names)?
309 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
310 if const_labels.is_some() {
313 return Err(anyhow::anyhow!(
314 "const_labels parameter is not valid for Histogram"
315 ));
316 }
317 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
318 for (key, value) in &updated_labels {
319 opts = opts.const_label(key.clone(), value.clone());
320 }
321 T::with_histogram_opts_and_buckets(opts, buckets)?
322 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
323 if buckets.is_some() {
326 return Err(anyhow::anyhow!(
327 "buckets parameter is not valid for IntCounterVec"
328 ));
329 }
330 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
331 for (key, value) in &updated_labels {
332 opts = opts.const_label(key.clone(), value.clone());
333 }
334 let label_names = const_labels
335 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
336 T::with_opts_and_label_names(opts, label_names)?
337 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
338 if buckets.is_some() {
341 return Err(anyhow::anyhow!(
342 "buckets parameter is not valid for IntGaugeVec"
343 ));
344 }
345 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
346 for (key, value) in &updated_labels {
347 opts = opts.const_label(key.clone(), value.clone());
348 }
349 let label_names = const_labels
350 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
351 T::with_opts_and_label_names(opts, label_names)?
352 } else {
353 if buckets.is_some() {
356 return Err(anyhow::anyhow!(
357 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
358 ));
359 }
360 if const_labels.is_some() {
361 return Err(anyhow::anyhow!(
362 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
363 ));
364 }
365 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
366 for (key, value) in &updated_labels {
367 opts = opts.const_label(key.clone(), value.clone());
368 }
369 T::with_opts(opts)?
370 };
371
372 for parent in parent_hierarchies {
375 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
376 parent.get_metrics_registry().add_metric(collector)?;
377 }
378
379 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
381 hierarchy.get_metrics_registry().add_metric(collector)?;
382
383 Ok(prometheus_metric)
384}
385
386pub struct Metrics<H: MetricsHierarchy> {
389 hierarchy: H,
390}
391
392impl<H: MetricsHierarchy> Metrics<H> {
393 pub fn new(hierarchy: H) -> Self {
394 Self { hierarchy }
395 }
396
397 pub fn create_counter(
420 &self,
421 name: &str,
422 description: &str,
423 labels: &[(&str, &str)],
424 ) -> anyhow::Result<prometheus::Counter> {
425 create_metric(&self.hierarchy, name, description, labels, None, None)
426 }
427
428 pub fn create_countervec(
430 &self,
431 name: &str,
432 description: &str,
433 const_labels: &[&str],
434 const_label_values: &[(&str, &str)],
435 ) -> anyhow::Result<prometheus::CounterVec> {
436 create_metric(
437 &self.hierarchy,
438 name,
439 description,
440 const_label_values,
441 None,
442 Some(const_labels),
443 )
444 }
445
446 pub fn create_gauge(
448 &self,
449 name: &str,
450 description: &str,
451 labels: &[(&str, &str)],
452 ) -> anyhow::Result<prometheus::Gauge> {
453 create_metric(&self.hierarchy, name, description, labels, None, None)
454 }
455
456 pub fn create_gaugevec(
458 &self,
459 name: &str,
460 description: &str,
461 const_labels: &[&str],
462 const_label_values: &[(&str, &str)],
463 ) -> anyhow::Result<prometheus::GaugeVec> {
464 create_metric(
465 &self.hierarchy,
466 name,
467 description,
468 const_label_values,
469 None,
470 Some(const_labels),
471 )
472 }
473
474 pub fn create_histogram(
476 &self,
477 name: &str,
478 description: &str,
479 labels: &[(&str, &str)],
480 buckets: Option<Vec<f64>>,
481 ) -> anyhow::Result<prometheus::Histogram> {
482 create_metric(&self.hierarchy, name, description, labels, buckets, None)
483 }
484
485 pub fn create_intcounter(
487 &self,
488 name: &str,
489 description: &str,
490 labels: &[(&str, &str)],
491 ) -> anyhow::Result<prometheus::IntCounter> {
492 create_metric(&self.hierarchy, name, description, labels, None, None)
493 }
494
495 pub fn create_intcountervec(
497 &self,
498 name: &str,
499 description: &str,
500 const_labels: &[&str],
501 const_label_values: &[(&str, &str)],
502 ) -> anyhow::Result<prometheus::IntCounterVec> {
503 create_metric(
504 &self.hierarchy,
505 name,
506 description,
507 const_label_values,
508 None,
509 Some(const_labels),
510 )
511 }
512
513 pub fn create_intgauge(
515 &self,
516 name: &str,
517 description: &str,
518 labels: &[(&str, &str)],
519 ) -> anyhow::Result<prometheus::IntGauge> {
520 create_metric(&self.hierarchy, name, description, labels, None, None)
521 }
522
523 pub fn create_intgaugevec(
525 &self,
526 name: &str,
527 description: &str,
528 const_labels: &[&str],
529 const_label_values: &[(&str, &str)],
530 ) -> anyhow::Result<prometheus::IntGaugeVec> {
531 create_metric(
532 &self.hierarchy,
533 name,
534 description,
535 const_label_values,
536 None,
537 Some(const_labels),
538 )
539 }
540
541 pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
543 let callback_results = self
545 .hierarchy
546 .get_metrics_registry()
547 .execute_update_callbacks();
548
549 for result in callback_results {
551 if let Err(e) = result {
552 tracing::error!("Error executing metrics callback: {}", e);
553 }
554 }
555
556 let prometheus_registry = self
558 .hierarchy
559 .get_metrics_registry()
560 .get_prometheus_registry();
561
562 let metric_families = prometheus_registry.gather();
564 let encoder = prometheus::TextEncoder::new();
565 let mut buffer = Vec::new();
566 encoder.encode(&metric_families, &mut buffer)?;
567 let mut result = String::from_utf8(buffer)?;
568
569 let expfmt = self
571 .hierarchy
572 .get_metrics_registry()
573 .execute_expfmt_callbacks();
574 if !expfmt.is_empty() {
575 if !result.ends_with('\n') {
576 result.push('\n');
577 }
578 result.push_str(&expfmt);
579 }
580
581 Ok(result)
582 }
583}
584
585use crate::traits::DistributedRuntimeProvider;
589
590pub trait MetricsHierarchy: Send + Sync {
591 fn basename(&self) -> String;
597
598 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
602
603 fn get_metrics_registry(&self) -> &MetricsRegistry;
605
606 fn metrics(&self) -> Metrics<&Self>
613 where
614 Self: Sized,
615 {
616 Metrics::new(self)
617 }
618}
619
620impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
622 fn basename(&self) -> String {
623 (**self).basename()
624 }
625
626 fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
627 (**self).parent_hierarchies()
628 }
629
630 fn get_metrics_registry(&self) -> &MetricsRegistry {
631 (**self).get_metrics_registry()
632 }
633}
634
635pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
644
645pub type PrometheusExpositionFormatCallback =
647 Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
648
649pub struct MetricsRegistry {
651 pub prometheus_registry: std::sync::RwLock<prometheus::Registry>,
653
654 pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
657
658 pub prometheus_expfmt_callbacks:
661 Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
662}
663
664impl std::fmt::Debug for MetricsRegistry {
665 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
666 f.debug_struct("MetricsRegistry")
667 .field("prometheus_registry", &"<RwLock<Registry>>")
668 .field(
669 "prometheus_update_callbacks",
670 &format!(
671 "<RwLock<Vec<Callback>>> with {} callbacks",
672 self.prometheus_update_callbacks.read().unwrap().len()
673 ),
674 )
675 .field(
676 "prometheus_expfmt_callbacks",
677 &format!(
678 "<RwLock<Vec<Callback>>> with {} callbacks",
679 self.prometheus_expfmt_callbacks.read().unwrap().len()
680 ),
681 )
682 .finish()
683 }
684}
685
686impl Clone for MetricsRegistry {
687 fn clone(&self) -> Self {
688 Self {
689 prometheus_registry: std::sync::RwLock::new(
690 self.prometheus_registry.read().unwrap().clone(),
691 ),
692 prometheus_update_callbacks: Arc::clone(&self.prometheus_update_callbacks),
695 prometheus_expfmt_callbacks: Arc::clone(&self.prometheus_expfmt_callbacks),
696 }
697 }
698}
699
700impl MetricsRegistry {
701 pub fn new() -> Self {
703 Self {
704 prometheus_registry: std::sync::RwLock::new(prometheus::Registry::new()),
705 prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
706 prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
707 }
708 }
709
710 pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
712 self.prometheus_update_callbacks
713 .write()
714 .unwrap()
715 .push(callback);
716 }
717
718 pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
720 self.prometheus_expfmt_callbacks
721 .write()
722 .unwrap()
723 .push(callback);
724 }
725
726 pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
728 self.prometheus_update_callbacks
729 .read()
730 .unwrap()
731 .iter()
732 .map(|callback| callback())
733 .collect()
734 }
735
736 pub fn execute_expfmt_callbacks(&self) -> String {
738 let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
739 let mut result = String::new();
740 for callback in callbacks.iter() {
741 match callback() {
742 Ok(text) => {
743 if !text.is_empty() {
744 if !result.is_empty() && !result.ends_with('\n') {
745 result.push('\n');
746 }
747 result.push_str(&text);
748 }
749 }
750 Err(e) => {
751 tracing::error!("Error executing exposition text callback: {}", e);
752 }
753 }
754 }
755 result
756 }
757
758 pub fn add_metric(
760 &self,
761 collector: Box<dyn prometheus::core::Collector>,
762 ) -> anyhow::Result<()> {
763 self.prometheus_registry
764 .write()
765 .unwrap()
766 .register(collector)
767 .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
768 }
769
770 pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
772 self.prometheus_registry.read().unwrap()
773 }
774
775 pub fn has_metric_named(&self, metric_name: &str) -> bool {
777 self.prometheus_registry
778 .read()
779 .unwrap()
780 .gather()
781 .iter()
782 .any(|mf| mf.name() == metric_name)
783 }
784}
785
786impl Default for MetricsRegistry {
787 fn default() -> Self {
788 Self::new()
789 }
790}
791
792#[cfg(test)]
793mod test_helpers {
794 use super::prometheus_names::name_prefix;
795 use super::prometheus_names::{nats_client, nats_service};
796 use super::*;
797
798 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
801 where
802 F: FnMut(&str) -> bool,
803 {
804 input
805 .lines()
806 .filter(|line| predicate(line))
807 .map(|line| line.to_string())
808 .collect::<Vec<_>>()
809 }
810
811 pub fn remove_nats_lines(input: &str) -> Vec<String> {
813 filter_prometheus_lines(input, |line| {
814 !line.contains(&format!(
815 "{}_{}",
816 name_prefix::COMPONENT,
817 nats_client::PREFIX
818 )) && !line.contains(&format!(
819 "{}_{}",
820 name_prefix::COMPONENT,
821 nats_service::PREFIX
822 )) && !line.trim().is_empty()
823 })
824 }
825
826 pub fn extract_nats_lines(input: &str) -> Vec<String> {
828 filter_prometheus_lines(input, |line| {
829 line.contains(&format!(
830 "{}_{}",
831 name_prefix::COMPONENT,
832 nats_client::PREFIX
833 )) || line.contains(&format!(
834 "{}_{}",
835 name_prefix::COMPONENT,
836 nats_service::PREFIX
837 ))
838 })
839 }
840
841 pub fn extract_metrics(input: &str) -> Vec<String> {
844 filter_prometheus_lines(input, |line| {
845 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
846 && !line.starts_with("#")
847 && !line.trim().is_empty()
848 })
849 }
850
851 pub fn parse_prometheus_metric(
863 line: &str,
864 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
865 if line.trim().is_empty() || line.starts_with('#') {
866 return None;
867 }
868
869 let parts: Vec<&str> = line.split_whitespace().collect();
870 if parts.len() < 2 {
871 return None;
872 }
873
874 let metric_part = parts[0];
875 let value: f64 = parts[1].parse().ok()?;
876
877 let (name, labels) = if metric_part.contains('{') {
878 let brace_start = metric_part.find('{').unwrap();
879 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
880 let name = &metric_part[..brace_start];
881 let labels_str = &metric_part[brace_start + 1..brace_end];
882
883 let mut labels = std::collections::HashMap::new();
884 for pair in labels_str.split(',') {
885 if let Some((k, v)) = pair.split_once('=') {
886 let v = v.trim_matches('"');
887 labels.insert(k.trim().to_string(), v.to_string());
888 }
889 }
890 (name.to_string(), labels)
891 } else {
892 (metric_part.to_string(), std::collections::HashMap::new())
893 };
894
895 Some((name, labels, value))
896 }
897}
898
899#[cfg(test)]
900mod test_metricsregistry_units {
901 use super::*;
902
903 #[test]
904 fn test_build_component_metric_name_with_prefix() {
905 let result = build_component_metric_name("requests");
907 assert_eq!(result, "dynamo_component_requests");
908
909 let result = build_component_metric_name("counter");
910 assert_eq!(result, "dynamo_component_counter");
911 }
912
913 #[test]
914 fn test_parse_prometheus_metric() {
915 use super::test_helpers::parse_prometheus_metric;
916 use std::collections::HashMap;
917
918 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
920 let parsed = parse_prometheus_metric(line);
921 assert!(parsed.is_some());
922
923 let (name, labels, value) = parsed.unwrap();
924 assert_eq!(name, "http_requests_total");
925
926 let mut expected_labels = HashMap::new();
927 expected_labels.insert("method".to_string(), "GET".to_string());
928 expected_labels.insert("status".to_string(), "200".to_string());
929 assert_eq!(labels, expected_labels);
930
931 assert_eq!(value, 1234.0);
932
933 let line = "cpu_usage 98.5";
935 let parsed = parse_prometheus_metric(line);
936 assert!(parsed.is_some());
937
938 let (name, labels, value) = parsed.unwrap();
939 assert_eq!(name, "cpu_usage");
940 assert!(labels.is_empty());
941 assert_eq!(value, 98.5);
942
943 let line = "response_time{service=\"api\"} 0.123";
945 let parsed = parse_prometheus_metric(line);
946 assert!(parsed.is_some());
947
948 let (name, labels, value) = parsed.unwrap();
949 assert_eq!(name, "response_time");
950
951 let mut expected_labels = HashMap::new();
952 expected_labels.insert("service".to_string(), "api".to_string());
953 assert_eq!(labels, expected_labels);
954
955 assert_eq!(value, 0.123);
956
957 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
964 }
965
966 #[test]
967 fn test_metrics_registry_entry_callbacks() {
968 use crate::MetricsRegistry;
969 use std::sync::atomic::{AtomicUsize, Ordering};
970
971 {
973 let registry = MetricsRegistry::new();
974 let counter = Arc::new(AtomicUsize::new(0));
975
976 for increment in [1, 10, 100] {
978 let counter_clone = counter.clone();
979 registry.add_update_callback(Arc::new(move || {
980 counter_clone.fetch_add(increment, Ordering::SeqCst);
981 Ok(())
982 }));
983 }
984
985 assert_eq!(counter.load(Ordering::SeqCst), 0);
987
988 let results = registry.execute_update_callbacks();
990 assert_eq!(results.len(), 3);
991 assert!(results.iter().all(|r| r.is_ok()));
992 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = registry.execute_update_callbacks();
996 assert_eq!(results.len(), 3);
997 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = registry.clone();
1001 assert_eq!(cloned.execute_update_callbacks().len(), 3);
1002 assert_eq!(counter.load(Ordering::SeqCst), 333); registry.execute_update_callbacks();
1006 assert_eq!(counter.load(Ordering::SeqCst), 444); }
1008
1009 {
1011 let registry = MetricsRegistry::new();
1012 let counter = Arc::new(AtomicUsize::new(0));
1013
1014 let counter_clone = counter.clone();
1016 registry.add_update_callback(Arc::new(move || {
1017 counter_clone.fetch_add(1, Ordering::SeqCst);
1018 Ok(())
1019 }));
1020
1021 registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
1023
1024 let counter_clone = counter.clone();
1026 registry.add_update_callback(Arc::new(move || {
1027 counter_clone.fetch_add(10, Ordering::SeqCst);
1028 Ok(())
1029 }));
1030
1031 let results = registry.execute_update_callbacks();
1033 assert_eq!(results.len(), 3);
1034 assert!(results[0].is_ok());
1035 assert!(results[1].is_err());
1036 assert!(results[2].is_ok());
1037
1038 assert_eq!(
1040 results[1].as_ref().unwrap_err().to_string(),
1041 "Simulated error"
1042 );
1043
1044 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = registry.execute_update_callbacks();
1049 assert!(results[1].is_err());
1050 assert_eq!(counter.load(Ordering::SeqCst), 22); }
1052
1053 {
1055 let registry = MetricsRegistry::new();
1056 let results = registry.execute_update_callbacks();
1057 assert_eq!(results.len(), 0);
1058 }
1059 }
1060}
1061
1062#[cfg(feature = "integration")]
1063#[cfg(test)]
1064mod test_metricsregistry_prefixes {
1065 use super::*;
1066 use crate::distributed::distributed_test_utils::create_test_drt_async;
1067 use prometheus::core::Collector;
1068
1069 #[tokio::test]
1070 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
1071 let drt = create_test_drt_async().await;
1072
1073 const DRT_NAME: &str = "";
1074 const NAMESPACE_NAME: &str = "ns901";
1075 const COMPONENT_NAME: &str = "comp901";
1076 const ENDPOINT_NAME: &str = "ep901";
1077 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
1078 let component = namespace.component(COMPONENT_NAME).unwrap();
1079 let endpoint = component.endpoint(ENDPOINT_NAME);
1080
1081 assert_eq!(drt.basename(), DRT_NAME);
1083 assert_eq!(drt.parent_hierarchies().len(), 0);
1084 assert_eq!(namespace.basename(), NAMESPACE_NAME);
1088 assert_eq!(namespace.parent_hierarchies().len(), 1);
1089 assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
1090 assert_eq!(component.basename(), COMPONENT_NAME);
1094 assert_eq!(component.parent_hierarchies().len(), 2);
1095 assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
1096 assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1097 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
1101 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1102 assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
1103 assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
1104 assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
1105 assert!(
1109 namespace
1110 .parent_hierarchies()
1111 .iter()
1112 .any(|h| h.basename() == drt.basename())
1113 );
1114 assert!(
1115 component
1116 .parent_hierarchies()
1117 .iter()
1118 .any(|h| h.basename() == namespace.basename())
1119 );
1120 assert!(
1121 endpoint
1122 .parent_hierarchies()
1123 .iter()
1124 .any(|h| h.basename() == component.basename())
1125 );
1126
1127 assert_eq!(drt.parent_hierarchies().len(), 0);
1129 assert_eq!(namespace.parent_hierarchies().len(), 1);
1130 assert_eq!(component.parent_hierarchies().len(), 2);
1131 assert_eq!(endpoint.parent_hierarchies().len(), 3);
1132
1133 let invalid_namespace = drt.namespace("@@123").unwrap();
1137 let result =
1138 invalid_namespace
1139 .metrics()
1140 .create_counter("test_counter", "A test counter", &[]);
1141 assert!(result.is_ok());
1142 if let Ok(counter) = &result {
1143 let desc = counter.desc();
1145 let namespace_label = desc[0]
1146 .const_label_pairs
1147 .iter()
1148 .find(|l| l.name() == "dynamo_namespace")
1149 .expect("Should have dynamo_namespace label");
1150 assert_eq!(namespace_label.value(), "_123");
1151 }
1152
1153 let valid_namespace = drt.namespace("ns567").unwrap();
1155 assert!(
1156 valid_namespace
1157 .metrics()
1158 .create_counter("test_counter", "A test counter", &[])
1159 .is_ok()
1160 );
1161 }
1162
1163 #[tokio::test]
1164 async fn test_recursive_namespace() {
1165 let drt = create_test_drt_async().await;
1167
1168 let ns1 = drt.namespace("ns1").unwrap();
1170 let ns2 = ns1.namespace("ns2").unwrap();
1171 let ns3 = ns2.namespace("ns3").unwrap();
1172
1173 let component = ns3.component("test-component").unwrap();
1175
1176 assert_eq!(ns1.basename(), "ns1");
1178 assert_eq!(ns1.parent_hierarchies().len(), 1);
1179 assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
1180 assert_eq!(ns2.basename(), "ns2");
1183 assert_eq!(ns2.parent_hierarchies().len(), 2);
1184 assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
1185 assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
1186 assert_eq!(ns3.basename(), "ns3");
1189 assert_eq!(ns3.parent_hierarchies().len(), 3);
1190 assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
1191 assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
1192 assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
1193 assert_eq!(component.basename(), "test-component");
1196 assert_eq!(component.parent_hierarchies().len(), 4);
1197 assert_eq!(component.parent_hierarchies()[0].basename(), "");
1198 assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
1199 assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
1200 assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
1201 println!("✓ Chained namespace test passed - all prefixes correct");
1204 }
1205}
1206
1207#[cfg(feature = "integration")]
1208#[cfg(test)]
1209mod test_metricsregistry_prometheus_fmt_outputs {
1210 use super::prometheus_names::name_prefix;
1211 use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1212 use super::prometheus_names::{nats_client, nats_service};
1213 use super::*;
1214 use crate::distributed::distributed_test_utils::create_test_drt_async;
1215 use prometheus::Counter;
1216 use std::sync::Arc;
1217
1218 #[tokio::test]
1219 async fn test_prometheusfactory_using_metrics_registry_trait() {
1220 let drt = create_test_drt_async().await;
1222
1223 let namespace_name = "ns345";
1225
1226 let namespace = drt.namespace(namespace_name).unwrap();
1227 let component = namespace.component("comp345").unwrap();
1228 let endpoint = component.endpoint("ep345");
1229
1230 let counter = endpoint
1232 .metrics()
1233 .create_counter("testcounter", "A test counter", &[])
1234 .unwrap();
1235 counter.inc_by(123.456789);
1236 let epsilon = 0.01;
1237 assert!((counter.get() - 123.456789).abs() < epsilon);
1238
1239 let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
1240 println!("Endpoint output:");
1241 println!("{}", endpoint_output_raw);
1242
1243 let endpoint_output =
1245 super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
1246
1247 let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1248# TYPE dynamo_component_testcounter counter
1249dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1250
1251 assert_eq!(
1252 endpoint_output, expected_endpoint_output,
1253 "\n=== ENDPOINT COMPARISON FAILED ===\n\
1254 Expected:\n{}\n\
1255 Actual:\n{}\n\
1256 ==============================",
1257 expected_endpoint_output, endpoint_output
1258 );
1259
1260 let gauge = component
1262 .metrics()
1263 .create_gauge("testgauge", "A test gauge", &[])
1264 .unwrap();
1265 gauge.set(50000.0);
1266 assert_eq!(gauge.get(), 50000.0);
1267
1268 let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
1270 println!("Component output:");
1271 println!("{}", component_output_raw);
1272
1273 let component_output =
1275 super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
1276
1277 let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1278# TYPE dynamo_component_testcounter counter
1279dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1280# HELP dynamo_component_testgauge A test gauge
1281# TYPE dynamo_component_testgauge gauge
1282dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1283
1284 assert_eq!(
1285 component_output, expected_component_output,
1286 "\n=== COMPONENT COMPARISON FAILED ===\n\
1287 Expected:\n{}\n\
1288 Actual:\n{}\n\
1289 ==============================",
1290 expected_component_output, component_output
1291 );
1292
1293 let intcounter = namespace
1294 .metrics()
1295 .create_intcounter("testintcounter", "A test int counter", &[])
1296 .unwrap();
1297 intcounter.inc_by(12345);
1298 assert_eq!(intcounter.get(), 12345);
1299
1300 let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
1302 println!("Namespace output:");
1303 println!("{}", namespace_output_raw);
1304
1305 let namespace_output =
1307 super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
1308
1309 let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1310# TYPE dynamo_component_testcounter counter
1311dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1312# HELP dynamo_component_testgauge A test gauge
1313# TYPE dynamo_component_testgauge gauge
1314dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1315# HELP dynamo_component_testintcounter A test int counter
1316# TYPE dynamo_component_testintcounter counter
1317dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1318
1319 assert_eq!(
1320 namespace_output, expected_namespace_output,
1321 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1322 Expected:\n{}\n\
1323 Actual:\n{}\n\
1324 ==============================",
1325 expected_namespace_output, namespace_output
1326 );
1327
1328 let intgauge = namespace
1330 .metrics()
1331 .create_intgauge("testintgauge", "A test int gauge", &[])
1332 .unwrap();
1333 intgauge.set(42);
1334 assert_eq!(intgauge.get(), 42);
1335
1336 let intgaugevec = namespace
1338 .metrics()
1339 .create_intgaugevec(
1340 "testintgaugevec",
1341 "A test int gauge vector",
1342 &["instance", "status"],
1343 &[("service", "api")],
1344 )
1345 .unwrap();
1346 intgaugevec
1347 .with_label_values(&["server1", "active"])
1348 .set(10);
1349 intgaugevec
1350 .with_label_values(&["server2", "inactive"])
1351 .set(0);
1352
1353 let countervec = endpoint
1355 .metrics()
1356 .create_countervec(
1357 "testcountervec",
1358 "A test counter vector",
1359 &["method", "status"],
1360 &[("service", "api")],
1361 )
1362 .unwrap();
1363 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1364 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1365
1366 let histogram = component
1368 .metrics()
1369 .create_histogram("testhistogram", "A test histogram", &[], None)
1370 .unwrap();
1371 histogram.observe(1.0);
1372 histogram.observe(2.5);
1373 histogram.observe(4.0);
1374
1375 let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
1377 println!("DRT output:");
1378 println!("{}", drt_output_raw);
1379
1380 let filtered_drt_output =
1382 super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
1383
1384 let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1385# TYPE dynamo_component_testcounter counter
1386dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1387# HELP dynamo_component_testcountervec A test counter vector
1388# TYPE dynamo_component_testcountervec counter
1389dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1390dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1391# HELP dynamo_component_testgauge A test gauge
1392# TYPE dynamo_component_testgauge gauge
1393dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1394# HELP dynamo_component_testhistogram A test histogram
1395# TYPE dynamo_component_testhistogram histogram
1396dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1397dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1398dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1399dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1400dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1401dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1402dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1403dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1404dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1405dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1406dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1407dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1408dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1409dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1410# HELP dynamo_component_testintcounter A test int counter
1411# TYPE dynamo_component_testintcounter counter
1412dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1413# HELP dynamo_component_testintgauge A test int gauge
1414# TYPE dynamo_component_testintgauge gauge
1415dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1416# HELP dynamo_component_testintgaugevec A test int gauge vector
1417# TYPE dynamo_component_testintgaugevec gauge
1418dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1419dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1420# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1421# TYPE dynamo_component_uptime_seconds gauge
1422dynamo_component_uptime_seconds 0"#.to_string();
1423
1424 assert_eq!(
1425 filtered_drt_output, expected_drt_output,
1426 "\n=== DRT COMPARISON FAILED ===\n\
1427 Expected:\n{}\n\
1428 Actual (filtered):\n{}\n\
1429 ==============================",
1430 expected_drt_output, filtered_drt_output
1431 );
1432
1433 println!("✓ All Prometheus format outputs verified successfully!");
1434 }
1435
1436 #[test]
1437 fn test_refactored_filter_functions() {
1438 let test_input = r#"# HELP dynamo_component_requests Total requests
1440# TYPE dynamo_component_requests counter
1441dynamo_component_requests 42
1442# HELP dynamo_component_nats_client_connection_state Connection state
1443# TYPE dynamo_component_nats_client_connection_state gauge
1444dynamo_component_nats_client_connection_state 1
1445# HELP dynamo_component_latency Response latency
1446# TYPE dynamo_component_latency histogram
1447dynamo_component_latency_bucket{le="0.1"} 10
1448dynamo_component_latency_bucket{le="0.5"} 25
1449dynamo_component_nats_service_requests_total 100
1450dynamo_component_nats_service_errors_total 5"#;
1451
1452 let filtered_out = super::test_helpers::remove_nats_lines(test_input);
1454 assert_eq!(filtered_out.len(), 7); assert!(!filtered_out.iter().any(|line| line.contains("nats")));
1456
1457 let filtered_only = super::test_helpers::extract_nats_lines(test_input);
1459 assert_eq!(filtered_only.len(), 5); assert!(filtered_only.iter().all(|line| line.contains("nats")));
1461
1462 let metrics_only = super::test_helpers::extract_metrics(test_input);
1464 assert_eq!(metrics_only.len(), 6); assert!(
1466 metrics_only
1467 .iter()
1468 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1469 );
1470
1471 println!("✓ All refactored filter functions work correctly!");
1472 }
1473}
1474
1475#[cfg(feature = "integration")]
1476#[cfg(test)]
1477mod test_metricsregistry_nats {
1478 use super::prometheus_names::name_prefix;
1479 use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1480 use super::prometheus_names::{nats_client, nats_service};
1481 use super::*;
1482 use crate::distributed::distributed_test_utils::create_test_drt_async;
1483 use crate::pipeline::PushRouter;
1484 use crate::{DistributedRuntime, Runtime};
1485 use tokio::time::{Duration, sleep};
1486 #[tokio::test]
1487 async fn test_drt_nats_metrics() {
1488 let drt = create_test_drt_async().await;
1490
1491 let drt_output = drt.metrics().prometheus_expfmt().unwrap();
1493 println!("DRT output with NATS metrics:");
1494 println!("{}", drt_output);
1495
1496 let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
1498
1499 assert!(
1501 !drt_nats_metrics.is_empty(),
1502 "NATS client metrics should be present"
1503 );
1504
1505 let drt_nats_metric_lines =
1508 super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
1509 let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
1510 .iter()
1511 .map(|line| {
1512 let without_labels = line.split('{').next().unwrap_or(line);
1513 without_labels.split(' ').next().unwrap_or(without_labels)
1515 })
1516 .collect();
1517
1518 let expect_drt_nats_metrics_sorted = {
1519 let mut temp = DRT_NATS_METRICS
1520 .iter()
1521 .map(|metric| build_component_metric_name(metric))
1522 .collect::<Vec<_>>();
1523 temp.sort();
1524 temp
1525 };
1526
1527 println!(
1529 "actual_drt_nats_metrics_sorted: {:?}",
1530 actual_drt_nats_metrics_sorted
1531 );
1532 println!(
1533 "expect_drt_nats_metrics_sorted: {:?}",
1534 expect_drt_nats_metrics_sorted
1535 );
1536
1537 assert_eq!(
1539 actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
1540 "DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1541 );
1542
1543 println!("✓ DistributedRuntime NATS metrics integration test passed!");
1544 }
1545
1546 #[tokio::test]
1547 async fn test_nats_metric_names() {
1548 let drt = create_test_drt_async().await;
1553
1554 let namespace = drt.namespace("ns789").unwrap();
1556 let mut component = namespace.component("comp789").unwrap();
1557
1558 component.add_stats_service().await.unwrap();
1560
1561 let component_nats_metrics = super::test_helpers::extract_nats_lines(
1564 &component.metrics().prometheus_expfmt().unwrap(),
1565 );
1566 println!(
1567 "Component NATS metrics count: {}",
1568 component_nats_metrics.len()
1569 );
1570
1571 assert!(
1573 !component_nats_metrics.is_empty(),
1574 "NATS client metrics should be present"
1575 );
1576
1577 let component_metrics =
1579 super::test_helpers::extract_metrics(&component.metrics().prometheus_expfmt().unwrap());
1580 let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
1581 .iter()
1582 .map(|line| {
1583 let without_labels = line.split('{').next().unwrap_or(line);
1584 without_labels.split(' ').next().unwrap_or(without_labels)
1586 })
1587 .collect();
1588
1589 let expect_component_nats_metrics_sorted = {
1590 let mut temp = COMPONENT_NATS_METRICS
1591 .iter()
1592 .map(|metric| build_component_metric_name(metric))
1593 .collect::<Vec<_>>();
1594 temp.sort();
1595 temp
1596 };
1597
1598 println!(
1600 "actual_component_nats_metrics_sorted: {:?}",
1601 actual_component_nats_metrics_sorted
1602 );
1603 println!(
1604 "expect_component_nats_metrics_sorted: {:?}",
1605 expect_component_nats_metrics_sorted
1606 );
1607
1608 assert_eq!(
1610 actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
1611 "COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1612 );
1613
1614 let drt_output = drt.metrics().prometheus_expfmt().unwrap();
1616 let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
1617 let drt_and_component_nats_metrics =
1618 super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
1619 println!(
1620 "DRT and component NATS metrics count: {}",
1621 drt_and_component_nats_metrics.len()
1622 );
1623
1624 assert_eq!(
1626 drt_and_component_nats_metrics.len(),
1627 DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
1628 "DRT at this point should have both the DRT and component NATS metrics"
1629 );
1630
1631 println!("✓ Component NATS metrics integration test passed!");
1633 }
1634
1635 #[tokio::test]
1640 async fn test_nats_metrics_values() -> anyhow::Result<()> {
1641 struct MessageHandler {}
1642 impl MessageHandler {
1643 fn new() -> std::sync::Arc<Self> {
1644 std::sync::Arc::new(Self {})
1645 }
1646 }
1647
1648 #[async_trait]
1649 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
1650 async fn generate(
1651 &self,
1652 input: SingleIn<String>,
1653 ) -> Result<ManyOut<Annotated<String>>, Error> {
1654 let (data, ctx) = input.into_parts();
1655 let response = data.to_string();
1656 let stream = stream::iter(vec![Annotated::from_data(response)]);
1657 Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
1658 }
1659 }
1660
1661 println!("\n=== Initializing DistributedRuntime ===");
1662 let runtime = Runtime::from_current()?;
1663 let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
1664 let namespace = drt.namespace("ns123").unwrap();
1665 let mut component = namespace.component("comp123").unwrap();
1666 let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
1667
1668 let _backend_handle = tokio::spawn(async move {
1669 component.add_stats_service().await.unwrap();
1670 let endpoint = component
1671 .endpoint("echo")
1672 .endpoint_builder()
1673 .handler(ingress);
1674 endpoint.start().await.unwrap();
1675 });
1676
1677 sleep(Duration::from_millis(500)).await;
1678 println!("✓ Launched endpoint service in background successfully");
1679
1680 let drt_output = drt.metrics().prometheus_expfmt().unwrap();
1681 let parsed_metrics: Vec<_> = drt_output
1682 .lines()
1683 .filter_map(super::test_helpers::parse_prometheus_metric)
1684 .collect();
1685
1686 println!("=== Initial DRT metrics output ===");
1687 println!("{}", drt_output);
1688
1689 println!("\n=== Checking Initial Metric Values ===");
1690
1691 let initial_expected_metric_values = [
1692 (
1694 build_component_metric_name(nats_client::CONNECTION_STATE),
1695 1.0,
1696 1.0,
1697 ), (
1699 build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1700 1.0,
1701 1.0,
1702 ), (
1704 build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1705 800.0,
1706 4000.0,
1707 ), (
1709 build_component_metric_name(nats_client::IN_MESSAGES),
1710 0.0,
1711 5.0,
1712 ), (
1714 build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1715 1500.0,
1716 5000.0,
1717 ), (
1719 build_component_metric_name(nats_client::OUT_MESSAGES),
1720 0.0,
1721 5.0,
1722 ), (
1725 build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1726 0.0,
1727 0.0,
1728 ), (
1730 build_component_metric_name(nats_service::ERRORS_TOTAL),
1731 0.0,
1732 0.0,
1733 ), (
1735 build_component_metric_name(nats_service::REQUESTS_TOTAL),
1736 0.0,
1737 0.0,
1738 ), (
1740 build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1741 0.0,
1742 0.0,
1743 ), (
1745 build_component_metric_name(nats_service::ACTIVE_SERVICES),
1746 0.0,
1747 2.0,
1748 ), (
1750 build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1751 0.0,
1752 2.0,
1753 ), ];
1755
1756 for (metric_name, min_value, max_value) in &initial_expected_metric_values {
1757 let actual_value = parsed_metrics
1758 .iter()
1759 .find(|(name, _, _)| name == metric_name)
1760 .map(|(_, _, value)| *value)
1761 .unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
1762
1763 assert!(
1764 actual_value >= *min_value && actual_value <= *max_value,
1765 "Initial metric {} should be between {} and {}, but got {}",
1766 metric_name,
1767 min_value,
1768 max_value,
1769 actual_value
1770 );
1771 }
1772
1773 println!("\n=== Client Runtime to hit the endpoint ===");
1774 let client_runtime = Runtime::from_current()?;
1775 let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
1776 let namespace = client_distributed.namespace("ns123")?;
1777 let component = namespace.component("comp123")?;
1778 let client = component.endpoint("echo").client().await?;
1779
1780 client.wait_for_instances().await?;
1781 println!("✓ Connected to endpoint, waiting for instances...");
1782
1783 let router =
1784 PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
1785 .await?;
1786
1787 for i in 0..10 {
1788 let msg = i.to_string().repeat(2000); let mut stream = router.random(msg.clone().into()).await?;
1790 while let Some(resp) = stream.next().await {
1791 if let Some(data) = &resp.data {
1793 let is_same = data == &msg;
1794 println!(
1795 "Response {}: {} bytes, matches original: {}",
1796 i,
1797 data.len(),
1798 is_same
1799 );
1800 }
1801 }
1802 }
1803 println!("✓ Sent messages and received responses successfully");
1804
1805 println!("\n=== Waiting 500ms for metrics to update ===");
1806 sleep(Duration::from_millis(500)).await;
1807 println!("✓ Wait complete, getting final metrics...");
1808
1809 let final_drt_output = drt.metrics().prometheus_expfmt().unwrap();
1810 println!("\n=== Final Prometheus DRT output ===");
1811 println!("{}", final_drt_output);
1812
1813 let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
1814 println!("\n=== Filtered NATS metrics from final DRT output ===");
1815 for line in &final_drt_nats_output {
1816 println!("{}", line);
1817 }
1818
1819 let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
1820 .iter()
1821 .filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
1822 .collect();
1823
1824 let post_expected_metric_values = [
1825 (
1827 build_component_metric_name(nats_client::CONNECTION_STATE),
1828 1.0,
1829 1.0,
1830 ), (
1832 build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1833 1.0,
1834 1.0,
1835 ), (
1837 build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1838 20000.0,
1839 32000.0,
1840 ), (
1842 build_component_metric_name(nats_client::IN_MESSAGES),
1843 8.0,
1844 20.0,
1845 ), (
1847 build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1848 2500.0,
1849 8000.0,
1850 ), (
1852 build_component_metric_name(nats_client::OUT_MESSAGES),
1853 8.0,
1854 20.0,
1855 ), (
1858 build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1859 0.0,
1860 1.0,
1861 ), (
1863 build_component_metric_name(nats_service::ERRORS_TOTAL),
1864 0.0,
1865 0.0,
1866 ), (
1868 build_component_metric_name(nats_service::REQUESTS_TOTAL),
1869 0.0,
1870 10.0,
1871 ), (
1873 build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1874 0.0,
1875 5.0,
1876 ), (
1878 build_component_metric_name(nats_service::ACTIVE_SERVICES),
1879 0.0,
1880 2.0,
1881 ), (
1883 build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1884 0.0,
1885 2.0,
1886 ), (
1889 build_component_metric_name(work_handler::REQUESTS_TOTAL),
1890 10.0,
1891 10.0,
1892 ), (
1894 build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
1895 21000.0,
1896 26000.0,
1897 ), (
1899 build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
1900 18000.0,
1901 23000.0,
1902 ), (
1904 build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
1905 0.0,
1906 1.0,
1907 ), (
1910 format!(
1911 "{}_count",
1912 build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1913 ),
1914 10.0,
1915 10.0,
1916 ), (
1918 format!(
1919 "{}_sum",
1920 build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1921 ),
1922 0.0001,
1923 1.0,
1924 ), ];
1926
1927 println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
1928 for (metric_name, min_value, max_value) in &post_expected_metric_values {
1929 let actual_value = final_parsed_metrics
1930 .iter()
1931 .find(|(name, _, _)| name == metric_name)
1932 .map(|(_, _, value)| *value)
1933 .unwrap_or_else(|| {
1934 panic!(
1935 "Could not find expected post-activity metric: {}",
1936 metric_name
1937 )
1938 });
1939
1940 assert!(
1941 actual_value >= *min_value && actual_value <= *max_value,
1942 "Post-activity metric {} should be between {} and {}, but got {}",
1943 metric_name,
1944 min_value,
1945 max_value,
1946 actual_value
1947 );
1948 println!(
1949 "✓ {}: {} (range: {} to {})",
1950 metric_name, actual_value, min_value, max_value
1951 );
1952 }
1953
1954 println!("✓ All NATS and component metrics parsed successfully!");
1955 println!("✓ Byte metrics verified to be >= 100 bytes!");
1956 println!("✓ Post-activity metrics verified with higher thresholds!");
1957 println!("✓ Work handler metrics reflect increased activity!");
1958
1959 Ok(())
1960 }
1961}