use crate::config::register_plugin;
use crate::config::ItemType;
use crate::config::{ConfigSpec, ExecutionType};
use crate::Error;
use crate::{Closer, MetricEntry, Metrics};
use async_trait::async_trait;
use fiddler_macros::fiddler_registration_func;
use metrics::{counter, gauge};
use metrics_exporter_prometheus::PrometheusBuilder;
use serde::Deserialize;
use serde_yaml::Value;
use std::sync::Once;
use tracing::{debug, warn};
static PROMETHEUS_INIT: Once = Once::new();
#[derive(Debug, Deserialize, Clone, Default)]
pub struct PrometheusConfig {
}
#[derive(Debug)]
pub struct PrometheusMetrics {
_initialized: bool,
_config: PrometheusConfig,
}
impl PrometheusMetrics {
pub fn new(config: Value) -> Result<Self, Error> {
let prometheus_config: PrometheusConfig = serde_yaml::from_value(config)?;
let mut initialized = false;
PROMETHEUS_INIT.call_once(|| {
match PrometheusBuilder::new().install() {
Ok(_) => {
debug!("Prometheus metrics exporter initialized");
initialized = true;
}
Err(e) => {
warn!(error = %e, "Failed to initialize Prometheus exporter, metrics will be recorded but not exposed");
}
}
});
Ok(Self {
_initialized: initialized,
_config: prometheus_config,
})
}
}
impl Default for PrometheusMetrics {
fn default() -> Self {
Self::new(Value::Null).unwrap_or(Self {
_initialized: false,
_config: PrometheusConfig::default(),
})
}
}
#[async_trait]
impl Metrics for PrometheusMetrics {
fn record(&mut self, metric: MetricEntry) {
counter!("fiddler_messages_received_total").absolute(metric.total_received);
counter!("fiddler_messages_completed_total").absolute(metric.total_completed);
counter!("fiddler_messages_process_errors_total").absolute(metric.total_process_errors);
counter!("fiddler_messages_output_errors_total").absolute(metric.total_output_errors);
counter!("fiddler_messages_filtered_total").absolute(metric.total_filtered);
counter!("fiddler_streams_started_total").absolute(metric.streams_started);
counter!("fiddler_streams_completed_total").absolute(metric.streams_completed);
counter!("fiddler_duplicates_rejected_total").absolute(metric.duplicates_rejected);
counter!("fiddler_stale_entries_removed_total").absolute(metric.stale_entries_removed);
gauge!("fiddler_messages_in_flight").set(metric.in_flight as f64);
gauge!("fiddler_throughput_per_second").set(metric.throughput_per_sec);
if let Some(cpu) = metric.cpu_usage_percent {
gauge!("fiddler_cpu_usage_percent").set(cpu as f64);
}
if let Some(mem_used) = metric.memory_used_bytes {
gauge!("fiddler_memory_used_bytes").set(mem_used as f64);
}
if let Some(mem_total) = metric.memory_total_bytes {
gauge!("fiddler_memory_total_bytes").set(mem_total as f64);
}
counter!("fiddler_input_bytes_total").absolute(metric.input_bytes);
counter!("fiddler_output_bytes_total").absolute(metric.output_bytes);
gauge!("fiddler_bytes_per_second").set(metric.bytes_per_sec);
gauge!("fiddler_latency_avg_ms").set(metric.latency_avg_ms);
gauge!("fiddler_latency_min_ms").set(metric.latency_min_ms);
gauge!("fiddler_latency_max_ms").set(metric.latency_max_ms);
}
}
#[async_trait]
impl Closer for PrometheusMetrics {
async fn close(&mut self) -> Result<(), Error> {
debug!("Prometheus metrics backend closing");
Ok(())
}
}
#[fiddler_registration_func]
fn create_prometheus(conf: Value) -> Result<ExecutionType, Error> {
Ok(ExecutionType::Metrics(Box::new(PrometheusMetrics::new(
conf,
)?)))
}
pub(crate) fn register_prometheus() -> Result<(), Error> {
let config = "type: object";
let conf_spec = ConfigSpec::from_schema(config)?;
register_plugin(
"prometheus".into(),
ItemType::Metrics,
conf_spec,
create_prometheus,
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prometheus_metrics_record() {
let mut metrics = PrometheusMetrics::default();
metrics.record(MetricEntry {
total_received: 100,
total_completed: 90,
total_process_errors: 5,
total_output_errors: 5,
total_filtered: 0,
streams_started: 10,
streams_completed: 8,
duplicates_rejected: 2,
stale_entries_removed: 1,
in_flight: 50,
throughput_per_sec: 123.45,
cpu_usage_percent: None,
memory_used_bytes: None,
memory_total_bytes: None,
input_bytes: 1000,
output_bytes: 900,
bytes_per_sec: 90.0,
latency_avg_ms: 5.5,
latency_min_ms: 1.0,
latency_max_ms: 15.0,
total_retries: 0,
total_retries_exhausted: 0,
});
}
#[tokio::test]
async fn test_prometheus_metrics_close() {
let mut metrics = PrometheusMetrics::default();
assert!(metrics.close().await.is_ok());
}
#[test]
fn test_register_prometheus() {
let result = register_prometheus();
assert!(result.is_ok() || matches!(result, Err(crate::Error::DuplicateRegisteredName(_))));
}
}