use crate::config::{ExecutionType, ItemType};
use crate::{Closer, Error, MetricEntry, Metrics};
use async_trait::async_trait;
use tracing::debug;
pub(crate) const ALL_METRICS: &[&str] = &[
"total_received",
"total_completed",
"total_process_errors",
"total_output_errors",
"total_filtered",
"streams_started",
"streams_completed",
"duplicates_rejected",
"stale_entries_removed",
"in_flight",
"throughput_per_sec",
"cpu_usage_percent",
"memory_used_bytes",
"memory_total_bytes",
"input_bytes",
"output_bytes",
"bytes_per_sec",
"latency_avg_ms",
"latency_min_ms",
"latency_max_ms",
];
#[cfg(feature = "prometheus")]
pub mod prometheus;
#[cfg(feature = "clickhouse")]
pub mod clickhouse;
pub mod stdout;
pub(crate) fn register_plugins() -> Result<(), Error> {
#[cfg(feature = "prometheus")]
prometheus::register_prometheus()?;
#[cfg(feature = "clickhouse")]
clickhouse::register_clickhouse()?;
stdout::register_stdout()?;
Ok(())
}
#[derive(Debug, Default)]
pub struct NoOpMetrics;
impl NoOpMetrics {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl Metrics for NoOpMetrics {
fn record(&mut self, _metric: MetricEntry) {
}
}
#[async_trait]
impl Closer for NoOpMetrics {
async fn close(&mut self) -> Result<(), Error> {
debug!("noop metrics backend closing");
Ok(())
}
}
pub async fn create_metrics(
config: Option<&crate::config::MetricsConfig>,
) -> Result<Box<dyn Metrics>, Error> {
match config {
Some(cfg) => {
if cfg.extra.is_empty() {
return Ok(Box::new(NoOpMetrics::new()));
}
let parsed_item =
crate::config::parse_configuration_item(ItemType::Metrics, &cfg.extra).await?;
let execution_type = (parsed_item.creator)(parsed_item.config).await?;
match execution_type {
ExecutionType::Metrics(m) => Ok(m),
_ => Err(Error::Validation(
"Metrics plugin returned invalid execution type".into(),
)),
}
}
None => Ok(Box::new(NoOpMetrics::new())),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_noop_metrics_record() {
let mut metrics = NoOpMetrics::new();
metrics.record(MetricEntry {
total_received: 100,
total_completed: 90,
total_process_errors: 5,
total_output_errors: 5,
total_filtered: 0,
streams_started: 10,
streams_completed: 8,
duplicates_rejected: 2,
stale_entries_removed: 1,
in_flight: 50,
throughput_per_sec: 123.45,
cpu_usage_percent: None,
memory_used_bytes: None,
memory_total_bytes: None,
input_bytes: 1000,
output_bytes: 900,
bytes_per_sec: 90.0,
latency_avg_ms: 5.5,
latency_min_ms: 1.0,
latency_max_ms: 15.0,
total_retries: 0,
total_retries_exhausted: 0,
});
}
#[tokio::test]
async fn test_noop_metrics_close() {
let mut metrics = NoOpMetrics::new();
assert!(metrics.close().await.is_ok());
}
#[tokio::test]
async fn test_create_metrics_without_config() {
let mut metrics = create_metrics(None).await.unwrap();
metrics.record(MetricEntry {
total_received: 0,
total_completed: 0,
total_process_errors: 0,
total_output_errors: 0,
total_filtered: 0,
streams_started: 0,
streams_completed: 0,
duplicates_rejected: 0,
stale_entries_removed: 0,
in_flight: 0,
throughput_per_sec: 0.0,
cpu_usage_percent: None,
memory_used_bytes: None,
memory_total_bytes: None,
input_bytes: 0,
output_bytes: 0,
bytes_per_sec: 0.0,
latency_avg_ms: 0.0,
latency_min_ms: 0.0,
latency_max_ms: 0.0,
total_retries: 0,
total_retries_exhausted: 0,
});
}
#[tokio::test]
async fn test_create_metrics_with_empty_config() {
let config = crate::config::MetricsConfig::default();
let mut metrics = create_metrics(Some(&config)).await.unwrap();
metrics.record(MetricEntry {
total_received: 0,
total_completed: 0,
total_process_errors: 0,
total_output_errors: 0,
total_filtered: 0,
streams_started: 0,
streams_completed: 0,
duplicates_rejected: 0,
stale_entries_removed: 0,
in_flight: 0,
throughput_per_sec: 0.0,
cpu_usage_percent: None,
memory_used_bytes: None,
memory_total_bytes: None,
input_bytes: 0,
output_bytes: 0,
bytes_per_sec: 0.0,
latency_avg_ms: 0.0,
latency_min_ms: 0.0,
latency_max_ms: 0.0,
total_retries: 0,
total_retries_exhausted: 0,
});
}
#[cfg(feature = "prometheus")]
#[tokio::test]
async fn test_create_metrics_with_prometheus_config() {
let result = prometheus::register_prometheus();
assert!(result.is_ok() || matches!(result, Err(crate::Error::DuplicateRegisteredName(_))));
use std::collections::HashMap;
let mut extra = HashMap::new();
extra.insert(
"prometheus".to_string(),
serde_yaml::Value::Mapping(serde_yaml::Mapping::new()),
);
let config = crate::config::MetricsConfig {
label: None,
interval: 30,
collect_system_metrics: false,
extra,
};
let mut metrics = create_metrics(Some(&config)).await.unwrap();
metrics.record(MetricEntry {
total_received: 100,
total_completed: 90,
total_process_errors: 5,
total_output_errors: 5,
total_filtered: 0,
streams_started: 10,
streams_completed: 8,
duplicates_rejected: 2,
stale_entries_removed: 1,
in_flight: 50,
throughput_per_sec: 123.45,
cpu_usage_percent: None,
memory_used_bytes: None,
memory_total_bytes: None,
input_bytes: 1000,
output_bytes: 900,
bytes_per_sec: 90.0,
latency_avg_ms: 5.5,
latency_min_ms: 1.0,
latency_max_ms: 15.0,
total_retries: 0,
total_retries_exhausted: 0,
});
}
#[tokio::test]
async fn test_create_metrics_with_stdout_config() {
let result = stdout::register_stdout();
assert!(result.is_ok() || matches!(result, Err(crate::Error::DuplicateRegisteredName(_))));
use std::collections::HashMap;
let mut extra = HashMap::new();
extra.insert(
"stdout".to_string(),
serde_yaml::Value::Mapping(serde_yaml::Mapping::new()),
);
let config = crate::config::MetricsConfig {
label: None,
interval: 30,
collect_system_metrics: false,
extra,
};
let mut metrics = create_metrics(Some(&config)).await.unwrap();
metrics.record(MetricEntry {
total_received: 100,
total_completed: 90,
total_process_errors: 5,
total_output_errors: 5,
total_filtered: 0,
streams_started: 10,
streams_completed: 8,
duplicates_rejected: 2,
stale_entries_removed: 1,
in_flight: 50,
throughput_per_sec: 123.45,
cpu_usage_percent: None,
memory_used_bytes: None,
memory_total_bytes: None,
input_bytes: 1000,
output_bytes: 900,
bytes_per_sec: 90.0,
latency_avg_ms: 5.5,
latency_min_ms: 1.0,
latency_max_ms: 15.0,
total_retries: 0,
total_retries_exhausted: 0,
});
}
#[tokio::test]
async fn test_create_metrics_with_unknown_backend() {
use std::collections::HashMap;
let mut extra = HashMap::new();
extra.insert(
"unknown_backend".to_string(),
serde_yaml::Value::Mapping(serde_yaml::Mapping::new()),
);
let config = crate::config::MetricsConfig {
label: None,
interval: 30,
collect_system_metrics: false,
extra,
};
let result = create_metrics(Some(&config)).await;
assert!(result.is_err());
match result {
Err(crate::Error::ConfigurationItemNotFound(name)) => {
assert_eq!(name, "unknown_backend");
}
_ => panic!("Expected ConfigurationItemNotFound error"),
}
}
}