Skip to main content

bnto_core/
events.rs

1// Pipeline events — structured progress reporting for multi-node pipelines.
2// Complements `progress.rs` (per-file within one node) with pipeline-level
3// events (node started/completed, file progress, pipeline result).
4
5use serde::Serialize;
6
7// =============================================================================
8// Pipeline Event Types
9// =============================================================================
10
11/// Every event the pipeline executor can emit during execution.
12///
13/// Serialized as a tagged union with `"type"` discriminant and camelCase
14/// field names, matching the TypeScript `PipelineEvent` discriminated union.
15#[derive(Debug, Clone, Serialize)]
16#[serde(tag = "type")]
17pub enum PipelineEvent {
18    /// Emitted once at the very start of pipeline execution.
19    /// Tells the UI how many nodes and files to expect.
20    #[serde(rename_all = "camelCase")]
21    PipelineStarted {
22        /// How many processing nodes will run (excludes I/O markers).
23        total_nodes: usize,
24        /// How many input files were provided.
25        total_files: usize,
26    },
27
28    /// Emitted when a processing node begins execution.
29    /// The editor uses this to highlight the active node on the canvas.
30    #[serde(rename_all = "camelCase")]
31    NodeStarted {
32        /// The unique ID of the node (from the recipe definition).
33        node_id: String,
34        /// Zero-based index of this node in the processing sequence.
35        node_index: usize,
36        /// Total number of processing nodes in the pipeline.
37        total_nodes: usize,
38        /// The type of node (e.g., "image-compress", "spreadsheet-clean").
39        node_type: String,
40    },
41
42    /// Emitted during file processing within a node.
43    /// Powers progress bars and per-file status indicators.
44    #[serde(rename_all = "camelCase")]
45    FileProgress {
46        /// Which node is processing this file.
47        node_id: String,
48        /// Zero-based index of the current file within this node's batch.
49        file_index: usize,
50        /// Total files this node will process.
51        total_files: usize,
52        /// Processing progress for this specific file (0-100).
53        percent: u32,
54        /// Human-readable status message (e.g., "Compressing photo.jpg...").
55        message: String,
56    },
57
58    /// Emitted when a node finishes successfully.
59    /// The editor uses this to mark the node as "completed" with a checkmark.
60    #[serde(rename_all = "camelCase")]
61    NodeCompleted {
62        /// Which node completed.
63        node_id: String,
64        /// How long this node took, in milliseconds.
65        duration_ms: u64,
66        /// How many files this node processed.
67        files_processed: usize,
68    },
69
70    /// Emitted when a node fails.
71    /// The editor uses this to mark the node as "failed" with an error icon.
72    #[serde(rename_all = "camelCase")]
73    NodeFailed {
74        /// Which node failed.
75        node_id: String,
76        /// Human-readable error message.
77        error: String,
78    },
79
80    /// Emitted once when the entire pipeline finishes successfully.
81    #[serde(rename_all = "camelCase")]
82    PipelineCompleted {
83        /// Total wall-clock time for the entire pipeline, in milliseconds.
84        duration_ms: u64,
85        /// Total number of files processed across all nodes.
86        total_files_processed: usize,
87    },
88
89    /// Emitted when the pipeline fails (a node error stops execution).
90    #[serde(rename_all = "camelCase")]
91    PipelineFailed {
92        /// Which node caused the pipeline to fail.
93        node_id: String,
94        /// Human-readable error message.
95        error: String,
96    },
97}
98
99// =============================================================================
100// Pipeline Reporter
101// =============================================================================
102
103/// Emits structured pipeline events to a callback.
104///
105/// This is the pipeline-level equivalent of `ProgressReporter`.
106/// The callback receives a `PipelineEvent` which can be serialized
107/// to JSON and sent to the UI (via Web Worker postMessage, CLI stdout,
108/// or any other transport).
109pub struct PipelineReporter {
110    /// The callback that receives events. `None` = no-op mode.
111    callback: Option<Box<dyn Fn(PipelineEvent)>>,
112}
113
114impl PipelineReporter {
115    /// Create a new reporter with a callback that receives pipeline events.
116    ///
117    /// USAGE:
118    /// ```rust
119    /// use bnto_core::PipelineReporter;
120    ///
121    /// let reporter = PipelineReporter::new(|event| {
122    ///     println!("Pipeline event: {:?}", event);
123    /// });
124    /// ```
125    pub fn new(callback: impl Fn(PipelineEvent) + 'static) -> Self {
126        Self {
127            callback: Some(Box::new(callback)),
128        }
129    }
130
131    /// Create a no-op reporter that discards all events.
132    /// Used in tests where we don't need event tracking.
133    pub fn new_noop() -> Self {
134        Self { callback: None }
135    }
136
137    /// Emit a pipeline event. If no callback is set (no-op mode),
138    /// the event is silently discarded.
139    pub fn emit(&self, event: PipelineEvent) {
140        if let Some(cb) = &self.callback {
141            cb(event);
142        }
143    }
144}
145
146// =============================================================================
147// Recording Reporter (for tests)
148// =============================================================================
149
150/// A reporter that records all events in a thread-safe Vec.
151/// Used in tests to verify the executor emits the right events
152/// in the right order.
153#[cfg(test)]
154pub struct RecordingReporter {
155    /// The shared, thread-safe event log.
156    events: std::sync::Arc<std::sync::Mutex<Vec<PipelineEvent>>>,
157}
158
159#[cfg(test)]
160impl Default for RecordingReporter {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166#[cfg(test)]
167impl RecordingReporter {
168    /// Create a new recording reporter.
169    pub fn new() -> Self {
170        Self {
171            events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
172        }
173    }
174
175    /// Build a `PipelineReporter` that records events into this recorder.
176    pub fn reporter(&self) -> PipelineReporter {
177        let events = std::sync::Arc::clone(&self.events);
178        PipelineReporter::new(move |event| {
179            events.lock().unwrap().push(event);
180        })
181    }
182
183    /// Get a snapshot of all recorded events.
184    pub fn events(&self) -> Vec<PipelineEvent> {
185        self.events.lock().unwrap().clone()
186    }
187}
188
189// =============================================================================
190// Tests
191// =============================================================================
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196
197    // --- Serialization Tests ---
198    // Verify the JSON shape matches what the TypeScript side expects.
199
200    #[test]
201    fn test_pipeline_started_serializes_correctly() {
202        // Create an event and serialize it to JSON.
203        let event = PipelineEvent::PipelineStarted {
204            total_nodes: 3,
205            total_files: 10,
206        };
207        let json = serde_json::to_value(&event).unwrap();
208
209        // Check the JSON has the right shape:
210        // { "type": "PipelineStarted", "totalNodes": 3, "totalFiles": 10 }
211        assert_eq!(json["type"], "PipelineStarted");
212        assert_eq!(json["totalNodes"], 3);
213        assert_eq!(json["totalFiles"], 10);
214    }
215
216    #[test]
217    fn test_node_started_serializes_correctly() {
218        let event = PipelineEvent::NodeStarted {
219            node_id: "node-1".to_string(),
220            node_index: 0,
221            total_nodes: 3,
222            node_type: "image-compress".to_string(),
223        };
224        let json = serde_json::to_value(&event).unwrap();
225
226        assert_eq!(json["type"], "NodeStarted");
227        assert_eq!(json["nodeId"], "node-1");
228        assert_eq!(json["nodeIndex"], 0);
229        assert_eq!(json["totalNodes"], 3);
230        assert_eq!(json["nodeType"], "image-compress");
231    }
232
233    #[test]
234    fn test_file_progress_serializes_correctly() {
235        let event = PipelineEvent::FileProgress {
236            node_id: "node-2".to_string(),
237            file_index: 2,
238            total_files: 5,
239            percent: 75,
240            message: "Compressing photo.jpg...".to_string(),
241        };
242        let json = serde_json::to_value(&event).unwrap();
243
244        assert_eq!(json["type"], "FileProgress");
245        assert_eq!(json["nodeId"], "node-2");
246        assert_eq!(json["fileIndex"], 2);
247        assert_eq!(json["totalFiles"], 5);
248        assert_eq!(json["percent"], 75);
249        assert_eq!(json["message"], "Compressing photo.jpg...");
250    }
251
252    #[test]
253    fn test_node_completed_serializes_correctly() {
254        let event = PipelineEvent::NodeCompleted {
255            node_id: "node-1".to_string(),
256            duration_ms: 1234,
257            files_processed: 5,
258        };
259        let json = serde_json::to_value(&event).unwrap();
260
261        assert_eq!(json["type"], "NodeCompleted");
262        assert_eq!(json["nodeId"], "node-1");
263        assert_eq!(json["durationMs"], 1234);
264        assert_eq!(json["filesProcessed"], 5);
265    }
266
267    #[test]
268    fn test_node_failed_serializes_correctly() {
269        let event = PipelineEvent::NodeFailed {
270            node_id: "node-3".to_string(),
271            error: "Unsupported format: BMP".to_string(),
272        };
273        let json = serde_json::to_value(&event).unwrap();
274
275        assert_eq!(json["type"], "NodeFailed");
276        assert_eq!(json["nodeId"], "node-3");
277        assert_eq!(json["error"], "Unsupported format: BMP");
278    }
279
280    #[test]
281    fn test_pipeline_completed_serializes_correctly() {
282        let event = PipelineEvent::PipelineCompleted {
283            duration_ms: 5678,
284            total_files_processed: 10,
285        };
286        let json = serde_json::to_value(&event).unwrap();
287
288        assert_eq!(json["type"], "PipelineCompleted");
289        assert_eq!(json["durationMs"], 5678);
290        assert_eq!(json["totalFilesProcessed"], 10);
291    }
292
293    #[test]
294    fn test_pipeline_failed_serializes_correctly() {
295        let event = PipelineEvent::PipelineFailed {
296            node_id: "node-2".to_string(),
297            error: "Processing failed: out of memory".to_string(),
298        };
299        let json = serde_json::to_value(&event).unwrap();
300
301        assert_eq!(json["type"], "PipelineFailed");
302        assert_eq!(json["nodeId"], "node-2");
303        assert_eq!(json["error"], "Processing failed: out of memory");
304    }
305
306    // --- Reporter Tests ---
307
308    #[test]
309    fn test_noop_reporter_doesnt_panic() {
310        // No-op reporter should silently discard all events.
311        let reporter = PipelineReporter::new_noop();
312        reporter.emit(PipelineEvent::PipelineStarted {
313            total_nodes: 1,
314            total_files: 1,
315        });
316        reporter.emit(PipelineEvent::PipelineCompleted {
317            duration_ms: 100,
318            total_files_processed: 1,
319        });
320        // No panic = success.
321    }
322
323    #[test]
324    fn test_recording_reporter_captures_events() {
325        // Create a recording reporter and emit some events.
326        let recorder = RecordingReporter::new();
327        let reporter = recorder.reporter();
328
329        reporter.emit(PipelineEvent::PipelineStarted {
330            total_nodes: 2,
331            total_files: 3,
332        });
333        reporter.emit(PipelineEvent::NodeStarted {
334            node_id: "n1".to_string(),
335            node_index: 0,
336            total_nodes: 2,
337            node_type: "image-compress".to_string(),
338        });
339        reporter.emit(PipelineEvent::PipelineCompleted {
340            duration_ms: 500,
341            total_files_processed: 3,
342        });
343
344        // Verify all 3 events were captured in order.
345        let events = recorder.events();
346        assert_eq!(events.len(), 3);
347
348        // Check types using pattern matching.
349        assert!(matches!(events[0], PipelineEvent::PipelineStarted { .. }));
350        assert!(matches!(events[1], PipelineEvent::NodeStarted { .. }));
351        assert!(matches!(events[2], PipelineEvent::PipelineCompleted { .. }));
352    }
353
354    #[test]
355    fn test_reporter_calls_callback() {
356        // Verify the callback receives the exact event we emit.
357        let received = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
358        let received_clone = std::sync::Arc::clone(&received);
359
360        let reporter = PipelineReporter::new(move |event| {
361            received_clone.lock().unwrap().push(event);
362        });
363
364        reporter.emit(PipelineEvent::PipelineStarted {
365            total_nodes: 1,
366            total_files: 1,
367        });
368
369        let events = received.lock().unwrap();
370        assert_eq!(events.len(), 1);
371        if let PipelineEvent::PipelineStarted {
372            total_nodes,
373            total_files,
374        } = &events[0]
375        {
376            assert_eq!(*total_nodes, 1);
377            assert_eq!(*total_files, 1);
378        } else {
379            panic!("Expected PipelineStarted event");
380        }
381    }
382}