use crate::errors::BntoError;
use crate::pipeline::{PipelineDefinition, PipelineFile, PipelineNode, is_io_node};
use super::{NodeExecutionResult, PipelineContext, PipelineNodeRef, run_node_chain};
fn passthrough(files: Vec<PipelineFile>) -> NodeExecutionResult {
NodeExecutionResult {
files_processed: 0,
output_files: files,
}
}
fn clone_children_definitions(node: PipelineNodeRef) -> Option<PipelineDefinition> {
let children = node.children.as_ref()?;
if children.is_empty() {
return None;
}
Some(PipelineDefinition {
nodes: children.clone(),
settings: None,
})
}
pub(super) fn execute_container_node<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
node: PipelineNodeRef,
files: Vec<PipelineFile>,
file_offset: usize,
) -> Result<NodeExecutionResult, BntoError> {
let sub_definition = match clone_children_definitions(node) {
Some(def) => def,
None => return Ok(passthrough(files)),
};
match node.node_type.as_str() {
"loop" => execute_loop(ctx, &sub_definition, files, file_offset),
"group" | "parallel" => execute_group(ctx, &sub_definition, files, file_offset),
_ => Ok(passthrough(files)),
}
}
fn execute_loop<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
sub_definition: &PipelineDefinition,
files: Vec<PipelineFile>,
file_offset: usize,
) -> Result<NodeExecutionResult, BntoError> {
let mut all_output_files: Vec<PipelineFile> = Vec::new();
let mut total_processed: usize = 0;
for (i, file) in files.into_iter().enumerate() {
let result = execute_sub_pipeline(ctx, sub_definition, vec![file], file_offset + i)?;
total_processed += result.files_processed;
all_output_files.extend(result.output_files);
}
Ok(NodeExecutionResult {
files_processed: total_processed,
output_files: all_output_files,
})
}
fn execute_group<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
sub_definition: &PipelineDefinition,
files: Vec<PipelineFile>,
file_offset: usize,
) -> Result<NodeExecutionResult, BntoError> {
let result = execute_sub_pipeline(ctx, sub_definition, files, file_offset)?;
Ok(NodeExecutionResult {
files_processed: result.files_processed,
output_files: result.output_files,
})
}
fn execute_sub_pipeline<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
definition: &PipelineDefinition,
files: Vec<PipelineFile>,
file_offset: usize,
) -> Result<NodeExecutionResult, BntoError> {
let processing_nodes: Vec<&PipelineNode> = definition
.nodes
.iter()
.filter(|n| !is_io_node(&n.node_type))
.collect();
let (output_files, files_processed) =
run_node_chain(ctx, &processing_nodes, files, file_offset)?;
Ok(NodeExecutionResult {
files_processed,
output_files,
})
}