1pub mod prometheus_names;
10
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use crate::component::ComponentBuilder;
16use anyhow;
17use once_cell::sync::Lazy;
18use regex::Regex;
19use std::any::Any;
20use std::collections::HashMap;
21
22use prometheus_names::{
24 COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
25 nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
26};
27
28use crate::pipeline::{
30 AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
31 network::Ingress,
32};
33use crate::protocols::annotated::Annotated;
34use crate::stream;
35use crate::stream::StreamExt;
36
37pub const USE_AUTO_LABELS: bool = true;
40
41use prometheus::Encoder;
43
44fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
47 let mut seen_keys = std::collections::HashSet::new();
48 for (key, _) in labels {
49 if !seen_keys.insert(*key) {
50 return Err(anyhow::anyhow!(
51 "Duplicate label key '{}' found in labels",
52 key
53 ));
54 }
55 }
56 Ok(())
57}
58
59pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
61 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
63 where
64 Self: Sized;
65
66 fn with_histogram_opts_and_buckets(
69 _opts: prometheus::HistogramOpts,
70 _buckets: Option<Vec<f64>>,
71 ) -> Result<Self, prometheus::Error>
72 where
73 Self: Sized,
74 {
75 panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
76 }
77
78 fn with_opts_and_label_names(
81 _opts: prometheus::Opts,
82 _label_names: &[&str],
83 ) -> Result<Self, prometheus::Error>
84 where
85 Self: Sized,
86 {
87 panic!("with_opts_and_label_names is not implemented for this metric type");
88 }
89}
90
91impl PrometheusMetric for prometheus::Counter {
93 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
94 prometheus::Counter::with_opts(opts)
95 }
96}
97
98impl PrometheusMetric for prometheus::IntCounter {
99 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
100 prometheus::IntCounter::with_opts(opts)
101 }
102}
103
104impl PrometheusMetric for prometheus::Gauge {
105 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
106 prometheus::Gauge::with_opts(opts)
107 }
108}
109
110impl PrometheusMetric for prometheus::IntGauge {
111 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
112 prometheus::IntGauge::with_opts(opts)
113 }
114}
115
116impl PrometheusMetric for prometheus::GaugeVec {
117 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
118 Err(prometheus::Error::Msg(
119 "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
120 ))
121 }
122
123 fn with_opts_and_label_names(
124 opts: prometheus::Opts,
125 label_names: &[&str],
126 ) -> Result<Self, prometheus::Error> {
127 prometheus::GaugeVec::new(opts, label_names)
128 }
129}
130
131impl PrometheusMetric for prometheus::IntGaugeVec {
132 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
133 Err(prometheus::Error::Msg(
134 "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
135 ))
136 }
137
138 fn with_opts_and_label_names(
139 opts: prometheus::Opts,
140 label_names: &[&str],
141 ) -> Result<Self, prometheus::Error> {
142 prometheus::IntGaugeVec::new(opts, label_names)
143 }
144}
145
146impl PrometheusMetric for prometheus::IntCounterVec {
147 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
148 Err(prometheus::Error::Msg(
149 "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
150 ))
151 }
152
153 fn with_opts_and_label_names(
154 opts: prometheus::Opts,
155 label_names: &[&str],
156 ) -> Result<Self, prometheus::Error> {
157 prometheus::IntCounterVec::new(opts, label_names)
158 }
159}
160
161impl PrometheusMetric for prometheus::Histogram {
163 fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
164 let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
166 prometheus::Histogram::with_opts(histogram_opts)
167 }
168
169 fn with_histogram_opts_and_buckets(
170 mut opts: prometheus::HistogramOpts,
171 buckets: Option<Vec<f64>>,
172 ) -> Result<Self, prometheus::Error> {
173 if let Some(custom_buckets) = buckets {
174 opts = opts.buckets(custom_buckets);
175 }
176 prometheus::Histogram::with_opts(opts)
177 }
178}
179
180impl PrometheusMetric for prometheus::CounterVec {
182 fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
183 panic!("CounterVec requires label names, use with_opts_and_label_names instead");
185 }
186
187 fn with_opts_and_label_names(
188 opts: prometheus::Opts,
189 label_names: &[&str],
190 ) -> Result<Self, prometheus::Error> {
191 prometheus::CounterVec::new(opts, label_names)
192 }
193}
194
195fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
197 registry: &R,
198 metric_name: &str,
199 metric_desc: &str,
200 labels: &[(&str, &str)],
201 buckets: Option<Vec<f64>>,
202 const_labels: Option<&[&str]>,
203) -> anyhow::Result<T> {
204 validate_no_duplicate_label_keys(labels)?;
206 let basename = registry.basename();
209 let parent_hierarchy = registry.parent_hierarchy();
210
211 let hierarchy = [parent_hierarchy.clone(), vec![basename.clone()]].concat();
213
214 let metric_name = build_component_metric_name(metric_name);
215
216 let mut updated_labels: Vec<(String, String)> = Vec::new();
218
219 if USE_AUTO_LABELS {
220 for (key, _) in labels {
222 if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
223 return Err(anyhow::anyhow!(
224 "Label '{}' is automatically added by auto_label feature and cannot be manually set",
225 key
226 ));
227 }
228 }
229
230 if hierarchy.len() > 1 {
232 let namespace = &hierarchy[1];
233 if !namespace.is_empty() {
234 let valid_namespace = sanitize_prometheus_label(namespace)?;
235 if !valid_namespace.is_empty() {
236 updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
237 }
238 }
239 }
240 if hierarchy.len() > 2 {
241 let component = &hierarchy[2];
242 if !component.is_empty() {
243 let valid_component = sanitize_prometheus_label(component)?;
244 if !valid_component.is_empty() {
245 updated_labels.push((labels::COMPONENT.to_string(), valid_component));
246 }
247 }
248 }
249 if hierarchy.len() > 3 {
250 let endpoint = &hierarchy[3];
251 if !endpoint.is_empty() {
252 let valid_endpoint = sanitize_prometheus_label(endpoint)?;
253 if !valid_endpoint.is_empty() {
254 updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
255 }
256 }
257 }
258 }
259
260 updated_labels.extend(
262 labels
263 .iter()
264 .map(|(k, v)| ((*k).to_string(), (*v).to_string())),
265 );
266 let prometheus_metric = if std::any::TypeId::of::<T>()
270 == std::any::TypeId::of::<prometheus::CounterVec>()
271 {
272 if buckets.is_some() {
275 return Err(anyhow::anyhow!(
276 "buckets parameter is not valid for CounterVec"
277 ));
278 }
279 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
280 for (key, value) in &updated_labels {
281 opts = opts.const_label(key.clone(), value.clone());
282 }
283 let label_names = const_labels
284 .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
285 T::with_opts_and_label_names(opts, label_names)?
286 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
287 if buckets.is_some() {
290 return Err(anyhow::anyhow!(
291 "buckets parameter is not valid for GaugeVec"
292 ));
293 }
294 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
295 for (key, value) in &updated_labels {
296 opts = opts.const_label(key.clone(), value.clone());
297 }
298 let label_names = const_labels
299 .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
300 T::with_opts_and_label_names(opts, label_names)?
301 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
302 if const_labels.is_some() {
305 return Err(anyhow::anyhow!(
306 "const_labels parameter is not valid for Histogram"
307 ));
308 }
309 let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
310 for (key, value) in &updated_labels {
311 opts = opts.const_label(key.clone(), value.clone());
312 }
313 T::with_histogram_opts_and_buckets(opts, buckets)?
314 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
315 if buckets.is_some() {
318 return Err(anyhow::anyhow!(
319 "buckets parameter is not valid for IntCounterVec"
320 ));
321 }
322 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
323 for (key, value) in &updated_labels {
324 opts = opts.const_label(key.clone(), value.clone());
325 }
326 let label_names = const_labels
327 .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
328 T::with_opts_and_label_names(opts, label_names)?
329 } else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
330 if buckets.is_some() {
333 return Err(anyhow::anyhow!(
334 "buckets parameter is not valid for IntGaugeVec"
335 ));
336 }
337 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
338 for (key, value) in &updated_labels {
339 opts = opts.const_label(key.clone(), value.clone());
340 }
341 let label_names = const_labels
342 .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
343 T::with_opts_and_label_names(opts, label_names)?
344 } else {
345 if buckets.is_some() {
348 return Err(anyhow::anyhow!(
349 "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
350 ));
351 }
352 if const_labels.is_some() {
353 return Err(anyhow::anyhow!(
354 "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
355 ));
356 }
357 let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
358 for (key, value) in &updated_labels {
359 opts = opts.const_label(key.clone(), value.clone());
360 }
361 T::with_opts(opts)?
362 };
363
364 let mut current_hierarchy = String::new();
374 for name in &hierarchy {
375 if !current_hierarchy.is_empty() && !name.is_empty() {
376 current_hierarchy.push('_');
377 }
378 current_hierarchy.push_str(name);
379
380 let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
382 registry
383 .drt()
384 .add_prometheus_metric(¤t_hierarchy, collector)?;
385 }
386
387 Ok(prometheus_metric)
388}
389
390use crate::traits::DistributedRuntimeProvider;
394
395pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
396 fn basename(&self) -> String;
398
399 fn hierarchy(&self) -> String {
403 [self.parent_hierarchy(), vec![self.basename()]]
404 .concat()
405 .join("_")
406 .trim_start_matches('_')
407 .to_string()
408 }
409
410 fn parent_hierarchy(&self) -> Vec<String>;
412
413 fn create_counter(
435 &self,
436 name: &str,
437 description: &str,
438 labels: &[(&str, &str)],
439 ) -> anyhow::Result<prometheus::Counter> {
440 create_metric(self, name, description, labels, None, None)
441 }
442
443 fn create_countervec(
445 &self,
446 name: &str,
447 description: &str,
448 const_labels: &[&str],
449 const_label_values: &[(&str, &str)],
450 ) -> anyhow::Result<prometheus::CounterVec> {
451 create_metric(
452 self,
453 name,
454 description,
455 const_label_values,
456 None,
457 Some(const_labels),
458 )
459 }
460
461 fn create_gauge(
463 &self,
464 name: &str,
465 description: &str,
466 labels: &[(&str, &str)],
467 ) -> anyhow::Result<prometheus::Gauge> {
468 create_metric(self, name, description, labels, None, None)
469 }
470
471 fn create_gaugevec(
473 &self,
474 name: &str,
475 description: &str,
476 const_labels: &[&str],
477 const_label_values: &[(&str, &str)],
478 ) -> anyhow::Result<prometheus::GaugeVec> {
479 create_metric(
480 self,
481 name,
482 description,
483 const_label_values,
484 None,
485 Some(const_labels),
486 )
487 }
488
489 fn create_histogram(
491 &self,
492 name: &str,
493 description: &str,
494 labels: &[(&str, &str)],
495 buckets: Option<Vec<f64>>,
496 ) -> anyhow::Result<prometheus::Histogram> {
497 create_metric(self, name, description, labels, buckets, None)
498 }
499
500 fn create_intcounter(
502 &self,
503 name: &str,
504 description: &str,
505 labels: &[(&str, &str)],
506 ) -> anyhow::Result<prometheus::IntCounter> {
507 create_metric(self, name, description, labels, None, None)
508 }
509
510 fn create_intcountervec(
512 &self,
513 name: &str,
514 description: &str,
515 const_labels: &[&str],
516 const_label_values: &[(&str, &str)],
517 ) -> anyhow::Result<prometheus::IntCounterVec> {
518 create_metric(
519 self,
520 name,
521 description,
522 const_label_values,
523 None,
524 Some(const_labels),
525 )
526 }
527
528 fn create_intgauge(
530 &self,
531 name: &str,
532 description: &str,
533 labels: &[(&str, &str)],
534 ) -> anyhow::Result<prometheus::IntGauge> {
535 create_metric(self, name, description, labels, None, None)
536 }
537
538 fn create_intgaugevec(
540 &self,
541 name: &str,
542 description: &str,
543 const_labels: &[&str],
544 const_label_values: &[(&str, &str)],
545 ) -> anyhow::Result<prometheus::IntGaugeVec> {
546 create_metric(
547 self,
548 name,
549 description,
550 const_label_values,
551 None,
552 Some(const_labels),
553 )
554 }
555
556 fn prometheus_expfmt(&self) -> anyhow::Result<String> {
558 let callback_results = self
560 .drt()
561 .execute_prometheus_update_callbacks(&self.hierarchy());
562
563 for result in callback_results {
565 if let Err(e) = result {
566 tracing::error!("Error executing metrics callback: {}", e);
567 }
568 }
569
570 let (prometheus_registry, expfmt) = {
572 let mut registry_entry = self.drt().hierarchy_to_metricsregistry.write().unwrap();
573 let entry = registry_entry.entry(self.hierarchy()).or_default();
574 let registry = entry.prometheus_registry.clone();
575 let text = entry.execute_prometheus_expfmt_callbacks();
576 (registry, text)
577 };
578
579 let metric_families = prometheus_registry.gather();
581 let encoder = prometheus::TextEncoder::new();
582 let mut buffer = Vec::new();
583 encoder.encode(&metric_families, &mut buffer)?;
584 let mut result = String::from_utf8(buffer)?;
585
586 if !expfmt.is_empty() {
588 if !result.ends_with('\n') {
589 result.push('\n');
590 }
591 result.push_str(&expfmt);
592 }
593
594 Ok(result)
595 }
596}
597
598#[cfg(test)]
599mod test_helpers {
600 use super::prometheus_names::name_prefix;
601 use super::prometheus_names::{nats_client, nats_service};
602 use super::*;
603
604 fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
607 where
608 F: FnMut(&str) -> bool,
609 {
610 input
611 .lines()
612 .filter(|line| predicate(line))
613 .map(|line| line.to_string())
614 .collect::<Vec<_>>()
615 }
616
617 pub fn remove_nats_lines(input: &str) -> Vec<String> {
619 filter_prometheus_lines(input, |line| {
620 !line.contains(&format!(
621 "{}_{}",
622 name_prefix::COMPONENT,
623 nats_client::PREFIX
624 )) && !line.contains(&format!(
625 "{}_{}",
626 name_prefix::COMPONENT,
627 nats_service::PREFIX
628 )) && !line.trim().is_empty()
629 })
630 }
631
632 pub fn extract_nats_lines(input: &str) -> Vec<String> {
634 filter_prometheus_lines(input, |line| {
635 line.contains(&format!(
636 "{}_{}",
637 name_prefix::COMPONENT,
638 nats_client::PREFIX
639 )) || line.contains(&format!(
640 "{}_{}",
641 name_prefix::COMPONENT,
642 nats_service::PREFIX
643 ))
644 })
645 }
646
647 pub fn extract_metrics(input: &str) -> Vec<String> {
650 filter_prometheus_lines(input, |line| {
651 line.starts_with(&format!("{}_", name_prefix::COMPONENT))
652 && !line.starts_with("#")
653 && !line.trim().is_empty()
654 })
655 }
656
657 pub fn parse_prometheus_metric(
669 line: &str,
670 ) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
671 if line.trim().is_empty() || line.starts_with('#') {
672 return None;
673 }
674
675 let parts: Vec<&str> = line.split_whitespace().collect();
676 if parts.len() < 2 {
677 return None;
678 }
679
680 let metric_part = parts[0];
681 let value: f64 = parts[1].parse().ok()?;
682
683 let (name, labels) = if metric_part.contains('{') {
684 let brace_start = metric_part.find('{').unwrap();
685 let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
686 let name = &metric_part[..brace_start];
687 let labels_str = &metric_part[brace_start + 1..brace_end];
688
689 let mut labels = std::collections::HashMap::new();
690 for pair in labels_str.split(',') {
691 if let Some((k, v)) = pair.split_once('=') {
692 let v = v.trim_matches('"');
693 labels.insert(k.trim().to_string(), v.to_string());
694 }
695 }
696 (name.to_string(), labels)
697 } else {
698 (metric_part.to_string(), std::collections::HashMap::new())
699 };
700
701 Some((name, labels, value))
702 }
703}
704
705#[cfg(test)]
706mod test_metricsregistry_units {
707 use super::*;
708
709 #[test]
710 fn test_build_component_metric_name_with_prefix() {
711 let result = build_component_metric_name("requests");
713 assert_eq!(result, "dynamo_component_requests");
714
715 let result = build_component_metric_name("counter");
716 assert_eq!(result, "dynamo_component_counter");
717 }
718
719 #[test]
720 fn test_parse_prometheus_metric() {
721 use super::test_helpers::parse_prometheus_metric;
722 use std::collections::HashMap;
723
724 let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
726 let parsed = parse_prometheus_metric(line);
727 assert!(parsed.is_some());
728
729 let (name, labels, value) = parsed.unwrap();
730 assert_eq!(name, "http_requests_total");
731
732 let mut expected_labels = HashMap::new();
733 expected_labels.insert("method".to_string(), "GET".to_string());
734 expected_labels.insert("status".to_string(), "200".to_string());
735 assert_eq!(labels, expected_labels);
736
737 assert_eq!(value, 1234.0);
738
739 let line = "cpu_usage 98.5";
741 let parsed = parse_prometheus_metric(line);
742 assert!(parsed.is_some());
743
744 let (name, labels, value) = parsed.unwrap();
745 assert_eq!(name, "cpu_usage");
746 assert!(labels.is_empty());
747 assert_eq!(value, 98.5);
748
749 let line = "response_time{service=\"api\"} 0.123";
751 let parsed = parse_prometheus_metric(line);
752 assert!(parsed.is_some());
753
754 let (name, labels, value) = parsed.unwrap();
755 assert_eq!(name, "response_time");
756
757 let mut expected_labels = HashMap::new();
758 expected_labels.insert("service".to_string(), "api".to_string());
759 assert_eq!(labels, expected_labels);
760
761 assert_eq!(value, 0.123);
762
763 assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none()); println!("✓ Prometheus metric parsing works correctly!");
770 }
771
772 #[test]
773 fn test_metrics_registry_entry_callbacks() {
774 use crate::MetricsRegistryEntry;
775 use std::sync::atomic::{AtomicUsize, Ordering};
776
777 {
779 let mut entry = MetricsRegistryEntry::new();
780 let counter = Arc::new(AtomicUsize::new(0));
781
782 for increment in [1, 10, 100] {
784 let counter_clone = counter.clone();
785 entry.add_prometheus_update_callback(Arc::new(move || {
786 counter_clone.fetch_add(increment, Ordering::SeqCst);
787 Ok(())
788 }));
789 }
790
791 assert_eq!(counter.load(Ordering::SeqCst), 0);
793
794 let results = entry.execute_prometheus_update_callbacks();
796 assert_eq!(results.len(), 3);
797 assert!(results.iter().all(|r| r.is_ok()));
798 assert_eq!(counter.load(Ordering::SeqCst), 111); let results = entry.execute_prometheus_update_callbacks();
802 assert_eq!(results.len(), 3);
803 assert_eq!(counter.load(Ordering::SeqCst), 222); let cloned = entry.clone();
807 assert_eq!(cloned.execute_prometheus_update_callbacks().len(), 0);
808 assert_eq!(counter.load(Ordering::SeqCst), 222); entry.execute_prometheus_update_callbacks();
812 assert_eq!(counter.load(Ordering::SeqCst), 333); }
814
815 {
817 let mut entry = MetricsRegistryEntry::new();
818 let counter = Arc::new(AtomicUsize::new(0));
819
820 let counter_clone = counter.clone();
822 entry.add_prometheus_update_callback(Arc::new(move || {
823 counter_clone.fetch_add(1, Ordering::SeqCst);
824 Ok(())
825 }));
826
827 entry.add_prometheus_update_callback(Arc::new(|| {
829 Err(anyhow::anyhow!("Simulated error"))
830 }));
831
832 let counter_clone = counter.clone();
834 entry.add_prometheus_update_callback(Arc::new(move || {
835 counter_clone.fetch_add(10, Ordering::SeqCst);
836 Ok(())
837 }));
838
839 let results = entry.execute_prometheus_update_callbacks();
841 assert_eq!(results.len(), 3);
842 assert!(results[0].is_ok());
843 assert!(results[1].is_err());
844 assert!(results[2].is_ok());
845
846 assert_eq!(
848 results[1].as_ref().unwrap_err().to_string(),
849 "Simulated error"
850 );
851
852 assert_eq!(counter.load(Ordering::SeqCst), 11); let results = entry.execute_prometheus_update_callbacks();
857 assert!(results[1].is_err());
858 assert_eq!(counter.load(Ordering::SeqCst), 22); }
860
861 {
863 let entry = MetricsRegistryEntry::new();
864 let results = entry.execute_prometheus_update_callbacks();
865 assert_eq!(results.len(), 0);
866 }
867 }
868}
869
870#[cfg(feature = "integration")]
871#[cfg(test)]
872mod test_metricsregistry_prefixes {
873 use super::*;
874 use crate::distributed::distributed_test_utils::create_test_drt_async;
875 use prometheus::core::Collector;
876
877 #[tokio::test]
878 async fn test_hierarchical_prefixes_and_parent_hierarchies() {
879 let drt = create_test_drt_async().await;
880
881 const DRT_NAME: &str = "";
882 const NAMESPACE_NAME: &str = "ns901";
883 const COMPONENT_NAME: &str = "comp901";
884 const ENDPOINT_NAME: &str = "ep901";
885 let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
886 let component = namespace.component(COMPONENT_NAME).unwrap();
887 let endpoint = component.endpoint(ENDPOINT_NAME);
888
889 assert_eq!(drt.basename(), DRT_NAME);
891 assert_eq!(drt.parent_hierarchy(), Vec::<String>::new());
892 assert_eq!(drt.hierarchy(), DRT_NAME);
893
894 assert_eq!(namespace.basename(), NAMESPACE_NAME);
896 assert_eq!(namespace.parent_hierarchy(), vec!["".to_string()]);
897 assert_eq!(namespace.hierarchy(), NAMESPACE_NAME);
898
899 assert_eq!(component.basename(), COMPONENT_NAME);
901 assert_eq!(
902 component.parent_hierarchy(),
903 vec!["".to_string(), NAMESPACE_NAME.to_string()]
904 );
905 assert_eq!(
906 component.hierarchy(),
907 format!("{}_{}", NAMESPACE_NAME, COMPONENT_NAME)
908 );
909
910 assert_eq!(endpoint.basename(), ENDPOINT_NAME);
912 assert_eq!(
913 endpoint.parent_hierarchy(),
914 vec![
915 "".to_string(),
916 NAMESPACE_NAME.to_string(),
917 COMPONENT_NAME.to_string(),
918 ]
919 );
920 assert_eq!(
921 endpoint.hierarchy(),
922 format!("{}_{}_{}", NAMESPACE_NAME, COMPONENT_NAME, ENDPOINT_NAME)
923 );
924
925 assert!(namespace.parent_hierarchy().contains(&drt.basename()));
927 assert!(component.parent_hierarchy().contains(&namespace.basename()));
928 assert!(endpoint.parent_hierarchy().contains(&component.basename()));
929
930 assert_eq!(drt.parent_hierarchy().len(), 0);
932 assert_eq!(namespace.parent_hierarchy().len(), 1);
933 assert_eq!(component.parent_hierarchy().len(), 2);
934 assert_eq!(endpoint.parent_hierarchy().len(), 3);
935
936 let invalid_namespace = drt.namespace("@@123").unwrap();
940 let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
941 assert!(result.is_ok());
942 if let Ok(counter) = &result {
943 let desc = counter.desc();
945 let namespace_label = desc[0]
946 .const_label_pairs
947 .iter()
948 .find(|l| l.name() == "dynamo_namespace")
949 .expect("Should have dynamo_namespace label");
950 assert_eq!(namespace_label.value(), "_123");
951 }
952
953 let valid_namespace = drt.namespace("ns567").unwrap();
955 assert!(
956 valid_namespace
957 .create_counter("test_counter", "A test counter", &[])
958 .is_ok()
959 );
960 }
961
962 #[tokio::test]
963 async fn test_recursive_namespace() {
964 let drt = create_test_drt_async().await;
966
967 let ns1 = drt.namespace("ns1").unwrap();
969 let ns2 = ns1.namespace("ns2").unwrap();
970 let ns3 = ns2.namespace("ns3").unwrap();
971
972 let component = ns3.component("test-component").unwrap();
974
975 assert_eq!(ns1.basename(), "ns1");
977 assert_eq!(ns1.parent_hierarchy(), vec!("".to_string()));
978 assert_eq!(ns1.hierarchy(), "ns1");
979
980 assert_eq!(ns2.basename(), "ns2");
981 assert_eq!(
982 ns2.parent_hierarchy(),
983 vec!["".to_string(), "ns1".to_string()]
984 );
985 assert_eq!(ns2.hierarchy(), "ns1_ns2");
986
987 assert_eq!(ns3.basename(), "ns3");
988 assert_eq!(
989 ns3.parent_hierarchy(),
990 vec!["".to_string(), "ns1".to_string(), "ns2".to_string()]
991 );
992 assert_eq!(ns3.hierarchy(), "ns1_ns2_ns3");
993
994 assert_eq!(component.basename(), "test-component");
995 assert_eq!(
996 component.parent_hierarchy(),
997 vec![
998 "".to_string(),
999 "ns1".to_string(),
1000 "ns2".to_string(),
1001 "ns3".to_string()
1002 ]
1003 );
1004 assert_eq!(component.hierarchy(), "ns1_ns2_ns3_test-component");
1005
1006 println!("✓ Chained namespace test passed - all prefixes correct");
1007 }
1008}
1009
1010#[cfg(feature = "integration")]
1011#[cfg(test)]
1012mod test_metricsregistry_prometheus_fmt_outputs {
1013 use super::prometheus_names::name_prefix;
1014 use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1015 use super::prometheus_names::{nats_client, nats_service};
1016 use super::*;
1017 use crate::distributed::distributed_test_utils::create_test_drt_async;
1018 use prometheus::Counter;
1019 use std::sync::Arc;
1020
1021 #[tokio::test]
1022 async fn test_prometheusfactory_using_metrics_registry_trait() {
1023 let drt = create_test_drt_async().await;
1025
1026 let namespace_name = "ns345";
1028
1029 let namespace = drt.namespace(namespace_name).unwrap();
1030 let component = namespace.component("comp345").unwrap();
1031 let endpoint = component.endpoint("ep345");
1032
1033 let counter = endpoint
1035 .create_counter("testcounter", "A test counter", &[])
1036 .unwrap();
1037 counter.inc_by(123.456789);
1038 let epsilon = 0.01;
1039 assert!((counter.get() - 123.456789).abs() < epsilon);
1040
1041 let endpoint_output_raw = endpoint.prometheus_expfmt().unwrap();
1042 println!("Endpoint output:");
1043 println!("{}", endpoint_output_raw);
1044
1045 let endpoint_output =
1047 super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
1048
1049 let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
1050# TYPE dynamo_component_testcounter counter
1051dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
1052
1053 assert_eq!(
1054 endpoint_output, expected_endpoint_output,
1055 "\n=== ENDPOINT COMPARISON FAILED ===\n\
1056 Expected:\n{}\n\
1057 Actual:\n{}\n\
1058 ==============================",
1059 expected_endpoint_output, endpoint_output
1060 );
1061
1062 let gauge = component
1064 .create_gauge("testgauge", "A test gauge", &[])
1065 .unwrap();
1066 gauge.set(50000.0);
1067 assert_eq!(gauge.get(), 50000.0);
1068
1069 let component_output_raw = component.prometheus_expfmt().unwrap();
1071 println!("Component output:");
1072 println!("{}", component_output_raw);
1073
1074 let component_output =
1076 super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
1077
1078 let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
1079# TYPE dynamo_component_testcounter counter
1080dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1081# HELP dynamo_component_testgauge A test gauge
1082# TYPE dynamo_component_testgauge gauge
1083dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
1084
1085 assert_eq!(
1086 component_output, expected_component_output,
1087 "\n=== COMPONENT COMPARISON FAILED ===\n\
1088 Expected:\n{}\n\
1089 Actual:\n{}\n\
1090 ==============================",
1091 expected_component_output, component_output
1092 );
1093
1094 let intcounter = namespace
1095 .create_intcounter("testintcounter", "A test int counter", &[])
1096 .unwrap();
1097 intcounter.inc_by(12345);
1098 assert_eq!(intcounter.get(), 12345);
1099
1100 let namespace_output_raw = namespace.prometheus_expfmt().unwrap();
1102 println!("Namespace output:");
1103 println!("{}", namespace_output_raw);
1104
1105 let namespace_output =
1107 super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
1108
1109 let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
1110# TYPE dynamo_component_testcounter counter
1111dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1112# HELP dynamo_component_testgauge A test gauge
1113# TYPE dynamo_component_testgauge gauge
1114dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1115# HELP dynamo_component_testintcounter A test int counter
1116# TYPE dynamo_component_testintcounter counter
1117dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
1118
1119 assert_eq!(
1120 namespace_output, expected_namespace_output,
1121 "\n=== NAMESPACE COMPARISON FAILED ===\n\
1122 Expected:\n{}\n\
1123 Actual:\n{}\n\
1124 ==============================",
1125 expected_namespace_output, namespace_output
1126 );
1127
1128 let intgauge = namespace
1130 .create_intgauge("testintgauge", "A test int gauge", &[])
1131 .unwrap();
1132 intgauge.set(42);
1133 assert_eq!(intgauge.get(), 42);
1134
1135 let intgaugevec = namespace
1137 .create_intgaugevec(
1138 "testintgaugevec",
1139 "A test int gauge vector",
1140 &["instance", "status"],
1141 &[("service", "api")],
1142 )
1143 .unwrap();
1144 intgaugevec
1145 .with_label_values(&["server1", "active"])
1146 .set(10);
1147 intgaugevec
1148 .with_label_values(&["server2", "inactive"])
1149 .set(0);
1150
1151 let countervec = endpoint
1153 .create_countervec(
1154 "testcountervec",
1155 "A test counter vector",
1156 &["method", "status"],
1157 &[("service", "api")],
1158 )
1159 .unwrap();
1160 countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
1161 countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
1162
1163 let histogram = component
1165 .create_histogram("testhistogram", "A test histogram", &[], None)
1166 .unwrap();
1167 histogram.observe(1.0);
1168 histogram.observe(2.5);
1169 histogram.observe(4.0);
1170
1171 let drt_output_raw = drt.prometheus_expfmt().unwrap();
1173 println!("DRT output:");
1174 println!("{}", drt_output_raw);
1175
1176 let filtered_drt_output =
1178 super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
1179
1180 let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
1181# TYPE dynamo_component_testcounter counter
1182dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
1183# HELP dynamo_component_testcountervec A test counter vector
1184# TYPE dynamo_component_testcountervec counter
1185dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
1186dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
1187# HELP dynamo_component_testgauge A test gauge
1188# TYPE dynamo_component_testgauge gauge
1189dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
1190# HELP dynamo_component_testhistogram A test histogram
1191# TYPE dynamo_component_testhistogram histogram
1192dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
1193dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
1194dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
1195dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
1196dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
1197dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
1198dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
1199dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
1200dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
1201dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
1202dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
1203dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
1204dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
1205dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
1206# HELP dynamo_component_testintcounter A test int counter
1207# TYPE dynamo_component_testintcounter counter
1208dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
1209# HELP dynamo_component_testintgauge A test int gauge
1210# TYPE dynamo_component_testintgauge gauge
1211dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
1212# HELP dynamo_component_testintgaugevec A test int gauge vector
1213# TYPE dynamo_component_testintgaugevec gauge
1214dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
1215dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
1216# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
1217# TYPE dynamo_component_uptime_seconds gauge
1218dynamo_component_uptime_seconds 0"#.to_string();
1219
1220 assert_eq!(
1221 filtered_drt_output, expected_drt_output,
1222 "\n=== DRT COMPARISON FAILED ===\n\
1223 Expected:\n{}\n\
1224 Actual (filtered):\n{}\n\
1225 ==============================",
1226 expected_drt_output, filtered_drt_output
1227 );
1228
1229 println!("✓ All Prometheus format outputs verified successfully!");
1230 }
1231
1232 #[test]
1233 fn test_refactored_filter_functions() {
1234 let test_input = r#"# HELP dynamo_component_requests Total requests
1236# TYPE dynamo_component_requests counter
1237dynamo_component_requests 42
1238# HELP dynamo_component_nats_client_connection_state Connection state
1239# TYPE dynamo_component_nats_client_connection_state gauge
1240dynamo_component_nats_client_connection_state 1
1241# HELP dynamo_component_latency Response latency
1242# TYPE dynamo_component_latency histogram
1243dynamo_component_latency_bucket{le="0.1"} 10
1244dynamo_component_latency_bucket{le="0.5"} 25
1245dynamo_component_nats_service_requests_total 100
1246dynamo_component_nats_service_errors_total 5"#;
1247
1248 let filtered_out = super::test_helpers::remove_nats_lines(test_input);
1250 assert_eq!(filtered_out.len(), 7); assert!(!filtered_out.iter().any(|line| line.contains("nats")));
1252
1253 let filtered_only = super::test_helpers::extract_nats_lines(test_input);
1255 assert_eq!(filtered_only.len(), 5); assert!(filtered_only.iter().all(|line| line.contains("nats")));
1257
1258 let metrics_only = super::test_helpers::extract_metrics(test_input);
1260 assert_eq!(metrics_only.len(), 6); assert!(
1262 metrics_only
1263 .iter()
1264 .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
1265 );
1266
1267 println!("✓ All refactored filter functions work correctly!");
1268 }
1269}
1270
1271#[cfg(feature = "integration")]
1272#[cfg(test)]
1273mod test_metricsregistry_nats {
1274 use super::prometheus_names::name_prefix;
1275 use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
1276 use super::prometheus_names::{nats_client, nats_service};
1277 use super::*;
1278 use crate::distributed::distributed_test_utils::create_test_drt_async;
1279 use crate::pipeline::PushRouter;
1280 use crate::{DistributedRuntime, Runtime};
1281 use tokio::time::{Duration, sleep};
1282 #[tokio::test]
1283 async fn test_drt_nats_metrics() {
1284 let drt = create_test_drt_async().await;
1286
1287 let drt_output = drt.prometheus_expfmt().unwrap();
1289 println!("DRT output with NATS metrics:");
1290 println!("{}", drt_output);
1291
1292 let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
1294
1295 assert!(
1297 !drt_nats_metrics.is_empty(),
1298 "NATS client metrics should be present"
1299 );
1300
1301 let drt_nats_metric_lines =
1304 super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
1305 let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
1306 .iter()
1307 .map(|line| {
1308 let without_labels = line.split('{').next().unwrap_or(line);
1309 without_labels.split(' ').next().unwrap_or(without_labels)
1311 })
1312 .collect();
1313
1314 let expect_drt_nats_metrics_sorted = {
1315 let mut temp = DRT_NATS_METRICS
1316 .iter()
1317 .map(|metric| build_component_metric_name(metric))
1318 .collect::<Vec<_>>();
1319 temp.sort();
1320 temp
1321 };
1322
1323 println!(
1325 "actual_drt_nats_metrics_sorted: {:?}",
1326 actual_drt_nats_metrics_sorted
1327 );
1328 println!(
1329 "expect_drt_nats_metrics_sorted: {:?}",
1330 expect_drt_nats_metrics_sorted
1331 );
1332
1333 assert_eq!(
1335 actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
1336 "DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1337 );
1338
1339 println!("✓ DistributedRuntime NATS metrics integration test passed!");
1340 }
1341
1342 #[tokio::test]
1343 async fn test_nats_metric_names() {
1344 let drt = create_test_drt_async().await;
1349
1350 let namespace = drt.namespace("ns789").unwrap();
1352 let components = namespace.component("comp789").unwrap();
1353
1354 let _service = components.service_builder().create().await.unwrap();
1356
1357 let component_nats_metrics =
1360 super::test_helpers::extract_nats_lines(&components.prometheus_expfmt().unwrap());
1361 println!(
1362 "Component NATS metrics count: {}",
1363 component_nats_metrics.len()
1364 );
1365
1366 assert!(
1368 !component_nats_metrics.is_empty(),
1369 "NATS client metrics should be present"
1370 );
1371
1372 let component_metrics =
1374 super::test_helpers::extract_metrics(&components.prometheus_expfmt().unwrap());
1375 let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
1376 .iter()
1377 .map(|line| {
1378 let without_labels = line.split('{').next().unwrap_or(line);
1379 without_labels.split(' ').next().unwrap_or(without_labels)
1381 })
1382 .collect();
1383
1384 let expect_component_nats_metrics_sorted = {
1385 let mut temp = COMPONENT_NATS_METRICS
1386 .iter()
1387 .map(|metric| build_component_metric_name(metric))
1388 .collect::<Vec<_>>();
1389 temp.sort();
1390 temp
1391 };
1392
1393 println!(
1395 "actual_component_nats_metrics_sorted: {:?}",
1396 actual_component_nats_metrics_sorted
1397 );
1398 println!(
1399 "expect_component_nats_metrics_sorted: {:?}",
1400 expect_component_nats_metrics_sorted
1401 );
1402
1403 assert_eq!(
1405 actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
1406 "COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
1407 );
1408
1409 let drt_output = drt.prometheus_expfmt().unwrap();
1411 let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
1412 let drt_and_component_nats_metrics =
1413 super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
1414 println!(
1415 "DRT and component NATS metrics count: {}",
1416 drt_and_component_nats_metrics.len()
1417 );
1418
1419 assert_eq!(
1421 drt_and_component_nats_metrics.len(),
1422 DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
1423 "DRT at this point should have both the DRT and component NATS metrics"
1424 );
1425
1426 println!("✓ Component NATS metrics integration test passed!");
1428 }
1429
1430 #[tokio::test]
1435 async fn test_nats_metrics_values() -> anyhow::Result<()> {
1436 struct MessageHandler {}
1437 impl MessageHandler {
1438 fn new() -> std::sync::Arc<Self> {
1439 std::sync::Arc::new(Self {})
1440 }
1441 }
1442
1443 #[async_trait]
1444 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
1445 async fn generate(
1446 &self,
1447 input: SingleIn<String>,
1448 ) -> Result<ManyOut<Annotated<String>>, Error> {
1449 let (data, ctx) = input.into_parts();
1450 let response = data.to_string();
1451 let stream = stream::iter(vec![Annotated::from_data(response)]);
1452 Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
1453 }
1454 }
1455
1456 println!("\n=== Initializing DistributedRuntime ===");
1457 let runtime = Runtime::from_current()?;
1458 let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
1459 let namespace = drt.namespace("ns123").unwrap();
1460 let component = namespace.component("comp123").unwrap();
1461 let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
1462
1463 let _backend_handle = tokio::spawn(async move {
1464 let service = component.service_builder().create().await.unwrap();
1465 let endpoint = service.endpoint("echo").endpoint_builder().handler(ingress);
1466 endpoint.start().await.unwrap();
1467 });
1468
1469 sleep(Duration::from_millis(500)).await;
1470 println!("✓ Launched endpoint service in background successfully");
1471
1472 let drt_output = drt.prometheus_expfmt().unwrap();
1473 let parsed_metrics: Vec<_> = drt_output
1474 .lines()
1475 .filter_map(super::test_helpers::parse_prometheus_metric)
1476 .collect();
1477
1478 println!("=== Initial DRT metrics output ===");
1479 println!("{}", drt_output);
1480
1481 println!("\n=== Checking Initial Metric Values ===");
1482
1483 let initial_expected_metric_values = [
1484 (
1486 build_component_metric_name(nats_client::CONNECTION_STATE),
1487 1.0,
1488 1.0,
1489 ), (
1491 build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1492 1.0,
1493 1.0,
1494 ), (
1496 build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1497 800.0,
1498 4000.0,
1499 ), (
1501 build_component_metric_name(nats_client::IN_MESSAGES),
1502 0.0,
1503 5.0,
1504 ), (
1506 build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1507 1500.0,
1508 5000.0,
1509 ), (
1511 build_component_metric_name(nats_client::OUT_MESSAGES),
1512 0.0,
1513 5.0,
1514 ), (
1517 build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1518 0.0,
1519 0.0,
1520 ), (
1522 build_component_metric_name(nats_service::ERRORS_TOTAL),
1523 0.0,
1524 0.0,
1525 ), (
1527 build_component_metric_name(nats_service::REQUESTS_TOTAL),
1528 0.0,
1529 0.0,
1530 ), (
1532 build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1533 0.0,
1534 0.0,
1535 ), (
1537 build_component_metric_name(nats_service::ACTIVE_SERVICES),
1538 0.0,
1539 2.0,
1540 ), (
1542 build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1543 0.0,
1544 2.0,
1545 ), ];
1547
1548 for (metric_name, min_value, max_value) in &initial_expected_metric_values {
1549 let actual_value = parsed_metrics
1550 .iter()
1551 .find(|(name, _, _)| name == metric_name)
1552 .map(|(_, _, value)| *value)
1553 .unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
1554
1555 assert!(
1556 actual_value >= *min_value && actual_value <= *max_value,
1557 "Initial metric {} should be between {} and {}, but got {}",
1558 metric_name,
1559 min_value,
1560 max_value,
1561 actual_value
1562 );
1563 }
1564
1565 println!("\n=== Client Runtime to hit the endpoint ===");
1566 let client_runtime = Runtime::from_current()?;
1567 let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
1568 let namespace = client_distributed.namespace("ns123")?;
1569 let component = namespace.component("comp123")?;
1570 let client = component.endpoint("echo").client().await?;
1571
1572 client.wait_for_instances().await?;
1573 println!("✓ Connected to endpoint, waiting for instances...");
1574
1575 let router =
1576 PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
1577 .await?;
1578
1579 for i in 0..10 {
1580 let msg = i.to_string().repeat(2000); let mut stream = router.random(msg.clone().into()).await?;
1582 while let Some(resp) = stream.next().await {
1583 if let Some(data) = &resp.data {
1585 let is_same = data == &msg;
1586 println!(
1587 "Response {}: {} bytes, matches original: {}",
1588 i,
1589 data.len(),
1590 is_same
1591 );
1592 }
1593 }
1594 }
1595 println!("✓ Sent messages and received responses successfully");
1596
1597 println!("\n=== Waiting 500ms for metrics to update ===");
1598 sleep(Duration::from_millis(500)).await;
1599 println!("✓ Wait complete, getting final metrics...");
1600
1601 let final_drt_output = drt.prometheus_expfmt().unwrap();
1602 println!("\n=== Final Prometheus DRT output ===");
1603 println!("{}", final_drt_output);
1604
1605 let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
1606 println!("\n=== Filtered NATS metrics from final DRT output ===");
1607 for line in &final_drt_nats_output {
1608 println!("{}", line);
1609 }
1610
1611 let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
1612 .iter()
1613 .filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
1614 .collect();
1615
1616 let post_expected_metric_values = [
1617 (
1619 build_component_metric_name(nats_client::CONNECTION_STATE),
1620 1.0,
1621 1.0,
1622 ), (
1624 build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1625 1.0,
1626 1.0,
1627 ), (
1629 build_component_metric_name(nats_client::IN_TOTAL_BYTES),
1630 20000.0,
1631 32000.0,
1632 ), (
1634 build_component_metric_name(nats_client::IN_MESSAGES),
1635 8.0,
1636 20.0,
1637 ), (
1639 build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1640 2500.0,
1641 8000.0,
1642 ), (
1644 build_component_metric_name(nats_client::OUT_MESSAGES),
1645 8.0,
1646 20.0,
1647 ), (
1650 build_component_metric_name(nats_service::PROCESSING_MS_AVG),
1651 0.0,
1652 1.0,
1653 ), (
1655 build_component_metric_name(nats_service::ERRORS_TOTAL),
1656 0.0,
1657 0.0,
1658 ), (
1660 build_component_metric_name(nats_service::REQUESTS_TOTAL),
1661 0.0,
1662 10.0,
1663 ), (
1665 build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
1666 0.0,
1667 5.0,
1668 ), (
1670 build_component_metric_name(nats_service::ACTIVE_SERVICES),
1671 0.0,
1672 2.0,
1673 ), (
1675 build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
1676 0.0,
1677 2.0,
1678 ), (
1681 build_component_metric_name(work_handler::REQUESTS_TOTAL),
1682 10.0,
1683 10.0,
1684 ), (
1686 build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
1687 21000.0,
1688 26000.0,
1689 ), (
1691 build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
1692 18000.0,
1693 23000.0,
1694 ), (
1696 build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
1697 0.0,
1698 1.0,
1699 ), (
1702 format!(
1703 "{}_count",
1704 build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1705 ),
1706 10.0,
1707 10.0,
1708 ), (
1710 format!(
1711 "{}_sum",
1712 build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
1713 ),
1714 0.0001,
1715 1.0,
1716 ), ];
1718
1719 println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
1720 for (metric_name, min_value, max_value) in &post_expected_metric_values {
1721 let actual_value = final_parsed_metrics
1722 .iter()
1723 .find(|(name, _, _)| name == metric_name)
1724 .map(|(_, _, value)| *value)
1725 .unwrap_or_else(|| {
1726 panic!(
1727 "Could not find expected post-activity metric: {}",
1728 metric_name
1729 )
1730 });
1731
1732 assert!(
1733 actual_value >= *min_value && actual_value <= *max_value,
1734 "Post-activity metric {} should be between {} and {}, but got {}",
1735 metric_name,
1736 min_value,
1737 max_value,
1738 actual_value
1739 );
1740 println!(
1741 "✓ {}: {} (range: {} to {})",
1742 metric_name, actual_value, min_value, max_value
1743 );
1744 }
1745
1746 println!("✓ All NATS and component metrics parsed successfully!");
1747 println!("✓ Byte metrics verified to be >= 100 bytes!");
1748 println!("✓ Post-activity metrics verified with higher thresholds!");
1749 println!("✓ Work handler metrics reflect increased activity!");
1750
1751 Ok(())
1752 }
1753}