1mod auto_iteration;
12mod container;
13mod primitive;
14
15use crate::context::ProcessContext;
16use crate::errors::BntoError;
17use crate::events::{PipelineEvent, PipelineReporter};
18use crate::pipeline::{
19 IterationMode, PipelineDefinition, PipelineFile, PipelineFileResult, PipelineNode,
20 PipelineResult, is_container_node, is_io_node,
21};
22use crate::registry::NodeRegistry;
23
24#[cfg(test)]
25use crate::processor::NodeInput;
26#[cfg(test)]
27use crate::progress::ProgressReporter;
28
29type PipelineNodeRef<'a> = &'a PipelineNode;
32
33struct PipelineContext<'a, F: Fn() -> u64 + Copy> {
36 registry: &'a NodeRegistry,
37 reporter: &'a PipelineReporter,
38 process_ctx: &'a dyn ProcessContext,
40 pipeline_total_files: usize,
43 now_ms: F,
45}
46
47struct NodeExecutionResult {
49 output_files: Vec<PipelineFile>,
50 files_processed: usize,
51}
52
53fn filter_processing_nodes(definition: &PipelineDefinition) -> Vec<&PipelineNode> {
57 definition
58 .nodes
59 .iter()
60 .filter(|n| !is_io_node(&n.node_type))
61 .collect()
62}
63
64fn build_pipeline_result(files: Vec<PipelineFile>, duration_ms: u64) -> PipelineResult {
66 let result_files = files
67 .into_iter()
68 .map(|f| PipelineFileResult {
69 name: f.name,
70 data: f.data,
71 mime_type: f.mime_type,
72 metadata: f.metadata,
73 })
74 .collect();
75 PipelineResult {
76 files: result_files,
77 duration_ms,
78 }
79}
80
81pub fn execute_pipeline(
86 definition: &PipelineDefinition,
87 files: Vec<PipelineFile>,
88 registry: &NodeRegistry,
89 reporter: &PipelineReporter,
90 process_ctx: &dyn ProcessContext,
91 now_ms: impl Fn() -> u64 + Copy,
92) -> Result<PipelineResult, BntoError> {
93 let start_ms = now_ms();
94 let processing_nodes = filter_processing_nodes(definition);
95 let ctx = PipelineContext {
96 registry,
97 reporter,
98 process_ctx,
99 pipeline_total_files: files.len(),
100 now_ms,
101 };
102
103 ctx.reporter.emit(PipelineEvent::PipelineStarted {
104 total_nodes: processing_nodes.len(),
105 total_files: files.len(),
106 });
107
108 let (current_files, total_files_processed) = match definition.resolved_iteration() {
109 IterationMode::Explicit => run_node_chain(&ctx, &processing_nodes, files, 0)?,
110 IterationMode::Auto => auto_iteration::run_auto_iteration(&ctx, &processing_nodes, files)?,
111 };
112
113 let duration_ms = (ctx.now_ms)() - start_ms;
114 ctx.reporter.emit(PipelineEvent::PipelineCompleted {
115 duration_ms,
116 total_files_processed,
117 });
118
119 Ok(build_pipeline_result(current_files, duration_ms))
120}
121
122fn run_node_chain<F: Fn() -> u64 + Copy>(
124 ctx: &PipelineContext<F>,
125 nodes: &[&PipelineNode],
126 files: Vec<PipelineFile>,
127 file_offset: usize,
128) -> Result<(Vec<PipelineFile>, usize), BntoError> {
129 let total_nodes = nodes.len();
130 let mut current_files = files;
131 let mut total_files_processed: usize = 0;
132
133 for (node_index, node) in nodes.iter().enumerate() {
134 let result = execute_node(
135 ctx,
136 node,
137 current_files,
138 node_index,
139 total_nodes,
140 file_offset,
141 )?;
142 total_files_processed += result.files_processed;
143 current_files = result.output_files;
144 }
145
146 Ok((current_files, total_files_processed))
147}
148
149fn emit_node_failure<F: Fn() -> u64 + Copy>(
153 ctx: &PipelineContext<F>,
154 node_id: &str,
155 error: BntoError,
156) -> BntoError {
157 let error_msg = error.to_string();
158 ctx.reporter.emit(PipelineEvent::NodeFailed {
159 node_id: node_id.to_string(),
160 error: error_msg.clone(),
161 });
162 ctx.reporter.emit(PipelineEvent::PipelineFailed {
163 node_id: node_id.to_string(),
164 error: error_msg,
165 });
166 error
167}
168
169fn execute_node<F: Fn() -> u64 + Copy>(
175 ctx: &PipelineContext<F>,
176 node: &PipelineNode,
177 files: Vec<PipelineFile>,
178 node_index: usize,
179 total_nodes: usize,
180 file_offset: usize,
181) -> Result<NodeExecutionResult, BntoError> {
182 let node_start = (ctx.now_ms)();
183 emit_node_started(ctx, node, node_index, total_nodes);
184
185 let result = dispatch_node(ctx, node, files, file_offset);
186
187 match result {
188 Ok(exec_result) => {
189 emit_node_completed(ctx, &node.id, node_start, exec_result.files_processed);
190 Ok(exec_result)
191 }
192 Err(error) => Err(emit_node_failure(ctx, &node.id, error)),
193 }
194}
195
196fn emit_node_started<F: Fn() -> u64 + Copy>(
197 ctx: &PipelineContext<F>,
198 node: &PipelineNode,
199 node_index: usize,
200 total_nodes: usize,
201) {
202 ctx.reporter.emit(PipelineEvent::NodeStarted {
203 node_id: node.id.clone(),
204 node_index,
205 total_nodes,
206 node_type: node.node_type.clone(),
207 });
208}
209
210fn emit_node_completed<F: Fn() -> u64 + Copy>(
211 ctx: &PipelineContext<F>,
212 node_id: &str,
213 node_start: u64,
214 files_processed: usize,
215) {
216 let duration_ms = (ctx.now_ms)() - node_start;
217 ctx.reporter.emit(PipelineEvent::NodeCompleted {
218 node_id: node_id.to_string(),
219 duration_ms,
220 files_processed,
221 });
222}
223
224fn dispatch_node<F: Fn() -> u64 + Copy>(
225 ctx: &PipelineContext<F>,
226 node: &PipelineNode,
227 files: Vec<PipelineFile>,
228 file_offset: usize,
229) -> Result<NodeExecutionResult, BntoError> {
230 if is_container_node(&node.node_type) {
231 container::execute_container_node(ctx, node, files, file_offset)
232 } else {
233 primitive::execute_primitive_node(ctx, node, files, file_offset)
234 }
235}
236
237#[cfg(test)]
238mod tests;