1pub mod prometheus_names;
10
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22use prometheus_names::{
24 COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
25 nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
26};
27
28use crate::pipeline::{
30 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31 network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37pub const USE_AUTO_LABELS: bool = true;
40
41use prometheus::Encoder;
43
44fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47 let mut seen_keys = std::collections::HashSet::new();
48 for (key, _) in labels {
49 if !seen_keys.insert(*key) {
50 return Err(anyhow::anyhow!(
51 "Duplicate label key '{}' found in labels",
52 key
53 ));
54 }
55 }
56 Ok(())
57}
58
59pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
61 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
63 where
64 Self: Sized;
65
66 fn with_histogram_opts_and_buckets(
69 _opts: prometheus::HistogramOpts,
70 _buckets: Option<Vec<f64>>,
71 ) -> Result<Self, prometheus::Error>
72 where
73 Self: Sized,
74 {
75 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
76 }
77
78 fn with_opts_and_label_names(
81 _opts: prometheus::Opts,
82 _label_names: &[&str],
83 ) -> Result<Self, prometheus::Error>
84 where
85 Self: Sized,
86 {
87 panic!("with_opts_and_label_names is not implemented for this metric type");
88 }
89}
90
91impl PrometheusMetric for prometheus::Counter {
93 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
94 prometheus::Counter::with_opts(opts)
95 }
96}
97
98impl PrometheusMetric for prometheus::IntCounter {
99 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
100 prometheus::IntCounter::with_opts(opts)
101 }
102}
103
104impl PrometheusMetric for prometheus::Gauge {
105 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
106 prometheus::Gauge::with_opts(opts)
107 }
108}
109
110impl PrometheusMetric for prometheus::IntGauge {
111 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
112 prometheus::IntGauge::with_opts(opts)
113 }
114}
115
116impl PrometheusMetric for prometheus::IntGaugeVec {
117 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
118 Err(prometheus::Error::Msg(
119 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
120 ))
121 }
122
123 fn with_opts_and_label_names(
124 opts: prometheus::Opts,
125 label_names: &[&str],
126 ) -> Result<Self, prometheus::Error> {
127 prometheus::IntGaugeVec::new(opts, label_names)
128 }
129}
130
131impl PrometheusMetric for prometheus::IntCounterVec {
132 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
133 Err(prometheus::Error::Msg(
134 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
135 ))
136 }
137
138 fn with_opts_and_label_names(
139 opts: prometheus::Opts,
140 label_names: &[&str],
141 ) -> Result<Self, prometheus::Error> {
142 prometheus::IntCounterVec::new(opts, label_names)
143 }
144}
145
146impl PrometheusMetric for prometheus::Histogram {
148 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
149 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
151 prometheus::Histogram::with_opts(histogram_opts)
152 }
153
154 fn with_histogram_opts_and_buckets(
155 mut opts: prometheus::HistogramOpts,
156 buckets: Option<Vec<f64>>,
157 ) -> Result<Self, prometheus::Error> {
158 if let Some(custom_buckets) = buckets {
159 opts = opts.buckets(custom_buckets);
160 }
161 prometheus::Histogram::with_opts(opts)
162 }
163}
164
165impl PrometheusMetric for prometheus::CounterVec {
167 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
168 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
170 }
171
172 fn with_opts_and_label_names(
173 opts: prometheus::Opts,
174 label_names: &[&str],
175 ) -> Result<Self, prometheus::Error> {
176 prometheus::CounterVec::new(opts, label_names)
177 }
178}
179
180fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
182 registry: &R,
183 metric_name: &str,
184 metric_desc: &str,
185 labels: &[(&str, &str)],
186 buckets: Option<Vec<f64>>,
187 const_labels: Option<&[&str]>,
188) -> anyhow::Result<T> {
189 validate_no_duplicate_label_keys(labels)?;
191 let basename = registry.basename();
194 let parent_hierarchy = registry.parent_hierarchy();
195
196 let hierarchy = [parent_hierarchy.clone(), vec![basename.clone()]].concat();
198
199 let metric_name = build_component_metric_name(metric_name);
200
201 let mut updated_labels: Vec<(String, String)> = Vec::new();
203
204 if USE_AUTO_LABELS {
205 for (key, _) in labels {
207 if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
208 return Err(anyhow::anyhow!(
209 "Label '{}' is automatically added by auto_label feature and cannot be manually set",
210 key
211 ));
212 }
213 }
214
215 if hierarchy.len() > 1 {
217 let namespace = &hierarchy[1];
218 if !namespace.is_empty() {
219 let valid_namespace = sanitize_prometheus_label(namespace)?;
220 if !valid_namespace.is_empty() {
221 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
222 }
223 }
224 }
225 if hierarchy.len() > 2 {
226 let component = &hierarchy[2];
227 if !component.is_empty() {
228 let valid_component = sanitize_prometheus_label(component)?;
229 if !valid_component.is_empty() {
230 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
231 }
232 }
233 }
234 if hierarchy.len() > 3 {
235 let endpoint = &hierarchy[3];
236 if !endpoint.is_empty() {
237 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
238 if !valid_endpoint.is_empty() {
239 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
240 }
241 }
242 }
243 }
244
245 updated_labels.extend(
247 labels
248 .iter()
249 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
250 );
251 let prometheus_metric = if std::any::TypeId::of::<T>()
255 == std::any::TypeId::of::<prometheus::Histogram>()
256 {
257 if const_labels.is_some() {
260 return Err(anyhow::anyhow!(
261 "const_labels parameter is not valid for Histogram"
262 ));
263 }
264 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
265 for (key, value) in &updated_labels {
266 opts = opts.const_label(key.clone(), value.clone());
267 }
268 T::with_histogram_opts_and_buckets(opts, buckets)?
269 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::CounterVec>() {
270 if buckets.is_some() {
273 return Err(anyhow::anyhow!(
274 "buckets parameter is not valid for CounterVec"
275 ));
276 }
277 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
278 for (key, value) in &updated_labels {
279 opts = opts.const_label(key.clone(), value.clone());
280 }
281 let label_names = const_labels
282 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
283 T::with_opts_and_label_names(opts, label_names)?
284 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
285 if buckets.is_some() {
288 return Err(anyhow::anyhow!(
289 "buckets parameter is not valid for IntGaugeVec"
290 ));
291 }
292 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
293 for (key, value) in &updated_labels {
294 opts = opts.const_label(key.clone(), value.clone());
295 }
296 let label_names = const_labels
297 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
298 T::with_opts_and_label_names(opts, label_names)?
299 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
300 if buckets.is_some() {
303 return Err(anyhow::anyhow!(
304 "buckets parameter is not valid for IntCounterVec"
305 ));
306 }
307 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
308 for (key, value) in &updated_labels {
309 opts = opts.const_label(key.clone(), value.clone());
310 }
311 let label_names = const_labels
312 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
313 T::with_opts_and_label_names(opts, label_names)?
314 } else {
315 if buckets.is_some() {
318 return Err(anyhow::anyhow!(
319 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
320 ));
321 }
322 if const_labels.is_some() {
323 return Err(anyhow::anyhow!(
324 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
325 ));
326 }
327 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
328 for (key, value) in &updated_labels {
329 opts = opts.const_label(key.clone(), value.clone());
330 }
331 T::with_opts(opts)?
332 };
333
334 let mut current_hierarchy = String::new();
344 for name in &hierarchy {
345 if !current_hierarchy.is_empty() && !name.is_empty() {
346 current_hierarchy.push('_');
347 }
348 current_hierarchy.push_str(name);
349
350 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
352 registry
353 .drt()
354 .add_prometheus_metric(¤t_hierarchy, collector)?;
355 }
356
357 Ok(prometheus_metric)
358}
359
360use crate::traits::DistributedRuntimeProvider;
364
365pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
366 fn basename(&self) -> String;
368
369 fn hierarchy(&self) -> String {
373 [self.parent_hierarchy(), vec![self.basename()]]
374 .concat()
375 .join("_")
376 .trim_start_matches('_')
377 .to_string()
378 }
379
380 fn parent_hierarchy(&self) -> Vec<String>;
382
383 fn create_counter(
402 &self,
403 name: &str,
404 description: &str,
405 labels: &[(&str, &str)],
406 ) -> anyhow::Result<prometheus::Counter> {
407 create_metric(self, name, description, labels, None, None)
408 }
409
410 fn create_countervec(
412 &self,
413 name: &str,
414 description: &str,
415 const_labels: &[&str],
416 const_label_values: &[(&str, &str)],
417 ) -> anyhow::Result<prometheus::CounterVec> {
418 create_metric(
419 self,
420 name,
421 description,
422 const_label_values,
423 None,
424 Some(const_labels),
425 )
426 }
427
428 fn create_gauge(
430 &self,
431 name: &str,
432 description: &str,
433 labels: &[(&str, &str)],
434 ) -> anyhow::Result<prometheus::Gauge> {
435 create_metric(self, name, description, labels, None, None)
436 }
437
438 fn create_histogram(
440 &self,
441 name: &str,
442 description: &str,
443 labels: &[(&str, &str)],
444 buckets: Option<Vec<f64>>,
445 ) -> anyhow::Result<prometheus::Histogram> {
446 create_metric(self, name, description, labels, buckets, None)
447 }
448
449 fn create_intcounter(
451 &self,
452 name: &str,
453 description: &str,
454 labels: &[(&str, &str)],
455 ) -> anyhow::Result<prometheus::IntCounter> {
456 create_metric(self, name, description, labels, None, None)
457 }
458
459 fn create_intcountervec(
461 &self,
462 name: &str,
463 description: &str,
464 const_labels: &[&str],
465 const_label_values: &[(&str, &str)],
466 ) -> anyhow::Result<prometheus::IntCounterVec> {
467 create_metric(
468 self,
469 name,
470 description,
471 const_label_values,
472 None,
473 Some(const_labels),
474 )
475 }
476
477 fn create_intgauge(
479 &self,
480 name: &str,
481 description: &str,
482 labels: &[(&str, &str)],
483 ) -> anyhow::Result<prometheus::IntGauge> {
484 create_metric(self, name, description, labels, None, None)
485 }
486
487 fn create_intgaugevec(
489 &self,
490 name: &str,
491 description: &str,
492 const_labels: &[&str],
493 const_label_values: &[(&str, &str)],
494 ) -> anyhow::Result<prometheus::IntGaugeVec> {
495 create_metric(
496 self,
497 name,
498 description,
499 const_label_values,
500 None,
501 Some(const_labels),
502 )
503 }
504
505 fn prometheus_metrics_fmt(&self) -> anyhow::Result<String> {
507 let callback_results = self.drt().execute_metrics_callbacks(&self.hierarchy());
509
510 for result in callback_results {
512 if let Err(e) = result {
513 tracing::error!("Error executing metrics callback: {}", e);
514 }
515 }
516
517 let prometheus_registry = {
519 let mut registry_entry = self.drt().hierarchy_to_metricsregistry.write().unwrap();
520 registry_entry
521 .entry(self.hierarchy())
522 .or_default()
523 .prometheus_registry
524 .clone()
525 };
526 let metric_families = prometheus_registry.gather();
527 let encoder = prometheus::TextEncoder::new();
528 let mut buffer = Vec::new();
529 encoder.encode(&metric_families, &mut buffer)?;
530 Ok(String::from_utf8(buffer)?)
531 }
532}
533
534#[cfg(test)]
535mod test_helpers {
536 use super::prometheus_names::name_prefix;
537 use super::prometheus_names::{nats_client, nats_service};
538 use super::*;
539
540 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
543 where
544 F: FnMut(&str) -> bool,
545 {
546 input
547 .lines()
548 .filter(|line| predicate(line))
549 .map(|line| line.to_string())
550 .collect::<Vec<_>>()
551 }
552
553 pub fn remove_nats_lines(input: &str) -> Vec<String> {
555 filter_prometheus_lines(input, |line| {
556 !line.contains(&format!(
557 "{}_{}",
558 name_prefix::COMPONENT,
559 nats_client::PREFIX
560 )) && !line.contains(&format!(
561 "{}_{}",
562 name_prefix::COMPONENT,
563 nats_service::PREFIX
564 )) && !line.trim().is_empty()
565 })
566 }
567
568 pub fn extract_nats_lines(input: &str) -> Vec<String> {
570 filter_prometheus_lines(input, |line| {
571 line.contains(&format!(
572 "{}_{}",
573 name_prefix::COMPONENT,
574 nats_client::PREFIX
575 )) || line.contains(&format!(
576 "{}_{}",
577 name_prefix::COMPONENT,
578 nats_service::PREFIX
579 ))
580 })
581 }
582
583 pub fn extract_metrics(input: &str) -> Vec<String> {
586 filter_prometheus_lines(input, |line| {
587 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
588 && !line.starts_with("#")
589 && !line.trim().is_empty()
590 })
591 }
592
593 pub fn parse_prometheus_metric(
605 line: &str,
606 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
607 if line.trim().is_empty() || line.starts_with('#') {
608 return None;
609 }
610
611 let parts: Vec<&str> = line.split_whitespace().collect();
612 if parts.len() < 2 {
613 return None;
614 }
615
616 let metric_part = parts[0];
617 let value: f64 = parts[1].parse().ok()?;
618
619 let (name, labels) = if metric_part.contains('{') {
620 let brace_start = metric_part.find('{').unwrap();
621 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
622 let name = &metric_part[..brace_start];
623 let labels_str = &metric_part[brace_start + 1..brace_end];
624
625 let mut labels = std::collections::HashMap::new();
626 for pair in labels_str.split(',') {
627 if let Some((k, v)) = pair.split_once('=') {
628 let v = v.trim_matches('"');
629 labels.insert(k.trim().to_string(), v.to_string());
630 }
631 }
632 (name.to_string(), labels)
633 } else {
634 (metric_part.to_string(), std::collections::HashMap::new())
635 };
636
637 Some((name, labels, value))
638 }
639}
640
641#[cfg(test)]
642mod test_metricsregistry_units {
643 use super::*;
644
645 #[test]
646 fn test_build_component_metric_name_with_prefix() {
647 let result = build_component_metric_name("requests");
649 assert_eq!(result, "dynamo_component_requests");
650
651 let result = build_component_metric_name("counter");
652 assert_eq!(result, "dynamo_component_counter");
653 }
654
655 #[test]
656 fn test_parse_prometheus_metric() {
657 use super::test_helpers::parse_prometheus_metric;
658 use std::collections::HashMap;
659
660 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
662 let parsed = parse_prometheus_metric(line);
663 assert!(parsed.is_some());
664
665 let (name, labels, value) = parsed.unwrap();
666 assert_eq!(name, "http_requests_total");
667
668 let mut expected_labels = HashMap::new();
669 expected_labels.insert("method".to_string(), "GET".to_string());
670 expected_labels.insert("status".to_string(), "200".to_string());
671 assert_eq!(labels, expected_labels);
672
673 assert_eq!(value, 1234.0);
674
675 let line = "cpu_usage 98.5";
677 let parsed = parse_prometheus_metric(line);
678 assert!(parsed.is_some());
679
680 let (name, labels, value) = parsed.unwrap();
681 assert_eq!(name, "cpu_usage");
682 assert!(labels.is_empty());
683 assert_eq!(value, 98.5);
684
685 let line = "response_time{service=\"api\"} 0.123";
687 let parsed = parse_prometheus_metric(line);
688 assert!(parsed.is_some());
689
690 let (name, labels, value) = parsed.unwrap();
691 assert_eq!(name, "response_time");
692
693 let mut expected_labels = HashMap::new();
694 expected_labels.insert("service".to_string(), "api".to_string());
695 assert_eq!(labels, expected_labels);
696
697 assert_eq!(value, 0.123);
698
699 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
706 }
707
708 #[test]
709 fn test_metrics_registry_entry_callbacks() {
710 use crate::MetricsRegistryEntry;
711 use std::sync::atomic::{AtomicUsize, Ordering};
712
713 {
715 let mut entry = MetricsRegistryEntry::new();
716 let counter = Arc::new(AtomicUsize::new(0));
717
718 for increment in [1, 10, 100] {
720 let counter_clone = counter.clone();
721 entry.add_callback(Arc::new(move || {
722 counter_clone.fetch_add(increment, Ordering::SeqCst);
723 Ok(())
724 }));
725 }
726
727 assert_eq!(counter.load(Ordering::SeqCst), 0);
729
730 let results = entry.execute_callbacks();
732 assert_eq!(results.len(), 3);
733 assert!(results.iter().all(|r| r.is_ok()));
734 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = entry.execute_callbacks();
738 assert_eq!(results.len(), 3);
739 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = entry.clone();
743 assert_eq!(cloned.execute_callbacks().len(), 0);
744 assert_eq!(counter.load(Ordering::SeqCst), 222); entry.execute_callbacks();
748 assert_eq!(counter.load(Ordering::SeqCst), 333); }
750
751 {
753 let mut entry = MetricsRegistryEntry::new();
754 let counter = Arc::new(AtomicUsize::new(0));
755
756 let counter_clone = counter.clone();
758 entry.add_callback(Arc::new(move || {
759 counter_clone.fetch_add(1, Ordering::SeqCst);
760 Ok(())
761 }));
762
763 entry.add_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
765
766 let counter_clone = counter.clone();
768 entry.add_callback(Arc::new(move || {
769 counter_clone.fetch_add(10, Ordering::SeqCst);
770 Ok(())
771 }));
772
773 let results = entry.execute_callbacks();
775 assert_eq!(results.len(), 3);
776 assert!(results[0].is_ok());
777 assert!(results[1].is_err());
778 assert!(results[2].is_ok());
779
780 assert_eq!(
782 results[1].as_ref().unwrap_err().to_string(),
783 "Simulated error"
784 );
785
786 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = entry.execute_callbacks();
791 assert!(results[1].is_err());
792 assert_eq!(counter.load(Ordering::SeqCst), 22); }
794
795 {
797 let entry = MetricsRegistryEntry::new();
798 let results = entry.execute_callbacks();
799 assert_eq!(results.len(), 0);
800 }
801 }
802}
803
804#[cfg(feature = "integration")]
805#[cfg(test)]
806mod test_metricsregistry_prefixes {
807 use super::*;
808 use crate::distributed::distributed_test_utils::create_test_drt_async;
809 use prometheus::core::Collector;
810
811 #[tokio::test]
812 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
813 let drt = create_test_drt_async().await;
814
815 const DRT_NAME: &str = "";
816 const NAMESPACE_NAME: &str = "ns901";
817 const COMPONENT_NAME: &str = "comp901";
818 const ENDPOINT_NAME: &str = "ep901";
819 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
820 let component = namespace.component(COMPONENT_NAME).unwrap();
821 let endpoint = component.endpoint(ENDPOINT_NAME);
822
823 assert_eq!(drt.basename(), DRT_NAME);
825 assert_eq!(drt.parent_hierarchy(), Vec::<String>::new());
826 assert_eq!(drt.hierarchy(), DRT_NAME);
827
828 assert_eq!(namespace.basename(), NAMESPACE_NAME);
830 assert_eq!(namespace.parent_hierarchy(), vec!["".to_string()]);
831 assert_eq!(namespace.hierarchy(), NAMESPACE_NAME);
832
833 assert_eq!(component.basename(), COMPONENT_NAME);
835 assert_eq!(
836 component.parent_hierarchy(),
837 vec!["".to_string(), NAMESPACE_NAME.to_string()]
838 );
839 assert_eq!(
840 component.hierarchy(),
841 format!("{}_{}", NAMESPACE_NAME, COMPONENT_NAME)
842 );
843
844 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
846 assert_eq!(
847 endpoint.parent_hierarchy(),
848 vec![
849 "".to_string(),
850 NAMESPACE_NAME.to_string(),
851 COMPONENT_NAME.to_string(),
852 ]
853 );
854 assert_eq!(
855 endpoint.hierarchy(),
856 format!("{}_{}_{}", NAMESPACE_NAME, COMPONENT_NAME, ENDPOINT_NAME)
857 );
858
859 assert!(namespace.parent_hierarchy().contains(&drt.basename()));
861 assert!(component.parent_hierarchy().contains(&namespace.basename()));
862 assert!(endpoint.parent_hierarchy().contains(&component.basename()));
863
864 assert_eq!(drt.parent_hierarchy().len(), 0);
866 assert_eq!(namespace.parent_hierarchy().len(), 1);
867 assert_eq!(component.parent_hierarchy().len(), 2);
868 assert_eq!(endpoint.parent_hierarchy().len(), 3);
869
870 let invalid_namespace = drt.namespace("@@123").unwrap();
874 let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
875 assert!(result.is_ok());
876 if let Ok(counter) = &result {
877 let desc = counter.desc();
879 let namespace_label = desc[0]
880 .const_label_pairs
881 .iter()
882 .find(|l| l.name() == "dynamo_namespace")
883 .expect("Should have dynamo_namespace label");
884 assert_eq!(namespace_label.value(), "_123");
885 }
886
887 let valid_namespace = drt.namespace("ns567").unwrap();
889 assert!(
890 valid_namespace
891 .create_counter("test_counter", "A test counter", &[])
892 .is_ok()
893 );
894 }
895
896 #[tokio::test]
897 async fn test_recursive_namespace() {
898 let drt = create_test_drt_async().await;
900
901 let ns1 = drt.namespace("ns1").unwrap();
903 let ns2 = ns1.namespace("ns2").unwrap();
904 let ns3 = ns2.namespace("ns3").unwrap();
905
906 let component = ns3.component("test-component").unwrap();
908
909 assert_eq!(ns1.basename(), "ns1");
911 assert_eq!(ns1.parent_hierarchy(), vec!("".to_string()));
912 assert_eq!(ns1.hierarchy(), "ns1");
913
914 assert_eq!(ns2.basename(), "ns2");
915 assert_eq!(
916 ns2.parent_hierarchy(),
917 vec!["".to_string(), "ns1".to_string()]
918 );
919 assert_eq!(ns2.hierarchy(), "ns1_ns2");
920
921 assert_eq!(ns3.basename(), "ns3");
922 assert_eq!(
923 ns3.parent_hierarchy(),
924 vec!["".to_string(), "ns1".to_string(), "ns2".to_string()]
925 );
926 assert_eq!(ns3.hierarchy(), "ns1_ns2_ns3");
927
928 assert_eq!(component.basename(), "test-component");
929 assert_eq!(
930 component.parent_hierarchy(),
931 vec![
932 "".to_string(),
933 "ns1".to_string(),
934 "ns2".to_string(),
935 "ns3".to_string()
936 ]
937 );
938 assert_eq!(component.hierarchy(), "ns1_ns2_ns3_test-component");
939
940 println!("✓ Chained namespace test passed - all prefixes correct");
941 }
942}
943
944#[cfg(feature = "integration")]
945#[cfg(test)]
946mod test_metricsregistry_prometheus_fmt_outputs {
947 use super::prometheus_names::name_prefix;
948 use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
949 use super::prometheus_names::{nats_client, nats_service};
950 use super::*;
951 use crate::distributed::distributed_test_utils::create_test_drt_async;
952 use prometheus::Counter;
953 use std::sync::Arc;
954
955 #[tokio::test]
956 async fn test_prometheusfactory_using_metrics_registry_trait() {
957 let drt = create_test_drt_async().await;
959
960 let namespace_name = "ns345";
962
963 let namespace = drt.namespace(namespace_name).unwrap();
964 let component = namespace.component("comp345").unwrap();
965 let endpoint = component.endpoint("ep345");
966
967 let counter = endpoint
969 .create_counter("testcounter", "A test counter", &[])
970 .unwrap();
971 counter.inc_by(123.456789);
972 let epsilon = 0.01;
973 assert!((counter.get() - 123.456789).abs() < epsilon);
974
975 let endpoint_output_raw = endpoint.prometheus_metrics_fmt().unwrap();
976 println!("Endpoint output:");
977 println!("{}", endpoint_output_raw);
978
979 let endpoint_output =
981 super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
982
983 let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
984# TYPE dynamo_component_testcounter counter
985dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
986
987 assert_eq!(
988 endpoint_output, expected_endpoint_output,
989 "\n=== ENDPOINT COMPARISON FAILED ===\n\
990 Expected:\n{}\n\
991 Actual:\n{}\n\
992 ==============================",
993 expected_endpoint_output, endpoint_output
994 );
995
996 let gauge = component
998 .create_gauge("testgauge", "A test gauge", &[])
999 .unwrap();
1000 gauge.set(50000.0);
1001 assert_eq!(gauge.get(), 50000.0);
1002
1003 let component_output_raw = component.prometheus_metrics_fmt().unwrap();
1005 println!("Component output:");
1006 println!("{}", component_output_raw);
1007
1008 let component_output =
1010 super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
1011
1012 let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1013# TYPE dynamo_component_testcounter counter
1014dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1015# HELP dynamo_component_testgauge A test gauge
1016# TYPE dynamo_component_testgauge gauge
1017dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1018
1019 assert_eq!(
1020 component_output, expected_component_output,
1021 "\n=== COMPONENT COMPARISON FAILED ===\n\
1022 Expected:\n{}\n\
1023 Actual:\n{}\n\
1024 ==============================",
1025 expected_component_output, component_output
1026 );
1027
1028 let intcounter = namespace
1029 .create_intcounter("testintcounter", "A test int counter", &[])
1030 .unwrap();
1031 intcounter.inc_by(12345);
1032 assert_eq!(intcounter.get(), 12345);
1033
1034 let namespace_output_raw = namespace.prometheus_metrics_fmt().unwrap();
1036 println!("Namespace output:");
1037 println!("{}", namespace_output_raw);
1038
1039 let namespace_output =
1041 super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
1042
1043 let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1044# TYPE dynamo_component_testcounter counter
1045dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1046# HELP dynamo_component_testgauge A test gauge
1047# TYPE dynamo_component_testgauge gauge
1048dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1049# HELP dynamo_component_testintcounter A test int counter
1050# TYPE dynamo_component_testintcounter counter
1051dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1052
1053 assert_eq!(
1054 namespace_output, expected_namespace_output,
1055 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1056 Expected:\n{}\n\
1057 Actual:\n{}\n\
1058 ==============================",
1059 expected_namespace_output, namespace_output
1060 );
1061
1062 let intgauge = namespace
1064 .create_intgauge("testintgauge", "A test int gauge", &[])
1065 .unwrap();
1066 intgauge.set(42);
1067 assert_eq!(intgauge.get(), 42);
1068
1069 let intgaugevec = namespace
1071 .create_intgaugevec(
1072 "testintgaugevec",
1073 "A test int gauge vector",
1074 &["instance", "status"],
1075 &[("service", "api")],
1076 )
1077 .unwrap();
1078 intgaugevec
1079 .with_label_values(&["server1", "active"])
1080 .set(10);
1081 intgaugevec
1082 .with_label_values(&["server2", "inactive"])
1083 .set(0);
1084
1085 let countervec = endpoint
1087 .create_countervec(
1088 "testcountervec",
1089 "A test counter vector",
1090 &["method", "status"],
1091 &[("service", "api")],
1092 )
1093 .unwrap();
1094 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1095 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1096
1097 let histogram = component
1099 .create_histogram("testhistogram", "A test histogram", &[], None)
1100 .unwrap();
1101 histogram.observe(1.0);
1102 histogram.observe(2.5);
1103 histogram.observe(4.0);
1104
1105 let drt_output_raw = drt.prometheus_metrics_fmt().unwrap();
1107 println!("DRT output:");
1108 println!("{}", drt_output_raw);
1109
1110 let filtered_drt_output =
1112 super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
1113
1114 let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1115# TYPE dynamo_component_testcounter counter
1116dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1117# HELP dynamo_component_testcountervec A test counter vector
1118# TYPE dynamo_component_testcountervec counter
1119dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1120dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1121# HELP dynamo_component_testgauge A test gauge
1122# TYPE dynamo_component_testgauge gauge
1123dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1124# HELP dynamo_component_testhistogram A test histogram
1125# TYPE dynamo_component_testhistogram histogram
1126dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1127dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1128dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1129dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1130dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1131dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1132dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1133dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1134dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1135dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1136dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1137dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1138dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1139dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1140# HELP dynamo_component_testintcounter A test int counter
1141# TYPE dynamo_component_testintcounter counter
1142dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1143# HELP dynamo_component_testintgauge A test int gauge
1144# TYPE dynamo_component_testintgauge gauge
1145dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1146# HELP dynamo_component_testintgaugevec A test int gauge vector
1147# TYPE dynamo_component_testintgaugevec gauge
1148dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1149dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1150# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1151# TYPE dynamo_component_uptime_seconds gauge
1152dynamo_component_uptime_seconds 0"#.to_string();
1153
1154 assert_eq!(
1155 filtered_drt_output, expected_drt_output,
1156 "\n=== DRT COMPARISON FAILED ===\n\
1157 Expected:\n{}\n\
1158 Actual (filtered):\n{}\n\
1159 ==============================",
1160 expected_drt_output, filtered_drt_output
1161 );
1162
1163 println!("✓ All Prometheus format outputs verified successfully!");
1164 }
1165
1166 #[test]
1167 fn test_refactored_filter_functions() {
1168 let test_input = r#"# HELP dynamo_component_requests Total requests
1170# TYPE dynamo_component_requests counter
1171dynamo_component_requests 42
1172# HELP dynamo_component_nats_client_connection_state Connection state
1173# TYPE dynamo_component_nats_client_connection_state gauge
1174dynamo_component_nats_client_connection_state 1
1175# HELP dynamo_component_latency Response latency
1176# TYPE dynamo_component_latency histogram
1177dynamo_component_latency_bucket{le="0.1"} 10
1178dynamo_component_latency_bucket{le="0.5"} 25
1179dynamo_component_nats_service_total_requests 100
1180dynamo_component_nats_service_total_errors 5"#;
1181
1182 let filtered_out = super::test_helpers::remove_nats_lines(test_input);
1184 assert_eq!(filtered_out.len(), 7); assert!(!filtered_out.iter().any(|line| line.contains("nats")));
1186
1187 let filtered_only = super::test_helpers::extract_nats_lines(test_input);
1189 assert_eq!(filtered_only.len(), 5); assert!(filtered_only.iter().all(|line| line.contains("nats")));
1191
1192 let metrics_only = super::test_helpers::extract_metrics(test_input);
1194 assert_eq!(metrics_only.len(), 6); assert!(
1196 metrics_only
1197 .iter()
1198 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1199 );
1200
1201 println!("✓ All refactored filter functions work correctly!");
1202 }
1203}
1204
1205#[cfg(feature = "integration")]
1206#[cfg(test)]
1207mod test_metricsregistry_nats {
1208 use super::prometheus_names::name_prefix;
1209 use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1210 use super::prometheus_names::{nats_client, nats_service};
1211 use super::*;
1212 use crate::distributed::distributed_test_utils::create_test_drt_async;
1213 use crate::pipeline::PushRouter;
1214 use crate::{DistributedRuntime, Runtime};
1215 use tokio::time::{Duration, sleep};
1216 #[tokio::test]
1217 async fn test_drt_nats_metrics() {
1218 let drt = create_test_drt_async().await;
1220
1221 let drt_output = drt.prometheus_metrics_fmt().unwrap();
1223 println!("DRT output with NATS metrics:");
1224 println!("{}", drt_output);
1225
1226 let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
1228
1229 assert!(
1231 !drt_nats_metrics.is_empty(),
1232 "NATS client metrics should be present"
1233 );
1234
1235 let drt_nats_metric_lines =
1238 super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
1239 let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
1240 .iter()
1241 .map(|line| {
1242 let without_labels = line.split('{').next().unwrap_or(line);
1243 without_labels.split(' ').next().unwrap_or(without_labels)
1245 })
1246 .collect();
1247
1248 let expect_drt_nats_metrics_sorted = {
1249 let mut temp = DRT_NATS_METRICS
1250 .iter()
1251 .map(|metric| build_component_metric_name(metric))
1252 .collect::<Vec<_>>();
1253 temp.sort();
1254 temp
1255 };
1256
1257 println!(
1259 "actual_drt_nats_metrics_sorted: {:?}",
1260 actual_drt_nats_metrics_sorted
1261 );
1262 println!(
1263 "expect_drt_nats_metrics_sorted: {:?}",
1264 expect_drt_nats_metrics_sorted
1265 );
1266
1267 assert_eq!(
1269 actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
1270 "DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1271 );
1272
1273 println!("✓ DistributedRuntime NATS metrics integration test passed!");
1274 }
1275
1276 #[tokio::test]
1277 async fn test_nats_metric_names() {
1278 let drt = create_test_drt_async().await;
1283
1284 let namespace = drt.namespace("ns789").unwrap();
1286 let components = namespace.component("comp789").unwrap();
1287
1288 let _service = components.service_builder().create().await.unwrap();
1290
1291 let component_nats_metrics =
1294 super::test_helpers::extract_nats_lines(&components.prometheus_metrics_fmt().unwrap());
1295 println!(
1296 "Component NATS metrics count: {}",
1297 component_nats_metrics.len()
1298 );
1299
1300 assert!(
1302 !component_nats_metrics.is_empty(),
1303 "NATS client metrics should be present"
1304 );
1305
1306 let component_metrics =
1308 super::test_helpers::extract_metrics(&components.prometheus_metrics_fmt().unwrap());
1309 let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
1310 .iter()
1311 .map(|line| {
1312 let without_labels = line.split('{').next().unwrap_or(line);
1313 without_labels.split(' ').next().unwrap_or(without_labels)
1315 })
1316 .collect();
1317
1318 let expect_component_nats_metrics_sorted = {
1319 let mut temp = COMPONENT_NATS_METRICS
1320 .iter()
1321 .map(|metric| build_component_metric_name(metric))
1322 .collect::<Vec<_>>();
1323 temp.sort();
1324 temp
1325 };
1326
1327 println!(
1329 "actual_component_nats_metrics_sorted: {:?}",
1330 actual_component_nats_metrics_sorted
1331 );
1332 println!(
1333 "expect_component_nats_metrics_sorted: {:?}",
1334 expect_component_nats_metrics_sorted
1335 );
1336
1337 assert_eq!(
1339 actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
1340 "COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1341 );
1342
1343 let drt_output = drt.prometheus_metrics_fmt().unwrap();
1345 let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
1346 let drt_and_component_nats_metrics =
1347 super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
1348 println!(
1349 "DRT and component NATS metrics count: {}",
1350 drt_and_component_nats_metrics.len()
1351 );
1352
1353 assert_eq!(
1355 drt_and_component_nats_metrics.len(),
1356 DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
1357 "DRT at this point should have both the DRT and component NATS metrics"
1358 );
1359
1360 println!("✓ Component NATS metrics integration test passed!");
1362 }
1363
1364 #[tokio::test]
1369 async fn test_nats_metrics_values() -> anyhow::Result<()> {
1370 struct MessageHandler {}
1371 impl MessageHandler {
1372 fn new() -> std::sync::Arc<Self> {
1373 std::sync::Arc::new(Self {})
1374 }
1375 }
1376
1377 #[async_trait]
1378 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
1379 async fn generate(
1380 &self,
1381 input: SingleIn<String>,
1382 ) -> Result<ManyOut<Annotated<String>>, Error> {
1383 let (data, ctx) = input.into_parts();
1384 let response = data.to_string();
1385 let stream = stream::iter(vec![Annotated::from_data(response)]);
1386 Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
1387 }
1388 }
1389
1390 println!("\n=== Initializing DistributedRuntime ===");
1391 let runtime = Runtime::from_current()?;
1392 let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
1393 let namespace = drt.namespace("ns123").unwrap();
1394 let component = namespace.component("comp123").unwrap();
1395 let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
1396
1397 let _backend_handle = tokio::spawn(async move {
1398 let service = component.service_builder().create().await.unwrap();
1399 let endpoint = service.endpoint("echo").endpoint_builder().handler(ingress);
1400 endpoint.start().await.unwrap();
1401 });
1402
1403 sleep(Duration::from_millis(500)).await;
1404 println!("✓ Launched endpoint service in background successfully");
1405
1406 let drt_output = drt.prometheus_metrics_fmt().unwrap();
1407 let parsed_metrics: Vec<_> = drt_output
1408 .lines()
1409 .filter_map(super::test_helpers::parse_prometheus_metric)
1410 .collect();
1411
1412 println!("=== Initial DRT metrics output ===");
1413 println!("{}", drt_output);
1414
1415 println!("\n=== Checking Initial Metric Values ===");
1416
1417 let initial_expected_metric_values = [
1418 (
1420 build_component_metric_name(nats_client::CONNECTION_STATE),
1421 1.0,
1422 1.0,
1423 ), (build_component_metric_name(nats_client::CONNECTS), 1.0, 1.0), (
1426 build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1427 800.0,
1428 4000.0,
1429 ), (
1431 build_component_metric_name(nats_client::IN_MESSAGES),
1432 0.0,
1433 5.0,
1434 ), (
1436 build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1437 1500.0,
1438 5000.0,
1439 ), (
1441 build_component_metric_name(nats_client::OUT_MESSAGES),
1442 0.0,
1443 5.0,
1444 ), (
1447 build_component_metric_name(nats_service::AVG_PROCESSING_MS),
1448 0.0,
1449 0.0,
1450 ), (
1452 build_component_metric_name(nats_service::TOTAL_ERRORS),
1453 0.0,
1454 0.0,
1455 ), (
1457 build_component_metric_name(nats_service::TOTAL_REQUESTS),
1458 0.0,
1459 0.0,
1460 ), (
1462 build_component_metric_name(nats_service::TOTAL_PROCESSING_MS),
1463 0.0,
1464 0.0,
1465 ), (
1467 build_component_metric_name(nats_service::ACTIVE_SERVICES),
1468 0.0,
1469 2.0,
1470 ), (
1472 build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1473 0.0,
1474 2.0,
1475 ), ];
1477
1478 for (metric_name, min_value, max_value) in &initial_expected_metric_values {
1479 let actual_value = parsed_metrics
1480 .iter()
1481 .find(|(name, _, _)| name == metric_name)
1482 .map(|(_, _, value)| *value)
1483 .unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
1484
1485 assert!(
1486 actual_value >= *min_value && actual_value <= *max_value,
1487 "Initial metric {} should be between {} and {}, but got {}",
1488 metric_name,
1489 min_value,
1490 max_value,
1491 actual_value
1492 );
1493 }
1494
1495 println!("\n=== Client Runtime to hit the endpoint ===");
1496 let client_runtime = Runtime::from_current()?;
1497 let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
1498 let namespace = client_distributed.namespace("ns123")?;
1499 let component = namespace.component("comp123")?;
1500 let client = component.endpoint("echo").client().await?;
1501
1502 client.wait_for_instances().await?;
1503 println!("✓ Connected to endpoint, waiting for instances...");
1504
1505 let router =
1506 PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
1507 .await?;
1508
1509 for i in 0..10 {
1510 let msg = i.to_string().repeat(2000); let mut stream = router.random(msg.clone().into()).await?;
1512 while let Some(resp) = stream.next().await {
1513 if let Some(data) = &resp.data {
1515 let is_same = data == &msg;
1516 println!(
1517 "Response {}: {} bytes, matches original: {}",
1518 i,
1519 data.len(),
1520 is_same
1521 );
1522 }
1523 }
1524 }
1525 println!("✓ Sent messages and received responses successfully");
1526
1527 println!("\n=== Waiting 500ms for metrics to update ===");
1528 sleep(Duration::from_millis(500)).await;
1529 println!("✓ Wait complete, getting final metrics...");
1530
1531 let final_drt_output = drt.prometheus_metrics_fmt().unwrap();
1532 println!("\n=== Final Prometheus DRT output ===");
1533 println!("{}", final_drt_output);
1534
1535 let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
1536 println!("\n=== Filtered NATS metrics from final DRT output ===");
1537 for line in &final_drt_nats_output {
1538 println!("{}", line);
1539 }
1540
1541 let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
1542 .iter()
1543 .filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
1544 .collect();
1545
1546 let post_expected_metric_values = [
1547 (
1549 build_component_metric_name(nats_client::CONNECTION_STATE),
1550 1.0,
1551 1.0,
1552 ), (build_component_metric_name(nats_client::CONNECTS), 1.0, 1.0), (
1555 build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1556 20000.0,
1557 32000.0,
1558 ), (
1560 build_component_metric_name(nats_client::IN_MESSAGES),
1561 8.0,
1562 20.0,
1563 ), (
1565 build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1566 2500.0,
1567 8000.0,
1568 ), (
1570 build_component_metric_name(nats_client::OUT_MESSAGES),
1571 8.0,
1572 20.0,
1573 ), (
1576 build_component_metric_name(nats_service::AVG_PROCESSING_MS),
1577 0.0,
1578 1.0,
1579 ), (
1581 build_component_metric_name(nats_service::TOTAL_ERRORS),
1582 0.0,
1583 0.0,
1584 ), (
1586 build_component_metric_name(nats_service::TOTAL_REQUESTS),
1587 0.0,
1588 0.0,
1589 ), (
1591 build_component_metric_name(nats_service::TOTAL_PROCESSING_MS),
1592 0.0,
1593 5.0,
1594 ), (
1596 build_component_metric_name(nats_service::ACTIVE_SERVICES),
1597 0.0,
1598 2.0,
1599 ), (
1601 build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1602 0.0,
1603 2.0,
1604 ), (
1607 build_component_metric_name(work_handler::REQUESTS_TOTAL),
1608 10.0,
1609 10.0,
1610 ), (
1612 build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
1613 21000.0,
1614 26000.0,
1615 ), (
1617 build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
1618 18000.0,
1619 23000.0,
1620 ), (
1622 build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
1623 0.0,
1624 1.0,
1625 ), (
1628 format!(
1629 "{}_count",
1630 build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1631 ),
1632 10.0,
1633 10.0,
1634 ), (
1636 format!(
1637 "{}_sum",
1638 build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1639 ),
1640 0.0001,
1641 1.0,
1642 ), ];
1644
1645 println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
1646 for (metric_name, min_value, max_value) in &post_expected_metric_values {
1647 let actual_value = final_parsed_metrics
1648 .iter()
1649 .find(|(name, _, _)| name == metric_name)
1650 .map(|(_, _, value)| *value)
1651 .unwrap_or_else(|| {
1652 panic!(
1653 "Could not find expected post-activity metric: {}",
1654 metric_name
1655 )
1656 });
1657
1658 assert!(
1659 actual_value >= *min_value && actual_value <= *max_value,
1660 "Post-activity metric {} should be between {} and {}, but got {}",
1661 metric_name,
1662 min_value,
1663 max_value,
1664 actual_value
1665 );
1666 println!(
1667 "✓ {}: {} (range: {} to {})",
1668 metric_name, actual_value, min_value, max_value
1669 );
1670 }
1671
1672 println!("✓ All NATS and component metrics parsed successfully!");
1673 println!("✓ Byte metrics verified to be >= 100 bytes!");
1674 println!("✓ Post-activity metrics verified with higher thresholds!");
1675 println!("✓ Work handler metrics reflect increased activity!");
1676
1677 Ok(())
1678 }
1679}