sklears_preprocessing/
monitoring.rs

1//! Transformation Monitoring and Performance Metrics
2//!
3//! Provides comprehensive monitoring, logging, and performance tracking
4//! for preprocessing transformations.
5
6use scirs2_core::ndarray::Array2;
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10/// Transformation metrics collected during preprocessing
11#[derive(Debug, Clone)]
12pub struct TransformationMetrics {
13    /// Transformation name
14    pub name: String,
15    /// Start time
16    pub start_time: Option<Instant>,
17    /// End time
18    pub end_time: Option<Instant>,
19    /// Duration in milliseconds
20    pub duration_ms: Option<f64>,
21    /// Input shape (rows, cols)
22    pub input_shape: (usize, usize),
23    /// Output shape (rows, cols)
24    pub output_shape: (usize, usize),
25    /// Memory usage in bytes
26    pub memory_bytes: Option<usize>,
27    /// Number of NaN values in input
28    pub input_nan_count: usize,
29    /// Number of NaN values in output
30    pub output_nan_count: usize,
31    /// Transformation success
32    pub success: bool,
33    /// Error message if failed
34    pub error_message: Option<String>,
35    /// Custom metrics
36    pub custom_metrics: HashMap<String, f64>,
37}
38
39impl TransformationMetrics {
40    /// Create new transformation metrics
41    pub fn new(name: String, input_shape: (usize, usize)) -> Self {
42        Self {
43            name,
44            start_time: None,
45            end_time: None,
46            duration_ms: None,
47            input_shape,
48            output_shape: (0, 0),
49            memory_bytes: None,
50            input_nan_count: 0,
51            output_nan_count: 0,
52            success: false,
53            error_message: None,
54            custom_metrics: HashMap::new(),
55        }
56    }
57
58    /// Mark transformation as started
59    pub fn start(&mut self) {
60        self.start_time = Some(Instant::now());
61    }
62
63    /// Mark transformation as completed
64    pub fn complete(&mut self, output_shape: (usize, usize)) {
65        self.end_time = Some(Instant::now());
66        self.output_shape = output_shape;
67        self.success = true;
68
69        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
70            let duration = end.duration_since(start);
71            self.duration_ms = Some(duration.as_secs_f64() * 1000.0);
72        }
73    }
74
75    /// Mark transformation as failed
76    pub fn fail(&mut self, error: String) {
77        self.end_time = Some(Instant::now());
78        self.success = false;
79        self.error_message = Some(error);
80
81        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
82            let duration = end.duration_since(start);
83            self.duration_ms = Some(duration.as_secs_f64() * 1000.0);
84        }
85    }
86
87    /// Add custom metric
88    pub fn add_metric(&mut self, name: String, value: f64) {
89        self.custom_metrics.insert(name, value);
90    }
91
92    /// Get throughput (elements per second)
93    pub fn throughput(&self) -> Option<f64> {
94        if let Some(duration_ms) = self.duration_ms {
95            if duration_ms > 0.0 {
96                let elements = (self.input_shape.0 * self.input_shape.1) as f64;
97                Some(elements / (duration_ms / 1000.0))
98            } else {
99                None
100            }
101        } else {
102            None
103        }
104    }
105
106    /// Get memory efficiency (bytes per element)
107    pub fn memory_efficiency(&self) -> Option<f64> {
108        if let Some(mem) = self.memory_bytes {
109            let elements = (self.input_shape.0 * self.input_shape.1) as f64;
110            Some(mem as f64 / elements)
111        } else {
112            None
113        }
114    }
115}
116
117/// Pipeline monitoring session
118#[derive(Debug)]
119pub struct MonitoringSession {
120    /// Session name
121    pub name: String,
122    /// Start time
123    pub start_time: Instant,
124    /// All transformation metrics
125    pub metrics: Vec<TransformationMetrics>,
126    /// Session configuration
127    pub config: MonitoringConfig,
128}
129
130/// Monitoring configuration
131#[derive(Debug, Clone)]
132pub struct MonitoringConfig {
133    /// Enable detailed logging
134    pub enable_logging: bool,
135    /// Log level
136    pub log_level: LogLevel,
137    /// Track memory usage
138    pub track_memory: bool,
139    /// Track NaN values
140    pub track_nan: bool,
141    /// Collect custom metrics
142    pub collect_custom_metrics: bool,
143}
144
145impl Default for MonitoringConfig {
146    fn default() -> Self {
147        Self {
148            enable_logging: true,
149            log_level: LogLevel::Info,
150            track_memory: true,
151            track_nan: true,
152            collect_custom_metrics: false,
153        }
154    }
155}
156
157/// Log levels
158#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
159pub enum LogLevel {
160    Debug,
161    Info,
162    Warning,
163    Error,
164}
165
166impl MonitoringSession {
167    /// Create a new monitoring session
168    pub fn new(name: String) -> Self {
169        Self {
170            name,
171            start_time: Instant::now(),
172            metrics: Vec::new(),
173            config: MonitoringConfig::default(),
174        }
175    }
176
177    /// Create a session with custom configuration
178    pub fn with_config(name: String, config: MonitoringConfig) -> Self {
179        Self {
180            name,
181            start_time: Instant::now(),
182            metrics: Vec::new(),
183            config,
184        }
185    }
186
187    /// Start tracking a transformation
188    pub fn start_transformation(&mut self, name: String, input: &Array2<f64>) -> usize {
189        let mut metrics = TransformationMetrics::new(name.clone(), (input.nrows(), input.ncols()));
190        metrics.start();
191
192        if self.config.track_nan {
193            metrics.input_nan_count = input.iter().filter(|v| v.is_nan()).count();
194        }
195
196        if self.config.enable_logging && self.config.log_level <= LogLevel::Debug {
197            println!(
198                "[DEBUG] Starting transformation '{}' on data shape {:?}",
199                name,
200                (input.nrows(), input.ncols())
201            );
202        }
203
204        self.metrics.push(metrics);
205        self.metrics.len() - 1 // Return index
206    }
207
208    /// Complete a transformation
209    pub fn complete_transformation(
210        &mut self,
211        index: usize,
212        output: &Array2<f64>,
213    ) -> Result<(), String> {
214        if index >= self.metrics.len() {
215            return Err("Invalid metrics index".to_string());
216        }
217
218        let metrics = &mut self.metrics[index];
219        metrics.complete((output.nrows(), output.ncols()));
220
221        if self.config.track_nan {
222            metrics.output_nan_count = output.iter().filter(|v| v.is_nan()).count();
223        }
224
225        if self.config.track_memory {
226            let memory = (output.nrows() * output.ncols()) * std::mem::size_of::<f64>();
227            metrics.memory_bytes = Some(memory);
228        }
229
230        if self.config.enable_logging && self.config.log_level <= LogLevel::Info {
231            println!(
232                "[INFO] Completed transformation '{}' in {:.2}ms (throughput: {:.0} elem/s)",
233                metrics.name,
234                metrics.duration_ms.unwrap_or(0.0),
235                metrics.throughput().unwrap_or(0.0)
236            );
237        }
238
239        Ok(())
240    }
241
242    /// Fail a transformation
243    pub fn fail_transformation(&mut self, index: usize, error: String) -> Result<(), String> {
244        if index >= self.metrics.len() {
245            return Err("Invalid metrics index".to_string());
246        }
247
248        let metrics = &mut self.metrics[index];
249        metrics.fail(error.clone());
250
251        if self.config.enable_logging && self.config.log_level <= LogLevel::Error {
252            println!(
253                "[ERROR] Transformation '{}' failed: {}",
254                metrics.name, error
255            );
256        }
257
258        Ok(())
259    }
260
261    /// Get total session duration
262    pub fn total_duration(&self) -> Duration {
263        Instant::now().duration_since(self.start_time)
264    }
265
266    /// Get total processing time (sum of all transformations)
267    pub fn total_processing_time(&self) -> f64 {
268        self.metrics.iter().filter_map(|m| m.duration_ms).sum()
269    }
270
271    /// Get average throughput
272    pub fn average_throughput(&self) -> Option<f64> {
273        let throughputs: Vec<f64> = self.metrics.iter().filter_map(|m| m.throughput()).collect();
274
275        if throughputs.is_empty() {
276            None
277        } else {
278            Some(throughputs.iter().sum::<f64>() / throughputs.len() as f64)
279        }
280    }
281
282    /// Get successful transformations
283    pub fn successful_count(&self) -> usize {
284        self.metrics.iter().filter(|m| m.success).count()
285    }
286
287    /// Get failed transformations
288    pub fn failed_count(&self) -> usize {
289        self.metrics.iter().filter(|m| !m.success).count()
290    }
291
292    /// Generate summary report
293    pub fn summary(&self) -> MonitoringSummary {
294        let total_transformations = self.metrics.len();
295        let successful = self.successful_count();
296        let failed = self.failed_count();
297        let total_duration = self.total_duration();
298        let processing_time = self.total_processing_time();
299        let avg_throughput = self.average_throughput();
300
301        let total_input_elements: usize = self
302            .metrics
303            .iter()
304            .map(|m| m.input_shape.0 * m.input_shape.1)
305            .sum();
306
307        let total_output_elements: usize = self
308            .metrics
309            .iter()
310            .map(|m| m.output_shape.0 * m.output_shape.1)
311            .sum();
312
313        let total_memory: usize = self.metrics.iter().filter_map(|m| m.memory_bytes).sum();
314
315        MonitoringSummary {
316            session_name: self.name.clone(),
317            total_transformations,
318            successful_transformations: successful,
319            failed_transformations: failed,
320            total_duration_ms: total_duration.as_secs_f64() * 1000.0,
321            total_processing_ms: processing_time,
322            overhead_ms: total_duration.as_secs_f64() * 1000.0 - processing_time,
323            average_throughput: avg_throughput,
324            total_input_elements,
325            total_output_elements,
326            total_memory_bytes: total_memory,
327            slowest_transformation: self.find_slowest(),
328            fastest_transformation: self.find_fastest(),
329        }
330    }
331
332    /// Find slowest transformation
333    fn find_slowest(&self) -> Option<String> {
334        self.metrics
335            .iter()
336            .filter(|m| m.duration_ms.is_some())
337            .max_by(|a, b| {
338                a.duration_ms
339                    .unwrap()
340                    .partial_cmp(&b.duration_ms.unwrap())
341                    .unwrap_or(std::cmp::Ordering::Equal)
342            })
343            .map(|m| format!("{} ({:.2}ms)", m.name, m.duration_ms.unwrap()))
344    }
345
346    /// Find fastest transformation
347    fn find_fastest(&self) -> Option<String> {
348        self.metrics
349            .iter()
350            .filter(|m| m.duration_ms.is_some())
351            .min_by(|a, b| {
352                a.duration_ms
353                    .unwrap()
354                    .partial_cmp(&b.duration_ms.unwrap())
355                    .unwrap_or(std::cmp::Ordering::Equal)
356            })
357            .map(|m| format!("{} ({:.2}ms)", m.name, m.duration_ms.unwrap()))
358    }
359
360    /// Print summary
361    pub fn print_summary(&self) {
362        let summary = self.summary();
363        summary.print();
364    }
365}
366
367/// Monitoring summary report
368#[derive(Debug, Clone)]
369pub struct MonitoringSummary {
370    pub session_name: String,
371    pub total_transformations: usize,
372    pub successful_transformations: usize,
373    pub failed_transformations: usize,
374    pub total_duration_ms: f64,
375    pub total_processing_ms: f64,
376    pub overhead_ms: f64,
377    pub average_throughput: Option<f64>,
378    pub total_input_elements: usize,
379    pub total_output_elements: usize,
380    pub total_memory_bytes: usize,
381    pub slowest_transformation: Option<String>,
382    pub fastest_transformation: Option<String>,
383}
384
385impl MonitoringSummary {
386    /// Print formatted summary
387    pub fn print(&self) {
388        println!("\n{}", "=".repeat(60));
389        println!("Monitoring Summary: {}", self.session_name);
390        println!("{}", "=".repeat(60));
391        println!();
392
393        println!("Transformations:");
394        println!("  Total: {}", self.total_transformations);
395        println!("  Successful: {}", self.successful_transformations);
396        println!("  Failed: {}", self.failed_transformations);
397        println!();
398
399        println!("Performance:");
400        println!("  Total Duration: {:.2} ms", self.total_duration_ms);
401        println!("  Processing Time: {:.2} ms", self.total_processing_ms);
402        println!(
403            "  Overhead: {:.2} ms ({:.1}%)",
404            self.overhead_ms,
405            (self.overhead_ms / self.total_duration_ms) * 100.0
406        );
407        if let Some(throughput) = self.average_throughput {
408            println!("  Average Throughput: {:.0} elements/s", throughput);
409        }
410        println!();
411
412        println!("Data:");
413        println!("  Total Input Elements: {}", self.total_input_elements);
414        println!("  Total Output Elements: {}", self.total_output_elements);
415        println!(
416            "  Total Memory: {:.2} MB",
417            self.total_memory_bytes as f64 / 1024.0 / 1024.0
418        );
419        println!();
420
421        if let Some(slowest) = &self.slowest_transformation {
422            println!("Slowest Transformation: {}", slowest);
423        }
424        if let Some(fastest) = &self.fastest_transformation {
425            println!("Fastest Transformation: {}", fastest);
426        }
427
428        println!("{}", "=".repeat(60));
429    }
430
431    /// Get efficiency percentage
432    pub fn efficiency(&self) -> f64 {
433        if self.total_duration_ms > 0.0 {
434            (self.total_processing_ms / self.total_duration_ms) * 100.0
435        } else {
436            0.0
437        }
438    }
439
440    /// Check if performance is acceptable
441    pub fn is_acceptable(&self, min_throughput: f64, max_overhead_percent: f64) -> bool {
442        let throughput_ok = self
443            .average_throughput
444            .map(|t| t >= min_throughput)
445            .unwrap_or(false);
446
447        let overhead_percent = (self.overhead_ms / self.total_duration_ms) * 100.0;
448        let overhead_ok = overhead_percent <= max_overhead_percent;
449
450        throughput_ok && overhead_ok
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use scirs2_core::random::essentials::Normal;
458    use scirs2_core::random::{seeded_rng, Distribution};
459    use std::thread;
460
461    fn generate_test_data(nrows: usize, ncols: usize, seed: u64) -> Array2<f64> {
462        let mut rng = seeded_rng(seed);
463        let normal = Normal::new(0.0, 1.0).unwrap();
464
465        let data: Vec<f64> = (0..nrows * ncols)
466            .map(|_| normal.sample(&mut rng))
467            .collect();
468
469        Array2::from_shape_vec((nrows, ncols), data).unwrap()
470    }
471
472    #[test]
473    fn test_transformation_metrics() {
474        let mut metrics = TransformationMetrics::new("test".to_string(), (100, 10));
475        metrics.start();
476
477        thread::sleep(Duration::from_millis(10));
478
479        metrics.complete((100, 10));
480
481        assert!(metrics.success);
482        assert!(metrics.duration_ms.is_some());
483        assert!(metrics.duration_ms.unwrap() >= 10.0);
484    }
485
486    #[test]
487    fn test_monitoring_session() {
488        let mut session = MonitoringSession::new("test_session".to_string());
489
490        let input = generate_test_data(100, 10, 42);
491        let output = input.clone();
492
493        let idx = session.start_transformation("StandardScaler".to_string(), &input);
494        thread::sleep(Duration::from_millis(5));
495        session.complete_transformation(idx, &output).unwrap();
496
497        assert_eq!(session.successful_count(), 1);
498        assert_eq!(session.failed_count(), 0);
499    }
500
501    #[test]
502    fn test_monitoring_session_failure() {
503        let mut session = MonitoringSession::new("test_session".to_string());
504
505        let input = generate_test_data(100, 10, 123);
506
507        let idx = session.start_transformation("Faulty".to_string(), &input);
508        session
509            .fail_transformation(idx, "Test error".to_string())
510            .unwrap();
511
512        assert_eq!(session.successful_count(), 0);
513        assert_eq!(session.failed_count(), 1);
514    }
515
516    #[test]
517    fn test_throughput_calculation() {
518        let mut metrics = TransformationMetrics::new("test".to_string(), (1000, 100));
519        metrics.start();
520
521        thread::sleep(Duration::from_millis(100));
522
523        metrics.complete((1000, 100));
524
525        let throughput = metrics.throughput();
526        assert!(throughput.is_some());
527        assert!(throughput.unwrap() > 0.0);
528    }
529
530    #[test]
531    fn test_monitoring_summary() {
532        let mut session = MonitoringSession::new("test".to_string());
533
534        let input1 = generate_test_data(100, 10, 42);
535        let input2 = generate_test_data(200, 20, 123);
536
537        let idx1 = session.start_transformation("Step1".to_string(), &input1);
538        thread::sleep(Duration::from_millis(5));
539        session.complete_transformation(idx1, &input1).unwrap();
540
541        let idx2 = session.start_transformation("Step2".to_string(), &input2);
542        thread::sleep(Duration::from_millis(10));
543        session.complete_transformation(idx2, &input2).unwrap();
544
545        let summary = session.summary();
546
547        assert_eq!(summary.total_transformations, 2);
548        assert_eq!(summary.successful_transformations, 2);
549        assert!(summary.slowest_transformation.is_some());
550        assert!(summary.fastest_transformation.is_some());
551    }
552
553    #[test]
554    fn test_custom_metrics() {
555        let mut metrics = TransformationMetrics::new("test".to_string(), (100, 10));
556        metrics.add_metric("accuracy".to_string(), 0.95);
557        metrics.add_metric("loss".to_string(), 0.05);
558
559        assert_eq!(metrics.custom_metrics.len(), 2);
560        assert_eq!(metrics.custom_metrics.get("accuracy"), Some(&0.95));
561    }
562}