1use serde::Serialize;
6
7#[derive(Debug, Clone, Serialize)]
16#[serde(tag = "type")]
17pub enum PipelineEvent {
18 #[serde(rename_all = "camelCase")]
21 PipelineStarted {
22 total_nodes: usize,
24 total_files: usize,
26 },
27
28 #[serde(rename_all = "camelCase")]
31 NodeStarted {
32 node_id: String,
34 node_index: usize,
36 total_nodes: usize,
38 node_type: String,
40 },
41
42 #[serde(rename_all = "camelCase")]
45 FileProgress {
46 node_id: String,
48 file_index: usize,
50 total_files: usize,
52 percent: u32,
54 message: String,
56 },
57
58 #[serde(rename_all = "camelCase")]
61 NodeCompleted {
62 node_id: String,
64 duration_ms: u64,
66 files_processed: usize,
68 },
69
70 #[serde(rename_all = "camelCase")]
73 NodeFailed {
74 node_id: String,
76 error: String,
78 },
79
80 #[serde(rename_all = "camelCase")]
82 PipelineCompleted {
83 duration_ms: u64,
85 total_files_processed: usize,
87 },
88
89 #[serde(rename_all = "camelCase")]
91 PipelineFailed {
92 node_id: String,
94 error: String,
96 },
97}
98
99pub struct PipelineReporter {
110 callback: Option<Box<dyn Fn(PipelineEvent)>>,
112}
113
114impl PipelineReporter {
115 pub fn new(callback: impl Fn(PipelineEvent) + 'static) -> Self {
126 Self {
127 callback: Some(Box::new(callback)),
128 }
129 }
130
131 pub fn new_noop() -> Self {
134 Self { callback: None }
135 }
136
137 pub fn emit(&self, event: PipelineEvent) {
140 if let Some(cb) = &self.callback {
141 cb(event);
142 }
143 }
144}
145
146#[cfg(test)]
154pub struct RecordingReporter {
155 events: std::sync::Arc<std::sync::Mutex<Vec<PipelineEvent>>>,
157}
158
159#[cfg(test)]
160impl Default for RecordingReporter {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166#[cfg(test)]
167impl RecordingReporter {
168 pub fn new() -> Self {
170 Self {
171 events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
172 }
173 }
174
175 pub fn reporter(&self) -> PipelineReporter {
177 let events = std::sync::Arc::clone(&self.events);
178 PipelineReporter::new(move |event| {
179 events.lock().unwrap().push(event);
180 })
181 }
182
183 pub fn events(&self) -> Vec<PipelineEvent> {
185 self.events.lock().unwrap().clone()
186 }
187}
188
189#[cfg(test)]
194mod tests {
195 use super::*;
196
197 #[test]
201 fn test_pipeline_started_serializes_correctly() {
202 let event = PipelineEvent::PipelineStarted {
204 total_nodes: 3,
205 total_files: 10,
206 };
207 let json = serde_json::to_value(&event).unwrap();
208
209 assert_eq!(json["type"], "PipelineStarted");
212 assert_eq!(json["totalNodes"], 3);
213 assert_eq!(json["totalFiles"], 10);
214 }
215
216 #[test]
217 fn test_node_started_serializes_correctly() {
218 let event = PipelineEvent::NodeStarted {
219 node_id: "node-1".to_string(),
220 node_index: 0,
221 total_nodes: 3,
222 node_type: "image-compress".to_string(),
223 };
224 let json = serde_json::to_value(&event).unwrap();
225
226 assert_eq!(json["type"], "NodeStarted");
227 assert_eq!(json["nodeId"], "node-1");
228 assert_eq!(json["nodeIndex"], 0);
229 assert_eq!(json["totalNodes"], 3);
230 assert_eq!(json["nodeType"], "image-compress");
231 }
232
233 #[test]
234 fn test_file_progress_serializes_correctly() {
235 let event = PipelineEvent::FileProgress {
236 node_id: "node-2".to_string(),
237 file_index: 2,
238 total_files: 5,
239 percent: 75,
240 message: "Compressing photo.jpg...".to_string(),
241 };
242 let json = serde_json::to_value(&event).unwrap();
243
244 assert_eq!(json["type"], "FileProgress");
245 assert_eq!(json["nodeId"], "node-2");
246 assert_eq!(json["fileIndex"], 2);
247 assert_eq!(json["totalFiles"], 5);
248 assert_eq!(json["percent"], 75);
249 assert_eq!(json["message"], "Compressing photo.jpg...");
250 }
251
252 #[test]
253 fn test_node_completed_serializes_correctly() {
254 let event = PipelineEvent::NodeCompleted {
255 node_id: "node-1".to_string(),
256 duration_ms: 1234,
257 files_processed: 5,
258 };
259 let json = serde_json::to_value(&event).unwrap();
260
261 assert_eq!(json["type"], "NodeCompleted");
262 assert_eq!(json["nodeId"], "node-1");
263 assert_eq!(json["durationMs"], 1234);
264 assert_eq!(json["filesProcessed"], 5);
265 }
266
267 #[test]
268 fn test_node_failed_serializes_correctly() {
269 let event = PipelineEvent::NodeFailed {
270 node_id: "node-3".to_string(),
271 error: "Unsupported format: BMP".to_string(),
272 };
273 let json = serde_json::to_value(&event).unwrap();
274
275 assert_eq!(json["type"], "NodeFailed");
276 assert_eq!(json["nodeId"], "node-3");
277 assert_eq!(json["error"], "Unsupported format: BMP");
278 }
279
280 #[test]
281 fn test_pipeline_completed_serializes_correctly() {
282 let event = PipelineEvent::PipelineCompleted {
283 duration_ms: 5678,
284 total_files_processed: 10,
285 };
286 let json = serde_json::to_value(&event).unwrap();
287
288 assert_eq!(json["type"], "PipelineCompleted");
289 assert_eq!(json["durationMs"], 5678);
290 assert_eq!(json["totalFilesProcessed"], 10);
291 }
292
293 #[test]
294 fn test_pipeline_failed_serializes_correctly() {
295 let event = PipelineEvent::PipelineFailed {
296 node_id: "node-2".to_string(),
297 error: "Processing failed: out of memory".to_string(),
298 };
299 let json = serde_json::to_value(&event).unwrap();
300
301 assert_eq!(json["type"], "PipelineFailed");
302 assert_eq!(json["nodeId"], "node-2");
303 assert_eq!(json["error"], "Processing failed: out of memory");
304 }
305
306 #[test]
309 fn test_noop_reporter_doesnt_panic() {
310 let reporter = PipelineReporter::new_noop();
312 reporter.emit(PipelineEvent::PipelineStarted {
313 total_nodes: 1,
314 total_files: 1,
315 });
316 reporter.emit(PipelineEvent::PipelineCompleted {
317 duration_ms: 100,
318 total_files_processed: 1,
319 });
320 }
322
323 #[test]
324 fn test_recording_reporter_captures_events() {
325 let recorder = RecordingReporter::new();
327 let reporter = recorder.reporter();
328
329 reporter.emit(PipelineEvent::PipelineStarted {
330 total_nodes: 2,
331 total_files: 3,
332 });
333 reporter.emit(PipelineEvent::NodeStarted {
334 node_id: "n1".to_string(),
335 node_index: 0,
336 total_nodes: 2,
337 node_type: "image-compress".to_string(),
338 });
339 reporter.emit(PipelineEvent::PipelineCompleted {
340 duration_ms: 500,
341 total_files_processed: 3,
342 });
343
344 let events = recorder.events();
346 assert_eq!(events.len(), 3);
347
348 assert!(matches!(events[0], PipelineEvent::PipelineStarted { .. }));
350 assert!(matches!(events[1], PipelineEvent::NodeStarted { .. }));
351 assert!(matches!(events[2], PipelineEvent::PipelineCompleted { .. }));
352 }
353
354 #[test]
355 fn test_reporter_calls_callback() {
356 let received = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
358 let received_clone = std::sync::Arc::clone(&received);
359
360 let reporter = PipelineReporter::new(move |event| {
361 received_clone.lock().unwrap().push(event);
362 });
363
364 reporter.emit(PipelineEvent::PipelineStarted {
365 total_nodes: 1,
366 total_files: 1,
367 });
368
369 let events = received.lock().unwrap();
370 assert_eq!(events.len(), 1);
371 if let PipelineEvent::PipelineStarted {
372 total_nodes,
373 total_files,
374 } = &events[0]
375 {
376 assert_eq!(*total_nodes, 1);
377 assert_eq!(*total_files, 1);
378 } else {
379 panic!("Expected PipelineStarted event");
380 }
381 }
382}