adaptive_pipeline_domain/entities/
processing_metrics.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Processing Metrics Entities
9//!
10//! This module contains entities for collecting, tracking, and analyzing
11//! performance metrics during pipeline processing operations. The metrics
12//! system provides detailed insights into processing performance, resource
13//! utilization, and operational health.
14//!
15//! ## Overview
16//!
17//! The metrics system captures:
18//!
19//! - **Performance Data**: Throughput, processing times, and completion rates
20//! - **Resource Usage**: Memory consumption, CPU utilization, and I/O
21//!   statistics
22//! - **Operational Health**: Error rates, success rates, and warning counts
23//! - **Stage-Specific Metrics**: Individual performance data for each pipeline
24//!   stage
25//! - **File Processing Stats**: Input/output file sizes, checksums, and
26//!   compression ratios
27//!
28//! ## Metrics Architecture
29//!
30//! ### High-Resolution Timing
31//! Uses `std::time::Instant` for precise internal timing measurements while
32//! providing RFC3339 timestamps for serialization and external reporting.
33//!
34//! ### Hierarchical Structure
35//! - **ProcessingMetrics**: Overall pipeline processing metrics
36//! - **StageMetrics**: Individual stage performance metrics
37//!
38//! ### Real-Time Calculation
39//! Metrics are calculated and updated in real-time as processing progresses,
40//! providing immediate feedback on performance characteristics.
41
42use chrono::{DateTime, Utc};
43use serde::{Deserialize, Serialize};
44use std::time::{Duration, Instant};
45
46/// Processing metrics entity for comprehensive performance tracking and
47/// analysis.
48///
49/// `ProcessingMetrics` collects and maintains detailed performance data
50/// throughout pipeline execution. It provides real-time insights into
51/// processing speed, resource utilization, error rates, and overall operational
52/// health.
53///
54/// ## Metrics Categories
55///
56/// ### Processing Progress
57/// - **Bytes Processed**: Total data processed and remaining
58/// - **Chunks Processed**: Number of data chunks completed
59/// - **Completion Status**: Progress percentage and estimated time remaining
60///
61/// ### Performance Metrics
62/// - **Throughput**: Bytes per second and MB/s processing rates
63/// - **Duration**: Total processing time and stage-specific timings
64/// - **Efficiency**: Success rates and error statistics
65///
66/// ### File Information
67/// - **Input/Output Sizes**: File sizes before and after processing
68/// - **Checksums**: Integrity verification data
69/// - **Compression Ratios**: Data reduction achieved
70///
71/// ### Stage Analytics
72/// - **Individual Stage Performance**: Per-stage timing and throughput
73/// - **Resource Usage**: Memory and CPU consumption by stage
74/// - **Error Tracking**: Stage-specific error and warning counts
75///
76/// ## Usage Examples
77///
78/// ### Basic Metrics Tracking
79///
80///
81/// ### Complete Processing Workflow
82///
83///
84/// ### Error and Warning Tracking
85///
86///
87/// ### Time Estimation
88///
89///
90/// ### Merging Metrics from Multiple Sources
91///
92///
93/// ## Serialization and Persistence
94///
95/// Metrics support serialization for logging and analysis:
96///
97///
98/// ## Performance Considerations
99///
100/// - High-resolution timing uses `Instant` for accuracy
101/// - Throughput calculations are performed on-demand
102/// - Memory usage scales with the number of tracked stages
103/// - Serialization excludes `Instant` fields for compatibility
104/// - Real-time updates have minimal performance overhead
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ProcessingMetrics {
107    bytes_processed: u64,
108    bytes_total: u64,
109    chunks_processed: u64,
110    chunks_total: u64,
111    // High-resolution timing for internal use
112    #[serde(skip)]
113    start_time: Option<Instant>,
114    #[serde(skip)]
115    end_time: Option<Instant>,
116    // RFC3339 timestamps for serialization
117    start_time_rfc3339: Option<String>,
118    end_time_rfc3339: Option<String>,
119    processing_duration: Option<Duration>,
120    throughput_bytes_per_second: f64,
121    compression_ratio: Option<f64>,
122    error_count: u64,
123    warning_count: u64,
124    // File information
125    input_file_size_bytes: u64,
126    output_file_size_bytes: u64,
127    input_file_checksum: Option<String>,
128    output_file_checksum: Option<String>,
129    stage_metrics: std::collections::HashMap<String, StageMetrics>,
130}
131
132/// Stage-specific metrics entity for detailed performance analysis.
133///
134/// `StageMetrics` tracks performance data for individual pipeline stages,
135/// providing granular insights into the efficiency and resource usage of
136/// each processing step within the pipeline.
137///
138/// ## Stage Performance Data
139///
140/// ### Processing Metrics
141/// - **Bytes Processed**: Amount of data processed by the stage
142/// - **Processing Time**: Total time spent in the stage
143/// - **Throughput**: Data processing rate (bytes per second)
144///
145/// ### Quality Metrics
146/// - **Error Count**: Number of errors encountered
147/// - **Success Rate**: Percentage of successful operations
148///
149/// ### Resource Metrics
150/// - **Memory Usage**: Peak memory consumption (optional)
151/// - **CPU Usage**: CPU utilization percentage (optional)
152///
153/// ## Usage Examples
154///
155/// ### Creating and Updating Stage Metrics
156///
157///
158/// ### Comparing Stage Performance
159///
160///
161/// ### Resource Monitoring
162///
163///
164/// ### Error Rate Analysis
165///
166///
167/// ## Integration with ProcessingMetrics
168///
169/// Stage metrics are typically collected and aggregated by `ProcessingMetrics`:
170///
171///
172/// ## Performance Characteristics
173///
174/// - Lightweight structure with minimal memory overhead
175/// - Real-time throughput calculation based on processed data and time
176/// - Optional resource metrics to avoid unnecessary overhead
177/// - Thread-safe when used with appropriate synchronization
178/// - Efficient serialization for logging and analysis
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct StageMetrics {
181    pub stage_name: String,
182    pub bytes_processed: u64,
183    pub processing_time: Duration,
184    pub throughput: f64,
185    pub error_count: u64,
186    pub success_rate: f64,
187    pub memory_usage: Option<u64>,
188    pub cpu_usage: Option<f64>,
189}
190
191impl Default for ProcessingMetrics {
192    fn default() -> Self {
193        Self {
194            bytes_processed: 0,
195            bytes_total: 0,
196            chunks_processed: 0,
197            chunks_total: 0,
198            start_time: None,
199            end_time: None,
200            start_time_rfc3339: None,
201            end_time_rfc3339: None,
202            processing_duration: None,
203            throughput_bytes_per_second: 0.0,
204            compression_ratio: None,
205            error_count: 0,
206            warning_count: 0,
207            input_file_size_bytes: 0,
208            output_file_size_bytes: 0,
209            input_file_checksum: None,
210            output_file_checksum: None,
211            stage_metrics: std::collections::HashMap::new(),
212        }
213    }
214}
215
216impl ProcessingMetrics {
217    /// Creates new processing metrics
218    pub fn new(bytes_total: u64, chunks_total: u64) -> Self {
219        Self {
220            bytes_total,
221            chunks_total,
222            ..Default::default()
223        }
224    }
225
226    /// Starts the processing timer
227    pub fn start(&mut self) {
228        self.start_time = Some(Instant::now());
229        self.start_time_rfc3339 = Some(Utc::now().to_rfc3339());
230    }
231
232    /// Ends the processing timer
233    pub fn end(&mut self) {
234        self.end_time = Some(Instant::now());
235        self.end_time_rfc3339 = Some(Utc::now().to_rfc3339());
236        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
237            self.processing_duration = Some(end.duration_since(start));
238            self.calculate_throughput();
239        }
240    }
241
242    /// Updates bytes processed
243    pub fn update_bytes_processed(&mut self, bytes: u64) {
244        self.bytes_processed = bytes;
245        self.calculate_throughput();
246    }
247
248    /// Adds bytes processed
249    pub fn add_bytes_processed(&mut self, bytes: u64) {
250        self.bytes_processed += bytes;
251        self.calculate_throughput();
252    }
253
254    /// Updates chunks processed
255    pub fn update_chunks_processed(&mut self, chunks: u64) {
256        self.chunks_processed = chunks;
257    }
258
259    /// Adds chunks processed
260    pub fn add_chunks_processed(&mut self, chunks: u64) {
261        self.chunks_processed += chunks;
262    }
263
264    /// Sets the compression ratio
265    pub fn set_compression_ratio(&mut self, ratio: f64) {
266        self.compression_ratio = Some(ratio);
267    }
268
269    /// Increments error count
270    pub fn increment_errors(&mut self) {
271        self.error_count += 1;
272    }
273
274    /// Increments warning count
275    pub fn increment_warnings(&mut self) {
276        self.warning_count += 1;
277    }
278
279    /// Adds stage metrics
280    pub fn add_stage_metrics(&mut self, metrics: StageMetrics) {
281        self.stage_metrics.insert(metrics.stage_name.clone(), metrics);
282    }
283
284    /// Gets bytes processed
285    pub fn bytes_processed(&self) -> u64 {
286        self.bytes_processed
287    }
288
289    /// Gets total bytes
290    pub fn bytes_total(&self) -> u64 {
291        self.bytes_total
292    }
293
294    /// Gets chunks processed
295    pub fn chunks_processed(&self) -> u64 {
296        self.chunks_processed
297    }
298
299    /// Gets total chunks
300    pub fn chunks_total(&self) -> u64 {
301        self.chunks_total
302    }
303
304    /// Gets processing duration
305    pub fn processing_duration(&self) -> Option<Duration> {
306        self.processing_duration
307    }
308
309    /// Gets start time as `DateTime<Utc>`
310    pub fn start_time(&self) -> Option<DateTime<Utc>> {
311        self.start_time_rfc3339
312            .as_ref()
313            .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
314    }
315
316    /// Gets end time as `DateTime<Utc>`
317    pub fn end_time(&self) -> Option<DateTime<Utc>> {
318        self.end_time_rfc3339
319            .as_ref()
320            .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
321    }
322
323    /// Gets throughput in bytes per second
324    pub fn throughput_bytes_per_second(&self) -> f64 {
325        self.throughput_bytes_per_second
326    }
327
328    /// Gets throughput in MB/s
329    pub fn throughput_mb_per_second(&self) -> f64 {
330        self.throughput_bytes_per_second / (1024.0 * 1024.0)
331    }
332
333    /// Gets compression ratio
334    pub fn compression_ratio(&self) -> Option<f64> {
335        self.compression_ratio
336    }
337
338    /// Gets error count
339    pub fn error_count(&self) -> u64 {
340        self.error_count
341    }
342
343    /// Gets warning count
344    pub fn warning_count(&self) -> u64 {
345        self.warning_count
346    }
347
348    /// Gets stage metrics
349    pub fn stage_metrics(&self) -> &std::collections::HashMap<String, StageMetrics> {
350        &self.stage_metrics
351    }
352
353    /// Gets input file size in bytes
354    pub fn input_file_size_bytes(&self) -> u64 {
355        self.input_file_size_bytes
356    }
357
358    /// Gets output file size in bytes
359    pub fn output_file_size_bytes(&self) -> u64 {
360        self.output_file_size_bytes
361    }
362
363    /// Gets input file size in MiB
364    pub fn input_file_size_mib(&self) -> f64 {
365        (self.input_file_size_bytes as f64) / (1024.0 * 1024.0)
366    }
367
368    /// Gets output file size in MiB
369    pub fn output_file_size_mib(&self) -> f64 {
370        (self.output_file_size_bytes as f64) / (1024.0 * 1024.0)
371    }
372
373    /// Gets input file checksum
374    pub fn input_file_checksum(&self) -> &Option<String> {
375        &self.input_file_checksum
376    }
377
378    /// Gets output file checksum
379    pub fn output_file_checksum(&self) -> &Option<String> {
380        &self.output_file_checksum
381    }
382
383    /// Calculates processing progress as percentage
384    pub fn progress_percentage(&self) -> f64 {
385        if self.bytes_total == 0 {
386            return 0.0;
387        }
388        ((self.bytes_processed as f64) / (self.bytes_total as f64)) * 100.0
389    }
390
391    /// Calculates chunk progress as percentage
392    pub fn chunk_progress_percentage(&self) -> f64 {
393        if self.chunks_total == 0 {
394            return 0.0;
395        }
396        ((self.chunks_processed as f64) / (self.chunks_total as f64)) * 100.0
397    }
398
399    /// Estimates remaining time
400    pub fn estimated_remaining_time(&self) -> Option<Duration> {
401        if self.throughput_bytes_per_second <= 0.0 || self.bytes_processed == 0 {
402            return None;
403        }
404
405        let remaining_bytes = self.bytes_total.saturating_sub(self.bytes_processed);
406        let remaining_seconds = (remaining_bytes as f64) / self.throughput_bytes_per_second;
407        Some(Duration::from_secs_f64(remaining_seconds))
408    }
409
410    /// Checks if processing is complete
411    pub fn is_complete(&self) -> bool {
412        self.bytes_processed >= self.bytes_total && self.chunks_processed >= self.chunks_total
413    }
414
415    /// Calculates overall success rate
416    pub fn success_rate(&self) -> f64 {
417        if self.chunks_processed == 0 {
418            return 0.0;
419        }
420        let successful_chunks = self.chunks_processed.saturating_sub(self.error_count);
421        (successful_chunks as f64) / (self.chunks_processed as f64)
422    }
423
424    /// Sets input file size and checksum
425    pub fn set_input_file_info(&mut self, size_bytes: u64, checksum: Option<String>) {
426        self.input_file_size_bytes = size_bytes;
427        self.input_file_checksum = checksum;
428    }
429
430    /// Sets output file size and checksum
431    pub fn set_output_file_info(&mut self, size_bytes: u64, checksum: Option<String>) {
432        self.output_file_size_bytes = size_bytes;
433        self.output_file_checksum = checksum;
434    }
435
436    /// Calculates throughput based on current metrics
437    fn calculate_throughput(&mut self) {
438        if let Some(duration) = self.processing_duration {
439            let seconds = duration.as_secs_f64();
440            if seconds > 0.0 {
441                self.throughput_bytes_per_second = (self.bytes_processed as f64) / seconds;
442            }
443        } else if let Some(start) = self.start_time {
444            let elapsed = start.elapsed();
445            let seconds = elapsed.as_secs_f64();
446            if seconds > 0.0 {
447                self.throughput_bytes_per_second = (self.bytes_processed as f64) / seconds;
448            }
449        }
450    }
451
452    /// Merges metrics from another instance
453    pub fn merge(&mut self, other: &ProcessingMetrics) {
454        self.bytes_processed += other.bytes_processed;
455        self.chunks_processed += other.chunks_processed;
456        self.error_count += other.error_count;
457        self.warning_count += other.warning_count;
458
459        // Merge stage metrics
460        for (stage_name, stage_metrics) in &other.stage_metrics {
461            self.stage_metrics.insert(stage_name.clone(), stage_metrics.clone());
462        }
463
464        // Recalculate throughput
465        self.calculate_throughput();
466    }
467}
468
469impl StageMetrics {
470    /// Creates new stage metrics
471    pub fn new(stage_name: String) -> Self {
472        Self {
473            stage_name,
474            bytes_processed: 0,
475            processing_time: Duration::ZERO,
476            throughput: 0.0,
477            error_count: 0,
478            success_rate: 0.0,
479            memory_usage: None,
480            cpu_usage: None,
481        }
482    }
483
484    /// Updates the stage metrics
485    pub fn update(&mut self, bytes_processed: u64, processing_time: Duration) {
486        self.bytes_processed = bytes_processed;
487        self.processing_time = processing_time;
488
489        let seconds = processing_time.as_secs_f64();
490        if seconds > 0.0 {
491            self.throughput = (bytes_processed as f64) / seconds;
492        }
493    }
494
495    /// Sets memory usage
496    pub fn set_memory_usage(&mut self, memory_usage: u64) {
497        self.memory_usage = Some(memory_usage);
498    }
499
500    /// Sets CPU usage
501    pub fn set_cpu_usage(&mut self, cpu_usage: f64) {
502        self.cpu_usage = Some(cpu_usage);
503    }
504
505    /// Increments error count
506    pub fn increment_errors(&mut self) {
507        self.error_count += 1;
508    }
509
510    /// Calculates success rate
511    pub fn calculate_success_rate(&mut self, total_operations: u64) {
512        if total_operations > 0 {
513            let successful_operations = total_operations.saturating_sub(self.error_count);
514            self.success_rate = (successful_operations as f64) / (total_operations as f64);
515        }
516    }
517}