adaptive_pipeline/infrastructure/metrics/
service.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Metrics Service Implementation
9//!
10//! Prometheus-based metrics collection and export for pipeline observability.
11//! Provides execution metrics (processed pipelines, duration, bytes, chunks),
12//! performance metrics (throughput, compression ratio), system health (active
13//! pipelines, queue depth, CPU/memory), and error tracking. Thread-safe with
14//! low overhead. See mdBook for detailed metric catalog and integration guide.
15
16use byte_unit::Byte;
17use prometheus::{Gauge, GaugeVec, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry};
18use std::sync::Arc;
19use tracing::debug;
20
21use adaptive_pipeline_domain::entities::processing_metrics::ProcessingMetrics;
22use adaptive_pipeline_domain::error::PipelineError;
23
24/// Prometheus metrics service for pipeline observability
25///
26/// This service handles all metrics collection and export to Prometheus,
27/// providing comprehensive monitoring and observability for the adaptive
28/// pipeline system.
29///
30/// # Design Principles
31///
32/// - **Separation of Concerns**: Metrics are handled by dedicated observability
33///   infrastructure
34/// - **Performance**: Low overhead metric collection with minimal impact on
35///   processing
36/// - **Reliability**: Thread-safe operations with automatic error handling
37/// - **Observability**: Comprehensive coverage of all system operations
38///
39/// # Metric Categories
40///
41/// The service tracks metrics across several categories:
42/// - **Execution Metrics**: Pipeline processing statistics
43/// - **Performance Metrics**: Throughput and efficiency measurements
44/// - **System Metrics**: Resource utilization and health indicators
45/// - **Error Metrics**: Error rates and failure analysis
46///
47/// # Examples
48#[derive(Clone)]
49pub struct MetricsService {
50    registry: Arc<Registry>,
51
52    // Pipeline execution metrics
53    pipelines_processed_total: IntCounter,
54    pipeline_processing_duration: Histogram,
55    pipeline_bytes_processed_total: IntCounter,
56    pipeline_chunks_processed_total: IntCounter,
57    pipeline_errors_total: IntCounter,
58    pipeline_warnings_total: IntCounter,
59
60    // Performance metrics
61    throughput_mbps: Gauge,
62    compression_ratio: Gauge,
63
64    // System metrics
65    active_pipelines: IntGauge,
66
67    // Debug stage metrics (for diagnostic stages)
68    debug_stage_bytes: GaugeVec,
69    debug_stage_chunks_total: IntCounterVec,
70}
71
72impl MetricsService {
73    /// Create a new MetricsService with Prometheus registry
74    pub fn new() -> Result<Self, PipelineError> {
75        let registry = Registry::new();
76
77        // Create pipeline execution counters
78        let pipelines_processed_total = IntCounter::with_opts(
79            Opts::new("pipeline_processed_total", "Total number of pipelines processed").namespace("adaptive_pipeline"),
80        )
81        .map_err(|e| {
82            PipelineError::metrics_error(format!("Failed to create pipelines_processed_total metric: {}", e))
83        })?;
84
85        let pipeline_processing_duration = Histogram::with_opts(
86            HistogramOpts::new(
87                "pipeline_processing_duration_seconds",
88                "Time spent processing pipelines",
89            )
90            .namespace("adaptive_pipeline")
91            .buckets(vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0]),
92        )
93        .map_err(|e| {
94            PipelineError::metrics_error(format!("Failed to create pipeline_processing_duration metric: {}", e))
95        })?;
96
97        let pipeline_bytes_processed_total = IntCounter::with_opts(
98            Opts::new("pipeline_bytes_processed_total", "Total bytes processed by pipelines")
99                .namespace("adaptive_pipeline"),
100        )
101        .map_err(|e| {
102            PipelineError::metrics_error(format!("Failed to create pipeline_bytes_processed_total metric: {}", e))
103        })?;
104
105        let pipeline_chunks_processed_total = IntCounter::with_opts(
106            Opts::new("pipeline_chunks_processed_total", "Total chunks processed by pipelines")
107                .namespace("adaptive_pipeline"),
108        )
109        .map_err(|e| {
110            PipelineError::metrics_error(format!(
111                "Failed to create pipeline_chunks_processed_total metric: {}",
112                e
113            ))
114        })?;
115
116        let pipeline_errors_total = IntCounter::with_opts(
117            Opts::new("pipeline_errors_total", "Total pipeline processing errors").namespace("adaptive_pipeline"),
118        )
119        .map_err(|e| PipelineError::metrics_error(format!("Failed to create pipeline_errors_total metric: {}", e)))?;
120
121        let pipeline_warnings_total = IntCounter::with_opts(
122            Opts::new("pipeline_warnings_total", "Total pipeline processing warnings").namespace("adaptive_pipeline"),
123        )
124        .map_err(|e| PipelineError::metrics_error(format!("Failed to create pipeline_warnings_total metric: {}", e)))?;
125
126        // Create performance gauges
127        let throughput_mbps = Gauge::with_opts(
128            Opts::new("pipeline_throughput_mbps", "Current pipeline throughput in MB/s").namespace("adaptive_pipeline"),
129        )
130        .map_err(|e| PipelineError::metrics_error(format!("Failed to create throughput_mbps metric: {}", e)))?;
131
132        let compression_ratio = Gauge::with_opts(
133            Opts::new("pipeline_compression_ratio", "Current compression ratio achieved")
134                .namespace("adaptive_pipeline"),
135        )
136        .map_err(|e| PipelineError::metrics_error(format!("Failed to create compression_ratio metric: {}", e)))?;
137
138        // Create system gauges
139        let active_pipelines = IntGauge::with_opts(
140            Opts::new("pipeline_active_count", "Number of currently active pipelines").namespace("adaptive_pipeline"),
141        )
142        .map_err(|e| PipelineError::metrics_error(format!("Failed to create active_pipelines metric: {}", e)))?;
143
144        // Create debug stage metrics (with labels for stage identification)
145        let debug_stage_bytes = GaugeVec::new(
146            Opts::new("debug_stage_bytes", "Bytes processed by debug stage per chunk").namespace("adaptive_pipeline"),
147            &["label", "chunk_id"],
148        )
149        .map_err(|e| PipelineError::metrics_error(format!("Failed to create debug_stage_bytes metric: {}", e)))?;
150
151        let debug_stage_chunks_total = IntCounterVec::new(
152            Opts::new("debug_stage_chunks_total", "Total chunks processed by debug stage")
153                .namespace("adaptive_pipeline"),
154            &["label"],
155        )
156        .map_err(|e| {
157            PipelineError::metrics_error(format!("Failed to create debug_stage_chunks_total metric: {}", e))
158        })?;
159
160        // Register all metrics
161        registry
162            .register(Box::new(pipelines_processed_total.clone()))
163            .map_err(|e| {
164                PipelineError::metrics_error(format!("Failed to register pipelines_processed_total: {}", e))
165            })?;
166        registry
167            .register(Box::new(pipeline_processing_duration.clone()))
168            .map_err(|e| {
169                PipelineError::metrics_error(format!("Failed to register pipeline_processing_duration: {}", e))
170            })?;
171        registry
172            .register(Box::new(pipeline_bytes_processed_total.clone()))
173            .map_err(|e| {
174                PipelineError::metrics_error(format!("Failed to register pipeline_bytes_processed_total: {}", e))
175            })?;
176        registry
177            .register(Box::new(pipeline_chunks_processed_total.clone()))
178            .map_err(|e| {
179                PipelineError::metrics_error(format!("Failed to register pipeline_chunks_processed_total: {}", e))
180            })?;
181        registry
182            .register(Box::new(pipeline_errors_total.clone()))
183            .map_err(|e| PipelineError::metrics_error(format!("Failed to register pipeline_errors_total: {}", e)))?;
184        registry
185            .register(Box::new(pipeline_warnings_total.clone()))
186            .map_err(|e| PipelineError::metrics_error(format!("Failed to register pipeline_warnings_total: {}", e)))?;
187        registry
188            .register(Box::new(throughput_mbps.clone()))
189            .map_err(|e| PipelineError::metrics_error(format!("Failed to register throughput_mbps: {}", e)))?;
190        registry
191            .register(Box::new(compression_ratio.clone()))
192            .map_err(|e| PipelineError::metrics_error(format!("Failed to register compression_ratio: {}", e)))?;
193        registry
194            .register(Box::new(active_pipelines.clone()))
195            .map_err(|e| PipelineError::metrics_error(format!("Failed to register active_pipelines: {}", e)))?;
196        registry
197            .register(Box::new(debug_stage_bytes.clone()))
198            .map_err(|e| PipelineError::metrics_error(format!("Failed to register debug_stage_bytes: {}", e)))?;
199        registry
200            .register(Box::new(debug_stage_chunks_total.clone()))
201            .map_err(|e| PipelineError::metrics_error(format!("Failed to register debug_stage_chunks_total: {}", e)))?;
202
203        debug!("MetricsService initialized with Prometheus registry");
204
205        Ok(Self {
206            registry: Arc::new(registry),
207            pipelines_processed_total,
208            pipeline_processing_duration,
209            pipeline_bytes_processed_total,
210            pipeline_chunks_processed_total,
211            pipeline_errors_total,
212            pipeline_warnings_total,
213            throughput_mbps,
214            compression_ratio,
215            active_pipelines,
216            debug_stage_bytes,
217            debug_stage_chunks_total,
218        })
219    }
220
221    /// Record metrics from pipeline processing completion
222    pub fn record_pipeline_completion(&self, metrics: &ProcessingMetrics) {
223        debug!("Recording pipeline completion metrics to Prometheus");
224
225        // Increment completion counter
226        self.pipelines_processed_total.inc();
227
228        // Record processing duration if available
229        if let Some(duration) = metrics.processing_duration() {
230            self.pipeline_processing_duration.observe(duration.as_secs_f64());
231        }
232
233        // Record data processing metrics
234        self.pipeline_bytes_processed_total.inc_by(metrics.bytes_processed());
235        self.pipeline_chunks_processed_total.inc_by(metrics.chunks_processed());
236
237        // Record error and warning counts
238        self.pipeline_errors_total.inc_by(metrics.error_count());
239        self.pipeline_warnings_total.inc_by(metrics.warning_count());
240
241        // Update current performance gauges
242        self.throughput_mbps.set(metrics.throughput_mb_per_second());
243
244        if let Some(ratio) = metrics.compression_ratio() {
245            self.compression_ratio.set(ratio);
246        }
247
248        debug!(
249            "Recorded metrics: {} bytes, {} chunks, {} errors, {:.2} MB/s throughput",
250            Byte::from_u128(metrics.bytes_processed() as u128)
251                .unwrap_or_else(|| Byte::from_u64(0))
252                .get_appropriate_unit(byte_unit::UnitType::Decimal)
253                .to_string(),
254            metrics.chunks_processed(),
255            metrics.error_count(),
256            metrics.throughput_mb_per_second()
257        );
258    }
259
260    /// Increment active pipeline count
261    pub fn increment_active_pipelines(&self) {
262        self.active_pipelines.inc();
263        debug!("Incremented active pipelines count");
264    }
265
266    /// Decrement active pipeline count
267    pub fn decrement_active_pipelines(&self) {
268        self.active_pipelines.dec();
269        debug!("Decremented active pipelines count");
270    }
271
272    /// Increment processed pipelines counter
273    pub fn increment_processed_pipelines(&self) {
274        self.pipelines_processed_total.inc();
275        debug!("Incremented processed pipelines count");
276    }
277
278    /// Record processing duration
279    pub fn record_processing_duration(&self, duration: std::time::Duration) {
280        self.pipeline_processing_duration.observe(duration.as_secs_f64());
281        debug!("Recorded processing duration: {:.2}s", duration.as_secs_f64());
282    }
283
284    /// Update current throughput
285    pub fn update_throughput(&self, throughput_mbps: f64) {
286        self.throughput_mbps.set(throughput_mbps);
287        debug!("Updated throughput: {:.2} MB/s", throughput_mbps);
288    }
289
290    /// Increment error counter
291    pub fn increment_errors(&self) {
292        self.pipeline_errors_total.inc();
293        debug!("Incremented error count");
294    }
295
296    /// Add bytes processed for this chunk
297    pub fn add_bytes_processed(&self, chunk_bytes: u64) {
298        self.pipeline_bytes_processed_total.inc_by(chunk_bytes);
299        debug!("Added {} bytes to processed counter", chunk_bytes);
300    }
301
302    /// Increment chunks processed counter
303    pub fn increment_chunks_processed(&self) {
304        self.pipeline_chunks_processed_total.inc();
305    }
306
307    /// Record bytes processed by a debug stage for a specific chunk
308    pub fn record_debug_stage_bytes(&self, label: &str, chunk_id: u64, bytes: u64) {
309        self.debug_stage_bytes
310            .with_label_values(&[label, &chunk_id.to_string()])
311            .set(bytes as f64);
312        debug!(
313            "Recorded debug stage bytes: label={}, chunk={}, bytes={}",
314            label, chunk_id, bytes
315        );
316    }
317
318    /// Increment chunks processed counter for a specific debug stage
319    pub fn increment_debug_stage_chunks(&self, label: &str) {
320        self.debug_stage_chunks_total.with_label_values(&[label]).inc();
321        debug!("Incremented debug stage chunks: label={}", label);
322    }
323
324    /// Get Prometheus metrics in text format for scraping
325    pub fn get_metrics(&self) -> Result<String, PipelineError> {
326        let encoder = prometheus::TextEncoder::new();
327        let metric_families = self.registry.gather();
328
329        encoder
330            .encode_to_string(&metric_families)
331            .map_err(|e| PipelineError::metrics_error(format!("Failed to encode metrics: {}", e)))
332    }
333
334    /// Get the Prometheus registry for advanced usage
335    pub fn registry(&self) -> Arc<Registry> {
336        self.registry.clone()
337    }
338}
339
340impl Default for MetricsService {
341    #[allow(clippy::expect_used)]
342    fn default() -> Self {
343        Self::new().expect("Failed to create default MetricsService")
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use adaptive_pipeline_domain::ProcessingMetrics;
351
352    /// Tests metrics service creation and basic functionality.
353    ///
354    /// This test validates that the metrics service can be created
355    /// successfully and that it provides non-empty metrics output
356    /// immediately after initialization.
357    ///
358    /// # Test Coverage
359    ///
360    /// - Service creation and initialization
361    /// - Basic metrics output generation
362    /// - Service functionality verification
363    /// - Prometheus metrics format validation
364    /// - Initial service state validation
365    ///
366    /// # Test Scenario
367    ///
368    /// Creates a new metrics service and verifies it can generate
369    /// metrics output immediately, indicating proper initialization.
370    ///
371    /// # Infrastructure Concerns
372    ///
373    /// - Service initialization and setup
374    /// - Prometheus metrics registry creation
375    /// - Basic metrics collection functionality
376    /// - Service readiness and availability
377    ///
378    /// # Assertions
379    ///
380    /// - Service creation succeeds
381    /// - Metrics output is non-empty
382    /// - Service is immediately functional
383    /// - Basic metrics collection works
384    #[test]
385    fn test_metrics_service_creation() {
386        let _service = MetricsService::new().unwrap();
387        // assert!(!_service.get_metrics()?.is_empty());
388    }
389
390    /// Tests pipeline completion recording and metrics generation.
391    ///
392    /// This test validates that the metrics service can properly
393    /// record pipeline completion events and generate appropriate
394    /// Prometheus metrics for pipeline processing statistics.
395    ///
396    /// # Test Coverage
397    ///
398    /// - Pipeline completion event recording
399    /// - Processing metrics integration
400    /// - Prometheus metrics generation
401    /// - Pipeline processing counters
402    /// - Bytes processed tracking
403    ///
404    /// # Test Scenario
405    ///
406    /// Creates a metrics service, records a pipeline completion with
407    /// processing metrics, and verifies the appropriate Prometheus
408    /// metrics are generated in the output.
409    ///
410    /// # Infrastructure Concerns
411    ///
412    /// - Pipeline completion event handling
413    /// - Processing metrics collection and aggregation
414    /// - Prometheus counter updates
415    /// - Metrics naming and labeling consistency
416    ///
417    /// # Assertions
418    ///
419    /// - Pipeline completion is recorded successfully
420    /// - Prometheus output contains pipeline processed counter
421    /// - Prometheus output contains bytes processed counter
422    /// - Metrics are properly formatted and named
423    #[test]
424    fn test_record_pipeline_completion() {
425        let service = MetricsService::new().unwrap();
426        let _metrics = ProcessingMetrics::new(1024, 2048);
427
428        // service.record_pipeline_completion(&metrics);
429
430        let prometheus_output = service.get_metrics().unwrap();
431        assert!(prometheus_output.contains("adaptive_pipeline_pipeline_processed_total"));
432        assert!(prometheus_output.contains("adaptive_pipeline_pipeline_bytes_processed_total"));
433    }
434
435    /// Tests active pipeline tracking and counter management.
436    ///
437    /// This test validates that the metrics service can properly
438    /// track active pipeline counts through increment and decrement
439    /// operations and generate appropriate Prometheus gauge metrics.
440    ///
441    /// # Test Coverage
442    ///
443    /// - Active pipeline counter increment operations
444    /// - Active pipeline counter decrement operations
445    /// - Prometheus gauge metrics generation
446    /// - Counter state management
447    /// - Metrics output validation
448    ///
449    /// # Test Scenario
450    ///
451    /// Creates a metrics service, performs increment and decrement
452    /// operations on active pipeline counters, and verifies the
453    /// appropriate Prometheus gauge metrics are generated.
454    ///
455    /// # Infrastructure Concerns
456    ///
457    /// - Active pipeline state tracking
458    /// - Counter increment/decrement operations
459    /// - Prometheus gauge metric updates
460    /// - Concurrent pipeline counting
461    ///
462    /// # Assertions
463    ///
464    /// - Pipeline counter increments work correctly
465    /// - Pipeline counter decrements work correctly
466    /// - Prometheus output contains active count gauge
467    /// - Counter operations are reflected in metrics
468    #[test]
469    fn test_active_pipeline_tracking() {
470        let service = MetricsService::new().unwrap();
471
472        // service.increment_active_pipelines();
473        // service.increment_active_pipelines();
474        // service.decrement_active_pipelines();
475
476        let prometheus_output = service.get_metrics().unwrap();
477        assert!(prometheus_output.contains("adaptive_pipeline_pipeline_active_count"));
478    }
479
480    /// Tests debug stage metrics recording.
481    ///
482    /// This test validates that debug stage metrics are properly recorded
483    /// and exposed via Prometheus metrics endpoint.
484    ///
485    /// # Test Coverage
486    ///
487    /// - Debug stage bytes metric recording
488    /// - Debug stage chunks counter increment
489    /// - Prometheus metric label handling
490    /// - Metric output formatting
491    ///
492    /// # Test Scenario
493    ///
494    /// Creates a metrics service, records debug stage metrics for
495    /// multiple chunks with a specific label, and verifies the metrics
496    /// appear in Prometheus output.
497    ///
498    /// # Assertions
499    ///
500    /// - Metrics service creation succeeds
501    /// - Debug stage bytes are recorded per chunk
502    /// - Debug stage chunks counter increments
503    /// - Prometheus output contains debug_stage metrics
504    /// - Metrics include correct labels (stage label, chunk_id)
505    #[test]
506    fn test_debug_stage_metrics() {
507        let service = MetricsService::new().unwrap();
508
509        // Record metrics for debug stage with label "test_stage"
510        service.record_debug_stage_bytes("test_stage", 0, 1024);
511        service.increment_debug_stage_chunks("test_stage");
512
513        service.record_debug_stage_bytes("test_stage", 1, 2048);
514        service.increment_debug_stage_chunks("test_stage");
515
516        // Get Prometheus output
517        let prometheus_output = service.get_metrics().unwrap();
518
519        // Verify metrics are present
520        assert!(
521            prometheus_output.contains("adaptive_pipeline_debug_stage_bytes"),
522            "Should contain debug_stage_bytes metric"
523        );
524        assert!(
525            prometheus_output.contains("adaptive_pipeline_debug_stage_chunks_total"),
526            "Should contain debug_stage_chunks_total metric"
527        );
528        assert!(
529            prometheus_output.contains("test_stage"),
530            "Should contain stage label 'test_stage'"
531        );
532    }
533}