1use adaptive_pipeline_domain::error::PipelineError;
17use adaptive_pipeline_domain::services::datetime_serde;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::fmt::Debug;
22use std::sync::RwLock;
23use std::time::{Duration, Instant};
24
25pub trait CollectibleMetrics: Clone + Debug + Send + Sync + Default + 'static {
53 fn reset(&mut self);
55
56 fn merge(&mut self, other: &Self);
58
59 fn summary(&self) -> HashMap<String, String>;
61
62 fn metric_type(&self) -> String;
64
65 fn validate(&self) -> Result<(), PipelineError>;
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct MetricEntry<T>
72where
73 T: CollectibleMetrics,
74{
75 pub operation_id: String,
76 pub operation_type: String,
77 pub metrics: T,
78 #[serde(with = "datetime_serde")]
79 pub started_at: chrono::DateTime<chrono::Utc>,
80 #[serde(with = "datetime_serde")]
81 pub completed_at: chrono::DateTime<chrono::Utc>,
82 pub duration_ms: u64,
83 pub success: bool,
84 pub error_message: Option<String>,
85 pub metadata: HashMap<String, String>,
86 pub tags: Vec<String>,
87}
88
89impl<T> MetricEntry<T>
90where
91 T: CollectibleMetrics,
92{
93 pub fn new(operation_id: String, operation_type: String, metrics: T) -> Self {
94 let now = chrono::Utc::now();
95 Self {
96 operation_id,
97 operation_type,
98 metrics,
99 started_at: now,
100 completed_at: now,
101 duration_ms: 0,
102 success: true,
103 error_message: None,
104 metadata: HashMap::new(),
105 tags: Vec::new(),
106 }
107 }
108
109 pub fn with_duration(mut self, duration: Duration) -> Self {
110 self.duration_ms = duration.as_millis() as u64;
111 self.completed_at = self.started_at + chrono::Duration::milliseconds(self.duration_ms as i64);
112 self
113 }
114
115 pub fn with_error(mut self, error: String) -> Self {
116 self.error_message = Some(error);
117 self.success = false;
118 self
119 }
120
121 pub fn with_metadata(mut self, key: String, value: String) -> Self {
122 self.metadata.insert(key, value);
123 self
124 }
125
126 pub fn with_tags(mut self, tags: Vec<String>) -> Self {
127 self.tags = tags;
128 self
129 }
130}
131
132pub struct GenericMetricsCollector<T>
134where
135 T: CollectibleMetrics,
136{
137 collector_name: String,
138 entries: RwLock<Vec<MetricEntry<T>>>,
139 aggregated_metrics: RwLock<T>,
140 active_operations: RwLock<HashMap<String, Instant>>,
141 max_entries: usize,
142 auto_aggregate: bool,
143}
144
145impl<T> GenericMetricsCollector<T>
146where
147 T: CollectibleMetrics,
148{
149 pub fn new(collector_name: String) -> Self {
151 Self {
152 collector_name,
153 entries: RwLock::new(Vec::new()),
154 aggregated_metrics: RwLock::new(T::default()),
155 active_operations: RwLock::new(HashMap::new()),
156 max_entries: 1000, auto_aggregate: true,
158 }
159 }
160
161 pub fn with_config(collector_name: String, max_entries: usize, auto_aggregate: bool) -> Self {
163 Self {
164 collector_name,
165 entries: RwLock::new(Vec::new()),
166 aggregated_metrics: RwLock::new(T::default()),
167 active_operations: RwLock::new(HashMap::new()),
168 max_entries,
169 auto_aggregate,
170 }
171 }
172
173 pub fn start_operation(&self, operation_id: String) -> Result<(), PipelineError> {
175 let mut active_ops = self
176 .active_operations
177 .write()
178 .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
179
180 active_ops.insert(operation_id, Instant::now());
181 Ok(())
182 }
183
184 pub fn complete_operation(
186 &self,
187 operation_id: String,
188 operation_type: String,
189 metrics: T,
190 ) -> Result<(), PipelineError> {
191 let start_time = {
192 let mut active_ops = self
193 .active_operations
194 .write()
195 .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
196
197 active_ops.remove(&operation_id)
198 };
199
200 let duration = start_time
201 .map(|start| start.elapsed())
202 .unwrap_or_else(|| Duration::from_millis(0));
203
204 let entry = MetricEntry::new(operation_id, operation_type, metrics.clone()).with_duration(duration);
205
206 self.record_entry(entry)?;
207
208 if self.auto_aggregate {
209 self.aggregate_metrics(&metrics)?;
210 }
211
212 Ok(())
213 }
214
215 pub fn record_entry(&self, entry: MetricEntry<T>) -> Result<(), PipelineError> {
217 let mut entries = self
218 .entries
219 .write()
220 .map_err(|e| PipelineError::InternalError(format!("Failed to write entries: {}", e)))?;
221
222 entries.push(entry);
223
224 if entries.len() > self.max_entries {
226 entries.remove(0);
227 }
228
229 Ok(())
230 }
231
232 pub fn record_failure(
234 &self,
235 operation_id: String,
236 operation_type: String,
237 error: PipelineError,
238 ) -> Result<(), PipelineError> {
239 let start_time = {
240 let mut active_ops = self
241 .active_operations
242 .write()
243 .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
244
245 active_ops.remove(&operation_id)
246 };
247
248 let duration = start_time
249 .map(|start| start.elapsed())
250 .unwrap_or_else(|| Duration::from_millis(0));
251
252 let entry = MetricEntry::new(operation_id, operation_type, T::default())
253 .with_duration(duration)
254 .with_error(error.to_string());
255
256 self.record_entry(entry)
257 }
258
259 fn aggregate_metrics(&self, metrics: &T) -> Result<(), PipelineError> {
261 let mut aggregated = self
262 .aggregated_metrics
263 .write()
264 .map_err(|e| PipelineError::InternalError(format!("Failed to write aggregated metrics: {}", e)))?;
265
266 aggregated.merge(metrics);
267 Ok(())
268 }
269
270 pub fn get_aggregated_metrics(&self) -> Result<T, PipelineError> {
272 self.aggregated_metrics
273 .read()
274 .map_err(|e| PipelineError::InternalError(format!("Failed to read aggregated metrics: {}", e)))
275 .map(|metrics| metrics.clone())
276 }
277
278 pub fn get_entries(&self) -> Result<Vec<MetricEntry<T>>, PipelineError> {
280 self.entries
281 .read()
282 .map_err(|e| PipelineError::InternalError(format!("Failed to read entries: {}", e)))
283 .map(|entries| entries.clone())
284 }
285
286 pub fn get_entries_by_type(&self, operation_type: &str) -> Result<Vec<MetricEntry<T>>, PipelineError> {
288 let entries = self.get_entries()?;
289 Ok(entries
290 .into_iter()
291 .filter(|entry| entry.operation_type == operation_type)
292 .collect())
293 }
294
295 pub fn get_entries_in_range(
297 &self,
298 start: chrono::DateTime<chrono::Utc>,
299 end: chrono::DateTime<chrono::Utc>,
300 ) -> Result<Vec<MetricEntry<T>>, PipelineError> {
301 let entries = self.get_entries()?;
302 Ok(entries
303 .into_iter()
304 .filter(|entry| entry.started_at >= start && entry.completed_at <= end)
305 .collect())
306 }
307
308 pub fn reset(&self) -> Result<(), PipelineError> {
310 let mut entries = self
311 .entries
312 .write()
313 .map_err(|e| PipelineError::InternalError(format!("Failed to write entries: {}", e)))?;
314
315 let mut aggregated = self
316 .aggregated_metrics
317 .write()
318 .map_err(|e| PipelineError::InternalError(format!("Failed to write aggregated metrics: {}", e)))?;
319
320 let mut active_ops = self
321 .active_operations
322 .write()
323 .map_err(|e| PipelineError::InternalError(format!("Failed to write active operations: {}", e)))?;
324
325 entries.clear();
326 aggregated.reset();
327 active_ops.clear();
328
329 Ok(())
330 }
331
332 pub fn get_summary(&self) -> Result<HashMap<String, String>, PipelineError> {
334 let entries = self.get_entries()?;
335 let aggregated = self.get_aggregated_metrics()?;
336
337 let mut summary = HashMap::new();
338 summary.insert("collector_name".to_string(), self.collector_name.clone());
339 summary.insert("total_entries".to_string(), entries.len().to_string());
340 summary.insert(
341 "successful_operations".to_string(),
342 entries.iter().filter(|e| e.success).count().to_string(),
343 );
344 summary.insert(
345 "failed_operations".to_string(),
346 entries.iter().filter(|e| !e.success).count().to_string(),
347 );
348
349 if !entries.is_empty() {
350 let avg_duration = entries.iter().map(|e| e.duration_ms).sum::<u64>() / (entries.len() as u64);
351 summary.insert("average_duration_ms".to_string(), avg_duration.to_string());
352
353 let max_duration = entries.iter().map(|e| e.duration_ms).max().unwrap_or(0);
354 summary.insert("max_duration_ms".to_string(), max_duration.to_string());
355
356 let min_duration = entries.iter().map(|e| e.duration_ms).min().unwrap_or(0);
357 summary.insert("min_duration_ms".to_string(), min_duration.to_string());
358 }
359
360 let aggregated_summary = aggregated.summary();
362 summary.extend(aggregated_summary);
363
364 Ok(summary)
365 }
366
367 pub fn name(&self) -> &str {
369 &self.collector_name
370 }
371
372 pub fn active_operations_count(&self) -> Result<usize, PipelineError> {
374 self.active_operations
375 .read()
376 .map_err(|e| PipelineError::InternalError(format!("Failed to read active operations: {}", e)))
377 .map(|ops| ops.len())
378 }
379}
380
381#[async_trait]
383pub trait MetricsEnabled<T>
384where
385 T: CollectibleMetrics,
386{
387 fn metrics_collector(&self) -> &GenericMetricsCollector<T>;
389
390 async fn record_success(
392 &self,
393 operation_id: String,
394 operation_type: String,
395 metrics: T,
396 ) -> Result<(), PipelineError> {
397 self.metrics_collector()
398 .complete_operation(operation_id, operation_type, metrics)
399 }
400
401 async fn record_failure(
403 &self,
404 operation_id: String,
405 operation_type: String,
406 error: PipelineError,
407 ) -> Result<(), PipelineError> {
408 self.metrics_collector()
409 .record_failure(operation_id, operation_type, error)
410 }
411
412 async fn get_metrics_summary(&self) -> Result<HashMap<String, String>, PipelineError> {
414 self.metrics_collector().get_summary()
415 }
416}
417
418#[macro_export]
420macro_rules! metrics_collector {
421 ($metrics_type:ty, $name:expr) => {
422 $crate::infrastructure::metrics::GenericMetricsCollector::<$metrics_type>::new($name.to_string())
423 };
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[derive(Clone, Debug, Default)]
431 struct TestMetrics {
432 bytes_processed: u64,
433 operations_count: u64,
434 errors_count: u64,
435 }
436
437 impl CollectibleMetrics for TestMetrics {
438 fn reset(&mut self) {
439 self.bytes_processed = 0;
440 self.operations_count = 0;
441 self.errors_count = 0;
442 }
443
444 fn merge(&mut self, other: &Self) {
445 self.bytes_processed += other.bytes_processed;
446 self.operations_count += other.operations_count;
447 self.errors_count += other.errors_count;
448 }
449
450 fn summary(&self) -> HashMap<String, String> {
451 let mut summary = HashMap::new();
452 summary.insert("bytes_processed".to_string(), self.bytes_processed.to_string());
453 summary.insert("operations_count".to_string(), self.operations_count.to_string());
454 summary.insert("errors_count".to_string(), self.errors_count.to_string());
455 summary
456 }
457
458 fn metric_type(&self) -> String {
459 "test_metrics".to_string()
460 }
461
462 fn validate(&self) -> Result<(), PipelineError> {
463 if self.operations_count < self.errors_count {
464 return Err(PipelineError::InternalError(
465 "Error count cannot exceed operations count".to_string(),
466 ));
467 }
468 Ok(())
469 }
470 }
471
472 #[test]
474 fn test_metrics_collector_creation() {
475 let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
476 assert_eq!(collector.name(), "test_collector");
477 assert_eq!(collector.active_operations_count().unwrap(), 0);
478 }
479
480 #[test]
482 fn test_operation_tracking() {
483 let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
484
485 collector.start_operation("op1".to_string()).unwrap();
487 assert_eq!(collector.active_operations_count().unwrap(), 1);
488
489 let metrics = TestMetrics {
491 bytes_processed: 1024,
492 operations_count: 1,
493 errors_count: 0,
494 };
495
496 collector
497 .complete_operation("op1".to_string(), "test_operation".to_string(), metrics)
498 .unwrap();
499
500 assert_eq!(collector.active_operations_count().unwrap(), 0);
501
502 let entries = collector.get_entries().unwrap();
503 assert_eq!(entries.len(), 1);
504 assert_eq!(entries[0].operation_id, "op1");
505 assert_eq!(entries[0].metrics.bytes_processed, 1024);
506 }
507
508 #[test]
510 fn test_metrics_aggregation() {
511 let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
512
513 let metrics1 = TestMetrics {
514 bytes_processed: 1024,
515 operations_count: 1,
516 errors_count: 0,
517 };
518
519 let metrics2 = TestMetrics {
520 bytes_processed: 2048,
521 operations_count: 1,
522 errors_count: 1,
523 };
524
525 collector
526 .complete_operation("op1".to_string(), "test".to_string(), metrics1)
527 .unwrap();
528 collector
529 .complete_operation("op2".to_string(), "test".to_string(), metrics2)
530 .unwrap();
531
532 let aggregated = collector.get_aggregated_metrics().unwrap();
533 assert_eq!(aggregated.bytes_processed, 3072);
534 assert_eq!(aggregated.operations_count, 2);
535 assert_eq!(aggregated.errors_count, 1);
536 }
537
538 #[test]
540 fn test_summary_generation() {
541 let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
542
543 let metrics = TestMetrics {
544 bytes_processed: 1024,
545 operations_count: 1,
546 errors_count: 0,
547 };
548
549 collector
550 .complete_operation("op1".to_string(), "test".to_string(), metrics)
551 .unwrap();
552
553 let summary = collector.get_summary().unwrap();
554 assert_eq!(summary.get("collector_name").unwrap(), "test_collector");
555 assert_eq!(summary.get("total_entries").unwrap(), "1");
556 assert_eq!(summary.get("successful_operations").unwrap(), "1");
557 assert_eq!(summary.get("failed_operations").unwrap(), "0");
558 assert!(summary.contains_key("bytes_processed"));
559 }
560
561 #[test]
563 fn test_failure_recording() {
564 let collector = GenericMetricsCollector::<TestMetrics>::new("test_collector".to_string());
565
566 collector.start_operation("op1".to_string()).unwrap();
567 collector
568 .record_failure(
569 "op1".to_string(),
570 "test_operation".to_string(),
571 PipelineError::InternalError("Test error".to_string()),
572 )
573 .unwrap();
574
575 let entries = collector.get_entries().unwrap();
576 assert_eq!(entries.len(), 1);
577 assert!(!entries[0].success);
578 assert!(entries[0].error_message.is_some());
579 }
580
581 #[test]
583 fn test_macro_usage() {
584 let collector = metrics_collector!(TestMetrics, "test");
585 assert_eq!(collector.name(), "test");
586 }
587}