adaptive_pipeline/infrastructure/metrics/
generic_collector.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Generic Metrics Collector
9//!
10//! Generic, reusable metrics collection system with type-safe metrics,
11//! automatic aggregation, and comprehensive performance tracking.
12//!
13//! Provides thread-safe collection, storage, and reporting of operation metrics
14//! for any type implementing `CollectibleMetrics`.
15
16use adaptive_pipeline_domain::error::PipelineError;
17use adaptive_pipeline_domain::services::datetime_serde;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::fmt::Debug;
22use std::sync::RwLock;
23use std::time::{Duration, Instant};
24
25/// Generic trait for metrics that can be collected and aggregated
26///
27/// This trait defines the interface for metrics that can be collected,
28/// aggregated, and reported by the generic metrics collector. It provides a
29/// type-safe way to define custom metrics with validation and summarization
30/// capabilities.
31///
32/// # Key Features
33///
34/// - **Reset Capability**: Reset metrics to initial state for new collection
35///   periods
36/// - **Merge Operations**: Combine metrics from different sources or time
37///   periods
38/// - **Summary Generation**: Generate human-readable summaries of metrics
39/// - **Type Identification**: Identify metric types for proper handling
40/// - **Validation**: Validate metric consistency and correctness
41///
42/// # Implementation Requirements
43///
44/// Implementing types must:
45/// - Be cloneable for metrics aggregation
46/// - Be debuggable for error reporting
47/// - Be thread-safe (`Send + Sync`)
48/// - Have a default constructor for initialization
49/// - Have a stable lifetime (`'static`)
50///
51/// # Examples
52pub trait CollectibleMetrics: Clone + Debug + Send + Sync + Default + 'static {
53    /// Resets all metrics to their initial state
54    fn reset(&mut self);
55
56    /// Merges metrics from another instance
57    fn merge(&mut self, other: &Self);
58
59    /// Returns a summary of key metrics as key-value pairs
60    fn summary(&self) -> HashMap<String, String>;
61
62    /// Returns the metric type identifier
63    fn metric_type(&self) -> String;
64
65    /// Validates that the metrics are in a consistent state
66    fn validate(&self) -> Result<(), PipelineError>;
67}
68
69/// Generic metric entry with timing and metadata
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MetricEntry<T>
72where
73    T: CollectibleMetrics,
74{
75    pub operation_id: String,
76    pub operation_type: String,
77    pub metrics: T,
78    #[serde(with = "datetime_serde")]
79    pub started_at: chrono::DateTime<chrono::Utc>,
80    #[serde(with = "datetime_serde")]
81    pub completed_at: chrono::DateTime<chrono::Utc>,
82    pub duration_ms: u64,
83    pub success: bool,
84    pub error_message: Option<String>,
85    pub metadata: HashMap<String, String>,
86    pub tags: Vec<String>,
87}
88
89impl<T> MetricEntry<T>
90where
91    T: CollectibleMetrics,
92{
93    pub fn new(operation_id: String, operation_type: String, metrics: T) -> Self {
94        let now = chrono::Utc::now();
95        Self {
96            operation_id,
97            operation_type,
98            metrics,
99            started_at: now,
100            completed_at: now,
101            duration_ms: 0,
102            success: true,
103            error_message: None,
104            metadata: HashMap::new(),
105            tags: Vec::new(),
106        }
107    }
108
109    pub fn with_duration(mut self, duration: Duration) -> Self {
110        self.duration_ms = duration.as_millis() as u64;
111        self.completed_at = self.started_at + chrono::Duration::milliseconds(self.duration_ms as i64);
112        self
113    }
114
115    pub fn with_error(mut self, error: String) -> Self {
116        self.error_message = Some(error);
117        self.success = false;
118        self
119    }
120
121    pub fn with_metadata(mut self, key: String, value: String) -> Self {
122        self.metadata.insert(key, value);
123        self
124    }
125
126    pub fn with_tags(mut self, tags: Vec<String>) -> Self {
127        self.tags = tags;
128        self
129    }
130}
131
132/// Generic metrics collector for any operation type
133pub struct GenericMetricsCollector<T>
134where
135    T: CollectibleMetrics,
136{
137    collector_name: String,
138    entries: RwLock<Vec<MetricEntry<T>>>,
139    aggregated_metrics: RwLock<T>,
140    active_operations: RwLock<HashMap<String, Instant>>,
141    max_entries: usize,
142    auto_aggregate: bool,
143}
144
145impl<T> GenericMetricsCollector<T>
146where
147    T: CollectibleMetrics,
148{
149    /// Creates a new metrics collector
150    pub fn new(collector_name: String) -> Self {
151        Self {
152            collector_name,
153            entries: RwLock::new(Vec::new()),
154            aggregated_metrics: RwLock::new(T::default()),
155            active_operations: RwLock::new(HashMap::new()),
156            max_entries: 1000, // Default max entries
157            auto_aggregate: true,
158        }
159    }
160
161    /// Creates a new metrics collector with custom configuration
162    pub fn with_config(collector_name: String, max_entries: usize, auto_aggregate: bool) -> Self {
163        Self {
164            collector_name,
165            entries: RwLock::new(Vec::new()),
166            aggregated_metrics: RwLock::new(T::default()),
167            active_operations: RwLock::new(HashMap::new()),
168            max_entries,
169            auto_aggregate,
170        }
171    }
172
173    /// Starts tracking an operation
174    pub fn start_operation(&self, operation_id: String) -> Result<(), PipelineError> {
175        let mut active_ops = self
176            .active_operations
177            .write()
178            .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
179
180        active_ops.insert(operation_id, Instant::now());
181        Ok(())
182    }
183
184    /// Completes an operation and records metrics
185    pub fn complete_operation(
186        &self,
187        operation_id: String,
188        operation_type: String,
189        metrics: T,
190    ) -> Result<(), PipelineError> {
191        let start_time = {
192            let mut active_ops = self
193                .active_operations
194                .write()
195                .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
196
197            active_ops.remove(&operation_id)
198        };
199
200        let duration = start_time
201            .map(|start| start.elapsed())
202            .unwrap_or_else(|| Duration::from_millis(0));
203
204        let entry = MetricEntry::new(operation_id, operation_type, metrics.clone()).with_duration(duration);
205
206        self.record_entry(entry)?;
207
208        if self.auto_aggregate {
209            self.aggregate_metrics(&metrics)?;
210        }
211
212        Ok(())
213    }
214
215    /// Records a metric entry directly
216    pub fn record_entry(&self, entry: MetricEntry<T>) -> Result<(), PipelineError> {
217        let mut entries = self
218            .entries
219            .write()
220            .map_err(|e| PipelineError::InternalError(format!("Failed to write entries: {}", e)))?;
221
222        entries.push(entry);
223
224        // Limit the number of entries to prevent memory issues
225        if entries.len() > self.max_entries {
226            entries.remove(0);
227        }
228
229        Ok(())
230    }
231
232    /// Records an operation failure
233    pub fn record_failure(
234        &self,
235        operation_id: String,
236        operation_type: String,
237        error: PipelineError,
238    ) -> Result<(), PipelineError> {
239        let start_time = {
240            let mut active_ops = self
241                .active_operations
242                .write()
243                .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
244
245            active_ops.remove(&operation_id)
246        };
247
248        let duration = start_time
249            .map(|start| start.elapsed())
250            .unwrap_or_else(|| Duration::from_millis(0));
251
252        let entry = MetricEntry::new(operation_id, operation_type, T::default())
253            .with_duration(duration)
254            .with_error(error.to_string());
255
256        self.record_entry(entry)
257    }
258
259    /// Aggregates metrics into the running total
260    fn aggregate_metrics(&self, metrics: &T) -> Result<(), PipelineError> {
261        let mut aggregated = self
262            .aggregated_metrics
263            .write()
264            .map_err(|e| PipelineError::InternalError(format!("Failed to write aggregated metrics: {}", e)))?;
265
266        aggregated.merge(metrics);
267        Ok(())
268    }
269
270    /// Gets the current aggregated metrics
271    pub fn get_aggregated_metrics(&self) -> Result<T, PipelineError> {
272        self.aggregated_metrics
273            .read()
274            .map_err(|e| PipelineError::InternalError(format!("Failed to read aggregated metrics: {}", e)))
275            .map(|metrics| metrics.clone())
276    }
277
278    /// Gets all recorded entries
279    pub fn get_entries(&self) -> Result<Vec<MetricEntry<T>>, PipelineError> {
280        self.entries
281            .read()
282            .map_err(|e| PipelineError::InternalError(format!("Failed to read entries: {}", e)))
283            .map(|entries| entries.clone())
284    }
285
286    /// Gets entries filtered by operation type
287    pub fn get_entries_by_type(&self, operation_type: &str) -> Result<Vec<MetricEntry<T>>, PipelineError> {
288        let entries = self.get_entries()?;
289        Ok(entries
290            .into_iter()
291            .filter(|entry| entry.operation_type == operation_type)
292            .collect())
293    }
294
295    /// Gets entries within a time range
296    pub fn get_entries_in_range(
297        &self,
298        start: chrono::DateTime<chrono::Utc>,
299        end: chrono::DateTime<chrono::Utc>,
300    ) -> Result<Vec<MetricEntry<T>>, PipelineError> {
301        let entries = self.get_entries()?;
302        Ok(entries
303            .into_iter()
304            .filter(|entry| entry.started_at >= start && entry.completed_at <= end)
305            .collect())
306    }
307
308    /// Resets all metrics and entries
309    pub fn reset(&self) -> Result<(), PipelineError> {
310        let mut entries = self
311            .entries
312            .write()
313            .map_err(|e| PipelineError::InternalError(format!("Failed to write entries: {}", e)))?;
314
315        let mut aggregated = self
316            .aggregated_metrics
317            .write()
318            .map_err(|e| PipelineError::InternalError(format!("Failed to write aggregated metrics: {}", e)))?;
319
320        let mut active_ops = self
321            .active_operations
322            .write()
323            .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
324
325        entries.clear();
326        aggregated.reset();
327        active_ops.clear();
328
329        Ok(())
330    }
331
332    /// Gets summary statistics
333    pub fn get_summary(&self) -> Result<HashMap<String, String>, PipelineError> {
334        let entries = self.get_entries()?;
335        let aggregated = self.get_aggregated_metrics()?;
336
337        let mut summary = HashMap::new();
338        summary.insert("collector_name".to_string(), self.collector_name.clone());
339        summary.insert("total_entries".to_string(), entries.len().to_string());
340        summary.insert(
341            "successful_operations".to_string(),
342            entries.iter().filter(|e| e.success).count().to_string(),
343        );
344        summary.insert(
345            "failed_operations".to_string(),
346            entries.iter().filter(|e| !e.success).count().to_string(),
347        );
348
349        if !entries.is_empty() {
350            let avg_duration = entries.iter().map(|e| e.duration_ms).sum::<u64>() / (entries.len() as u64);
351            summary.insert("average_duration_ms".to_string(), avg_duration.to_string());
352
353            let max_duration = entries.iter().map(|e| e.duration_ms).max().unwrap_or(0);
354            summary.insert("max_duration_ms".to_string(), max_duration.to_string());
355
356            let min_duration = entries.iter().map(|e| e.duration_ms).min().unwrap_or(0);
357            summary.insert("min_duration_ms".to_string(), min_duration.to_string());
358        }
359
360        // Add aggregated metrics summary
361        let aggregated_summary = aggregated.summary();
362        summary.extend(aggregated_summary);
363
364        Ok(summary)
365    }
366
367    /// Gets the collector name
368    pub fn name(&self) -> &str {
369        &self.collector_name
370    }
371
372    /// Gets the number of active operations
373    pub fn active_operations_count(&self) -> Result<usize, PipelineError> {
374        self.active_operations
375            .read()
376            .map_err(|e| PipelineError::InternalError(format!("Failed to read active operations: {}", e)))
377            .map(|ops| ops.len())
378    }
379}
380
381/// Trait for services that support metrics collection
382#[async_trait]
383pub trait MetricsEnabled<T>
384where
385    T: CollectibleMetrics,
386{
387    /// Gets the metrics collector for this service
388    fn metrics_collector(&self) -> &GenericMetricsCollector<T>;
389
390    /// Records a successful operation
391    async fn record_success(
392        &self,
393        operation_id: String,
394        operation_type: String,
395        metrics: T,
396    ) -> Result<(), PipelineError> {
397        self.metrics_collector()
398            .complete_operation(operation_id, operation_type, metrics)
399    }
400
401    /// Records a failed operation
402    async fn record_failure(
403        &self,
404        operation_id: String,
405        operation_type: String,
406        error: PipelineError,
407    ) -> Result<(), PipelineError> {
408        self.metrics_collector()
409            .record_failure(operation_id, operation_type, error)
410    }
411
412    /// Gets current metrics summary
413    async fn get_metrics_summary(&self) -> Result<HashMap<String, String>, PipelineError> {
414        self.metrics_collector().get_summary()
415    }
416}
417
418/// Convenience macro for creating metrics collectors
419#[macro_export]
420macro_rules! metrics_collector {
421    ($metrics_type:ty, $name:expr) => {
422        $crate::infrastructure::metrics::GenericMetricsCollector::<$metrics_type>::new($name.to_string())
423    };
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[derive(Clone, Debug, Default)]
431    struct TestMetrics {
432        bytes_processed: u64,
433        operations_count: u64,
434        errors_count: u64,
435    }
436
437    impl CollectibleMetrics for TestMetrics {
438        fn reset(&mut self) {
439            self.bytes_processed = 0;
440            self.operations_count = 0;
441            self.errors_count = 0;
442        }
443
444        fn merge(&mut self, other: &Self) {
445            self.bytes_processed += other.bytes_processed;
446            self.operations_count += other.operations_count;
447            self.errors_count += other.errors_count;
448        }
449
450        fn summary(&self) -> HashMap<String, String> {
451            let mut summary = HashMap::new();
452            summary.insert("bytes_processed".to_string(), self.bytes_processed.to_string());
453            summary.insert("operations_count".to_string(), self.operations_count.to_string());
454            summary.insert("errors_count".to_string(), self.errors_count.to_string());
455            summary
456        }
457
458        fn metric_type(&self) -> String {
459            "test_metrics".to_string()
460        }
461
462        fn validate(&self) -> Result<(), PipelineError> {
463            if self.operations_count < self.errors_count {
464                return Err(PipelineError::InternalError(
465                    "Error count cannot exceed operations count".to_string(),
466                ));
467            }
468            Ok(())
469        }
470    }
471
472    /// Tests metrics collector creation with proper initialization state.
473    #[test]
474    fn test_metrics_collector_creation() {
475        let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
476        assert_eq!(collector.name(), "test_collector");
477        assert_eq!(collector.active_operations_count().unwrap(), 0);
478    }
479
480    /// Tests operation lifecycle tracking from start through completion.
481    #[test]
482    fn test_operation_tracking() {
483        let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
484
485        // Start operation
486        collector.start_operation("op1".to_string()).unwrap();
487        assert_eq!(collector.active_operations_count().unwrap(), 1);
488
489        // Complete operation
490        let metrics = TestMetrics {
491            bytes_processed: 1024,
492            operations_count: 1,
493            errors_count: 0,
494        };
495
496        collector
497            .complete_operation("op1".to_string(), "test_operation".to_string(), metrics)
498            .unwrap();
499
500        assert_eq!(collector.active_operations_count().unwrap(), 0);
501
502        let entries = collector.get_entries().unwrap();
503        assert_eq!(entries.len(), 1);
504        assert_eq!(entries[0].operation_id, "op1");
505        assert_eq!(entries[0].metrics.bytes_processed, 1024);
506    }
507
508    /// Tests metrics aggregation across multiple completed operations.
509    #[test]
510    fn test_metrics_aggregation() {
511        let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
512
513        let metrics1 = TestMetrics {
514            bytes_processed: 1024,
515            operations_count: 1,
516            errors_count: 0,
517        };
518
519        let metrics2 = TestMetrics {
520            bytes_processed: 2048,
521            operations_count: 1,
522            errors_count: 1,
523        };
524
525        collector
526            .complete_operation("op1".to_string(), "test".to_string(), metrics1)
527            .unwrap();
528        collector
529            .complete_operation("op2".to_string(), "test".to_string(), metrics2)
530            .unwrap();
531
532        let aggregated = collector.get_aggregated_metrics().unwrap();
533        assert_eq!(aggregated.bytes_processed, 3072);
534        assert_eq!(aggregated.operations_count, 2);
535        assert_eq!(aggregated.errors_count, 1);
536    }
537
538    /// Tests summary generation with key-value structure for reporting.
539    #[test]
540    fn test_summary_generation() {
541        let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
542
543        let metrics = TestMetrics {
544            bytes_processed: 1024,
545            operations_count: 1,
546            errors_count: 0,
547        };
548
549        collector
550            .complete_operation("op1".to_string(), "test".to_string(), metrics)
551            .unwrap();
552
553        let summary = collector.get_summary().unwrap();
554        assert_eq!(summary.get("collector_name").unwrap(), "test_collector");
555        assert_eq!(summary.get("total_entries").unwrap(), "1");
556        assert_eq!(summary.get("successful_operations").unwrap(), "1");
557        assert_eq!(summary.get("failed_operations").unwrap(), "0");
558        assert!(summary.contains_key("bytes_processed"));
559    }
560
561    /// Tests failure recording with error message capture and storage.
562    #[test]
563    fn test_failure_recording() {
564        let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
565
566        collector.start_operation("op1".to_string()).unwrap();
567        collector
568            .record_failure(
569                "op1".to_string(),
570                "test_operation".to_string(),
571                PipelineError::InternalError("Test error".to_string()),
572            )
573            .unwrap();
574
575        let entries = collector.get_entries().unwrap();
576        assert_eq!(entries.len(), 1);
577        assert!(!entries[0].success);
578        assert!(entries[0].error_message.is_some());
579    }
580
581    /// Tests macro-based collector creation for simplified usage.
582    #[test]
583    fn test_macro_usage() {
584        let collector = metrics_collector!(TestMetrics, "test");
585        assert_eq!(collector.name(), "test");
586    }
587}