Skip to main content

sklears_preprocessing/
monitoring.rs

1//! Transformation Monitoring and Performance Metrics
2//!
3//! Provides comprehensive monitoring, logging, and performance tracking
4//! for preprocessing transformations.
5
6use scirs2_core::ndarray::Array2;
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10/// Transformation metrics collected during preprocessing
11#[derive(Debug, Clone)]
12pub struct TransformationMetrics {
13    /// Transformation name
14    pub name: String,
15    /// Start time
16    pub start_time: Option<Instant>,
17    /// End time
18    pub end_time: Option<Instant>,
19    /// Duration in milliseconds
20    pub duration_ms: Option<f64>,
21    /// Input shape (rows, cols)
22    pub input_shape: (usize, usize),
23    /// Output shape (rows, cols)
24    pub output_shape: (usize, usize),
25    /// Memory usage in bytes
26    pub memory_bytes: Option<usize>,
27    /// Number of NaN values in input
28    pub input_nan_count: usize,
29    /// Number of NaN values in output
30    pub output_nan_count: usize,
31    /// Transformation success
32    pub success: bool,
33    /// Error message if failed
34    pub error_message: Option<String>,
35    /// Custom metrics
36    pub custom_metrics: HashMap<String, f64>,
37}
38
39impl TransformationMetrics {
40    /// Create new transformation metrics
41    pub fn new(name: String, input_shape: (usize, usize)) -> Self {
42        Self {
43            name,
44            start_time: None,
45            end_time: None,
46            duration_ms: None,
47            input_shape,
48            output_shape: (0, 0),
49            memory_bytes: None,
50            input_nan_count: 0,
51            output_nan_count: 0,
52            success: false,
53            error_message: None,
54            custom_metrics: HashMap::new(),
55        }
56    }
57
58    /// Mark transformation as started
59    pub fn start(&mut self) {
60        self.start_time = Some(Instant::now());
61    }
62
63    /// Mark transformation as completed
64    pub fn complete(&mut self, output_shape: (usize, usize)) {
65        self.end_time = Some(Instant::now());
66        self.output_shape = output_shape;
67        self.success = true;
68
69        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
70            let duration = end.duration_since(start);
71            self.duration_ms = Some(duration.as_secs_f64() * 1000.0);
72        }
73    }
74
75    /// Mark transformation as failed
76    pub fn fail(&mut self, error: String) {
77        self.end_time = Some(Instant::now());
78        self.success = false;
79        self.error_message = Some(error);
80
81        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
82            let duration = end.duration_since(start);
83            self.duration_ms = Some(duration.as_secs_f64() * 1000.0);
84        }
85    }
86
87    /// Add custom metric
88    pub fn add_metric(&mut self, name: String, value: f64) {
89        self.custom_metrics.insert(name, value);
90    }
91
92    /// Get throughput (elements per second)
93    pub fn throughput(&self) -> Option<f64> {
94        if let Some(duration_ms) = self.duration_ms {
95            if duration_ms > 0.0 {
96                let elements = (self.input_shape.0 * self.input_shape.1) as f64;
97                Some(elements / (duration_ms / 1000.0))
98            } else {
99                None
100            }
101        } else {
102            None
103        }
104    }
105
106    /// Get memory efficiency (bytes per element)
107    pub fn memory_efficiency(&self) -> Option<f64> {
108        if let Some(mem) = self.memory_bytes {
109            let elements = (self.input_shape.0 * self.input_shape.1) as f64;
110            Some(mem as f64 / elements)
111        } else {
112            None
113        }
114    }
115}
116
117/// Pipeline monitoring session
118#[derive(Debug)]
119pub struct MonitoringSession {
120    /// Session name
121    pub name: String,
122    /// Start time
123    pub start_time: Instant,
124    /// All transformation metrics
125    pub metrics: Vec<TransformationMetrics>,
126    /// Session configuration
127    pub config: MonitoringConfig,
128}
129
130/// Monitoring configuration
131#[derive(Debug, Clone)]
132pub struct MonitoringConfig {
133    /// Enable detailed logging
134    pub enable_logging: bool,
135    /// Log level
136    pub log_level: LogLevel,
137    /// Track memory usage
138    pub track_memory: bool,
139    /// Track NaN values
140    pub track_nan: bool,
141    /// Collect custom metrics
142    pub collect_custom_metrics: bool,
143}
144
145impl Default for MonitoringConfig {
146    fn default() -> Self {
147        Self {
148            enable_logging: true,
149            log_level: LogLevel::Info,
150            track_memory: true,
151            track_nan: true,
152            collect_custom_metrics: false,
153        }
154    }
155}
156
157/// Log levels
158#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
159pub enum LogLevel {
160    Debug,
161    Info,
162    Warning,
163    Error,
164}
165
166impl MonitoringSession {
167    /// Create a new monitoring session
168    pub fn new(name: String) -> Self {
169        Self {
170            name,
171            start_time: Instant::now(),
172            metrics: Vec::new(),
173            config: MonitoringConfig::default(),
174        }
175    }
176
177    /// Create a session with custom configuration
178    pub fn with_config(name: String, config: MonitoringConfig) -> Self {
179        Self {
180            name,
181            start_time: Instant::now(),
182            metrics: Vec::new(),
183            config,
184        }
185    }
186
187    /// Start tracking a transformation
188    pub fn start_transformation(&mut self, name: String, input: &Array2<f64>) -> usize {
189        let mut metrics = TransformationMetrics::new(name.clone(), (input.nrows(), input.ncols()));
190        metrics.start();
191
192        if self.config.track_nan {
193            metrics.input_nan_count = input.iter().filter(|v| v.is_nan()).count();
194        }
195
196        if self.config.enable_logging && self.config.log_level <= LogLevel::Debug {
197            println!(
198                "[DEBUG] Starting transformation '{}' on data shape {:?}",
199                name,
200                (input.nrows(), input.ncols())
201            );
202        }
203
204        self.metrics.push(metrics);
205        self.metrics.len() - 1 // Return index
206    }
207
208    /// Complete a transformation
209    pub fn complete_transformation(
210        &mut self,
211        index: usize,
212        output: &Array2<f64>,
213    ) -> Result<(), String> {
214        if index >= self.metrics.len() {
215            return Err("Invalid metrics index".to_string());
216        }
217
218        let metrics = &mut self.metrics[index];
219        metrics.complete((output.nrows(), output.ncols()));
220
221        if self.config.track_nan {
222            metrics.output_nan_count = output.iter().filter(|v| v.is_nan()).count();
223        }
224
225        if self.config.track_memory {
226            let memory = (output.nrows() * output.ncols()) * std::mem::size_of::<f64>();
227            metrics.memory_bytes = Some(memory);
228        }
229
230        if self.config.enable_logging && self.config.log_level <= LogLevel::Info {
231            println!(
232                "[INFO] Completed transformation '{}' in {:.2}ms (throughput: {:.0} elem/s)",
233                metrics.name,
234                metrics.duration_ms.unwrap_or(0.0),
235                metrics.throughput().unwrap_or(0.0)
236            );
237        }
238
239        Ok(())
240    }
241
242    /// Fail a transformation
243    pub fn fail_transformation(&mut self, index: usize, error: String) -> Result<(), String> {
244        if index >= self.metrics.len() {
245            return Err("Invalid metrics index".to_string());
246        }
247
248        let metrics = &mut self.metrics[index];
249        metrics.fail(error.clone());
250
251        if self.config.enable_logging && self.config.log_level <= LogLevel::Error {
252            println!(
253                "[ERROR] Transformation '{}' failed: {}",
254                metrics.name, error
255            );
256        }
257
258        Ok(())
259    }
260
261    /// Get total session duration
262    pub fn total_duration(&self) -> Duration {
263        Instant::now().duration_since(self.start_time)
264    }
265
266    /// Get total processing time (sum of all transformations)
267    pub fn total_processing_time(&self) -> f64 {
268        self.metrics.iter().filter_map(|m| m.duration_ms).sum()
269    }
270
271    /// Get average throughput
272    pub fn average_throughput(&self) -> Option<f64> {
273        let throughputs: Vec<f64> = self.metrics.iter().filter_map(|m| m.throughput()).collect();
274
275        if throughputs.is_empty() {
276            None
277        } else {
278            Some(throughputs.iter().sum::<f64>() / throughputs.len() as f64)
279        }
280    }
281
282    /// Get successful transformations
283    pub fn successful_count(&self) -> usize {
284        self.metrics.iter().filter(|m| m.success).count()
285    }
286
287    /// Get failed transformations
288    pub fn failed_count(&self) -> usize {
289        self.metrics.iter().filter(|m| !m.success).count()
290    }
291
292    /// Generate summary report
293    pub fn summary(&self) -> MonitoringSummary {
294        let total_transformations = self.metrics.len();
295        let successful = self.successful_count();
296        let failed = self.failed_count();
297        let total_duration = self.total_duration();
298        let processing_time = self.total_processing_time();
299        let avg_throughput = self.average_throughput();
300
301        let total_input_elements: usize = self
302            .metrics
303            .iter()
304            .map(|m| m.input_shape.0 * m.input_shape.1)
305            .sum();
306
307        let total_output_elements: usize = self
308            .metrics
309            .iter()
310            .map(|m| m.output_shape.0 * m.output_shape.1)
311            .sum();
312
313        let total_memory: usize = self.metrics.iter().filter_map(|m| m.memory_bytes).sum();
314
315        MonitoringSummary {
316            session_name: self.name.clone(),
317            total_transformations,
318            successful_transformations: successful,
319            failed_transformations: failed,
320            total_duration_ms: total_duration.as_secs_f64() * 1000.0,
321            total_processing_ms: processing_time,
322            overhead_ms: total_duration.as_secs_f64() * 1000.0 - processing_time,
323            average_throughput: avg_throughput,
324            total_input_elements,
325            total_output_elements,
326            total_memory_bytes: total_memory,
327            slowest_transformation: self.find_slowest(),
328            fastest_transformation: self.find_fastest(),
329        }
330    }
331
332    /// Find slowest transformation
333    fn find_slowest(&self) -> Option<String> {
334        self.metrics
335            .iter()
336            .filter(|m| m.duration_ms.is_some())
337            .max_by(|a, b| {
338                a.duration_ms
339                    .expect("operation should succeed")
340                    .partial_cmp(&b.duration_ms.expect("operation should succeed"))
341                    .unwrap_or(std::cmp::Ordering::Equal)
342            })
343            .map(|m| {
344                format!(
345                    "{} ({:.2}ms)",
346                    m.name,
347                    m.duration_ms.expect("operation should succeed")
348                )
349            })
350    }
351
352    /// Find fastest transformation
353    fn find_fastest(&self) -> Option<String> {
354        self.metrics
355            .iter()
356            .filter(|m| m.duration_ms.is_some())
357            .min_by(|a, b| {
358                a.duration_ms
359                    .expect("operation should succeed")
360                    .partial_cmp(&b.duration_ms.expect("operation should succeed"))
361                    .unwrap_or(std::cmp::Ordering::Equal)
362            })
363            .map(|m| {
364                format!(
365                    "{} ({:.2}ms)",
366                    m.name,
367                    m.duration_ms.expect("operation should succeed")
368                )
369            })
370    }
371
372    /// Print summary
373    pub fn print_summary(&self) {
374        let summary = self.summary();
375        summary.print();
376    }
377}
378
379/// Monitoring summary report
380#[derive(Debug, Clone)]
381pub struct MonitoringSummary {
382    pub session_name: String,
383    pub total_transformations: usize,
384    pub successful_transformations: usize,
385    pub failed_transformations: usize,
386    pub total_duration_ms: f64,
387    pub total_processing_ms: f64,
388    pub overhead_ms: f64,
389    pub average_throughput: Option<f64>,
390    pub total_input_elements: usize,
391    pub total_output_elements: usize,
392    pub total_memory_bytes: usize,
393    pub slowest_transformation: Option<String>,
394    pub fastest_transformation: Option<String>,
395}
396
397impl MonitoringSummary {
398    /// Print formatted summary
399    pub fn print(&self) {
400        println!("\n{}", "=".repeat(60));
401        println!("Monitoring Summary: {}", self.session_name);
402        println!("{}", "=".repeat(60));
403        println!();
404
405        println!("Transformations:");
406        println!("  Total: {}", self.total_transformations);
407        println!("  Successful: {}", self.successful_transformations);
408        println!("  Failed: {}", self.failed_transformations);
409        println!();
410
411        println!("Performance:");
412        println!("  Total Duration: {:.2} ms", self.total_duration_ms);
413        println!("  Processing Time: {:.2} ms", self.total_processing_ms);
414        println!(
415            "  Overhead: {:.2} ms ({:.1}%)",
416            self.overhead_ms,
417            (self.overhead_ms / self.total_duration_ms) * 100.0
418        );
419        if let Some(throughput) = self.average_throughput {
420            println!("  Average Throughput: {:.0} elements/s", throughput);
421        }
422        println!();
423
424        println!("Data:");
425        println!("  Total Input Elements: {}", self.total_input_elements);
426        println!("  Total Output Elements: {}", self.total_output_elements);
427        println!(
428            "  Total Memory: {:.2} MB",
429            self.total_memory_bytes as f64 / 1024.0 / 1024.0
430        );
431        println!();
432
433        if let Some(slowest) = &self.slowest_transformation {
434            println!("Slowest Transformation: {}", slowest);
435        }
436        if let Some(fastest) = &self.fastest_transformation {
437            println!("Fastest Transformation: {}", fastest);
438        }
439
440        println!("{}", "=".repeat(60));
441    }
442
443    /// Get efficiency percentage
444    pub fn efficiency(&self) -> f64 {
445        if self.total_duration_ms > 0.0 {
446            (self.total_processing_ms / self.total_duration_ms) * 100.0
447        } else {
448            0.0
449        }
450    }
451
452    /// Check if performance is acceptable
453    pub fn is_acceptable(&self, min_throughput: f64, max_overhead_percent: f64) -> bool {
454        let throughput_ok = self
455            .average_throughput
456            .map(|t| t >= min_throughput)
457            .unwrap_or(false);
458
459        let overhead_percent = (self.overhead_ms / self.total_duration_ms) * 100.0;
460        let overhead_ok = overhead_percent <= max_overhead_percent;
461
462        throughput_ok && overhead_ok
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use scirs2_core::random::essentials::Normal;
470    use scirs2_core::random::{seeded_rng, Distribution};
471    use std::thread;
472
473    fn generate_test_data(nrows: usize, ncols: usize, seed: u64) -> Array2<f64> {
474        let mut rng = seeded_rng(seed);
475        let normal = Normal::new(0.0, 1.0).expect("operation should succeed");
476
477        let data: Vec<f64> = (0..nrows * ncols)
478            .map(|_| normal.sample(&mut rng))
479            .collect();
480
481        Array2::from_shape_vec((nrows, ncols), data).expect("shape and data length should match")
482    }
483
484    #[test]
485    fn test_transformation_metrics() {
486        let mut metrics = TransformationMetrics::new("test".to_string(), (100, 10));
487        metrics.start();
488
489        thread::sleep(Duration::from_millis(10));
490
491        metrics.complete((100, 10));
492
493        assert!(metrics.success);
494        assert!(metrics.duration_ms.is_some());
495        assert!(metrics.duration_ms.expect("operation should succeed") >= 10.0);
496    }
497
498    #[test]
499    fn test_monitoring_session() {
500        let mut session = MonitoringSession::new("test_session".to_string());
501
502        let input = generate_test_data(100, 10, 42);
503        let output = input.clone();
504
505        let idx = session.start_transformation("StandardScaler".to_string(), &input);
506        thread::sleep(Duration::from_millis(5));
507        session
508            .complete_transformation(idx, &output)
509            .expect("operation should succeed");
510
511        assert_eq!(session.successful_count(), 1);
512        assert_eq!(session.failed_count(), 0);
513    }
514
515    #[test]
516    fn test_monitoring_session_failure() {
517        let mut session = MonitoringSession::new("test_session".to_string());
518
519        let input = generate_test_data(100, 10, 123);
520
521        let idx = session.start_transformation("Faulty".to_string(), &input);
522        session
523            .fail_transformation(idx, "Test error".to_string())
524            .expect("operation should succeed");
525
526        assert_eq!(session.successful_count(), 0);
527        assert_eq!(session.failed_count(), 1);
528    }
529
530    #[test]
531    fn test_throughput_calculation() {
532        let mut metrics = TransformationMetrics::new("test".to_string(), (1000, 100));
533        metrics.start();
534
535        thread::sleep(Duration::from_millis(100));
536
537        metrics.complete((1000, 100));
538
539        let throughput = metrics.throughput();
540        assert!(throughput.is_some());
541        assert!(throughput.expect("operation should succeed") > 0.0);
542    }
543
544    #[test]
545    fn test_monitoring_summary() {
546        let mut session = MonitoringSession::new("test".to_string());
547
548        let input1 = generate_test_data(100, 10, 42);
549        let input2 = generate_test_data(200, 20, 123);
550
551        let idx1 = session.start_transformation("Step1".to_string(), &input1);
552        thread::sleep(Duration::from_millis(5));
553        session
554            .complete_transformation(idx1, &input1)
555            .expect("operation should succeed");
556
557        let idx2 = session.start_transformation("Step2".to_string(), &input2);
558        thread::sleep(Duration::from_millis(10));
559        session
560            .complete_transformation(idx2, &input2)
561            .expect("operation should succeed");
562
563        let summary = session.summary();
564
565        assert_eq!(summary.total_transformations, 2);
566        assert_eq!(summary.successful_transformations, 2);
567        assert!(summary.slowest_transformation.is_some());
568        assert!(summary.fastest_transformation.is_some());
569    }
570
571    #[test]
572    fn test_custom_metrics() {
573        let mut metrics = TransformationMetrics::new("test".to_string(), (100, 10));
574        metrics.add_metric("accuracy".to_string(), 0.95);
575        metrics.add_metric("loss".to_string(), 0.05);
576
577        assert_eq!(metrics.custom_metrics.len(), 2);
578        assert_eq!(metrics.custom_metrics.get("accuracy"), Some(&0.95));
579    }
580}