use crate::errors::BntoError;
use crate::metadata::InputCardinality;
use crate::pipeline::{PipelineFile, PipelineNode, is_container_node};
use crate::registry::NodeRegistry;
use super::{NodeExecutionResult, PipelineContext, run_node_chain};
pub(super) fn run_auto_iteration<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
nodes: &[&PipelineNode],
files: Vec<PipelineFile>,
) -> Result<(Vec<PipelineFile>, usize), BntoError> {
let runs = partition_into_runs(nodes, ctx.registry);
let mut current_files = files;
let mut total_processed: usize = 0;
for run in &runs {
match run {
Run::PerFileSequence { start, len } => {
let seq = &nodes[*start..*start + *len];
let result = run_per_file_sequence(ctx, seq, current_files)?;
total_processed += result.files_processed;
current_files = result.output_files;
}
Run::Container { index } => {
let container = &nodes[*index..*index + 1];
let (output, processed) = run_node_chain(ctx, container, current_files, 0)?;
total_processed += processed;
current_files = output;
}
Run::Batch { index } => {
let batch_node = &nodes[*index..*index + 1];
let (output, processed) = run_node_chain(ctx, batch_node, current_files, 0)?;
total_processed += processed;
current_files = output;
}
Run::Source { index } => {
let source_node = &nodes[*index..*index + 1];
let (output, processed) = run_node_chain(ctx, source_node, vec![], 0)?;
total_processed += processed;
current_files = output;
}
}
}
Ok((current_files, total_processed))
}
enum Run {
PerFileSequence { start: usize, len: usize },
Container { index: usize },
Batch { index: usize },
Source { index: usize },
}
fn is_batch_node(node: &PipelineNode, registry: &NodeRegistry) -> bool {
let params = &node.params;
registry
.resolve(&node.node_type, params)
.map(|p| p.metadata().input_cardinality == InputCardinality::Batch)
.unwrap_or(false)
}
fn is_source_node(node: &PipelineNode, registry: &NodeRegistry) -> bool {
let params = &node.params;
registry
.resolve(&node.node_type, params)
.map(|p| p.metadata().input_cardinality == InputCardinality::Source)
.unwrap_or(false)
}
fn partition_into_runs(nodes: &[&PipelineNode], registry: &NodeRegistry) -> Vec<Run> {
let mut runs = Vec::new();
let mut seq_start: Option<usize> = None;
for (i, node) in nodes.iter().enumerate() {
if is_container_node(&node.node_type) {
if let Some(start) = seq_start.take() {
runs.push(Run::PerFileSequence {
start,
len: i - start,
});
}
runs.push(Run::Container { index: i });
} else if is_batch_node(node, registry) {
if let Some(start) = seq_start.take() {
runs.push(Run::PerFileSequence {
start,
len: i - start,
});
}
runs.push(Run::Batch { index: i });
} else if is_source_node(node, registry) {
if let Some(start) = seq_start.take() {
runs.push(Run::PerFileSequence {
start,
len: i - start,
});
}
runs.push(Run::Source { index: i });
} else if seq_start.is_none() {
seq_start = Some(i);
}
}
if let Some(start) = seq_start {
runs.push(Run::PerFileSequence {
start,
len: nodes.len() - start,
});
}
runs
}
fn run_per_file_sequence<F: Fn() -> u64 + Copy>(
ctx: &PipelineContext<F>,
nodes: &[&PipelineNode],
files: Vec<PipelineFile>,
) -> Result<NodeExecutionResult, BntoError> {
let mut all_output_files: Vec<PipelineFile> = Vec::new();
let mut total_processed: usize = 0;
for (i, file) in files.into_iter().enumerate() {
let (output, processed) = run_node_chain(ctx, nodes, vec![file], i)?;
total_processed += processed;
all_output_files.extend(output);
}
Ok(NodeExecutionResult {
files_processed: total_processed,
output_files: all_output_files,
})
}