Skip to main content

bnto_core/
processor.rs

1// =============================================================================
2// NodeProcessor Trait — The Contract Every Node Type Must Implement
3// =============================================================================
4
5use crate::context::ProcessContext;
6use crate::errors::BntoError;
7use crate::metadata::{NodeCategory, NodeMetadata};
8use crate::progress::ProgressReporter;
9
10// Re-export FileData so `processor::FileData` still works for existing imports.
11pub use crate::file_data::FileData;
12
13// =============================================================================
14// Input and Output Types
15// =============================================================================
16
17/// The input data that a node receives for processing.
18pub struct NodeInput {
19    /// File content — in-memory bytes or a path on disk.
20    ///
21    /// Most processors call `input.data.into_bytes()` at the top of `process()`.
22    /// Processors that move/copy files can use `write_to()` / `copy_to()` directly
23    /// for zero-copy operations on `FileData::Path` variants.
24    pub data: FileData,
25
26    /// The original filename (e.g., "photo.jpg", "data.csv").
27    /// Used to determine the file format and to generate output filenames.
28    pub filename: String,
29
30    /// The MIME type of the input (e.g., "image/jpeg", "text/csv").
31    /// `None` when the MIME type wasn't provided by the caller.
32    pub mime_type: Option<String>,
33
34    /// Configuration parameters for the node (e.g., quality level, target
35    /// format, dimensions). A JSON-compatible map where keys are parameter
36    /// names from @bnto/nodes schemas.
37    pub params: serde_json::Map<String, serde_json::Value>,
38}
39
40/// The output from a node after processing.
41///
42/// A node can produce one or more output files. For example, the
43/// compress-images node takes one image in and produces one compressed
44/// image out. A future "split PDF" node might produce many pages.
45pub struct NodeOutput {
46    /// The processed file data. Each entry is one output file.
47    pub files: Vec<OutputFile>,
48
49    /// Optional metadata about the processing (timing, compression ratio,
50    /// rows removed, etc.). Displayed in the UI's results panel.
51    pub metadata: serde_json::Map<String, serde_json::Value>,
52}
53
54/// A single output file produced by a node.
55pub struct OutputFile {
56    /// The file content — in-memory bytes or a path on disk.
57    pub data: FileData,
58
59    /// The filename for this output (e.g., "photo-compressed.jpg").
60    pub filename: String,
61
62    /// The MIME type of the output (e.g., "image/jpeg").
63    pub mime_type: String,
64
65    /// Per-file metadata (e.g., CSV row column values for loop iteration).
66    /// When non-empty, takes priority over `NodeOutput.metadata` in collect_output.
67    pub metadata: serde_json::Map<String, serde_json::Value>,
68}
69
70/// Input for batch processors that need all files at once (e.g., merge, zip).
71///
72/// Unlike `NodeInput` (one file), this carries the full set of pipeline files
73/// plus the shared configuration parameters.
74pub struct BatchInput {
75    /// All files to process as a group.
76    pub files: Vec<BatchFile>,
77
78    /// Configuration parameters for the node (same as `NodeInput.params`).
79    pub params: serde_json::Map<String, serde_json::Value>,
80}
81
82/// A single file within a batch input.
83pub struct BatchFile {
84    /// The raw file data (bytes).
85    pub data: Vec<u8>,
86
87    /// The original filename.
88    pub filename: String,
89
90    /// The MIME type, if known.
91    pub mime_type: Option<String>,
92}
93
94// =============================================================================
95// The NodeProcessor Trait
96// =============================================================================
97
98/// The contract that every node type must implement.
99///
100/// Currently synchronous -- async is handled at the Web Worker level.
101/// wasm-bindgen doesn't support async trait methods across the WASM boundary.
102pub trait NodeProcessor {
103    /// The processor's type key (e.g., "image-compress").
104    /// Must match the key used in `registry.register()`.
105    /// Convention: category-operation, kebab-case.
106    fn name(&self) -> &str;
107
108    /// Process a single input file and produce output.
109    ///
110    /// Arguments:
111    ///   - `&self` — reference to the node processor instance
112    ///   - `input` — the file data, filename, MIME type, and config params
113    ///   - `progress` — callback to report progress to the UI (0-100%)
114    ///   - `ctx` — system access boundary (commands, temp files, env vars)
115    ///
116    /// Returns:
117    ///   - `Ok(NodeOutput)` — processing succeeded, here are the results
118    ///   - `Err(BntoError)` — processing failed, here's what went wrong
119    fn process(
120        &self,
121        input: NodeInput,
122        progress: &ProgressReporter,
123        ctx: &dyn ProcessContext,
124    ) -> Result<NodeOutput, BntoError>;
125
126    /// Validate the input parameters before processing.
127    ///
128    /// This is called BEFORE `process()` to catch configuration errors
129    /// early (missing required params, invalid values, etc.) without
130    /// doing any expensive file processing.
131    ///
132    /// Returns a list of validation errors (empty = valid).
133    ///
134    /// Default implementation passes validation. Override in specific
135    /// node types to add parameter validation.
136    fn validate(&self, _params: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
137        Vec::new()
138    }
139
140    /// Process a batch of files together, producing combined output.
141    ///
142    /// Override this for processors with `InputCardinality::Batch` (merge, zip,
143    /// concat). The default falls back to calling `process()` per file and
144    /// concatenating results — suitable for `PerFile` processors.
145    fn process_batch(
146        &self,
147        input: BatchInput,
148        progress: &ProgressReporter,
149        ctx: &dyn ProcessContext,
150    ) -> Result<NodeOutput, BntoError> {
151        let total = input.files.len();
152        let mut all_files = Vec::new();
153        let mut combined_metadata = serde_json::Map::new();
154
155        for (i, file) in input.files.into_iter().enumerate() {
156            let pct = ((i as u32) * 100) / (total as u32).max(1);
157            progress.report(pct, &format!("Processing file {} of {total}...", i + 1));
158
159            let single_input = NodeInput {
160                data: FileData::Bytes(file.data),
161                filename: file.filename,
162                mime_type: file.mime_type,
163                params: input.params.clone(),
164            };
165            let output = self.process(single_input, progress, ctx)?;
166            all_files.extend(output.files);
167            // Merge metadata from the last file processed.
168            combined_metadata = output.metadata;
169        }
170
171        Ok(NodeOutput {
172            files: all_files,
173            metadata: combined_metadata,
174        })
175    }
176
177    /// Return the processor's self-describing metadata.
178    ///
179    /// This tells consumers everything about this processor: what it's called,
180    /// what category it belongs to, what parameters it accepts, what files it
181    /// handles, and whether it runs in the browser.
182    ///
183    /// Every concrete processor SHOULD override this with its real metadata.
184    /// The default returns a placeholder "unknown" metadata — useful for tests
185    /// and mocks that don't need real metadata.
186    fn metadata(&self) -> NodeMetadata {
187        NodeMetadata {
188            node_type: "unknown".to_string(),
189            name: self.name().to_string(),
190            description: String::new(),
191            category: NodeCategory::Data,
192            accepts: vec![],
193            platforms: vec![],
194            parameters: vec![],
195            input_cardinality: Default::default(),
196            requires: vec![],
197        }
198    }
199}
200
201// =============================================================================
202// Tests
203// =============================================================================
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::context::NoopContext;
209
210    // --- Test helpers ---
211    // We create a simple mock processor to test the trait contract.
212
213    /// A mock node processor for testing. Does nothing — just echoes
214    /// the input back as output.
215    struct EchoProcessor;
216
217    impl NodeProcessor for EchoProcessor {
218        fn name(&self) -> &str {
219            "echo"
220        }
221
222        fn process(
223            &self,
224            input: NodeInput,
225            _progress: &ProgressReporter,
226            _ctx: &dyn ProcessContext,
227        ) -> Result<NodeOutput, BntoError> {
228            // Just echo the input data back as output.
229            Ok(NodeOutput {
230                files: vec![OutputFile {
231                    data: input.data,
232                    filename: input.filename,
233                    mime_type: input
234                        .mime_type
235                        .unwrap_or_else(|| "application/octet-stream".to_string()),
236                    metadata: serde_json::Map::new(),
237                }],
238                metadata: serde_json::Map::new(),
239            })
240        }
241    }
242
243    /// A mock processor that always fails — for testing error handling.
244    struct FailProcessor;
245
246    impl NodeProcessor for FailProcessor {
247        fn name(&self) -> &str {
248            "fail"
249        }
250
251        fn process(
252            &self,
253            _input: NodeInput,
254            _progress: &ProgressReporter,
255            _ctx: &dyn ProcessContext,
256        ) -> Result<NodeOutput, BntoError> {
257            Err(BntoError::ProcessingFailed(
258                "intentional test failure".to_string(),
259            ))
260        }
261    }
262
263    /// Helper to create a simple test input.
264    fn make_test_input(data: &[u8], filename: &str) -> NodeInput {
265        NodeInput {
266            data: FileData::Bytes(data.to_vec()),
267            filename: filename.to_string(),
268            mime_type: None,
269            params: serde_json::Map::new(),
270        }
271    }
272
273    // --- Tests ---
274
275    #[test]
276    fn test_echo_processor_name() {
277        let processor = EchoProcessor;
278        assert_eq!(processor.name(), "echo");
279    }
280
281    #[test]
282    fn test_echo_processor_echoes_data() {
283        let processor = EchoProcessor;
284        let progress = ProgressReporter::new_noop();
285        let input = make_test_input(b"hello world", "test.txt");
286
287        let output = processor.process(input, &progress, &NoopContext).unwrap();
288
289        assert_eq!(output.files.len(), 1);
290        assert!(matches!(&output.files[0].data, FileData::Bytes(d) if d == b"hello world"));
291        assert_eq!(output.files[0].filename, "test.txt");
292    }
293
294    #[test]
295    fn test_fail_processor_returns_error() {
296        let processor = FailProcessor;
297        let progress = ProgressReporter::new_noop();
298        let input = make_test_input(b"data", "test.txt");
299
300        let result = processor.process(input, &progress, &NoopContext);
301        assert!(result.is_err());
302
303        if let Err(e) = result {
304            assert!(e.to_string().contains("intentional test failure"));
305        }
306    }
307
308    #[test]
309    fn test_default_validate_returns_empty() {
310        let processor = EchoProcessor;
311        let params = serde_json::Map::new();
312
313        // The default validate() should return no errors.
314        let errors = processor.validate(&params);
315        assert!(errors.is_empty());
316    }
317
318    // --- Batch Processing Tests ---
319
320    #[test]
321    fn test_default_process_batch_falls_back_to_per_file() {
322        let processor = EchoProcessor;
323        let progress = ProgressReporter::new_noop();
324        let input = BatchInput {
325            files: vec![
326                BatchFile {
327                    data: b"file1".to_vec(),
328                    filename: "a.txt".to_string(),
329                    mime_type: None,
330                },
331                BatchFile {
332                    data: b"file2".to_vec(),
333                    filename: "b.txt".to_string(),
334                    mime_type: None,
335                },
336            ],
337            params: serde_json::Map::new(),
338        };
339
340        let output = processor
341            .process_batch(input, &progress, &NoopContext)
342            .unwrap();
343
344        // Default batch falls back to per-file: 2 inputs → 2 outputs.
345        assert_eq!(output.files.len(), 2);
346        assert_eq!(output.files[0].filename, "a.txt");
347        assert!(matches!(&output.files[0].data, FileData::Bytes(d) if d == b"file1"));
348        assert_eq!(output.files[1].filename, "b.txt");
349        assert!(matches!(&output.files[1].data, FileData::Bytes(d) if d == b"file2"));
350    }
351
352    #[test]
353    fn test_default_process_batch_empty_input() {
354        let processor = EchoProcessor;
355        let progress = ProgressReporter::new_noop();
356        let input = BatchInput {
357            files: vec![],
358            params: serde_json::Map::new(),
359        };
360
361        let output = processor
362            .process_batch(input, &progress, &NoopContext)
363            .unwrap();
364        assert_eq!(output.files.len(), 0);
365    }
366
367    #[test]
368    fn test_default_process_batch_propagates_errors() {
369        let processor = FailProcessor;
370        let progress = ProgressReporter::new_noop();
371        let input = BatchInput {
372            files: vec![BatchFile {
373                data: b"data".to_vec(),
374                filename: "test.txt".to_string(),
375                mime_type: None,
376            }],
377            params: serde_json::Map::new(),
378        };
379
380        let result = processor.process_batch(input, &progress, &NoopContext);
381        assert!(result.is_err());
382    }
383}