mod auto_iteration;
mod container;
mod primitive;
use crate::context::ProcessContext;
use crate::errors::BntoError;
use crate::events::{PipelineEvent, PipelineReporter};
use crate::pipeline::{
IterationMode, PipelineDefinition, PipelineFile, PipelineFileResult, PipelineNode,
PipelineResult, is_container_node, is_io_node,
};
use crate::registry::NodeRegistry;
#[cfg(test)]
use crate::processor::NodeInput;
#[cfg(test)]
use crate::progress::ProgressReporter;
type PipelineNodeRef<'a> = &'a PipelineNode;
struct PipelineContext<'a, F: Fn() -> u64 + Copy> {
registry: &'a NodeRegistry,
reporter: &'a PipelineReporter,
process_ctx: &'a dyn ProcessContext,
pipeline_total_files: usize,
now_ms: F,
}
struct NodeExecutionResult {
output_files: Vec<PipelineFile>,
files_processed: usize,
}
fn filter_processing_nodes(definition: &PipelineDefinition) -> Vec<&PipelineNode> {
definition
.nodes
.iter()
.filter(|n| !is_io_node(&n.node_type))
.collect()
}
fn build_pipeline_result(files: Vec<PipelineFile>, duration_ms: u64) -> PipelineResult {
let result_files = files
.into_iter()
.map(|f| PipelineFileResult {
name: f.name,
data: f.data,
mime_type: f.mime_type,
metadata: f.metadata,
})
.collect();
PipelineResult {
files: result_files,
duration_ms,
}
}
pub fn execute_pipeline(
definition: &PipelineDefinition,
files: Vec<PipelineFile>,
registry: &NodeRegistry,
reporter: &PipelineReporter,
process_ctx: &dyn ProcessContext,
now_ms: impl Fn() -> u64 + Copy,
) -> Result<PipelineResult, BntoError> {
let start_ms = now_ms();
let processing_nodes = filter_processing_nodes(definition);
let ctx = PipelineContext {
registry,
reporter,
process_ctx,
pipeline_total_files: files.len(),
now_ms,
};
ctx.reporter.emit(PipelineEvent::PipelineStarted {
total_nodes: processing_nodes.len(),
total_files: files.len(),
});
let (current_files, total_files_processed) = match definition.resolved_iteration() {
IterationMode::Explicit => run_node_chain(&ctx, &processing_nodes, files, 0)?,
IterationMode::Auto => auto_iteration::run_auto_iteration(&ctx, &processing_nodes, files)?,
};
let duration_ms = (ctx.now_ms)() - start_ms;
ctx.reporter.emit(PipelineEvent::PipelineCompleted {
duration_ms,
total_files_processed,
});
Ok(build_pipeline_result(current_files, duration_ms))
}
fn run_node_chain<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
nodes: &[&PipelineNode],
files: Vec<PipelineFile>,
file_offset: usize,
) -> Result<(Vec<PipelineFile>, usize), BntoError> {
let total_nodes = nodes.len();
let mut current_files = files;
let mut total_files_processed: usize = 0;
for (node_index, node) in nodes.iter().enumerate() {
let result = execute_node(
ctx,
node,
current_files,
node_index,
total_nodes,
file_offset,
)?;
total_files_processed += result.files_processed;
current_files = result.output_files;
}
Ok((current_files, total_files_processed))
}
fn emit_node_failure<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
node_id: &str,
error: BntoError,
) -> BntoError {
let error_msg = error.to_string();
ctx.reporter.emit(PipelineEvent::NodeFailed {
node_id: node_id.to_string(),
error: error_msg.clone(),
});
ctx.reporter.emit(PipelineEvent::PipelineFailed {
node_id: node_id.to_string(),
error: error_msg,
});
error
}
fn execute_node<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
node: &PipelineNode,
files: Vec<PipelineFile>,
node_index: usize,
total_nodes: usize,
file_offset: usize,
) -> Result<NodeExecutionResult, BntoError> {
let node_start = (ctx.now_ms)();
emit_node_started(ctx, node, node_index, total_nodes);
let result = dispatch_node(ctx, node, files, file_offset);
match result {
Ok(exec_result) => {
emit_node_completed(ctx, &node.id, node_start, exec_result.files_processed);
Ok(exec_result)
}
Err(error) => Err(emit_node_failure(ctx, &node.id, error)),
}
}
fn emit_node_started<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
node: &PipelineNode,
node_index: usize,
total_nodes: usize,
) {
ctx.reporter.emit(PipelineEvent::NodeStarted {
node_id: node.id.clone(),
node_index,
total_nodes,
node_type: node.node_type.clone(),
});
}
fn emit_node_completed<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
node_id: &str,
node_start: u64,
files_processed: usize,
) {
let duration_ms = (ctx.now_ms)() - node_start;
ctx.reporter.emit(PipelineEvent::NodeCompleted {
node_id: node_id.to_string(),
duration_ms,
files_processed,
});
}
fn dispatch_node<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
node: &PipelineNode,
files: Vec<PipelineFile>,
file_offset: usize,
) -> Result<NodeExecutionResult, BntoError> {
if is_container_node(&node.node_type) {
container::execute_container_node(ctx, node, files, file_offset)
} else {
primitive::execute_primitive_node(ctx, node, files, file_offset)
}
}
#[cfg(test)]
mod tests;