Skip to main content

edgeparse_core/pipeline/
logging.rs

1//! Structured pipeline logging with per-stage timing.
2//!
3//! Provides a [`PipelineTimer`] that records how long each pipeline stage takes
4//! and produces a structured summary at the end.
5
6use std::time::{Duration, Instant};
7
8/// A single recorded stage execution.
9#[derive(Debug, Clone)]
10pub struct StageRecord {
11    /// Human-readable stage name.
12    pub name: String,
13    /// Elapsed wall-clock time.
14    pub duration: Duration,
15    /// Element count after the stage completed.
16    pub element_count: usize,
17}
18
19/// Accumulates timing records for each pipeline stage.
20#[derive(Debug, Default)]
21pub struct PipelineTimer {
22    records: Vec<StageRecord>,
23    current_start: Option<(String, Instant)>,
24    pipeline_start: Option<Instant>,
25}
26
27impl PipelineTimer {
28    /// Create a new timer and record the pipeline start time.
29    pub fn new() -> Self {
30        Self {
31            records: Vec::new(),
32            current_start: None,
33            pipeline_start: Some(Instant::now()),
34        }
35    }
36
37    /// Begin timing a named stage. If a previous stage was still open, it is
38    /// automatically ended with element_count = 0.
39    pub fn start_stage(&mut self, name: &str) {
40        if let Some((prev_name, prev_start)) = self.current_start.take() {
41            self.records.push(StageRecord {
42                name: prev_name,
43                duration: prev_start.elapsed(),
44                element_count: 0,
45            });
46        }
47        self.current_start = Some((name.to_string(), Instant::now()));
48    }
49
50    /// End the current stage and record its element count.
51    pub fn end_stage(&mut self, element_count: usize) {
52        if let Some((name, start)) = self.current_start.take() {
53            self.records.push(StageRecord {
54                name,
55                duration: start.elapsed(),
56                element_count,
57            });
58        }
59    }
60
61    /// Total pipeline wall-clock time.
62    pub fn total_duration(&self) -> Duration {
63        self.pipeline_start.map(|s| s.elapsed()).unwrap_or_default()
64    }
65
66    /// Recorded stage entries.
67    pub fn records(&self) -> &[StageRecord] {
68        &self.records
69    }
70
71    /// Format a human-readable timing summary.
72    pub fn summary(&self) -> String {
73        let mut out = String::from("Pipeline Timing Summary\n");
74        out.push_str(&format!(
75            "{:<40} {:>10} {:>10}\n",
76            "Stage", "Time (ms)", "Elements"
77        ));
78        out.push_str(&"-".repeat(62));
79        out.push('\n');
80        for r in &self.records {
81            out.push_str(&format!(
82                "{:<40} {:>10.2} {:>10}\n",
83                r.name,
84                r.duration.as_secs_f64() * 1000.0,
85                r.element_count,
86            ));
87        }
88        out.push_str(&"-".repeat(62));
89        out.push('\n');
90        out.push_str(&format!(
91            "{:<40} {:>10.2}\n",
92            "TOTAL",
93            self.total_duration().as_secs_f64() * 1000.0
94        ));
95        out
96    }
97
98    /// Log the summary at info level.
99    pub fn log_summary(&self) {
100        for line in self.summary().lines() {
101            log::info!("{}", line);
102        }
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use std::thread;
110
111    #[test]
112    fn test_stage_recording() {
113        let mut timer = PipelineTimer::new();
114        timer.start_stage("Stage A");
115        thread::sleep(Duration::from_millis(5));
116        timer.end_stage(100);
117
118        timer.start_stage("Stage B");
119        timer.end_stage(200);
120
121        assert_eq!(timer.records().len(), 2);
122        assert_eq!(timer.records()[0].name, "Stage A");
123        assert_eq!(timer.records()[0].element_count, 100);
124        assert_eq!(timer.records()[1].name, "Stage B");
125        assert_eq!(timer.records()[1].element_count, 200);
126        // Stage A should have non-zero duration
127        assert!(timer.records()[0].duration.as_nanos() > 0);
128    }
129
130    #[test]
131    fn test_auto_close_previous_stage() {
132        let mut timer = PipelineTimer::new();
133        timer.start_stage("Stage 1");
134        // Start another without ending the first
135        timer.start_stage("Stage 2");
136        timer.end_stage(50);
137
138        assert_eq!(timer.records().len(), 2);
139        assert_eq!(timer.records()[0].name, "Stage 1");
140        assert_eq!(timer.records()[0].element_count, 0); // auto-closed
141        assert_eq!(timer.records()[1].name, "Stage 2");
142    }
143
144    #[test]
145    fn test_summary_format() {
146        let mut timer = PipelineTimer::new();
147        timer.start_stage("Test Stage");
148        timer.end_stage(42);
149
150        let summary = timer.summary();
151        assert!(summary.contains("Test Stage"));
152        assert!(summary.contains("42"));
153        assert!(summary.contains("TOTAL"));
154    }
155
156    #[test]
157    fn test_empty_timer() {
158        let timer = PipelineTimer::new();
159        assert!(timer.records().is_empty());
160        let summary = timer.summary();
161        assert!(summary.contains("TOTAL"));
162    }
163}