pub mod prometheus_names;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::sync::Arc;
use crate::component::ComponentBuilder;
use anyhow;
use once_cell::sync::Lazy;
use regex::Regex;
use std::any::Any;
use std::collections::HashMap;
use prometheus_names::{
build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
sanitize_prometheus_name, work_handler,
};
use crate::pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait,
network::Ingress,
};
use crate::protocols::annotated::Annotated;
use crate::stream;
use crate::stream::StreamExt;
use prometheus::Encoder;
fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> {
let mut seen_keys = std::collections::HashSet::new();
for (key, _) in labels {
if !seen_keys.insert(*key) {
return Err(anyhow::anyhow!(
"Duplicate label key '{}' found in labels",
key
));
}
}
Ok(())
}
pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error>
where
Self: Sized;
fn with_histogram_opts_and_buckets(
_opts: prometheus::HistogramOpts,
_buckets: Option<Vec<f64>>,
) -> Result<Self, prometheus::Error>
where
Self: Sized,
{
panic!("with_histogram_opts_and_buckets is not implemented for this metric type");
}
fn with_opts_and_label_names(
_opts: prometheus::Opts,
_label_names: &[&str],
) -> Result<Self, prometheus::Error>
where
Self: Sized,
{
panic!("with_opts_and_label_names is not implemented for this metric type");
}
}
impl PrometheusMetric for prometheus::Counter {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::Counter::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::IntCounter {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::IntCounter::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::Gauge {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::Gauge::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::IntGauge {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
prometheus::IntGauge::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::GaugeVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::GaugeVec::new(opts, label_names)
}
}
impl PrometheusMetric for prometheus::IntGaugeVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::IntGaugeVec::new(opts, label_names)
}
}
impl PrometheusMetric for prometheus::IntCounterVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::IntCounterVec::new(opts, label_names)
}
}
impl PrometheusMetric for prometheus::Histogram {
fn with_opts(opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help);
prometheus::Histogram::with_opts(histogram_opts)
}
fn with_histogram_opts_and_buckets(
mut opts: prometheus::HistogramOpts,
buckets: Option<Vec<f64>>,
) -> Result<Self, prometheus::Error> {
if let Some(custom_buckets) = buckets {
opts = opts.buckets(custom_buckets);
}
prometheus::Histogram::with_opts(opts)
}
}
impl PrometheusMetric for prometheus::CounterVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
panic!("CounterVec requires label names, use with_opts_and_label_names instead");
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::CounterVec::new(opts, label_names)
}
}
pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
hierarchy: &H,
metric_name: &str,
metric_desc: &str,
labels: &[(&str, &str)],
buckets: Option<Vec<f64>>,
const_labels: Option<&[&str]>,
) -> anyhow::Result<T> {
validate_no_duplicate_label_keys(labels)?;
let basename = hierarchy.basename();
let parent_hierarchies = hierarchy.parent_hierarchies();
let mut hierarchy_names: Vec<String> =
parent_hierarchies.iter().map(|p| p.basename()).collect();
hierarchy_names.push(basename.clone());
let metric_name = build_component_metric_name(metric_name);
let mut updated_labels: Vec<(String, String)> = Vec::new();
for (key, _) in labels {
if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT {
return Err(anyhow::anyhow!(
"Label '{}' is automatically added by auto-label injection and cannot be manually set",
key
));
}
}
if hierarchy_names.len() > 1 {
let namespace = &hierarchy_names[1];
if !namespace.is_empty() {
let valid_namespace = sanitize_prometheus_label(namespace)?;
if !valid_namespace.is_empty() {
updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace));
}
}
}
if hierarchy_names.len() > 2 {
let component = &hierarchy_names[2];
if !component.is_empty() {
let valid_component = sanitize_prometheus_label(component)?;
if !valid_component.is_empty() {
updated_labels.push((labels::COMPONENT.to_string(), valid_component));
}
}
}
if hierarchy_names.len() > 3 {
let endpoint = &hierarchy_names[3];
if !endpoint.is_empty() {
let valid_endpoint = sanitize_prometheus_label(endpoint)?;
if !valid_endpoint.is_empty() {
updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint));
}
}
}
updated_labels.extend(
labels
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string())),
);
let prometheus_metric = if std::any::TypeId::of::<T>()
== std::any::TypeId::of::<prometheus::CounterVec>()
{
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for CounterVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for GaugeVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
if const_labels.is_some() {
return Err(anyhow::anyhow!(
"const_labels parameter is not valid for Histogram"
));
}
let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
T::with_histogram_opts_and_buckets(opts, buckets)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for IntCounterVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for IntGaugeVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else {
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
));
}
if const_labels.is_some() {
return Err(anyhow::anyhow!(
"const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
T::with_opts(opts)?
};
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
hierarchy.get_metrics_registry().add_metric(collector)?;
Ok(prometheus_metric)
}
pub struct Metrics<H: MetricsHierarchy> {
hierarchy: H,
}
impl<H: MetricsHierarchy> Metrics<H> {
pub fn new(hierarchy: H) -> Self {
Self { hierarchy }
}
pub fn create_counter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::Counter> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_countervec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::CounterVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn create_gauge(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::Gauge> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_gaugevec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::GaugeVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn create_histogram(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
buckets: Option<Vec<f64>>,
) -> anyhow::Result<prometheus::Histogram> {
create_metric(&self.hierarchy, name, description, labels, buckets, None)
}
pub fn create_intcounter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntCounter> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_intcountervec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntCounterVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn create_intgauge(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntGauge> {
create_metric(&self.hierarchy, name, description, labels, None, None)
}
pub fn create_intgaugevec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntGaugeVec> {
create_metric(
&self.hierarchy,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
self.hierarchy
.get_metrics_registry()
.prometheus_expfmt_combined()
}
}
use crate::traits::DistributedRuntimeProvider;
pub trait MetricsHierarchy: Send + Sync {
fn basename(&self) -> String;
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
fn get_metrics_registry(&self) -> &MetricsRegistry;
fn metrics(&self) -> Metrics<&Self>
where
Self: Sized,
{
Metrics::new(self)
}
}
impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
fn basename(&self) -> String {
(**self).basename()
}
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
(**self).parent_hierarchies()
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
(**self).get_metrics_registry()
}
}
pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
pub type PrometheusExpositionFormatCallback =
Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
#[derive(Clone)]
pub struct MetricsRegistry {
pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
pub prometheus_expfmt_callbacks:
Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
}
impl std::fmt::Debug for MetricsRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsRegistry")
.field("prometheus_registry", &"<RwLock<Registry>>")
.field(
"prometheus_update_callbacks",
&format!(
"<RwLock<Vec<Callback>>> with {} callbacks",
self.prometheus_update_callbacks.read().unwrap().len()
),
)
.field(
"prometheus_expfmt_callbacks",
&format!(
"<RwLock<Vec<Callback>>> with {} callbacks",
self.prometheus_expfmt_callbacks.read().unwrap().len()
),
)
.finish()
}
}
impl MetricsRegistry {
pub fn new() -> Self {
Self {
prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
}
}
pub fn add_child_registry(&self, child: &MetricsRegistry) {
let child_ptr = Arc::as_ptr(&child.prometheus_registry);
let mut guard = self.child_registries.write().unwrap();
if guard
.iter()
.any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
{
return;
}
guard.push(child.clone());
}
fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
fn visit(
registry: &MetricsRegistry,
out: &mut Vec<MetricsRegistry>,
seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
) {
let ptr = Arc::as_ptr(®istry.prometheus_registry);
if !seen.insert(ptr) {
return;
}
out.push(registry.clone());
let children: Vec<MetricsRegistry> = registry
.child_registries
.read()
.unwrap()
.iter()
.cloned()
.collect();
for child in children {
visit(&child, out, seen);
}
}
let mut out = Vec::new();
let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
visit(self, &mut out, &mut seen);
out
}
pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
let registries = self.registries_for_combined_scrape();
for registry in ®istries {
for result in registry.execute_update_callbacks() {
if let Err(e) = result {
tracing::error!("Error executing metrics callback: {}", e);
}
}
}
let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
let mut seen_series: HashSet<String> = HashSet::new();
for (registry_idx, registry) in registries.iter().enumerate() {
let families = registry.get_prometheus_registry().gather();
for mut family in families {
let name = family.name().to_string();
let entry = by_name.entry(name.clone()).or_insert_with(|| {
let mut out = prometheus::proto::MetricFamily::new();
out.set_name(name.clone());
out.set_help(family.help().to_string());
out.set_field_type(family.get_field_type());
out
});
if entry.help() != family.help()
|| entry.get_field_type() != family.get_field_type()
{
return Err(anyhow::anyhow!(
"Metric family '{}' has inconsistent help/type across registries (idx={})",
name,
registry_idx
));
}
let mut metrics = family.take_metric();
for metric in metrics.drain(..) {
let mut labels: Vec<(String, String)> = metric
.get_label()
.iter()
.map(|lp| (lp.name().to_string(), lp.value().to_string()))
.collect();
labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
let key = format!(
"{}|{}",
name,
labels
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
);
if !seen_series.insert(key) {
tracing::warn!(
metric_name = %name,
labels = ?labels,
registry_idx,
"Duplicate Prometheus series while merging registries; dropping later sample"
);
continue;
}
entry.mut_metric().push(metric);
}
}
}
let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
merged.sort_by(|a, b| a.name().cmp(b.name()));
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&merged, &mut buffer)?;
let mut result = String::from_utf8(buffer)?;
let mut expfmt = String::new();
for registry in registries {
let text = registry.execute_expfmt_callbacks();
if !text.is_empty() {
if !expfmt.is_empty() && !expfmt.ends_with('\n') {
expfmt.push('\n');
}
expfmt.push_str(&text);
}
}
if !expfmt.is_empty() {
if !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&expfmt);
}
Ok(result)
}
pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
self.prometheus_update_callbacks
.write()
.unwrap()
.push(callback);
}
pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
self.prometheus_expfmt_callbacks
.write()
.unwrap()
.push(callback);
}
pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
self.prometheus_update_callbacks
.read()
.unwrap()
.iter()
.map(|callback| callback())
.collect()
}
pub fn execute_expfmt_callbacks(&self) -> String {
let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
let mut result = String::new();
for callback in callbacks.iter() {
match callback() {
Ok(text) => {
if !text.is_empty() {
if !result.is_empty() && !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&text);
}
}
Err(e) => {
tracing::error!("Error executing exposition text callback: {}", e);
}
}
}
result
}
pub fn add_metric(
&self,
collector: Box<dyn prometheus::core::Collector>,
) -> anyhow::Result<()> {
self.prometheus_registry
.write()
.unwrap()
.register(collector)
.map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
}
pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
self.prometheus_registry.read().unwrap()
}
pub fn has_metric_named(&self, metric_name: &str) -> bool {
self.prometheus_registry
.read()
.unwrap()
.gather()
.iter()
.any(|mf| mf.name() == metric_name)
}
}
impl Default for MetricsRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod test_helpers {
use super::prometheus_names::name_prefix;
use super::*;
fn filter_prometheus_lines<F>(input: &str, mut predicate: F) -> Vec<String>
where
F: FnMut(&str) -> bool,
{
input
.lines()
.filter(|line| predicate(line))
.map(|line| line.to_string())
.collect::<Vec<_>>()
}
pub fn extract_metrics(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
line.starts_with(&format!("{}_", name_prefix::COMPONENT))
&& !line.starts_with("#")
&& !line.trim().is_empty()
})
}
pub fn parse_prometheus_metric(
line: &str,
) -> Option<(String, std::collections::HashMap<String, String>, f64)> {
if line.trim().is_empty() || line.starts_with('#') {
return None;
}
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 2 {
return None;
}
let metric_part = parts[0];
let value: f64 = parts[1].parse().ok()?;
let (name, labels) = if metric_part.contains('{') {
let brace_start = metric_part.find('{').unwrap();
let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len());
let name = &metric_part[..brace_start];
let labels_str = &metric_part[brace_start + 1..brace_end];
let mut labels = std::collections::HashMap::new();
for pair in labels_str.split(',') {
if let Some((k, v)) = pair.split_once('=') {
let v = v.trim_matches('"');
labels.insert(k.trim().to_string(), v.to_string());
}
}
(name.to_string(), labels)
} else {
(metric_part.to_string(), std::collections::HashMap::new())
};
Some((name, labels, value))
}
}
#[cfg(test)]
mod test_metricsregistry_units {
use super::*;
#[test]
fn test_build_component_metric_name_with_prefix() {
let result = build_component_metric_name("requests");
assert_eq!(result, "dynamo_component_requests");
let result = build_component_metric_name("counter");
assert_eq!(result, "dynamo_component_counter");
}
#[test]
fn test_parse_prometheus_metric() {
use super::test_helpers::parse_prometheus_metric;
use std::collections::HashMap;
let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234";
let parsed = parse_prometheus_metric(line);
assert!(parsed.is_some());
let (name, labels, value) = parsed.unwrap();
assert_eq!(name, "http_requests_total");
let mut expected_labels = HashMap::new();
expected_labels.insert("method".to_string(), "GET".to_string());
expected_labels.insert("status".to_string(), "200".to_string());
assert_eq!(labels, expected_labels);
assert_eq!(value, 1234.0);
let line = "cpu_usage 98.5";
let parsed = parse_prometheus_metric(line);
assert!(parsed.is_some());
let (name, labels, value) = parsed.unwrap();
assert_eq!(name, "cpu_usage");
assert!(labels.is_empty());
assert_eq!(value, 98.5);
let line = "response_time{service=\"api\"} 0.123";
let parsed = parse_prometheus_metric(line);
assert!(parsed.is_some());
let (name, labels, value) = parsed.unwrap();
assert_eq!(name, "response_time");
let mut expected_labels = HashMap::new();
expected_labels.insert("service".to_string(), "api".to_string());
assert_eq!(labels, expected_labels);
assert_eq!(value, 0.123);
assert!(parse_prometheus_metric("").is_none()); assert!(parse_prometheus_metric("# HELP metric description").is_none()); assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); assert!(parse_prometheus_metric("metric_name").is_none());
println!("✓ Prometheus metric parsing works correctly!");
}
#[test]
fn test_metrics_registry_entry_callbacks() {
use crate::MetricsRegistry;
use std::sync::atomic::{AtomicUsize, Ordering};
{
let registry = MetricsRegistry::new();
let counter = Arc::new(AtomicUsize::new(0));
for increment in [1, 10, 100] {
let counter_clone = counter.clone();
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(increment, Ordering::SeqCst);
Ok(())
}));
}
assert_eq!(counter.load(Ordering::SeqCst), 0);
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results.iter().all(|r| r.is_ok()));
assert_eq!(counter.load(Ordering::SeqCst), 111);
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 222);
let cloned = registry.clone();
assert_eq!(cloned.execute_update_callbacks().len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 333);
registry.execute_update_callbacks();
assert_eq!(counter.load(Ordering::SeqCst), 444); }
{
let registry = MetricsRegistry::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
Ok(())
}));
registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
let counter_clone = counter.clone();
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(10, Ordering::SeqCst);
Ok(())
}));
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results[0].is_ok());
assert!(results[1].is_err());
assert!(results[2].is_ok());
assert_eq!(
results[1].as_ref().unwrap_err().to_string(),
"Simulated error"
);
assert_eq!(counter.load(Ordering::SeqCst), 11);
let results = registry.execute_update_callbacks();
assert!(results[1].is_err());
assert_eq!(counter.load(Ordering::SeqCst), 22); }
{
let registry = MetricsRegistry::new();
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 0);
}
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod test_metricsregistry_prefixes {
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::core::Collector;
#[tokio::test]
async fn test_hierarchical_prefixes_and_parent_hierarchies() {
let drt = create_test_drt_async().await;
const DRT_NAME: &str = "";
const NAMESPACE_NAME: &str = "ns901";
const COMPONENT_NAME: &str = "comp901";
const ENDPOINT_NAME: &str = "ep901";
let namespace = drt.namespace(NAMESPACE_NAME).unwrap();
let component = namespace.component(COMPONENT_NAME).unwrap();
let endpoint = component.endpoint(ENDPOINT_NAME);
assert_eq!(drt.basename(), DRT_NAME);
assert_eq!(drt.parent_hierarchies().len(), 0);
assert_eq!(namespace.basename(), NAMESPACE_NAME);
assert_eq!(namespace.parent_hierarchies().len(), 1);
assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(component.basename(), COMPONENT_NAME);
assert_eq!(component.parent_hierarchies().len(), 2);
assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
assert_eq!(endpoint.basename(), ENDPOINT_NAME);
assert_eq!(endpoint.parent_hierarchies().len(), 3);
assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
assert!(
namespace
.parent_hierarchies()
.iter()
.any(|h| h.basename() == drt.basename())
);
assert!(
component
.parent_hierarchies()
.iter()
.any(|h| h.basename() == namespace.basename())
);
assert!(
endpoint
.parent_hierarchies()
.iter()
.any(|h| h.basename() == component.basename())
);
assert_eq!(drt.parent_hierarchies().len(), 0);
assert_eq!(namespace.parent_hierarchies().len(), 1);
assert_eq!(component.parent_hierarchies().len(), 2);
assert_eq!(endpoint.parent_hierarchies().len(), 3);
let invalid_namespace = drt.namespace("@@123").unwrap();
let result =
invalid_namespace
.metrics()
.create_counter("test_counter", "A test counter", &[]);
assert!(result.is_ok());
if let Ok(counter) = &result {
let desc = counter.desc();
let namespace_label = desc[0]
.const_label_pairs
.iter()
.find(|l| l.name() == "dynamo_namespace")
.expect("Should have dynamo_namespace label");
assert_eq!(namespace_label.value(), "_123");
}
let valid_namespace = drt.namespace("ns567").unwrap();
assert!(
valid_namespace
.metrics()
.create_counter("test_counter", "A test counter", &[])
.is_ok()
);
}
#[tokio::test]
async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() {
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns_expfmt_ep_only").unwrap();
let component = namespace.component("comp_expfmt_ep_only").unwrap();
let endpoint = component.endpoint("ep_expfmt_ep_only");
let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n";
let callback: PrometheusExpositionFormatCallback =
Arc::new(move || Ok(metric_line.to_string()));
endpoint
.get_metrics_registry()
.add_expfmt_callback(callback);
let output = drt.metrics().prometheus_expfmt().unwrap();
let occurrences = output
.lines()
.filter(|line| line == &metric_line.trim_end_matches('\n'))
.count();
assert_eq!(
occurrences, 1,
"endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}",
occurrences, output
);
}
#[tokio::test]
async fn test_recursive_namespace() {
let drt = create_test_drt_async().await;
let ns1 = drt.namespace("ns1").unwrap();
let ns2 = ns1.namespace("ns2").unwrap();
let ns3 = ns2.namespace("ns3").unwrap();
let component = ns3.component("test-component").unwrap();
assert_eq!(ns1.basename(), "ns1");
assert_eq!(ns1.parent_hierarchies().len(), 1);
assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
assert_eq!(ns2.basename(), "ns2");
assert_eq!(ns2.parent_hierarchies().len(), 2);
assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(ns3.basename(), "ns3");
assert_eq!(ns3.parent_hierarchies().len(), 3);
assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
assert_eq!(component.basename(), "test-component");
assert_eq!(component.parent_hierarchies().len(), 4);
assert_eq!(component.parent_hierarchies()[0].basename(), "");
assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
println!("✓ Chained namespace test passed - all prefixes correct");
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod test_metricsregistry_prometheus_fmt_outputs {
use super::prometheus_names::name_prefix;
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::Counter;
use std::sync::Arc;
#[tokio::test]
async fn test_prometheusfactory_using_metrics_registry_trait() {
let drt = create_test_drt_async().await;
let namespace_name = "ns345";
let namespace = drt.namespace(namespace_name).unwrap();
let component = namespace.component("comp345").unwrap();
let endpoint = component.endpoint("ep345");
let counter = endpoint
.metrics()
.create_counter("testcounter", "A test counter", &[])
.unwrap();
counter.inc_by(123.456789);
let epsilon = 0.01;
assert!((counter.get() - 123.456789).abs() < epsilon);
let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
println!("Endpoint output:");
println!("{}", endpoint_output_raw);
let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
assert_eq!(
endpoint_output_raw.trim_end_matches('\n'),
expected_endpoint_output.trim_end_matches('\n'),
"\n=== ENDPOINT COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
endpoint_output_raw,
expected_endpoint_output
);
let gauge = component
.metrics()
.create_gauge("testgauge", "A test gauge", &[])
.unwrap();
gauge.set(50000.0);
assert_eq!(gauge.get(), 50000.0);
let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
println!("Component output:");
println!("{}", component_output_raw);
let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
assert_eq!(
component_output_raw.trim_end_matches('\n'),
expected_component_output.trim_end_matches('\n'),
"\n=== COMPONENT COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
component_output_raw,
expected_component_output
);
let intcounter = namespace
.metrics()
.create_intcounter("testintcounter", "A test int counter", &[])
.unwrap();
intcounter.inc_by(12345);
assert_eq!(intcounter.get(), 12345);
let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
println!("Namespace output:");
println!("{}", namespace_output_raw);
let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
# HELP dynamo_component_testintcounter A test int counter
# TYPE dynamo_component_testintcounter counter
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
assert_eq!(
namespace_output_raw.trim_end_matches('\n'),
expected_namespace_output.trim_end_matches('\n'),
"\n=== NAMESPACE COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
namespace_output_raw,
expected_namespace_output
);
let intgauge = namespace
.metrics()
.create_intgauge("testintgauge", "A test int gauge", &[])
.unwrap();
intgauge.set(42);
assert_eq!(intgauge.get(), 42);
let intgaugevec = namespace
.metrics()
.create_intgaugevec(
"testintgaugevec",
"A test int gauge vector",
&["instance", "status"],
&[("service", "api")],
)
.unwrap();
intgaugevec
.with_label_values(&["server1", "active"])
.set(10);
intgaugevec
.with_label_values(&["server2", "inactive"])
.set(0);
let countervec = endpoint
.metrics()
.create_countervec(
"testcountervec",
"A test counter vector",
&["method", "status"],
&[("service", "api")],
)
.unwrap();
countervec.with_label_values(&["GET", "200"]).inc_by(10.0);
countervec.with_label_values(&["POST", "201"]).inc_by(5.0);
let histogram = component
.metrics()
.create_histogram("testhistogram", "A test histogram", &[], None)
.unwrap();
histogram.observe(1.0);
histogram.observe(2.5);
histogram.observe(4.0);
let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
println!("DRT output:");
println!("{}", drt_output_raw);
let expected_drt_output_without_uptime = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testcountervec A test counter vector
# TYPE dynamo_component_testcountervec counter
dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
# HELP dynamo_component_testhistogram A test histogram
# TYPE dynamo_component_testhistogram histogram
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
# HELP dynamo_component_testintcounter A test int counter
# TYPE dynamo_component_testintcounter counter
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
# HELP dynamo_component_testintgauge A test int gauge
# TYPE dynamo_component_testintgauge gauge
dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
# HELP dynamo_component_testintgaugevec A test int gauge vector
# TYPE dynamo_component_testintgaugevec gauge
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#;
let mut non_uptime_lines = Vec::new();
let mut uptime_value: Option<f64> = None;
for line in drt_output_raw.trim_end_matches('\n').lines() {
if line.starts_with("dynamo_component_uptime_seconds ") {
let val_str = line
.strip_prefix("dynamo_component_uptime_seconds ")
.unwrap();
uptime_value = Some(val_str.parse::<f64>().expect("uptime should be a float"));
} else if line.starts_with("# HELP dynamo_component_uptime_seconds")
|| line.starts_with("# TYPE dynamo_component_uptime_seconds")
{
} else {
non_uptime_lines.push(line);
}
}
let actual_without_uptime = non_uptime_lines.join("\n");
assert_eq!(
actual_without_uptime,
expected_drt_output_without_uptime.trim_end_matches('\n'),
"\n=== DRT COMPARISON FAILED (excluding uptime) ===\n\
Expected:\n{}\n\
Actual:\n{}\n\
==============================",
expected_drt_output_without_uptime,
actual_without_uptime
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
let uptime_after: f64 = drt_output_after
.lines()
.find(|l| l.starts_with("dynamo_component_uptime_seconds "))
.expect("uptime_seconds metric should be present after sleep")
.strip_prefix("dynamo_component_uptime_seconds ")
.unwrap()
.parse()
.expect("uptime should be a float");
assert!(
uptime_after > 0.0,
"uptime_seconds should be > 0 after 10ms sleep, got {}",
uptime_after
);
println!("✓ All Prometheus format outputs verified successfully!");
}
#[test]
fn test_refactored_filter_functions() {
let test_input = r#"# HELP dynamo_component_requests Total requests
# TYPE dynamo_component_requests counter
dynamo_component_requests 42
# HELP dynamo_component_latency Response latency
# TYPE dynamo_component_latency histogram
dynamo_component_latency_bucket{le="0.1"} 10
dynamo_component_latency_bucket{le="0.5"} 25
dynamo_component_errors_total 5"#;
let metrics_only = super::test_helpers::extract_metrics(test_input);
assert_eq!(metrics_only.len(), 4); assert!(
metrics_only
.iter()
.all(|line| line.starts_with("dynamo_component") && !line.starts_with("#"))
);
println!("✓ All refactored filter functions work correctly!");
}
#[tokio::test]
async fn test_same_metric_name_different_endpoints() {
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns_test").unwrap();
let component = namespace.component("comp_test").unwrap();
let ep1 = component.endpoint("ep1");
let ep2 = component.endpoint("ep2");
let counter1 = ep1
.metrics()
.create_counter("requests_total", "Total requests", &[])
.unwrap();
counter1.inc_by(100.0);
let counter2 = ep2
.metrics()
.create_counter("requests_total", "Total requests", &[])
.unwrap();
counter2.inc_by(200.0);
let output = component.metrics().prometheus_expfmt().unwrap();
let expected_output = r#"# HELP dynamo_component_requests_total Total requests
# TYPE dynamo_component_requests_total counter
dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#;
assert_eq!(
output.trim_end_matches('\n'),
expected_output.trim_end_matches('\n'),
"\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
output,
expected_output
);
println!("✓ Multi-registry prevents Prometheus collisions!");
}
#[tokio::test]
async fn test_duplicate_series_warning() {
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns_dup").unwrap();
let component = namespace.component("comp_dup").unwrap();
let ep1 = component.endpoint("ep_same");
let ep2 = component.endpoint("ep_same");
let counter1 = ep1
.metrics()
.create_counter("dup_metric", "Duplicate metric test", &[])
.unwrap();
counter1.inc_by(50.0);
let counter2 = ep2
.metrics()
.create_counter("dup_metric", "Duplicate metric test", &[])
.unwrap();
counter2.inc_by(75.0);
let output = component.metrics().prometheus_expfmt().unwrap();
let expected_output = r#"# HELP dynamo_component_dup_metric Duplicate metric test
# TYPE dynamo_component_dup_metric counter
dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#;
assert_eq!(
output.trim_end_matches('\n'),
expected_output.trim_end_matches('\n'),
"\n=== DEDUPLICATION COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
output,
expected_output
);
println!("✓ Duplicate series detection and deduplication works!");
}
}