edgeparse_core/pipeline/
logging.rs1use std::time::{Duration, Instant};
7
8#[derive(Debug, Clone)]
10pub struct StageRecord {
11 pub name: String,
13 pub duration: Duration,
15 pub element_count: usize,
17}
18
19#[derive(Debug, Default)]
21pub struct PipelineTimer {
22 records: Vec<StageRecord>,
23 current_start: Option<(String, Instant)>,
24 pipeline_start: Option<Instant>,
25}
26
27impl PipelineTimer {
28 pub fn new() -> Self {
30 Self {
31 records: Vec::new(),
32 current_start: None,
33 pipeline_start: Some(Instant::now()),
34 }
35 }
36
37 pub fn start_stage(&mut self, name: &str) {
40 if let Some((prev_name, prev_start)) = self.current_start.take() {
41 self.records.push(StageRecord {
42 name: prev_name,
43 duration: prev_start.elapsed(),
44 element_count: 0,
45 });
46 }
47 self.current_start = Some((name.to_string(), Instant::now()));
48 }
49
50 pub fn end_stage(&mut self, element_count: usize) {
52 if let Some((name, start)) = self.current_start.take() {
53 self.records.push(StageRecord {
54 name,
55 duration: start.elapsed(),
56 element_count,
57 });
58 }
59 }
60
61 pub fn total_duration(&self) -> Duration {
63 self.pipeline_start.map(|s| s.elapsed()).unwrap_or_default()
64 }
65
66 pub fn records(&self) -> &[StageRecord] {
68 &self.records
69 }
70
71 pub fn summary(&self) -> String {
73 let mut out = String::from("Pipeline Timing Summary\n");
74 out.push_str(&format!(
75 "{:<40} {:>10} {:>10}\n",
76 "Stage", "Time (ms)", "Elements"
77 ));
78 out.push_str(&"-".repeat(62));
79 out.push('\n');
80 for r in &self.records {
81 out.push_str(&format!(
82 "{:<40} {:>10.2} {:>10}\n",
83 r.name,
84 r.duration.as_secs_f64() * 1000.0,
85 r.element_count,
86 ));
87 }
88 out.push_str(&"-".repeat(62));
89 out.push('\n');
90 out.push_str(&format!(
91 "{:<40} {:>10.2}\n",
92 "TOTAL",
93 self.total_duration().as_secs_f64() * 1000.0
94 ));
95 out
96 }
97
98 pub fn log_summary(&self) {
100 for line in self.summary().lines() {
101 log::info!("{}", line);
102 }
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use std::thread;
110
111 #[test]
112 fn test_stage_recording() {
113 let mut timer = PipelineTimer::new();
114 timer.start_stage("Stage A");
115 thread::sleep(Duration::from_millis(5));
116 timer.end_stage(100);
117
118 timer.start_stage("Stage B");
119 timer.end_stage(200);
120
121 assert_eq!(timer.records().len(), 2);
122 assert_eq!(timer.records()[0].name, "Stage A");
123 assert_eq!(timer.records()[0].element_count, 100);
124 assert_eq!(timer.records()[1].name, "Stage B");
125 assert_eq!(timer.records()[1].element_count, 200);
126 assert!(timer.records()[0].duration.as_nanos() > 0);
128 }
129
130 #[test]
131 fn test_auto_close_previous_stage() {
132 let mut timer = PipelineTimer::new();
133 timer.start_stage("Stage 1");
134 timer.start_stage("Stage 2");
136 timer.end_stage(50);
137
138 assert_eq!(timer.records().len(), 2);
139 assert_eq!(timer.records()[0].name, "Stage 1");
140 assert_eq!(timer.records()[0].element_count, 0); assert_eq!(timer.records()[1].name, "Stage 2");
142 }
143
144 #[test]
145 fn test_summary_format() {
146 let mut timer = PipelineTimer::new();
147 timer.start_stage("Test Stage");
148 timer.end_stage(42);
149
150 let summary = timer.summary();
151 assert!(summary.contains("Test Stage"));
152 assert!(summary.contains("42"));
153 assert!(summary.contains("TOTAL"));
154 }
155
156 #[test]
157 fn test_empty_timer() {
158 let timer = PipelineTimer::new();
159 assert!(timer.records().is_empty());
160 let summary = timer.summary();
161 assert!(summary.contains("TOTAL"));
162 }
163}