bnto_core/processor.rs
1// =============================================================================
2// NodeProcessor Trait — The Contract Every Node Type Must Implement
3// =============================================================================
4
5use crate::context::ProcessContext;
6use crate::errors::BntoError;
7use crate::metadata::{NodeCategory, NodeMetadata};
8use crate::progress::ProgressReporter;
9
10// Re-export FileData so `processor::FileData` still works for existing imports.
11pub use crate::file_data::FileData;
12
13// =============================================================================
14// Input and Output Types
15// =============================================================================
16
17/// The input data that a node receives for processing.
18pub struct NodeInput {
19 /// File content — in-memory bytes or a path on disk.
20 ///
21 /// Most processors call `input.data.into_bytes()` at the top of `process()`.
22 /// Processors that move/copy files can use `write_to()` / `copy_to()` directly
23 /// for zero-copy operations on `FileData::Path` variants.
24 pub data: FileData,
25
26 /// The original filename (e.g., "photo.jpg", "data.csv").
27 /// Used to determine the file format and to generate output filenames.
28 pub filename: String,
29
30 /// The MIME type of the input (e.g., "image/jpeg", "text/csv").
31 /// `None` when the MIME type wasn't provided by the caller.
32 pub mime_type: Option<String>,
33
34 /// Configuration parameters for the node (e.g., quality level, target
35 /// format, dimensions). A JSON-compatible map where keys are parameter
36 /// names from @bnto/nodes schemas.
37 pub params: serde_json::Map<String, serde_json::Value>,
38}
39
40/// The output from a node after processing.
41///
42/// A node can produce one or more output files. For example, the
43/// compress-images node takes one image in and produces one compressed
44/// image out. A future "split PDF" node might produce many pages.
45pub struct NodeOutput {
46 /// The processed file data. Each entry is one output file.
47 pub files: Vec<OutputFile>,
48
49 /// Optional metadata about the processing (timing, compression ratio,
50 /// rows removed, etc.). Displayed in the UI's results panel.
51 pub metadata: serde_json::Map<String, serde_json::Value>,
52}
53
54/// A single output file produced by a node.
55pub struct OutputFile {
56 /// The file content — in-memory bytes or a path on disk.
57 pub data: FileData,
58
59 /// The filename for this output (e.g., "photo-compressed.jpg").
60 pub filename: String,
61
62 /// The MIME type of the output (e.g., "image/jpeg").
63 pub mime_type: String,
64
65 /// Per-file metadata (e.g., CSV row column values for loop iteration).
66 /// When non-empty, takes priority over `NodeOutput.metadata` in collect_output.
67 pub metadata: serde_json::Map<String, serde_json::Value>,
68}
69
70/// Input for batch processors that need all files at once (e.g., merge, zip).
71///
72/// Unlike `NodeInput` (one file), this carries the full set of pipeline files
73/// plus the shared configuration parameters.
74pub struct BatchInput {
75 /// All files to process as a group.
76 pub files: Vec<BatchFile>,
77
78 /// Configuration parameters for the node (same as `NodeInput.params`).
79 pub params: serde_json::Map<String, serde_json::Value>,
80}
81
82/// A single file within a batch input.
83pub struct BatchFile {
84 /// The raw file data (bytes).
85 pub data: Vec<u8>,
86
87 /// The original filename.
88 pub filename: String,
89
90 /// The MIME type, if known.
91 pub mime_type: Option<String>,
92}
93
94// =============================================================================
95// The NodeProcessor Trait
96// =============================================================================
97
98/// The contract that every node type must implement.
99///
100/// Currently synchronous -- async is handled at the Web Worker level.
101/// wasm-bindgen doesn't support async trait methods across the WASM boundary.
102pub trait NodeProcessor {
103 /// The processor's type key (e.g., "image-compress").
104 /// Must match the key used in `registry.register()`.
105 /// Convention: category-operation, kebab-case.
106 fn name(&self) -> &str;
107
108 /// Process a single input file and produce output.
109 ///
110 /// Arguments:
111 /// - `&self` — reference to the node processor instance
112 /// - `input` — the file data, filename, MIME type, and config params
113 /// - `progress` — callback to report progress to the UI (0-100%)
114 /// - `ctx` — system access boundary (commands, temp files, env vars)
115 ///
116 /// Returns:
117 /// - `Ok(NodeOutput)` — processing succeeded, here are the results
118 /// - `Err(BntoError)` — processing failed, here's what went wrong
119 fn process(
120 &self,
121 input: NodeInput,
122 progress: &ProgressReporter,
123 ctx: &dyn ProcessContext,
124 ) -> Result<NodeOutput, BntoError>;
125
126 /// Validate the input parameters before processing.
127 ///
128 /// This is called BEFORE `process()` to catch configuration errors
129 /// early (missing required params, invalid values, etc.) without
130 /// doing any expensive file processing.
131 ///
132 /// Returns a list of validation errors (empty = valid).
133 ///
134 /// Default implementation passes validation. Override in specific
135 /// node types to add parameter validation.
136 fn validate(&self, _params: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
137 Vec::new()
138 }
139
140 /// Process a batch of files together, producing combined output.
141 ///
142 /// Override this for processors with `InputCardinality::Batch` (merge, zip,
143 /// concat). The default falls back to calling `process()` per file and
144 /// concatenating results — suitable for `PerFile` processors.
145 fn process_batch(
146 &self,
147 input: BatchInput,
148 progress: &ProgressReporter,
149 ctx: &dyn ProcessContext,
150 ) -> Result<NodeOutput, BntoError> {
151 let total = input.files.len();
152 let mut all_files = Vec::new();
153 let mut combined_metadata = serde_json::Map::new();
154
155 for (i, file) in input.files.into_iter().enumerate() {
156 let pct = ((i as u32) * 100) / (total as u32).max(1);
157 progress.report(pct, &format!("Processing file {} of {total}...", i + 1));
158
159 let single_input = NodeInput {
160 data: FileData::Bytes(file.data),
161 filename: file.filename,
162 mime_type: file.mime_type,
163 params: input.params.clone(),
164 };
165 let output = self.process(single_input, progress, ctx)?;
166 all_files.extend(output.files);
167 // Merge metadata from the last file processed.
168 combined_metadata = output.metadata;
169 }
170
171 Ok(NodeOutput {
172 files: all_files,
173 metadata: combined_metadata,
174 })
175 }
176
177 /// Return the processor's self-describing metadata.
178 ///
179 /// This tells consumers everything about this processor: what it's called,
180 /// what category it belongs to, what parameters it accepts, what files it
181 /// handles, and whether it runs in the browser.
182 ///
183 /// Every concrete processor SHOULD override this with its real metadata.
184 /// The default returns a placeholder "unknown" metadata — useful for tests
185 /// and mocks that don't need real metadata.
186 fn metadata(&self) -> NodeMetadata {
187 NodeMetadata {
188 node_type: "unknown".to_string(),
189 name: self.name().to_string(),
190 description: String::new(),
191 category: NodeCategory::Data,
192 accepts: vec![],
193 platforms: vec![],
194 parameters: vec![],
195 input_cardinality: Default::default(),
196 requires: vec![],
197 }
198 }
199}
200
201// =============================================================================
202// Tests
203// =============================================================================
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::context::NoopContext;
209
210 // --- Test helpers ---
211 // We create a simple mock processor to test the trait contract.
212
213 /// A mock node processor for testing. Does nothing — just echoes
214 /// the input back as output.
215 struct EchoProcessor;
216
217 impl NodeProcessor for EchoProcessor {
218 fn name(&self) -> &str {
219 "echo"
220 }
221
222 fn process(
223 &self,
224 input: NodeInput,
225 _progress: &ProgressReporter,
226 _ctx: &dyn ProcessContext,
227 ) -> Result<NodeOutput, BntoError> {
228 // Just echo the input data back as output.
229 Ok(NodeOutput {
230 files: vec![OutputFile {
231 data: input.data,
232 filename: input.filename,
233 mime_type: input
234 .mime_type
235 .unwrap_or_else(|| "application/octet-stream".to_string()),
236 metadata: serde_json::Map::new(),
237 }],
238 metadata: serde_json::Map::new(),
239 })
240 }
241 }
242
243 /// A mock processor that always fails — for testing error handling.
244 struct FailProcessor;
245
246 impl NodeProcessor for FailProcessor {
247 fn name(&self) -> &str {
248 "fail"
249 }
250
251 fn process(
252 &self,
253 _input: NodeInput,
254 _progress: &ProgressReporter,
255 _ctx: &dyn ProcessContext,
256 ) -> Result<NodeOutput, BntoError> {
257 Err(BntoError::ProcessingFailed(
258 "intentional test failure".to_string(),
259 ))
260 }
261 }
262
263 /// Helper to create a simple test input.
264 fn make_test_input(data: &[u8], filename: &str) -> NodeInput {
265 NodeInput {
266 data: FileData::Bytes(data.to_vec()),
267 filename: filename.to_string(),
268 mime_type: None,
269 params: serde_json::Map::new(),
270 }
271 }
272
273 // --- Tests ---
274
275 #[test]
276 fn test_echo_processor_name() {
277 let processor = EchoProcessor;
278 assert_eq!(processor.name(), "echo");
279 }
280
281 #[test]
282 fn test_echo_processor_echoes_data() {
283 let processor = EchoProcessor;
284 let progress = ProgressReporter::new_noop();
285 let input = make_test_input(b"hello world", "test.txt");
286
287 let output = processor.process(input, &progress, &NoopContext).unwrap();
288
289 assert_eq!(output.files.len(), 1);
290 assert!(matches!(&output.files[0].data, FileData::Bytes(d) if d == b"hello world"));
291 assert_eq!(output.files[0].filename, "test.txt");
292 }
293
294 #[test]
295 fn test_fail_processor_returns_error() {
296 let processor = FailProcessor;
297 let progress = ProgressReporter::new_noop();
298 let input = make_test_input(b"data", "test.txt");
299
300 let result = processor.process(input, &progress, &NoopContext);
301 assert!(result.is_err());
302
303 if let Err(e) = result {
304 assert!(e.to_string().contains("intentional test failure"));
305 }
306 }
307
308 #[test]
309 fn test_default_validate_returns_empty() {
310 let processor = EchoProcessor;
311 let params = serde_json::Map::new();
312
313 // The default validate() should return no errors.
314 let errors = processor.validate(¶ms);
315 assert!(errors.is_empty());
316 }
317
318 // --- Batch Processing Tests ---
319
320 #[test]
321 fn test_default_process_batch_falls_back_to_per_file() {
322 let processor = EchoProcessor;
323 let progress = ProgressReporter::new_noop();
324 let input = BatchInput {
325 files: vec![
326 BatchFile {
327 data: b"file1".to_vec(),
328 filename: "a.txt".to_string(),
329 mime_type: None,
330 },
331 BatchFile {
332 data: b"file2".to_vec(),
333 filename: "b.txt".to_string(),
334 mime_type: None,
335 },
336 ],
337 params: serde_json::Map::new(),
338 };
339
340 let output = processor
341 .process_batch(input, &progress, &NoopContext)
342 .unwrap();
343
344 // Default batch falls back to per-file: 2 inputs → 2 outputs.
345 assert_eq!(output.files.len(), 2);
346 assert_eq!(output.files[0].filename, "a.txt");
347 assert!(matches!(&output.files[0].data, FileData::Bytes(d) if d == b"file1"));
348 assert_eq!(output.files[1].filename, "b.txt");
349 assert!(matches!(&output.files[1].data, FileData::Bytes(d) if d == b"file2"));
350 }
351
352 #[test]
353 fn test_default_process_batch_empty_input() {
354 let processor = EchoProcessor;
355 let progress = ProgressReporter::new_noop();
356 let input = BatchInput {
357 files: vec![],
358 params: serde_json::Map::new(),
359 };
360
361 let output = processor
362 .process_batch(input, &progress, &NoopContext)
363 .unwrap();
364 assert_eq!(output.files.len(), 0);
365 }
366
367 #[test]
368 fn test_default_process_batch_propagates_errors() {
369 let processor = FailProcessor;
370 let progress = ProgressReporter::new_noop();
371 let input = BatchInput {
372 files: vec![BatchFile {
373 data: b"data".to_vec(),
374 filename: "test.txt".to_string(),
375 mime_type: None,
376 }],
377 params: serde_json::Map::new(),
378 };
379
380 let result = processor.process_batch(input, &progress, &NoopContext);
381 assert!(result.is_err());
382 }
383}