carbon_core/
metrics.rs

1//! A trait for collecting and managing performance metrics within the pipeline.
2//!
3//! The `Metrics` trait defines a set of asynchronous methods for initializing,
4//! updating, flushing, and shutting down metrics in the `carbon-core`
5//! framework. This trait is designed for tracking various types of metrics,
6//! such as counters, gauges, and histograms, to monitor performance and
7//! operational health in real time.
8//!
9//! ## Key Concepts
10//!
11//! - **Gauges**: Track the value of a particular metric at a specific point in
12//!   time. Useful for monitoring values that can fluctuate, like the number of
13//!   queued updates.
14//! - **Counters**: Track the number of times an event has occurred, such as
15//!   successful or failed update processing.
16//! - **Histograms**: Measure the distribution of values, such as processing
17//!   times, allowing insights into latency or response times.
18//!
19//! ## Implementing the Trait
20//!
21//! To implement `Metrics`, provide implementations for each method, typically
22//! sending metrics data to a monitoring system or backend service for
23//! visualization and alerting. The trait requires `async` functions, allowing
24//! implementations to perform non-blocking I/O operations, such as network
25//! requests or database writes.
26
27use {crate::error::CarbonResult, async_trait::async_trait, std::sync::Arc};
28
29#[async_trait]
30pub trait Metrics: Send + Sync {
31    /// Initializes the metrics system, preparing it for data collection.
32    async fn initialize(&self) -> CarbonResult<()>;
33    /// Flushes any buffered metrics data to ensure all metrics are reported.
34    async fn flush(&self) -> CarbonResult<()>;
35    /// Shuts down the metrics system, performing cleanup and ensuring all data
36    /// is flushed.
37    async fn shutdown(&self) -> CarbonResult<()>;
38
39    /// Updates a gauge metric, setting its value to represent the current
40    /// state.
41    ///
42    /// # Parameters
43    ///
44    /// - `name`: The name of the gauge metric to update.
45    /// - `value`: The current value of the gauge metric.
46    async fn update_gauge(&self, name: &str, value: f64) -> CarbonResult<()>;
47
48    /// Increments a counter metric by a specified value.
49    ///
50    /// # Parameters
51    ///
52    /// - `name`: The name of the counter metric to increment.
53    /// - `value`: The amount by which to increment the counter.
54    async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()>;
55
56    /// Records a value in a histogram metric, representing distribution data.
57    ///
58    /// # Parameters
59    ///
60    /// - `name`: The name of the histogram metric to record.
61    /// - `value`: The value to add to the histogram, typically representing
62    ///   time or size.
63    async fn record_histogram(&self, name: &str, value: f64) -> CarbonResult<()>;
64}
65
66#[derive(Default)]
67pub struct MetricsCollection {
68    pub metrics: Vec<Arc<dyn Metrics>>,
69}
70
71impl MetricsCollection {
72    pub fn new(metrics: Vec<Arc<dyn Metrics>>) -> Self {
73        Self { metrics }
74    }
75
76    pub async fn initialize_metrics(&self) -> CarbonResult<()> {
77        for metric in &self.metrics {
78            metric.initialize().await?;
79        }
80        Ok(())
81    }
82
83    pub async fn shutdown_metrics(&self) -> CarbonResult<()> {
84        for metric in &self.metrics {
85            metric.shutdown().await?;
86        }
87        Ok(())
88    }
89
90    pub async fn flush_metrics(&self) -> CarbonResult<()> {
91        for metric in &self.metrics {
92            metric.flush().await?;
93        }
94        Ok(())
95    }
96
97    pub async fn update_gauge(&self, name: &str, value: f64) -> CarbonResult<()> {
98        for metric in &self.metrics {
99            metric.update_gauge(name, value).await?;
100        }
101        Ok(())
102    }
103
104    pub async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()> {
105        for metric in &self.metrics {
106            metric.increment_counter(name, value).await?;
107        }
108        Ok(())
109    }
110
111    pub async fn record_histogram(&self, name: &str, value: f64) -> CarbonResult<()> {
112        for metric in &self.metrics {
113            metric.record_histogram(name, value).await?;
114        }
115        Ok(())
116    }
117}