pub mod prometheus_names;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::sync::Arc;
use crate::component::ComponentBuilder;
use anyhow;
use once_cell::sync::Lazy;
use regex::Regex;
use std::any::Any;
use std::collections::HashMap;
use prometheus_names::{
COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix,
nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler,
};
use crate::pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
network::Ingress,
};
use crate::protocols::annotated::Annotated;
use crate::stream;
use crate::stream::StreamExt;
pub const USE_AUTO_LABELS: bool = true;
use prometheus::Encoder;
fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
let mut seen_keys = std::collections::HashSet::new();
for (key, _) in labels {
if !seen_keys.insert(*key) {
return Err(anyhow::anyhow!(
"Duplicate label key '{}' found in labels",
key
));
}
}
Ok(())
}
pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
where
Self: Sized;
fn with_histogram_opts_and_buckets(
_opts: prometheus::HistogramOpts,
_buckets: Option<Vec<f64>>,
) -> Result<Self, prometheus::Error>
where
Self: Sized,
{
panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
}
fn with_opts_and_label_names(
_opts: prometheus::Opts,
_label_names: &[&str],
) -> Result<Self, prometheus::Error>
where
Self: Sized,
{
panic!("with_opts_and_label_names is not implemented for this metric type");
}
}
impl PrometheusMetric for prometheus::Counter {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::Counter::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::IntCounter {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::IntCounter::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::Gauge {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::Gauge::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::IntGauge {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::IntGauge::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::GaugeVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::GaugeVec::new(opts, label_names)
}
}
impl PrometheusMetric for prometheus::IntGaugeVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::IntGaugeVec::new(opts, label_names)
}
}
impl PrometheusMetric for prometheus::IntCounterVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::IntCounterVec::new(opts, label_names)
}
}
impl PrometheusMetric for prometheus::Histogram {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
prometheus::Histogram::with_opts(histogram_opts)
}
fn with_histogram_opts_and_buckets(
mut opts: prometheus::HistogramOpts,
buckets: Option<Vec<f64>>,
) -> Result<Self, prometheus::Error> {
if let Some(custom_buckets) = buckets {
opts = opts.buckets(custom_buckets);
}
prometheus::Histogram::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::CounterVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
panic!("CounterVec requires label names, use with_opts_and_label_names instead");
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::CounterVec::new(opts, label_names)
}
}
pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
hierarchy: &H,
metric_name: &str,
metric_desc: &str,
labels: &[(&str, &str)],
buckets: Option<Vec<f64>>,
const_labels: Option<&[&str]>,
) -> anyhow::Result<T> {
validate_no_duplicate_label_keys(labels)?;
let basename = hierarchy.basename();
let parent_hierarchies = hierarchy.parent_hierarchies();
let mut hierarchy_names: Vec<String> =
parent_hierarchies.iter().map(|p| p.basename()).collect();
hierarchy_names.push(basename.clone());
let metric_name = build_component_metric_name(metric_name);
let mut updated_labels: Vec<(String, String)> = Vec::new();
if USE_AUTO_LABELS {
for (key, _) in labels {
if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
return Err(anyhow::anyhow!(
"Label '{}' is automatically added by auto_label feature and cannot be manually set",
key
));
}
}
if hierarchy_names.len() > 1 {
let namespace = &hierarchy_names[1];
if !namespace.is_empty() {
let valid_namespace = sanitize_prometheus_label(namespace)?;
if !valid_namespace.is_empty() {
updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
}
}
}
if hierarchy_names.len() > 2 {
let component = &hierarchy_names[2];
if !component.is_empty() {
let valid_component = sanitize_prometheus_label(component)?;
if !valid_component.is_empty() {
updated_labels.push((labels::COMPONENT.to_string(), valid_component));
}
}
}
if hierarchy_names.len() > 3 {
let endpoint = &hierarchy_names[3];
if !endpoint.is_empty() {
let valid_endpoint = sanitize_prometheus_label(endpoint)?;
if !valid_endpoint.is_empty() {
updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
}
}
}
}
updated_labels.extend(
labels
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string())),
);
let prometheus_metric = if std::any::TypeId::of::<T>()
== std::any::TypeId::of::<prometheus::CounterVec>()
{
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for CounterVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for GaugeVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
if const_labels.is_some() {
return Err(anyhow::anyhow!(
"const_labels parameter is not valid for Histogram"
));
}
let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
T::with_histogram_opts_and_buckets(opts, buckets)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for IntCounterVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for IntGaugeVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
));
}
if const_labels.is_some() {
return Err(anyhow::anyhow!(
"const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
T::with_opts(opts)?
};
for parent in parent_hierarchies {
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
parent.get_metrics_registry().add_metric(collector)?;
}
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
hierarchy.get_metrics_registry().add_metric(collector)?;
Ok(prometheus_metric)
}
pub struct Metrics<H: MetricsHierarchy> {
hierarchy: H,
}
impl<H: MetricsHierarchy> Metrics<H> {
pub fn new(hierarchy: H) -> Self {
Self { hierarchy }
}
pub fn create_counter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::Counter> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_countervec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::CounterVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn create_gauge(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::Gauge> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_gaugevec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::GaugeVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn create_histogram(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
buckets: Option<Vec<f64>>,
) -> anyhow::Result<prometheus::Histogram> {
create_metric(&self.hierarchy, name, description, labels, buckets, None)
}
pub fn create_intcounter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntCounter> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_intcountervec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntCounterVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn create_intgauge(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntGauge> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_intgaugevec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntGaugeVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
let callback_results = self
.hierarchy
.get_metrics_registry()
.execute_update_callbacks();
for result in callback_results {
if let Err(e) = result {
tracing::error!("Error executing metrics callback: {}", e);
}
}
let prometheus_registry = self
.hierarchy
.get_metrics_registry()
.get_prometheus_registry();
let metric_families = prometheus_registry.gather();
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;
let mut result = String::from_utf8(buffer)?;
let expfmt = self
.hierarchy
.get_metrics_registry()
.execute_expfmt_callbacks();
if !expfmt.is_empty() {
if !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&expfmt);
}
Ok(result)
}
}
use crate::traits::DistributedRuntimeProvider;
pub trait MetricsHierarchy: Send + Sync {
fn basename(&self) -> String;
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
fn get_metrics_registry(&self) -> &MetricsRegistry;
fn metrics(&self) -> Metrics<&Self>
where
Self: Sized,
{
Metrics::new(self)
}
}
impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
fn basename(&self) -> String {
(**self).basename()
}
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
(**self).parent_hierarchies()
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
(**self).get_metrics_registry()
}
}
pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
pub type PrometheusExpositionFormatCallback =
Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
pub struct MetricsRegistry {
pub prometheus_registry: std::sync::RwLock<prometheus::Registry>,
pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
pub prometheus_expfmt_callbacks:
Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
}
impl std::fmt::Debug for MetricsRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsRegistry")
.field("prometheus_registry", &"<RwLock<Registry>>")
.field(
"prometheus_update_callbacks",
&format!(
"<RwLock<Vec<Callback>>> with {} callbacks",
self.prometheus_update_callbacks.read().unwrap().len()
),
)
.field(
"prometheus_expfmt_callbacks",
&format!(
"<RwLock<Vec<Callback>>> with {} callbacks",
self.prometheus_expfmt_callbacks.read().unwrap().len()
),
)
.finish()
}
}
impl Clone for MetricsRegistry {
fn clone(&self) -> Self {
Self {
prometheus_registry: std::sync::RwLock::new(
self.prometheus_registry.read().unwrap().clone(),
),
prometheus_update_callbacks: Arc::clone(&self.prometheus_update_callbacks),
prometheus_expfmt_callbacks: Arc::clone(&self.prometheus_expfmt_callbacks),
}
}
}
impl MetricsRegistry {
pub fn new() -> Self {
Self {
prometheus_registry: std::sync::RwLock::new(prometheus::Registry::new()),
prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
}
}
pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
self.prometheus_update_callbacks
.write()
.unwrap()
.push(callback);
}
pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
self.prometheus_expfmt_callbacks
.write()
.unwrap()
.push(callback);
}
pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
self.prometheus_update_callbacks
.read()
.unwrap()
.iter()
.map(|callback| callback())
.collect()
}
pub fn execute_expfmt_callbacks(&self) -> String {
let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
let mut result = String::new();
for callback in callbacks.iter() {
match callback() {
Ok(text) => {
if !text.is_empty() {
if !result.is_empty() && !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&text);
}
}
Err(e) => {
tracing::error!("Error executing exposition text callback: {}", e);
}
}
}
result
}
pub fn add_metric(
&self,
collector: Box<dyn prometheus::core::Collector>,
) -> anyhow::Result<()> {
self.prometheus_registry
.write()
.unwrap()
.register(collector)
.map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
}
pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
self.prometheus_registry.read().unwrap()
}
pub fn has_metric_named(&self, metric_name: &str) -> bool {
self.prometheus_registry
.read()
.unwrap()
.gather()
.iter()
.any(|mf| mf.name() == metric_name)
}
}
impl Default for MetricsRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod test_helpers {
use super::prometheus_names::name_prefix;
use super::prometheus_names::{nats_client, nats_service};
use super::*;
fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
where
F: FnMut(&str) -> bool,
{
input
.lines()
.filter(|line| predicate(line))
.map(|line| line.to_string())
.collect::<Vec<_>>()
}
pub fn remove_nats_lines(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
!line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_client::PREFIX
)) && !line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_service::PREFIX
)) && !line.trim().is_empty()
})
}
pub fn extract_nats_lines(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_client::PREFIX
)) || line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_service::PREFIX
))
})
}
pub fn extract_metrics(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
line.starts_with(&format!("{}_", name_prefix::COMPONENT))
&& !line.starts_with("#")
&& !line.trim().is_empty()
})
}
pub fn parse_prometheus_metric(
line: &str,
) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
if line.trim().is_empty() || line.starts_with('#') {
return None;
}
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 2 {
return None;
}
let metric_part = parts[0];
let value: f64 = parts[1].parse().ok()?;
let (name, labels) = if metric_part.contains('{') {
let brace_start = metric_part.find('{').unwrap();
let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
let name = &metric_part[..brace_start];
let labels_str = &metric_part[brace_start + 1..brace_end];
let mut labels = std::collections::HashMap::new();
for pair in labels_str.split(',') {
if let Some((k, v)) = pair.split_once('=') {
let v = v.trim_matches('"');
labels.insert(k.trim().to_string(), v.to_string());
}
}
(name.to_string(), labels)
} else {
(metric_part.to_string(), std::collections::HashMap::new())
};
Some((name, labels, value))
}
}
#[cfg(test)]
mod test_metricsregistry_units {
use super::*;
#[test]
fn test_build_component_metric_name_with_prefix() {
let result = build_component_metric_name("requests");
assert_eq!(result, "dynamo_component_requests");
let result = build_component_metric_name("counter");
assert_eq!(result, "dynamo_component_counter");
}
#[test]
fn test_parse_prometheus_metric() {
use super::test_helpers::parse_prometheus_metric;
use std::collections::HashMap;
let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
let parsed = parse_prometheus_metric(line);
assert!(parsed.is_some());
let (name, labels, value) = parsed.unwrap();
assert_eq!(name, "http_requests_total");
let mut expected_labels = HashMap::new();
expected_labels.insert("method".to_string(), "GET".to_string());
expected_labels.insert("status".to_string(), "200".to_string());
assert_eq!(labels, expected_labels);
assert_eq!(value, 1234.0);
let line = "cpu_usage 98.5";
let parsed = parse_prometheus_metric(line);
assert!(parsed.is_some());
let (name, labels, value) = parsed.unwrap();
assert_eq!(name, "cpu_usage");
assert!(labels.is_empty());
assert_eq!(value, 98.5);
let line = "response_time{service=\"api\"} 0.123";
let parsed = parse_prometheus_metric(line);
assert!(parsed.is_some());
let (name, labels, value) = parsed.unwrap();
assert_eq!(name, "response_time");
let mut expected_labels = HashMap::new();
expected_labels.insert("service".to_string(), "api".to_string());
assert_eq!(labels, expected_labels);
assert_eq!(value, 0.123);
assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none());
println!("✓ Prometheus metric parsing works correctly!");
}
#[test]
fn test_metrics_registry_entry_callbacks() {
use crate::MetricsRegistry;
use std::sync::atomic::{AtomicUsize, Ordering};
{
let registry = MetricsRegistry::new();
let counter = Arc::new(AtomicUsize::new(0));
for increment in [1, 10, 100] {
let counter_clone = counter.clone();
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(increment, Ordering::SeqCst);
Ok(())
}));
}
assert_eq!(counter.load(Ordering::SeqCst), 0);
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results.iter().all(|r| r.is_ok()));
assert_eq!(counter.load(Ordering::SeqCst), 111);
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 222);
let cloned = registry.clone();
assert_eq!(cloned.execute_update_callbacks().len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 333);
registry.execute_update_callbacks();
assert_eq!(counter.load(Ordering::SeqCst), 444); }
{
let registry = MetricsRegistry::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
Ok(())
}));
registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
let counter_clone = counter.clone();
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(10, Ordering::SeqCst);
Ok(())
}));
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results[0].is_ok());
assert!(results[1].is_err());
assert!(results[2].is_ok());
assert_eq!(
results[1].as_ref().unwrap_err().to_string(),
"Simulated error"
);
assert_eq!(counter.load(Ordering::SeqCst), 11);
let results = registry.execute_update_callbacks();
assert!(results[1].is_err());
assert_eq!(counter.load(Ordering::SeqCst), 22); }
{
let registry = MetricsRegistry::new();
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 0);
}
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod test_metricsregistry_prefixes {
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::core::Collector;
#[tokio::test]
async fn test_hierarchical_prefixes_and_parent_hierarchies() {
let drt = create_test_drt_async().await;
const DRT_NAME: &str = "";
const NAMESPACE_NAME: &str = "ns901";
const COMPONENT_NAME: &str = "comp901";
const ENDPOINT_NAME: &str = "ep901";
let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
let component = namespace.component(COMPONENT_NAME).unwrap();
let endpoint = component.endpoint(ENDPOINT_NAME);
assert_eq!(drt.basename(), DRT_NAME);
assert_eq!(drt.parent_hierarchies().len(), 0);
assert_eq!(namespace.basename(), NAMESPACE_NAME);
assert_eq!(namespace.parent_hierarchies().len(), 1);
assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(component.basename(), COMPONENT_NAME);
assert_eq!(component.parent_hierarchies().len(), 2);
assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
assert_eq!(endpoint.basename(), ENDPOINT_NAME);
assert_eq!(endpoint.parent_hierarchies().len(), 3);
assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
assert!(
namespace
.parent_hierarchies()
.iter()
.any(|h| h.basename() == drt.basename())
);
assert!(
component
.parent_hierarchies()
.iter()
.any(|h| h.basename() == namespace.basename())
);
assert!(
endpoint
.parent_hierarchies()
.iter()
.any(|h| h.basename() == component.basename())
);
assert_eq!(drt.parent_hierarchies().len(), 0);
assert_eq!(namespace.parent_hierarchies().len(), 1);
assert_eq!(component.parent_hierarchies().len(), 2);
assert_eq!(endpoint.parent_hierarchies().len(), 3);
let invalid_namespace = drt.namespace("@@123").unwrap();
let result =
invalid_namespace
.metrics()
.create_counter("test_counter", "A test counter", &[]);
assert!(result.is_ok());
if let Ok(counter) = &result {
let desc = counter.desc();
let namespace_label = desc[0]
.const_label_pairs
.iter()
.find(|l| l.name() == "dynamo_namespace")
.expect("Should have dynamo_namespace label");
assert_eq!(namespace_label.value(), "_123");
}
let valid_namespace = drt.namespace("ns567").unwrap();
assert!(
valid_namespace
.metrics()
.create_counter("test_counter", "A test counter", &[])
.is_ok()
);
}
#[tokio::test]
async fn test_recursive_namespace() {
let drt = create_test_drt_async().await;
let ns1 = drt.namespace("ns1").unwrap();
let ns2 = ns1.namespace("ns2").unwrap();
let ns3 = ns2.namespace("ns3").unwrap();
let component = ns3.component("test-component").unwrap();
assert_eq!(ns1.basename(), "ns1");
assert_eq!(ns1.parent_hierarchies().len(), 1);
assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
assert_eq!(ns2.basename(), "ns2");
assert_eq!(ns2.parent_hierarchies().len(), 2);
assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(ns3.basename(), "ns3");
assert_eq!(ns3.parent_hierarchies().len(), 3);
assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
assert_eq!(component.basename(), "test-component");
assert_eq!(component.parent_hierarchies().len(), 4);
assert_eq!(component.parent_hierarchies()[0].basename(), "");
assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
println!("✓ Chained namespace test passed - all prefixes correct");
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod test_metricsregistry_prometheus_fmt_outputs {
use super::prometheus_names::name_prefix;
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service};
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::Counter;
use std::sync::Arc;
#[tokio::test]
async fn test_prometheusfactory_using_metrics_registry_trait() {
let drt = create_test_drt_async().await;
let namespace_name = "ns345";
let namespace = drt.namespace(namespace_name).unwrap();
let component = namespace.component("comp345").unwrap();
let endpoint = component.endpoint("ep345");
let counter = endpoint
.metrics()
.create_counter("testcounter", "A test counter", &[])
.unwrap();
counter.inc_by(123.456789);
let epsilon = 0.01;
assert!((counter.get() - 123.456789).abs() < epsilon);
let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
println!("Endpoint output:");
println!("{}", endpoint_output_raw);
let endpoint_output =
super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
assert_eq!(
endpoint_output, expected_endpoint_output,
"\n=== ENDPOINT COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual:\n{}\n\
==============================",
expected_endpoint_output, endpoint_output
);
let gauge = component
.metrics()
.create_gauge("testgauge", "A test gauge", &[])
.unwrap();
gauge.set(50000.0);
assert_eq!(gauge.get(), 50000.0);
let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
println!("Component output:");
println!("{}", component_output_raw);
let component_output =
super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
assert_eq!(
component_output, expected_component_output,
"\n=== COMPONENT COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual:\n{}\n\
==============================",
expected_component_output, component_output
);
let intcounter = namespace
.metrics()
.create_intcounter("testintcounter", "A test int counter", &[])
.unwrap();
intcounter.inc_by(12345);
assert_eq!(intcounter.get(), 12345);
let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
println!("Namespace output:");
println!("{}", namespace_output_raw);
let namespace_output =
super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
# HELP dynamo_component_testintcounter A test int counter
# TYPE dynamo_component_testintcounter counter
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
assert_eq!(
namespace_output, expected_namespace_output,
"\n=== NAMESPACE COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual:\n{}\n\
==============================",
expected_namespace_output, namespace_output
);
let intgauge = namespace
.metrics()
.create_intgauge("testintgauge", "A test int gauge", &[])
.unwrap();
intgauge.set(42);
assert_eq!(intgauge.get(), 42);
let intgaugevec = namespace
.metrics()
.create_intgaugevec(
"testintgaugevec",
"A test int gauge vector",
&["instance", "status"],
&[("service", "api")],
)
.unwrap();
intgaugevec
.with_label_values(&["server1", "active"])
.set(10);
intgaugevec
.with_label_values(&["server2", "inactive"])
.set(0);
let countervec = endpoint
.metrics()
.create_countervec(
"testcountervec",
"A test counter vector",
&["method", "status"],
&[("service", "api")],
)
.unwrap();
countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
let histogram = component
.metrics()
.create_histogram("testhistogram", "A test histogram", &[], None)
.unwrap();
histogram.observe(1.0);
histogram.observe(2.5);
histogram.observe(4.0);
let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
println!("DRT output:");
println!("{}", drt_output_raw);
let filtered_drt_output =
super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testcountervec A test counter vector
# TYPE dynamo_component_testcountervec counter
dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
# HELP dynamo_component_testhistogram A test histogram
# TYPE dynamo_component_testhistogram histogram
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
# HELP dynamo_component_testintcounter A test int counter
# TYPE dynamo_component_testintcounter counter
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
# HELP dynamo_component_testintgauge A test int gauge
# TYPE dynamo_component_testintgauge gauge
dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
# HELP dynamo_component_testintgaugevec A test int gauge vector
# TYPE dynamo_component_testintgaugevec gauge
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE dynamo_component_uptime_seconds gauge
dynamo_component_uptime_seconds 0"#.to_string();
assert_eq!(
filtered_drt_output, expected_drt_output,
"\n=== DRT COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual (filtered):\n{}\n\
==============================",
expected_drt_output, filtered_drt_output
);
println!("✓ All Prometheus format outputs verified successfully!");
}
#[test]
fn test_refactored_filter_functions() {
let test_input = r#"# HELP dynamo_component_requests Total requests
# TYPE dynamo_component_requests counter
dynamo_component_requests 42
# HELP dynamo_component_nats_client_connection_state Connection state
# TYPE dynamo_component_nats_client_connection_state gauge
dynamo_component_nats_client_connection_state 1
# HELP dynamo_component_latency Response latency
# TYPE dynamo_component_latency histogram
dynamo_component_latency_bucket{le="0.1"} 10
dynamo_component_latency_bucket{le="0.5"} 25
dynamo_component_nats_service_requests_total 100
dynamo_component_nats_service_errors_total 5"#;
let filtered_out = super::test_helpers::remove_nats_lines(test_input);
assert_eq!(filtered_out.len(), 7); assert!(!filtered_out.iter().any(|line| line.contains("nats")));
let filtered_only = super::test_helpers::extract_nats_lines(test_input);
assert_eq!(filtered_only.len(), 5); assert!(filtered_only.iter().all(|line| line.contains("nats")));
let metrics_only = super::test_helpers::extract_metrics(test_input);
assert_eq!(metrics_only.len(), 6); assert!(
metrics_only
.iter()
.all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
);
println!("✓ All refactored filter functions work correctly!");
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod test_metricsregistry_nats {
use super::prometheus_names::name_prefix;
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service};
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::pipeline::PushRouter;
use crate::{DistributedRuntime, Runtime};
use tokio::time::{Duration, sleep};
#[tokio::test]
async fn test_drt_nats_metrics() {
let drt = create_test_drt_async().await;
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
println!("DRT output with NATS metrics:");
println!("{}", drt_output);
let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
assert!(
!drt_nats_metrics.is_empty(),
"NATS client metrics should be present"
);
let drt_nats_metric_lines =
super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
.iter()
.map(|line| {
let without_labels = line.split('{').next().unwrap_or(line);
without_labels.split(' ').next().unwrap_or(without_labels)
})
.collect();
let expect_drt_nats_metrics_sorted = {
let mut temp = DRT_NATS_METRICS
.iter()
.map(|metric| build_component_metric_name(metric))
.collect::<Vec<_>>();
temp.sort();
temp
};
println!(
"actual_drt_nats_metrics_sorted: {:?}",
actual_drt_nats_metrics_sorted
);
println!(
"expect_drt_nats_metrics_sorted: {:?}",
expect_drt_nats_metrics_sorted
);
assert_eq!(
actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
"DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
);
println!("✓ DistributedRuntime NATS metrics integration test passed!");
}
#[tokio::test]
async fn test_nats_metric_names() {
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns789").unwrap();
let mut component = namespace.component("comp789").unwrap();
component.add_stats_service().await.unwrap();
let component_nats_metrics = super::test_helpers::extract_nats_lines(
&component.metrics().prometheus_expfmt().unwrap(),
);
println!(
"Component NATS metrics count: {}",
component_nats_metrics.len()
);
assert!(
!component_nats_metrics.is_empty(),
"NATS client metrics should be present"
);
let component_metrics =
super::test_helpers::extract_metrics(&component.metrics().prometheus_expfmt().unwrap());
let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
.iter()
.map(|line| {
let without_labels = line.split('{').next().unwrap_or(line);
without_labels.split(' ').next().unwrap_or(without_labels)
})
.collect();
let expect_component_nats_metrics_sorted = {
let mut temp = COMPONENT_NATS_METRICS
.iter()
.map(|metric| build_component_metric_name(metric))
.collect::<Vec<_>>();
temp.sort();
temp
};
println!(
"actual_component_nats_metrics_sorted: {:?}",
actual_component_nats_metrics_sorted
);
println!(
"expect_component_nats_metrics_sorted: {:?}",
expect_component_nats_metrics_sorted
);
assert_eq!(
actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
"COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
);
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
let drt_and_component_nats_metrics =
super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
println!(
"DRT and component NATS metrics count: {}",
drt_and_component_nats_metrics.len()
);
assert_eq!(
drt_and_component_nats_metrics.len(),
DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
"DRT at this point should have both the DRT and component NATS metrics"
);
println!("✓ Component NATS metrics integration test passed!");
}
#[tokio::test]
async fn test_nats_metrics_values() -> anyhow::Result<()> {
struct MessageHandler {}
impl MessageHandler {
fn new() -> std::sync::Arc<Self> {
std::sync::Arc::new(Self {})
}
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
async fn generate(
&self,
input: SingleIn<String>,
) -> Result<ManyOut<Annotated<String>>, Error> {
let (data, ctx) = input.into_parts();
let response = data.to_string();
let stream = stream::iter(vec![Annotated::from_data(response)]);
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}
println!("\n=== Initializing DistributedRuntime ===");
let runtime = Runtime::from_current()?;
let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
let namespace = drt.namespace("ns123").unwrap();
let mut component = namespace.component("comp123").unwrap();
let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
let _backend_handle = tokio::spawn(async move {
component.add_stats_service().await.unwrap();
let endpoint = component
.endpoint("echo")
.endpoint_builder()
.handler(ingress);
endpoint.start().await.unwrap();
});
sleep(Duration::from_millis(500)).await;
println!("✓ Launched endpoint service in background successfully");
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
let parsed_metrics: Vec<_> = drt_output
.lines()
.filter_map(super::test_helpers::parse_prometheus_metric)
.collect();
println!("=== Initial DRT metrics output ===");
println!("{}", drt_output);
println!("\n=== Checking Initial Metric Values ===");
let initial_expected_metric_values = [
(
build_component_metric_name(nats_client::CONNECTION_STATE),
1.0,
1.0,
), (
build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1.0,
1.0,
), (
build_component_metric_name(nats_client::IN_TOTAL_BYTES),
800.0,
4000.0,
), (
build_component_metric_name(nats_client::IN_MESSAGES),
0.0,
5.0,
), (
build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1500.0,
5000.0,
), (
build_component_metric_name(nats_client::OUT_MESSAGES),
0.0,
5.0,
), (
build_component_metric_name(nats_service::PROCESSING_MS_AVG),
0.0,
0.0,
), (
build_component_metric_name(nats_service::ERRORS_TOTAL),
0.0,
0.0,
), (
build_component_metric_name(nats_service::REQUESTS_TOTAL),
0.0,
0.0,
), (
build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
0.0,
0.0,
), (
build_component_metric_name(nats_service::ACTIVE_SERVICES),
0.0,
2.0,
), (
build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
0.0,
2.0,
), ];
for (metric_name, min_value, max_value) in &initial_expected_metric_values {
let actual_value = parsed_metrics
.iter()
.find(|(name, _, _)| name == metric_name)
.map(|(_, _, value)| *value)
.unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
assert!(
actual_value >= *min_value && actual_value <= *max_value,
"Initial metric {} should be between {} and {}, but got {}",
metric_name,
min_value,
max_value,
actual_value
);
}
println!("\n=== Client Runtime to hit the endpoint ===");
let client_runtime = Runtime::from_current()?;
let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
let namespace = client_distributed.namespace("ns123")?;
let component = namespace.component("comp123")?;
let client = component.endpoint("echo").client().await?;
client.wait_for_instances().await?;
println!("✓ Connected to endpoint, waiting for instances...");
let router =
PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
.await?;
for i in 0..10 {
let msg = i.to_string().repeat(2000); let mut stream = router.random(msg.clone().into()).await?;
while let Some(resp) = stream.next().await {
if let Some(data) = &resp.data {
let is_same = data == &msg;
println!(
"Response {}: {} bytes, matches original: {}",
i,
data.len(),
is_same
);
}
}
}
println!("✓ Sent messages and received responses successfully");
println!("\n=== Waiting 500ms for metrics to update ===");
sleep(Duration::from_millis(500)).await;
println!("✓ Wait complete, getting final metrics...");
let final_drt_output = drt.metrics().prometheus_expfmt().unwrap();
println!("\n=== Final Prometheus DRT output ===");
println!("{}", final_drt_output);
let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
println!("\n=== Filtered NATS metrics from final DRT output ===");
for line in &final_drt_nats_output {
println!("{}", line);
}
let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
.iter()
.filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
.collect();
let post_expected_metric_values = [
(
build_component_metric_name(nats_client::CONNECTION_STATE),
1.0,
1.0,
), (
build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1.0,
1.0,
), (
build_component_metric_name(nats_client::IN_TOTAL_BYTES),
20000.0,
32000.0,
), (
build_component_metric_name(nats_client::IN_MESSAGES),
8.0,
20.0,
), (
build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
2500.0,
8000.0,
), (
build_component_metric_name(nats_client::OUT_MESSAGES),
8.0,
20.0,
), (
build_component_metric_name(nats_service::PROCESSING_MS_AVG),
0.0,
1.0,
), (
build_component_metric_name(nats_service::ERRORS_TOTAL),
0.0,
0.0,
), (
build_component_metric_name(nats_service::REQUESTS_TOTAL),
0.0,
10.0,
), (
build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
0.0,
5.0,
), (
build_component_metric_name(nats_service::ACTIVE_SERVICES),
0.0,
2.0,
), (
build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
0.0,
2.0,
), (
build_component_metric_name(work_handler::REQUESTS_TOTAL),
10.0,
10.0,
), (
build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
21000.0,
26000.0,
), (
build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
18000.0,
23000.0,
), (
build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
0.0,
1.0,
), (
format!(
"{}_count",
build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
),
10.0,
10.0,
), (
format!(
"{}_sum",
build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
),
0.0001,
1.0,
), ];
println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
for (metric_name, min_value, max_value) in &post_expected_metric_values {
let actual_value = final_parsed_metrics
.iter()
.find(|(name, _, _)| name == metric_name)
.map(|(_, _, value)| *value)
.unwrap_or_else(|| {
panic!(
"Could not find expected post-activity metric: {}",
metric_name
)
});
assert!(
actual_value >= *min_value && actual_value <= *max_value,
"Post-activity metric {} should be between {} and {}, but got {}",
metric_name,
min_value,
max_value,
actual_value
);
println!(
"✓ {}: {} (range: {} to {})",
metric_name, actual_value, min_value, max_value
);
}
println!("✓ All NATS and component metrics parsed successfully!");
println!("✓ Byte metrics verified to be >= 100 bytes!");
println!("✓ Post-activity metrics verified with higher thresholds!");
println!("✓ Work handler metrics reflect increased activity!");
Ok(())
}
}