Skip to main content

bnto_core/executor/
mod.rs

1// Pipeline Executor — top-level orchestrator for pipeline execution.
2//
3// Walks nodes in order, chaining outputs to inputs. Split across three files:
4//   - `mod.rs` — public API, node dispatch, shared types
5//   - `primitive.rs` — leaf node execution (image compress, file rename, etc.)
6//   - `container.rs` — container node execution (loop, group, parallel)
7//
8// Pure Rust — no WASM deps. Works with `cargo test` natively.
9// The WASM bridge (`bnto-wasm/src/execute.rs`) handles JS type conversions.
10
11mod auto_iteration;
12mod container;
13pub mod loop_config;
14mod primitive;
15pub mod resolve;
16pub mod template;
17
18use crate::context::ProcessContext;
19use crate::errors::BntoError;
20use crate::events::{NodeInfo, PipelineEvent, PipelineReporter};
21use crate::pipeline::{
22    IterationMode, PipelineDefinition, PipelineFile, PipelineFileResult, PipelineNode,
23    PipelineResult, is_container_node, is_io_node,
24};
25use crate::registry::NodeRegistry;
26
27#[cfg(test)]
28use crate::processor::NodeInput;
29#[cfg(test)]
30use crate::progress::ProgressReporter;
31
32// --- Shared Types ---
33
34type PipelineNodeRef<'a> = &'a PipelineNode;
35
36/// Bundles shared immutable state for executor functions, keeping
37/// signatures clean and under clippy's 7-argument limit.
38struct PipelineContext<'a, F: Fn() -> u64 + Copy> {
39    registry: &'a NodeRegistry,
40    reporter: &'a PipelineReporter,
41    /// System access boundary for processors (commands, temp files, env vars).
42    process_ctx: &'a dyn ProcessContext,
43    /// Original input file count — used in progress events so the UI
44    /// reports global position even inside loop container iterations.
45    pipeline_total_files: usize,
46    /// Returns current time in ms (injected for testability in WASM).
47    now_ms: F,
48    /// Current loop iteration's per-file metadata (CSV row columns, etc.).
49    /// Set by loop containers from PipelineFile.metadata each iteration.
50    loop_item: Option<serde_json::Map<String, serde_json::Value>>,
51    /// If executing inside a container, the container's node ID.
52    /// Used to set `parent_node_id` on child `NodeStarted` events.
53    parent_node_id: Option<String>,
54    /// Per-node output metadata for {{node.<id>.*}} templates.
55    /// Seeded with input file metadata before execution begins.
56    node_outputs: std::collections::BTreeMap<String, serde_json::Value>,
57}
58
59/// Result of executing a single node or container sub-pipeline.
60struct NodeExecutionResult {
61    output_files: Vec<PipelineFile>,
62    files_processed: usize,
63    /// Non-fatal warnings (e.g. skipped loop iterations with continue-on-error).
64    warnings: Vec<String>,
65}
66
67// --- Public API ---
68
69/// Filter out I/O marker nodes (input/output) that don't perform processing.
70fn filter_processing_nodes(definition: &PipelineDefinition) -> Vec<&PipelineNode> {
71    definition
72        .nodes
73        .iter()
74        .filter(|n| !is_io_node(&n.node_type))
75        .collect()
76}
77
78/// Convert final pipeline files to result format, preserving processor metadata.
79fn build_pipeline_result(
80    files: Vec<PipelineFile>,
81    duration_ms: u64,
82    warnings: Vec<String>,
83) -> PipelineResult {
84    let result_files = files
85        .into_iter()
86        .map(|f| PipelineFileResult {
87            name: f.name,
88            data: f.data,
89            mime_type: f.mime_type,
90            metadata: f.metadata,
91        })
92        .collect();
93    PipelineResult {
94        files: result_files,
95        duration_ms,
96        warnings,
97    }
98}
99
100/// Execute a complete pipeline: walk nodes, iterate files, chain outputs.
101///
102/// Main entry point for the engine. `now_ms` is injected for testability
103/// (no `std::time::Instant` in WASM — uses `js_sys::Date::now()` instead).
104pub fn execute_pipeline(
105    definition: &PipelineDefinition,
106    files: Vec<PipelineFile>,
107    registry: &NodeRegistry,
108    reporter: &PipelineReporter,
109    process_ctx: &dyn ProcessContext,
110    now_ms: impl Fn() -> u64 + Copy,
111) -> Result<PipelineResult, BntoError> {
112    let start_ms = now_ms();
113    let processing_nodes = filter_processing_nodes(definition);
114    let node_outputs = template::build_node_outputs_for_input(&definition.nodes, &files);
115    let ctx = PipelineContext {
116        registry,
117        reporter,
118        process_ctx,
119        pipeline_total_files: files.len(),
120        now_ms,
121        loop_item: None,
122        parent_node_id: None,
123        node_outputs,
124    };
125
126    let node_infos: Vec<NodeInfo> = processing_nodes
127        .iter()
128        .map(|n| NodeInfo {
129            id: n.id.clone(),
130            node_type: n.node_type.clone(),
131        })
132        .collect();
133
134    ctx.reporter.emit(PipelineEvent::PipelineStarted {
135        total_nodes: processing_nodes.len(),
136        total_files: files.len(),
137        nodes: node_infos,
138    });
139
140    let (current_files, total_files_processed, warnings) = match definition.resolved_iteration() {
141        IterationMode::Explicit => run_node_chain(&ctx, &processing_nodes, files, 0)?,
142        IterationMode::Auto => auto_iteration::run_auto_iteration(&ctx, &processing_nodes, files)?,
143    };
144
145    let duration_ms = (ctx.now_ms)() - start_ms;
146    ctx.reporter.emit(PipelineEvent::PipelineCompleted {
147        duration_ms,
148        total_files_processed,
149    });
150
151    Ok(build_pipeline_result(current_files, duration_ms, warnings))
152}
153
154/// Chain nodes sequentially, passing each node's output as the next node's input.
155fn run_node_chain<F: Fn() -> u64 + Copy>(
156    ctx: &PipelineContext<F>,
157    nodes: &[&PipelineNode],
158    files: Vec<PipelineFile>,
159    file_offset: usize,
160) -> Result<(Vec<PipelineFile>, usize, Vec<String>), BntoError> {
161    let total_nodes = nodes.len();
162    let mut current_files = files;
163    let mut total_files_processed: usize = 0;
164    let mut all_warnings: Vec<String> = Vec::new();
165
166    for (node_index, node) in nodes.iter().enumerate() {
167        let result = execute_node(
168            ctx,
169            node,
170            current_files,
171            node_index,
172            total_nodes,
173            file_offset,
174        )?;
175        total_files_processed += result.files_processed;
176        all_warnings.extend(result.warnings);
177        current_files = result.output_files;
178    }
179
180    Ok((current_files, total_files_processed, all_warnings))
181}
182
183// --- Internal: Node Dispatch ---
184
185/// Emit failure events (NodeFailed + PipelineFailed) and return the error.
186fn emit_node_failure<F: Fn() -> u64 + Copy>(
187    ctx: &PipelineContext<F>,
188    node_id: &str,
189    error: BntoError,
190) -> BntoError {
191    let error_msg = error.to_string();
192    ctx.reporter.emit(PipelineEvent::NodeFailed {
193        node_id: node_id.to_string(),
194        error: error_msg.clone(),
195        parent_node_id: ctx.parent_node_id.clone(),
196    });
197    ctx.reporter.emit(PipelineEvent::PipelineFailed {
198        node_id: node_id.to_string(),
199        error: error_msg,
200    });
201    error
202}
203
204/// Execute a single node — dispatches to primitive or container handler.
205///
206/// Emits NodeStarted, delegates to the appropriate handler, then emits
207/// NodeCompleted or NodeFailed. Container children call back into this
208/// for recursive execution.
209fn execute_node<F: Fn() -> u64 + Copy>(
210    ctx: &PipelineContext<F>,
211    node: &PipelineNode,
212    files: Vec<PipelineFile>,
213    node_index: usize,
214    total_nodes: usize,
215    file_offset: usize,
216) -> Result<NodeExecutionResult, BntoError> {
217    let node_start = (ctx.now_ms)();
218    emit_node_started(ctx, node, node_index, total_nodes);
219
220    let result = dispatch_node(ctx, node, files, file_offset);
221
222    match result {
223        Ok(exec_result) => {
224            emit_node_completed(ctx, &node.id, node_start, exec_result.files_processed);
225            Ok(exec_result)
226        }
227        Err(error) => Err(emit_node_failure(ctx, &node.id, error)),
228    }
229}
230
231fn emit_node_started<F: Fn() -> u64 + Copy>(
232    ctx: &PipelineContext<F>,
233    node: &PipelineNode,
234    node_index: usize,
235    total_nodes: usize,
236) {
237    ctx.reporter.emit(PipelineEvent::NodeStarted {
238        node_id: node.id.clone(),
239        node_index,
240        total_nodes,
241        node_type: node.node_type.clone(),
242        parent_node_id: ctx.parent_node_id.clone(),
243    });
244}
245
246fn emit_node_completed<F: Fn() -> u64 + Copy>(
247    ctx: &PipelineContext<F>,
248    node_id: &str,
249    node_start: u64,
250    files_processed: usize,
251) {
252    let duration_ms = (ctx.now_ms)() - node_start;
253    ctx.reporter.emit(PipelineEvent::NodeCompleted {
254        node_id: node_id.to_string(),
255        duration_ms,
256        files_processed,
257        parent_node_id: ctx.parent_node_id.clone(),
258    });
259}
260
261fn dispatch_node<F: Fn() -> u64 + Copy>(
262    ctx: &PipelineContext<F>,
263    node: &PipelineNode,
264    files: Vec<PipelineFile>,
265    file_offset: usize,
266) -> Result<NodeExecutionResult, BntoError> {
267    if is_container_node(&node.node_type) {
268        container::execute_container_node(ctx, node, files, file_offset)
269    } else {
270        primitive::execute_primitive_node(ctx, node, files, file_offset)
271    }
272}
273
274#[cfg(test)]
275mod tests;