adaptive_pipeline_domain/entities/
processing_metrics.rs1use chrono::{DateTime, Utc};
43use serde::{Deserialize, Serialize};
44use std::time::{Duration, Instant};
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ProcessingMetrics {
107 bytes_processed: u64,
108 bytes_total: u64,
109 chunks_processed: u64,
110 chunks_total: u64,
111 #[serde(skip)]
113 start_time: Option<Instant>,
114 #[serde(skip)]
115 end_time: Option<Instant>,
116 start_time_rfc3339: Option<String>,
118 end_time_rfc3339: Option<String>,
119 processing_duration: Option<Duration>,
120 throughput_bytes_per_second: f64,
121 compression_ratio: Option<f64>,
122 error_count: u64,
123 warning_count: u64,
124 input_file_size_bytes: u64,
126 output_file_size_bytes: u64,
127 input_file_checksum: Option<String>,
128 output_file_checksum: Option<String>,
129 stage_metrics: std::collections::HashMap<String, StageMetrics>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct StageMetrics {
181 pub stage_name: String,
182 pub bytes_processed: u64,
183 pub processing_time: Duration,
184 pub throughput: f64,
185 pub error_count: u64,
186 pub success_rate: f64,
187 pub memory_usage: Option<u64>,
188 pub cpu_usage: Option<f64>,
189}
190
191impl Default for ProcessingMetrics {
192 fn default() -> Self {
193 Self {
194 bytes_processed: 0,
195 bytes_total: 0,
196 chunks_processed: 0,
197 chunks_total: 0,
198 start_time: None,
199 end_time: None,
200 start_time_rfc3339: None,
201 end_time_rfc3339: None,
202 processing_duration: None,
203 throughput_bytes_per_second: 0.0,
204 compression_ratio: None,
205 error_count: 0,
206 warning_count: 0,
207 input_file_size_bytes: 0,
208 output_file_size_bytes: 0,
209 input_file_checksum: None,
210 output_file_checksum: None,
211 stage_metrics: std::collections::HashMap::new(),
212 }
213 }
214}
215
216impl ProcessingMetrics {
217 pub fn new(bytes_total: u64, chunks_total: u64) -> Self {
219 Self {
220 bytes_total,
221 chunks_total,
222 ..Default::default()
223 }
224 }
225
226 pub fn start(&mut self) {
228 self.start_time = Some(Instant::now());
229 self.start_time_rfc3339 = Some(Utc::now().to_rfc3339());
230 }
231
232 pub fn end(&mut self) {
234 self.end_time = Some(Instant::now());
235 self.end_time_rfc3339 = Some(Utc::now().to_rfc3339());
236 if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
237 self.processing_duration = Some(end.duration_since(start));
238 self.calculate_throughput();
239 }
240 }
241
242 pub fn update_bytes_processed(&mut self, bytes: u64) {
244 self.bytes_processed = bytes;
245 self.calculate_throughput();
246 }
247
248 pub fn add_bytes_processed(&mut self, bytes: u64) {
250 self.bytes_processed += bytes;
251 self.calculate_throughput();
252 }
253
254 pub fn update_chunks_processed(&mut self, chunks: u64) {
256 self.chunks_processed = chunks;
257 }
258
259 pub fn add_chunks_processed(&mut self, chunks: u64) {
261 self.chunks_processed += chunks;
262 }
263
264 pub fn set_compression_ratio(&mut self, ratio: f64) {
266 self.compression_ratio = Some(ratio);
267 }
268
269 pub fn increment_errors(&mut self) {
271 self.error_count += 1;
272 }
273
274 pub fn increment_warnings(&mut self) {
276 self.warning_count += 1;
277 }
278
279 pub fn add_stage_metrics(&mut self, metrics: StageMetrics) {
281 self.stage_metrics.insert(metrics.stage_name.clone(), metrics);
282 }
283
284 pub fn bytes_processed(&self) -> u64 {
286 self.bytes_processed
287 }
288
289 pub fn bytes_total(&self) -> u64 {
291 self.bytes_total
292 }
293
294 pub fn chunks_processed(&self) -> u64 {
296 self.chunks_processed
297 }
298
299 pub fn chunks_total(&self) -> u64 {
301 self.chunks_total
302 }
303
304 pub fn processing_duration(&self) -> Option<Duration> {
306 self.processing_duration
307 }
308
309 pub fn start_time(&self) -> Option<DateTime<Utc>> {
311 self.start_time_rfc3339
312 .as_ref()
313 .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
314 }
315
316 pub fn end_time(&self) -> Option<DateTime<Utc>> {
318 self.end_time_rfc3339
319 .as_ref()
320 .and_then(|s| DateTime::parse_from_rfc3339(s).ok().map(|dt| dt.with_timezone(&Utc)))
321 }
322
323 pub fn throughput_bytes_per_second(&self) -> f64 {
325 self.throughput_bytes_per_second
326 }
327
328 pub fn throughput_mb_per_second(&self) -> f64 {
330 self.throughput_bytes_per_second / (1024.0 * 1024.0)
331 }
332
333 pub fn compression_ratio(&self) -> Option<f64> {
335 self.compression_ratio
336 }
337
338 pub fn error_count(&self) -> u64 {
340 self.error_count
341 }
342
343 pub fn warning_count(&self) -> u64 {
345 self.warning_count
346 }
347
348 pub fn stage_metrics(&self) -> &std::collections::HashMap<String, StageMetrics> {
350 &self.stage_metrics
351 }
352
353 pub fn input_file_size_bytes(&self) -> u64 {
355 self.input_file_size_bytes
356 }
357
358 pub fn output_file_size_bytes(&self) -> u64 {
360 self.output_file_size_bytes
361 }
362
363 pub fn input_file_size_mib(&self) -> f64 {
365 (self.input_file_size_bytes as f64) / (1024.0 * 1024.0)
366 }
367
368 pub fn output_file_size_mib(&self) -> f64 {
370 (self.output_file_size_bytes as f64) / (1024.0 * 1024.0)
371 }
372
373 pub fn input_file_checksum(&self) -> &Option<String> {
375 &self.input_file_checksum
376 }
377
378 pub fn output_file_checksum(&self) -> &Option<String> {
380 &self.output_file_checksum
381 }
382
383 pub fn progress_percentage(&self) -> f64 {
385 if self.bytes_total == 0 {
386 return 0.0;
387 }
388 ((self.bytes_processed as f64) / (self.bytes_total as f64)) * 100.0
389 }
390
391 pub fn chunk_progress_percentage(&self) -> f64 {
393 if self.chunks_total == 0 {
394 return 0.0;
395 }
396 ((self.chunks_processed as f64) / (self.chunks_total as f64)) * 100.0
397 }
398
399 pub fn estimated_remaining_time(&self) -> Option<Duration> {
401 if self.throughput_bytes_per_second <= 0.0 || self.bytes_processed == 0 {
402 return None;
403 }
404
405 let remaining_bytes = self.bytes_total.saturating_sub(self.bytes_processed);
406 let remaining_seconds = (remaining_bytes as f64) / self.throughput_bytes_per_second;
407 Some(Duration::from_secs_f64(remaining_seconds))
408 }
409
410 pub fn is_complete(&self) -> bool {
412 self.bytes_processed >= self.bytes_total && self.chunks_processed >= self.chunks_total
413 }
414
415 pub fn success_rate(&self) -> f64 {
417 if self.chunks_processed == 0 {
418 return 0.0;
419 }
420 let successful_chunks = self.chunks_processed.saturating_sub(self.error_count);
421 (successful_chunks as f64) / (self.chunks_processed as f64)
422 }
423
424 pub fn set_input_file_info(&mut self, size_bytes: u64, checksum: Option<String>) {
426 self.input_file_size_bytes = size_bytes;
427 self.input_file_checksum = checksum;
428 }
429
430 pub fn set_output_file_info(&mut self, size_bytes: u64, checksum: Option<String>) {
432 self.output_file_size_bytes = size_bytes;
433 self.output_file_checksum = checksum;
434 }
435
436 fn calculate_throughput(&mut self) {
438 if let Some(duration) = self.processing_duration {
439 let seconds = duration.as_secs_f64();
440 if seconds > 0.0 {
441 self.throughput_bytes_per_second = (self.bytes_processed as f64) / seconds;
442 }
443 } else if let Some(start) = self.start_time {
444 let elapsed = start.elapsed();
445 let seconds = elapsed.as_secs_f64();
446 if seconds > 0.0 {
447 self.throughput_bytes_per_second = (self.bytes_processed as f64) / seconds;
448 }
449 }
450 }
451
452 pub fn merge(&mut self, other: &ProcessingMetrics) {
454 self.bytes_processed += other.bytes_processed;
455 self.chunks_processed += other.chunks_processed;
456 self.error_count += other.error_count;
457 self.warning_count += other.warning_count;
458
459 for (stage_name, stage_metrics) in &other.stage_metrics {
461 self.stage_metrics.insert(stage_name.clone(), stage_metrics.clone());
462 }
463
464 self.calculate_throughput();
466 }
467}
468
469impl StageMetrics {
470 pub fn new(stage_name: String) -> Self {
472 Self {
473 stage_name,
474 bytes_processed: 0,
475 processing_time: Duration::ZERO,
476 throughput: 0.0,
477 error_count: 0,
478 success_rate: 0.0,
479 memory_usage: None,
480 cpu_usage: None,
481 }
482 }
483
484 pub fn update(&mut self, bytes_processed: u64, processing_time: Duration) {
486 self.bytes_processed = bytes_processed;
487 self.processing_time = processing_time;
488
489 let seconds = processing_time.as_secs_f64();
490 if seconds > 0.0 {
491 self.throughput = (bytes_processed as f64) / seconds;
492 }
493 }
494
495 pub fn set_memory_usage(&mut self, memory_usage: u64) {
497 self.memory_usage = Some(memory_usage);
498 }
499
500 pub fn set_cpu_usage(&mut self, cpu_usage: f64) {
502 self.cpu_usage = Some(cpu_usage);
503 }
504
505 pub fn increment_errors(&mut self) {
507 self.error_count += 1;
508 }
509
510 pub fn calculate_success_rate(&mut self, total_operations: u64) {
512 if total_operations > 0 {
513 let successful_operations = total_operations.saturating_sub(self.error_count);
514 self.success_rate = (successful_operations as f64) / (total_operations as f64);
515 }
516 }
517}