bnto_core/processor.rs
1// =============================================================================
2// NodeProcessor Trait — The Contract Every Node Type Must Implement
3// =============================================================================
4
5use crate::context::ProcessContext;
6use crate::errors::BntoError;
7use crate::metadata::{NodeCategory, NodeMetadata};
8use crate::progress::ProgressReporter;
9
10// =============================================================================
11// Input and Output Types
12// =============================================================================
13
14/// The input data that a node receives for processing.
15pub struct NodeInput {
16 /// The raw file data (bytes). For an image, this is the JPEG/PNG/WebP
17 /// binary data. For a CSV, this is the UTF-8 text content as bytes.
18 pub data: Vec<u8>,
19
20 /// The original filename (e.g., "photo.jpg", "data.csv").
21 /// Used to determine the file format and to generate output filenames.
22 pub filename: String,
23
24 /// The MIME type of the input (e.g., "image/jpeg", "text/csv").
25 /// `None` when the MIME type wasn't provided by the caller.
26 pub mime_type: Option<String>,
27
28 /// Configuration parameters for the node (e.g., quality level, target
29 /// format, dimensions). A JSON-compatible map where keys are parameter
30 /// names from @bnto/nodes schemas.
31 pub params: serde_json::Map<String, serde_json::Value>,
32}
33
34/// The output from a node after processing.
35///
36/// A node can produce one or more output files. For example, the
37/// compress-images node takes one image in and produces one compressed
38/// image out. A future "split PDF" node might produce many pages.
39pub struct NodeOutput {
40 /// The processed file data. Each entry is one output file.
41 pub files: Vec<OutputFile>,
42
43 /// Optional metadata about the processing (timing, compression ratio,
44 /// rows removed, etc.). Displayed in the UI's results panel.
45 pub metadata: serde_json::Map<String, serde_json::Value>,
46}
47
48/// A single output file produced by a node.
49pub struct OutputFile {
50 /// The raw file data (bytes) of the processed output.
51 pub data: Vec<u8>,
52
53 /// The filename for this output (e.g., "photo-compressed.jpg").
54 pub filename: String,
55
56 /// The MIME type of the output (e.g., "image/jpeg").
57 pub mime_type: String,
58}
59
60/// Input for batch processors that need all files at once (e.g., merge, zip).
61///
62/// Unlike `NodeInput` (one file), this carries the full set of pipeline files
63/// plus the shared configuration parameters.
64pub struct BatchInput {
65 /// All files to process as a group.
66 pub files: Vec<BatchFile>,
67
68 /// Configuration parameters for the node (same as `NodeInput.params`).
69 pub params: serde_json::Map<String, serde_json::Value>,
70}
71
72/// A single file within a batch input.
73pub struct BatchFile {
74 /// The raw file data (bytes).
75 pub data: Vec<u8>,
76
77 /// The original filename.
78 pub filename: String,
79
80 /// The MIME type, if known.
81 pub mime_type: Option<String>,
82}
83
84// =============================================================================
85// The NodeProcessor Trait
86// =============================================================================
87
88/// The contract that every node type must implement.
89///
90/// Currently synchronous -- async is handled at the Web Worker level.
91/// wasm-bindgen doesn't support async trait methods across the WASM boundary.
92pub trait NodeProcessor {
93 /// The unique name of this node type (e.g., "compress-images").
94 /// Used for logging and progress reporting.
95 fn name(&self) -> &str;
96
97 /// Process a single input file and produce output.
98 ///
99 /// Arguments:
100 /// - `&self` — reference to the node processor instance
101 /// - `input` — the file data, filename, MIME type, and config params
102 /// - `progress` — callback to report progress to the UI (0-100%)
103 /// - `ctx` — system access boundary (commands, temp files, env vars)
104 ///
105 /// Returns:
106 /// - `Ok(NodeOutput)` — processing succeeded, here are the results
107 /// - `Err(BntoError)` — processing failed, here's what went wrong
108 fn process(
109 &self,
110 input: NodeInput,
111 progress: &ProgressReporter,
112 ctx: &dyn ProcessContext,
113 ) -> Result<NodeOutput, BntoError>;
114
115 /// Validate the input parameters before processing.
116 ///
117 /// This is called BEFORE `process()` to catch configuration errors
118 /// early (missing required params, invalid values, etc.) without
119 /// doing any expensive file processing.
120 ///
121 /// Returns a list of validation errors (empty = valid).
122 ///
123 /// Default implementation passes validation. Override in specific
124 /// node types to add parameter validation.
125 fn validate(&self, _params: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
126 Vec::new()
127 }
128
129 /// Process a batch of files together, producing combined output.
130 ///
131 /// Override this for processors with `InputCardinality::Batch` (merge, zip,
132 /// concat). The default falls back to calling `process()` per file and
133 /// concatenating results — suitable for `PerFile` processors.
134 fn process_batch(
135 &self,
136 input: BatchInput,
137 progress: &ProgressReporter,
138 ctx: &dyn ProcessContext,
139 ) -> Result<NodeOutput, BntoError> {
140 let total = input.files.len();
141 let mut all_files = Vec::new();
142 let mut combined_metadata = serde_json::Map::new();
143
144 for (i, file) in input.files.into_iter().enumerate() {
145 let pct = ((i as u32) * 100) / (total as u32).max(1);
146 progress.report(pct, &format!("Processing file {} of {total}...", i + 1));
147
148 let single_input = NodeInput {
149 data: file.data,
150 filename: file.filename,
151 mime_type: file.mime_type,
152 params: input.params.clone(),
153 };
154 let output = self.process(single_input, progress, ctx)?;
155 all_files.extend(output.files);
156 // Merge metadata from the last file processed.
157 combined_metadata = output.metadata;
158 }
159
160 Ok(NodeOutput {
161 files: all_files,
162 metadata: combined_metadata,
163 })
164 }
165
166 /// Return the processor's self-describing metadata.
167 ///
168 /// This tells consumers everything about this processor: what it's called,
169 /// what category it belongs to, what parameters it accepts, what files it
170 /// handles, and whether it runs in the browser.
171 ///
172 /// Every concrete processor SHOULD override this with its real metadata.
173 /// The default returns a placeholder "unknown" metadata — useful for tests
174 /// and mocks that don't need real metadata.
175 fn metadata(&self) -> NodeMetadata {
176 NodeMetadata {
177 node_type: "unknown".to_string(),
178 name: self.name().to_string(),
179 description: String::new(),
180 category: NodeCategory::Data,
181 accepts: vec![],
182 platforms: vec![],
183 parameters: vec![],
184 input_cardinality: Default::default(),
185 }
186 }
187}
188
189// =============================================================================
190// Tests
191// =============================================================================
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use crate::context::NoopContext;
197
198 // --- Test helpers ---
199 // We create a simple mock processor to test the trait contract.
200
201 /// A mock node processor for testing. Does nothing — just echoes
202 /// the input back as output.
203 struct EchoProcessor;
204
205 impl NodeProcessor for EchoProcessor {
206 fn name(&self) -> &str {
207 "echo"
208 }
209
210 fn process(
211 &self,
212 input: NodeInput,
213 _progress: &ProgressReporter,
214 _ctx: &dyn ProcessContext,
215 ) -> Result<NodeOutput, BntoError> {
216 // Just echo the input data back as output.
217 Ok(NodeOutput {
218 files: vec![OutputFile {
219 data: input.data,
220 filename: input.filename,
221 mime_type: input
222 .mime_type
223 .unwrap_or_else(|| "application/octet-stream".to_string()),
224 }],
225 metadata: serde_json::Map::new(),
226 })
227 }
228 }
229
230 /// A mock processor that always fails — for testing error handling.
231 struct FailProcessor;
232
233 impl NodeProcessor for FailProcessor {
234 fn name(&self) -> &str {
235 "fail"
236 }
237
238 fn process(
239 &self,
240 _input: NodeInput,
241 _progress: &ProgressReporter,
242 _ctx: &dyn ProcessContext,
243 ) -> Result<NodeOutput, BntoError> {
244 Err(BntoError::ProcessingFailed(
245 "intentional test failure".to_string(),
246 ))
247 }
248 }
249
250 /// Helper to create a simple test input.
251 fn make_test_input(data: &[u8], filename: &str) -> NodeInput {
252 NodeInput {
253 data: data.to_vec(),
254 filename: filename.to_string(),
255 mime_type: None,
256 params: serde_json::Map::new(),
257 }
258 }
259
260 // --- Tests ---
261
262 #[test]
263 fn test_echo_processor_name() {
264 let processor = EchoProcessor;
265 assert_eq!(processor.name(), "echo");
266 }
267
268 #[test]
269 fn test_echo_processor_echoes_data() {
270 let processor = EchoProcessor;
271 let progress = ProgressReporter::new_noop();
272 let input = make_test_input(b"hello world", "test.txt");
273
274 let output = processor.process(input, &progress, &NoopContext).unwrap();
275
276 assert_eq!(output.files.len(), 1);
277 assert_eq!(output.files[0].data, b"hello world");
278 assert_eq!(output.files[0].filename, "test.txt");
279 }
280
281 #[test]
282 fn test_fail_processor_returns_error() {
283 let processor = FailProcessor;
284 let progress = ProgressReporter::new_noop();
285 let input = make_test_input(b"data", "test.txt");
286
287 let result = processor.process(input, &progress, &NoopContext);
288 assert!(result.is_err());
289
290 if let Err(e) = result {
291 assert!(e.to_string().contains("intentional test failure"));
292 }
293 }
294
295 #[test]
296 fn test_default_validate_returns_empty() {
297 let processor = EchoProcessor;
298 let params = serde_json::Map::new();
299
300 // The default validate() should return no errors.
301 let errors = processor.validate(¶ms);
302 assert!(errors.is_empty());
303 }
304
305 // --- Batch Processing Tests ---
306
307 #[test]
308 fn test_default_process_batch_falls_back_to_per_file() {
309 let processor = EchoProcessor;
310 let progress = ProgressReporter::new_noop();
311 let input = BatchInput {
312 files: vec![
313 BatchFile {
314 data: b"file1".to_vec(),
315 filename: "a.txt".to_string(),
316 mime_type: None,
317 },
318 BatchFile {
319 data: b"file2".to_vec(),
320 filename: "b.txt".to_string(),
321 mime_type: None,
322 },
323 ],
324 params: serde_json::Map::new(),
325 };
326
327 let output = processor
328 .process_batch(input, &progress, &NoopContext)
329 .unwrap();
330
331 // Default batch falls back to per-file: 2 inputs → 2 outputs.
332 assert_eq!(output.files.len(), 2);
333 assert_eq!(output.files[0].filename, "a.txt");
334 assert_eq!(output.files[0].data, b"file1");
335 assert_eq!(output.files[1].filename, "b.txt");
336 assert_eq!(output.files[1].data, b"file2");
337 }
338
339 #[test]
340 fn test_default_process_batch_empty_input() {
341 let processor = EchoProcessor;
342 let progress = ProgressReporter::new_noop();
343 let input = BatchInput {
344 files: vec![],
345 params: serde_json::Map::new(),
346 };
347
348 let output = processor
349 .process_batch(input, &progress, &NoopContext)
350 .unwrap();
351 assert_eq!(output.files.len(), 0);
352 }
353
354 #[test]
355 fn test_default_process_batch_propagates_errors() {
356 let processor = FailProcessor;
357 let progress = ProgressReporter::new_noop();
358 let input = BatchInput {
359 files: vec![BatchFile {
360 data: b"data".to_vec(),
361 filename: "test.txt".to_string(),
362 mime_type: None,
363 }],
364 params: serde_json::Map::new(),
365 };
366
367 let result = processor.process_batch(input, &progress, &NoopContext);
368 assert!(result.is_err());
369 }
370}