1mod auto_iteration;
12mod container;
13pub mod loop_config;
14mod primitive;
15pub mod resolve;
16pub mod template;
17
18use crate::context::ProcessContext;
19use crate::errors::BntoError;
20use crate::events::{NodeInfo, PipelineEvent, PipelineReporter};
21use crate::pipeline::{
22 IterationMode, PipelineDefinition, PipelineFile, PipelineFileResult, PipelineNode,
23 PipelineResult, is_container_node, is_io_node,
24};
25use crate::registry::NodeRegistry;
26
27#[cfg(test)]
28use crate::processor::NodeInput;
29#[cfg(test)]
30use crate::progress::ProgressReporter;
31
32type PipelineNodeRef<'a> = &'a PipelineNode;
35
36struct PipelineContext<'a, F: Fn() -> u64 + Copy> {
39 registry: &'a NodeRegistry,
40 reporter: &'a PipelineReporter,
41 process_ctx: &'a dyn ProcessContext,
43 pipeline_total_files: usize,
46 now_ms: F,
48 loop_item: Option<serde_json::Map<String, serde_json::Value>>,
51 parent_node_id: Option<String>,
54 node_outputs: std::collections::BTreeMap<String, serde_json::Value>,
57}
58
59struct NodeExecutionResult {
61 output_files: Vec<PipelineFile>,
62 files_processed: usize,
63 warnings: Vec<String>,
65}
66
67fn filter_processing_nodes(definition: &PipelineDefinition) -> Vec<&PipelineNode> {
71 definition
72 .nodes
73 .iter()
74 .filter(|n| !is_io_node(&n.node_type))
75 .collect()
76}
77
78fn build_pipeline_result(
80 files: Vec<PipelineFile>,
81 duration_ms: u64,
82 warnings: Vec<String>,
83) -> PipelineResult {
84 let result_files = files
85 .into_iter()
86 .map(|f| PipelineFileResult {
87 name: f.name,
88 data: f.data,
89 mime_type: f.mime_type,
90 metadata: f.metadata,
91 })
92 .collect();
93 PipelineResult {
94 files: result_files,
95 duration_ms,
96 warnings,
97 }
98}
99
100pub fn execute_pipeline(
105 definition: &PipelineDefinition,
106 files: Vec<PipelineFile>,
107 registry: &NodeRegistry,
108 reporter: &PipelineReporter,
109 process_ctx: &dyn ProcessContext,
110 now_ms: impl Fn() -> u64 + Copy,
111) -> Result<PipelineResult, BntoError> {
112 let start_ms = now_ms();
113 let processing_nodes = filter_processing_nodes(definition);
114 let node_outputs = template::build_node_outputs_for_input(&definition.nodes, &files);
115 let ctx = PipelineContext {
116 registry,
117 reporter,
118 process_ctx,
119 pipeline_total_files: files.len(),
120 now_ms,
121 loop_item: None,
122 parent_node_id: None,
123 node_outputs,
124 };
125
126 let node_infos: Vec<NodeInfo> = processing_nodes
127 .iter()
128 .map(|n| NodeInfo {
129 id: n.id.clone(),
130 node_type: n.node_type.clone(),
131 })
132 .collect();
133
134 ctx.reporter.emit(PipelineEvent::PipelineStarted {
135 total_nodes: processing_nodes.len(),
136 total_files: files.len(),
137 nodes: node_infos,
138 });
139
140 let (current_files, total_files_processed, warnings) = match definition.resolved_iteration() {
141 IterationMode::Explicit => run_node_chain(&ctx, &processing_nodes, files, 0)?,
142 IterationMode::Auto => auto_iteration::run_auto_iteration(&ctx, &processing_nodes, files)?,
143 };
144
145 let duration_ms = (ctx.now_ms)() - start_ms;
146 ctx.reporter.emit(PipelineEvent::PipelineCompleted {
147 duration_ms,
148 total_files_processed,
149 });
150
151 Ok(build_pipeline_result(current_files, duration_ms, warnings))
152}
153
154fn run_node_chain<F: Fn() -> u64 + Copy>(
156 ctx: &PipelineContext<F>,
157 nodes: &[&PipelineNode],
158 files: Vec<PipelineFile>,
159 file_offset: usize,
160) -> Result<(Vec<PipelineFile>, usize, Vec<String>), BntoError> {
161 let total_nodes = nodes.len();
162 let mut current_files = files;
163 let mut total_files_processed: usize = 0;
164 let mut all_warnings: Vec<String> = Vec::new();
165
166 for (node_index, node) in nodes.iter().enumerate() {
167 let result = execute_node(
168 ctx,
169 node,
170 current_files,
171 node_index,
172 total_nodes,
173 file_offset,
174 )?;
175 total_files_processed += result.files_processed;
176 all_warnings.extend(result.warnings);
177 current_files = result.output_files;
178 }
179
180 Ok((current_files, total_files_processed, all_warnings))
181}
182
183fn emit_node_failure<F: Fn() -> u64 + Copy>(
187 ctx: &PipelineContext<F>,
188 node_id: &str,
189 error: BntoError,
190) -> BntoError {
191 let error_msg = error.to_string();
192 ctx.reporter.emit(PipelineEvent::NodeFailed {
193 node_id: node_id.to_string(),
194 error: error_msg.clone(),
195 parent_node_id: ctx.parent_node_id.clone(),
196 });
197 ctx.reporter.emit(PipelineEvent::PipelineFailed {
198 node_id: node_id.to_string(),
199 error: error_msg,
200 });
201 error
202}
203
204fn execute_node<F: Fn() -> u64 + Copy>(
210 ctx: &PipelineContext<F>,
211 node: &PipelineNode,
212 files: Vec<PipelineFile>,
213 node_index: usize,
214 total_nodes: usize,
215 file_offset: usize,
216) -> Result<NodeExecutionResult, BntoError> {
217 let node_start = (ctx.now_ms)();
218 emit_node_started(ctx, node, node_index, total_nodes);
219
220 let result = dispatch_node(ctx, node, files, file_offset);
221
222 match result {
223 Ok(exec_result) => {
224 emit_node_completed(ctx, &node.id, node_start, exec_result.files_processed);
225 Ok(exec_result)
226 }
227 Err(error) => Err(emit_node_failure(ctx, &node.id, error)),
228 }
229}
230
231fn emit_node_started<F: Fn() -> u64 + Copy>(
232 ctx: &PipelineContext<F>,
233 node: &PipelineNode,
234 node_index: usize,
235 total_nodes: usize,
236) {
237 ctx.reporter.emit(PipelineEvent::NodeStarted {
238 node_id: node.id.clone(),
239 node_index,
240 total_nodes,
241 node_type: node.node_type.clone(),
242 parent_node_id: ctx.parent_node_id.clone(),
243 });
244}
245
246fn emit_node_completed<F: Fn() -> u64 + Copy>(
247 ctx: &PipelineContext<F>,
248 node_id: &str,
249 node_start: u64,
250 files_processed: usize,
251) {
252 let duration_ms = (ctx.now_ms)() - node_start;
253 ctx.reporter.emit(PipelineEvent::NodeCompleted {
254 node_id: node_id.to_string(),
255 duration_ms,
256 files_processed,
257 parent_node_id: ctx.parent_node_id.clone(),
258 });
259}
260
261fn dispatch_node<F: Fn() -> u64 + Copy>(
262 ctx: &PipelineContext<F>,
263 node: &PipelineNode,
264 files: Vec<PipelineFile>,
265 file_offset: usize,
266) -> Result<NodeExecutionResult, BntoError> {
267 if is_container_node(&node.node_type) {
268 container::execute_container_node(ctx, node, files, file_offset)
269 } else {
270 primitive::execute_primitive_node(ctx, node, files, file_offset)
271 }
272}
273
274#[cfg(test)]
275mod tests;