adaptive_pipeline/infrastructure/metrics/
service.rs1use byte_unit::Byte;
17use prometheus::{Gauge, GaugeVec, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry};
18use std::sync::Arc;
19use tracing::debug;
20
21use adaptive_pipeline_domain::entities::processing_metrics::ProcessingMetrics;
22use adaptive_pipeline_domain::error::PipelineError;
23
24#[derive(Clone)]
49pub struct MetricsService {
50 registry: Arc<Registry>,
51
52 pipelines_processed_total: IntCounter,
54 pipeline_processing_duration: Histogram,
55 pipeline_bytes_processed_total: IntCounter,
56 pipeline_chunks_processed_total: IntCounter,
57 pipeline_errors_total: IntCounter,
58 pipeline_warnings_total: IntCounter,
59
60 throughput_mbps: Gauge,
62 compression_ratio: Gauge,
63
64 active_pipelines: IntGauge,
66
67 debug_stage_bytes: GaugeVec,
69 debug_stage_chunks_total: IntCounterVec,
70}
71
72impl MetricsService {
73 pub fn new() -> Result<Self, PipelineError> {
75 let registry = Registry::new();
76
77 let pipelines_processed_total = IntCounter::with_opts(
79 Opts::new("pipeline_processed_total", "Total number of pipelines processed").namespace("adaptive_pipeline"),
80 )
81 .map_err(|e| {
82 PipelineError::metrics_error(format!("Failed to create pipelines_processed_total metric: {}", e))
83 })?;
84
85 let pipeline_processing_duration = Histogram::with_opts(
86 HistogramOpts::new(
87 "pipeline_processing_duration_seconds",
88 "Time spent processing pipelines",
89 )
90 .namespace("adaptive_pipeline")
91 .buckets(vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0]),
92 )
93 .map_err(|e| {
94 PipelineError::metrics_error(format!("Failed to create pipeline_processing_duration metric: {}", e))
95 })?;
96
97 let pipeline_bytes_processed_total = IntCounter::with_opts(
98 Opts::new("pipeline_bytes_processed_total", "Total bytes processed by pipelines")
99 .namespace("adaptive_pipeline"),
100 )
101 .map_err(|e| {
102 PipelineError::metrics_error(format!("Failed to create pipeline_bytes_processed_total metric: {}", e))
103 })?;
104
105 let pipeline_chunks_processed_total = IntCounter::with_opts(
106 Opts::new("pipeline_chunks_processed_total", "Total chunks processed by pipelines")
107 .namespace("adaptive_pipeline"),
108 )
109 .map_err(|e| {
110 PipelineError::metrics_error(format!(
111 "Failed to create pipeline_chunks_processed_total metric: {}",
112 e
113 ))
114 })?;
115
116 let pipeline_errors_total = IntCounter::with_opts(
117 Opts::new("pipeline_errors_total", "Total pipeline processing errors").namespace("adaptive_pipeline"),
118 )
119 .map_err(|e| PipelineError::metrics_error(format!("Failed to create pipeline_errors_total metric: {}", e)))?;
120
121 let pipeline_warnings_total = IntCounter::with_opts(
122 Opts::new("pipeline_warnings_total", "Total pipeline processing warnings").namespace("adaptive_pipeline"),
123 )
124 .map_err(|e| PipelineError::metrics_error(format!("Failed to create pipeline_warnings_total metric: {}", e)))?;
125
126 let throughput_mbps = Gauge::with_opts(
128 Opts::new("pipeline_throughput_mbps", "Current pipeline throughput in MB/s").namespace("adaptive_pipeline"),
129 )
130 .map_err(|e| PipelineError::metrics_error(format!("Failed to create throughput_mbps metric: {}", e)))?;
131
132 let compression_ratio = Gauge::with_opts(
133 Opts::new("pipeline_compression_ratio", "Current compression ratio achieved")
134 .namespace("adaptive_pipeline"),
135 )
136 .map_err(|e| PipelineError::metrics_error(format!("Failed to create compression_ratio metric: {}", e)))?;
137
138 let active_pipelines = IntGauge::with_opts(
140 Opts::new("pipeline_active_count", "Number of currently active pipelines").namespace("adaptive_pipeline"),
141 )
142 .map_err(|e| PipelineError::metrics_error(format!("Failed to create active_pipelines metric: {}", e)))?;
143
144 let debug_stage_bytes = GaugeVec::new(
146 Opts::new("debug_stage_bytes", "Bytes processed by debug stage per chunk").namespace("adaptive_pipeline"),
147 &["label", "chunk_id"],
148 )
149 .map_err(|e| PipelineError::metrics_error(format!("Failed to create debug_stage_bytes metric: {}", e)))?;
150
151 let debug_stage_chunks_total = IntCounterVec::new(
152 Opts::new("debug_stage_chunks_total", "Total chunks processed by debug stage")
153 .namespace("adaptive_pipeline"),
154 &["label"],
155 )
156 .map_err(|e| {
157 PipelineError::metrics_error(format!("Failed to create debug_stage_chunks_total metric: {}", e))
158 })?;
159
160 registry
162 .register(Box::new(pipelines_processed_total.clone()))
163 .map_err(|e| {
164 PipelineError::metrics_error(format!("Failed to register pipelines_processed_total: {}", e))
165 })?;
166 registry
167 .register(Box::new(pipeline_processing_duration.clone()))
168 .map_err(|e| {
169 PipelineError::metrics_error(format!("Failed to register pipeline_processing_duration: {}", e))
170 })?;
171 registry
172 .register(Box::new(pipeline_bytes_processed_total.clone()))
173 .map_err(|e| {
174 PipelineError::metrics_error(format!("Failed to register pipeline_bytes_processed_total: {}", e))
175 })?;
176 registry
177 .register(Box::new(pipeline_chunks_processed_total.clone()))
178 .map_err(|e| {
179 PipelineError::metrics_error(format!("Failed to register pipeline_chunks_processed_total: {}", e))
180 })?;
181 registry
182 .register(Box::new(pipeline_errors_total.clone()))
183 .map_err(|e| PipelineError::metrics_error(format!("Failed to register pipeline_errors_total: {}", e)))?;
184 registry
185 .register(Box::new(pipeline_warnings_total.clone()))
186 .map_err(|e| PipelineError::metrics_error(format!("Failed to register pipeline_warnings_total: {}", e)))?;
187 registry
188 .register(Box::new(throughput_mbps.clone()))
189 .map_err(|e| PipelineError::metrics_error(format!("Failed to register throughput_mbps: {}", e)))?;
190 registry
191 .register(Box::new(compression_ratio.clone()))
192 .map_err(|e| PipelineError::metrics_error(format!("Failed to register compression_ratio: {}", e)))?;
193 registry
194 .register(Box::new(active_pipelines.clone()))
195 .map_err(|e| PipelineError::metrics_error(format!("Failed to register active_pipelines: {}", e)))?;
196 registry
197 .register(Box::new(debug_stage_bytes.clone()))
198 .map_err(|e| PipelineError::metrics_error(format!("Failed to register debug_stage_bytes: {}", e)))?;
199 registry
200 .register(Box::new(debug_stage_chunks_total.clone()))
201 .map_err(|e| PipelineError::metrics_error(format!("Failed to register debug_stage_chunks_total: {}", e)))?;
202
203 debug!("MetricsService initialized with Prometheus registry");
204
205 Ok(Self {
206 registry: Arc::new(registry),
207 pipelines_processed_total,
208 pipeline_processing_duration,
209 pipeline_bytes_processed_total,
210 pipeline_chunks_processed_total,
211 pipeline_errors_total,
212 pipeline_warnings_total,
213 throughput_mbps,
214 compression_ratio,
215 active_pipelines,
216 debug_stage_bytes,
217 debug_stage_chunks_total,
218 })
219 }
220
221 pub fn record_pipeline_completion(&self, metrics: &ProcessingMetrics) {
223 debug!("Recording pipeline completion metrics to Prometheus");
224
225 self.pipelines_processed_total.inc();
227
228 if let Some(duration) = metrics.processing_duration() {
230 self.pipeline_processing_duration.observe(duration.as_secs_f64());
231 }
232
233 self.pipeline_bytes_processed_total.inc_by(metrics.bytes_processed());
235 self.pipeline_chunks_processed_total.inc_by(metrics.chunks_processed());
236
237 self.pipeline_errors_total.inc_by(metrics.error_count());
239 self.pipeline_warnings_total.inc_by(metrics.warning_count());
240
241 self.throughput_mbps.set(metrics.throughput_mb_per_second());
243
244 if let Some(ratio) = metrics.compression_ratio() {
245 self.compression_ratio.set(ratio);
246 }
247
248 debug!(
249 "Recorded metrics: {} bytes, {} chunks, {} errors, {:.2} MB/s throughput",
250 Byte::from_u128(metrics.bytes_processed() as u128)
251 .unwrap_or_else(|| Byte::from_u64(0))
252 .get_appropriate_unit(byte_unit::UnitType::Decimal)
253 .to_string(),
254 metrics.chunks_processed(),
255 metrics.error_count(),
256 metrics.throughput_mb_per_second()
257 );
258 }
259
260 pub fn increment_active_pipelines(&self) {
262 self.active_pipelines.inc();
263 debug!("Incremented active pipelines count");
264 }
265
266 pub fn decrement_active_pipelines(&self) {
268 self.active_pipelines.dec();
269 debug!("Decremented active pipelines count");
270 }
271
272 pub fn increment_processed_pipelines(&self) {
274 self.pipelines_processed_total.inc();
275 debug!("Incremented processed pipelines count");
276 }
277
278 pub fn record_processing_duration(&self, duration: std::time::Duration) {
280 self.pipeline_processing_duration.observe(duration.as_secs_f64());
281 debug!("Recorded processing duration: {:.2}s", duration.as_secs_f64());
282 }
283
284 pub fn update_throughput(&self, throughput_mbps: f64) {
286 self.throughput_mbps.set(throughput_mbps);
287 debug!("Updated throughput: {:.2} MB/s", throughput_mbps);
288 }
289
290 pub fn increment_errors(&self) {
292 self.pipeline_errors_total.inc();
293 debug!("Incremented error count");
294 }
295
296 pub fn add_bytes_processed(&self, chunk_bytes: u64) {
298 self.pipeline_bytes_processed_total.inc_by(chunk_bytes);
299 debug!("Added {} bytes to processed counter", chunk_bytes);
300 }
301
302 pub fn increment_chunks_processed(&self) {
304 self.pipeline_chunks_processed_total.inc();
305 }
306
307 pub fn record_debug_stage_bytes(&self, label: &str, chunk_id: u64, bytes: u64) {
309 self.debug_stage_bytes
310 .with_label_values(&[label, &chunk_id.to_string()])
311 .set(bytes as f64);
312 debug!(
313 "Recorded debug stage bytes: label={}, chunk={}, bytes={}",
314 label, chunk_id, bytes
315 );
316 }
317
318 pub fn increment_debug_stage_chunks(&self, label: &str) {
320 self.debug_stage_chunks_total.with_label_values(&[label]).inc();
321 debug!("Incremented debug stage chunks: label={}", label);
322 }
323
324 pub fn get_metrics(&self) -> Result<String, PipelineError> {
326 let encoder = prometheus::TextEncoder::new();
327 let metric_families = self.registry.gather();
328
329 encoder
330 .encode_to_string(&metric_families)
331 .map_err(|e| PipelineError::metrics_error(format!("Failed to encode metrics: {}", e)))
332 }
333
334 pub fn registry(&self) -> Arc<Registry> {
336 self.registry.clone()
337 }
338}
339
340impl Default for MetricsService {
341 #[allow(clippy::expect_used)]
342 fn default() -> Self {
343 Self::new().expect("Failed to create default MetricsService")
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use adaptive_pipeline_domain::ProcessingMetrics;
351
352 #[test]
385 fn test_metrics_service_creation() {
386 let _service = MetricsService::new().unwrap();
387 }
389
390 #[test]
424 fn test_record_pipeline_completion() {
425 let service = MetricsService::new().unwrap();
426 let _metrics = ProcessingMetrics::new(1024, 2048);
427
428 let prometheus_output = service.get_metrics().unwrap();
431 assert!(prometheus_output.contains("adaptive_pipeline_pipeline_processed_total"));
432 assert!(prometheus_output.contains("adaptive_pipeline_pipeline_bytes_processed_total"));
433 }
434
435 #[test]
469 fn test_active_pipeline_tracking() {
470 let service = MetricsService::new().unwrap();
471
472 let prometheus_output = service.get_metrics().unwrap();
477 assert!(prometheus_output.contains("adaptive_pipeline_pipeline_active_count"));
478 }
479
480 #[test]
506 fn test_debug_stage_metrics() {
507 let service = MetricsService::new().unwrap();
508
509 service.record_debug_stage_bytes("test_stage", 0, 1024);
511 service.increment_debug_stage_chunks("test_stage");
512
513 service.record_debug_stage_bytes("test_stage", 1, 2048);
514 service.increment_debug_stage_chunks("test_stage");
515
516 let prometheus_output = service.get_metrics().unwrap();
518
519 assert!(
521 prometheus_output.contains("adaptive_pipeline_debug_stage_bytes"),
522 "Should contain debug_stage_bytes metric"
523 );
524 assert!(
525 prometheus_output.contains("adaptive_pipeline_debug_stage_chunks_total"),
526 "Should contain debug_stage_chunks_total metric"
527 );
528 assert!(
529 prometheus_output.contains("test_stage"),
530 "Should contain stage label 'test_stage'"
531 );
532 }
533}