Skip to main content

bnto_core/executor/
mod.rs

1// Pipeline Executor — top-level orchestrator for pipeline execution.
2//
3// Walks nodes in order, chaining outputs to inputs. Split across three files:
4//   - `mod.rs` — public API, node dispatch, shared types
5//   - `primitive.rs` — leaf node execution (image compress, file rename, etc.)
6//   - `container.rs` — container node execution (loop, group, parallel)
7//
8// Pure Rust — no WASM deps. Works with `cargo test` natively.
9// The WASM bridge (`bnto-wasm/src/execute.rs`) handles JS type conversions.
10
11mod auto_iteration;
12mod container;
13mod primitive;
14
15use crate::context::ProcessContext;
16use crate::errors::BntoError;
17use crate::events::{PipelineEvent, PipelineReporter};
18use crate::pipeline::{
19    IterationMode, PipelineDefinition, PipelineFile, PipelineFileResult, PipelineNode,
20    PipelineResult, is_container_node, is_io_node,
21};
22use crate::registry::NodeRegistry;
23
24#[cfg(test)]
25use crate::processor::NodeInput;
26#[cfg(test)]
27use crate::progress::ProgressReporter;
28
29// --- Shared Types ---
30
31type PipelineNodeRef<'a> = &'a PipelineNode;
32
33/// Bundles shared immutable state for executor functions, keeping
34/// signatures clean and under clippy's 7-argument limit.
35struct PipelineContext<'a, F: Fn() -> u64 + Copy> {
36    registry: &'a NodeRegistry,
37    reporter: &'a PipelineReporter,
38    /// System access boundary for processors (commands, temp files, env vars).
39    process_ctx: &'a dyn ProcessContext,
40    /// Original input file count — used in progress events so the UI
41    /// reports global position even inside loop container iterations.
42    pipeline_total_files: usize,
43    /// Returns current time in ms (injected for testability in WASM).
44    now_ms: F,
45}
46
47/// Result of executing a single node or container sub-pipeline.
48struct NodeExecutionResult {
49    output_files: Vec<PipelineFile>,
50    files_processed: usize,
51}
52
53// --- Public API ---
54
55/// Filter out I/O marker nodes (input/output) that don't perform processing.
56fn filter_processing_nodes(definition: &PipelineDefinition) -> Vec<&PipelineNode> {
57    definition
58        .nodes
59        .iter()
60        .filter(|n| !is_io_node(&n.node_type))
61        .collect()
62}
63
64/// Convert final pipeline files to result format, preserving processor metadata.
65fn build_pipeline_result(files: Vec<PipelineFile>, duration_ms: u64) -> PipelineResult {
66    let result_files = files
67        .into_iter()
68        .map(|f| PipelineFileResult {
69            name: f.name,
70            data: f.data,
71            mime_type: f.mime_type,
72            metadata: f.metadata,
73        })
74        .collect();
75    PipelineResult {
76        files: result_files,
77        duration_ms,
78    }
79}
80
81/// Execute a complete pipeline: walk nodes, iterate files, chain outputs.
82///
83/// Main entry point for the engine. `now_ms` is injected for testability
84/// (no `std::time::Instant` in WASM — uses `js_sys::Date::now()` instead).
85pub fn execute_pipeline(
86    definition: &PipelineDefinition,
87    files: Vec<PipelineFile>,
88    registry: &NodeRegistry,
89    reporter: &PipelineReporter,
90    process_ctx: &dyn ProcessContext,
91    now_ms: impl Fn() -> u64 + Copy,
92) -> Result<PipelineResult, BntoError> {
93    let start_ms = now_ms();
94    let processing_nodes = filter_processing_nodes(definition);
95    let ctx = PipelineContext {
96        registry,
97        reporter,
98        process_ctx,
99        pipeline_total_files: files.len(),
100        now_ms,
101    };
102
103    ctx.reporter.emit(PipelineEvent::PipelineStarted {
104        total_nodes: processing_nodes.len(),
105        total_files: files.len(),
106    });
107
108    let (current_files, total_files_processed) = match definition.resolved_iteration() {
109        IterationMode::Explicit => run_node_chain(&ctx, &processing_nodes, files, 0)?,
110        IterationMode::Auto => auto_iteration::run_auto_iteration(&ctx, &processing_nodes, files)?,
111    };
112
113    let duration_ms = (ctx.now_ms)() - start_ms;
114    ctx.reporter.emit(PipelineEvent::PipelineCompleted {
115        duration_ms,
116        total_files_processed,
117    });
118
119    Ok(build_pipeline_result(current_files, duration_ms))
120}
121
122/// Chain nodes sequentially, passing each node's output as the next node's input.
123fn run_node_chain<F: Fn() -> u64 + Copy>(
124    ctx: &PipelineContext<F>,
125    nodes: &[&PipelineNode],
126    files: Vec<PipelineFile>,
127    file_offset: usize,
128) -> Result<(Vec<PipelineFile>, usize), BntoError> {
129    let total_nodes = nodes.len();
130    let mut current_files = files;
131    let mut total_files_processed: usize = 0;
132
133    for (node_index, node) in nodes.iter().enumerate() {
134        let result = execute_node(
135            ctx,
136            node,
137            current_files,
138            node_index,
139            total_nodes,
140            file_offset,
141        )?;
142        total_files_processed += result.files_processed;
143        current_files = result.output_files;
144    }
145
146    Ok((current_files, total_files_processed))
147}
148
149// --- Internal: Node Dispatch ---
150
151/// Emit failure events (NodeFailed + PipelineFailed) and return the error.
152fn emit_node_failure<F: Fn() -> u64 + Copy>(
153    ctx: &PipelineContext<F>,
154    node_id: &str,
155    error: BntoError,
156) -> BntoError {
157    let error_msg = error.to_string();
158    ctx.reporter.emit(PipelineEvent::NodeFailed {
159        node_id: node_id.to_string(),
160        error: error_msg.clone(),
161    });
162    ctx.reporter.emit(PipelineEvent::PipelineFailed {
163        node_id: node_id.to_string(),
164        error: error_msg,
165    });
166    error
167}
168
169/// Execute a single node — dispatches to primitive or container handler.
170///
171/// Emits NodeStarted, delegates to the appropriate handler, then emits
172/// NodeCompleted or NodeFailed. Container children call back into this
173/// for recursive execution.
174fn execute_node<F: Fn() -> u64 + Copy>(
175    ctx: &PipelineContext<F>,
176    node: &PipelineNode,
177    files: Vec<PipelineFile>,
178    node_index: usize,
179    total_nodes: usize,
180    file_offset: usize,
181) -> Result<NodeExecutionResult, BntoError> {
182    let node_start = (ctx.now_ms)();
183    emit_node_started(ctx, node, node_index, total_nodes);
184
185    let result = dispatch_node(ctx, node, files, file_offset);
186
187    match result {
188        Ok(exec_result) => {
189            emit_node_completed(ctx, &node.id, node_start, exec_result.files_processed);
190            Ok(exec_result)
191        }
192        Err(error) => Err(emit_node_failure(ctx, &node.id, error)),
193    }
194}
195
196fn emit_node_started<F: Fn() -> u64 + Copy>(
197    ctx: &PipelineContext<F>,
198    node: &PipelineNode,
199    node_index: usize,
200    total_nodes: usize,
201) {
202    ctx.reporter.emit(PipelineEvent::NodeStarted {
203        node_id: node.id.clone(),
204        node_index,
205        total_nodes,
206        node_type: node.node_type.clone(),
207    });
208}
209
210fn emit_node_completed<F: Fn() -> u64 + Copy>(
211    ctx: &PipelineContext<F>,
212    node_id: &str,
213    node_start: u64,
214    files_processed: usize,
215) {
216    let duration_ms = (ctx.now_ms)() - node_start;
217    ctx.reporter.emit(PipelineEvent::NodeCompleted {
218        node_id: node_id.to_string(),
219        duration_ms,
220        files_processed,
221    });
222}
223
224fn dispatch_node<F: Fn() -> u64 + Copy>(
225    ctx: &PipelineContext<F>,
226    node: &PipelineNode,
227    files: Vec<PipelineFile>,
228    file_offset: usize,
229) -> Result<NodeExecutionResult, BntoError> {
230    if is_container_node(&node.node_type) {
231        container::execute_container_node(ctx, node, files, file_offset)
232    } else {
233        primitive::execute_primitive_node(ctx, node, files, file_offset)
234    }
235}
236
237#[cfg(test)]
238mod tests;