use jobflow::actions::action;
use jobflow::job_ordering::{GraphTraversalTaskOrderer, JobOrderer};
use jobflow::*;
use test_log::test;
use tracing::info;
#[test]
fn number_flow_stepped() -> Result<(), FlowError> {
let mut flow = FlowBuilder::new().build();
let test_data = Vec::from_iter(0..32);
populate_flow(&test_data, &mut flow)?;
let expected = expected_result(&test_data);
let result: Vec<i32> = flow
.apply(test_data)
.expect("failed to run flow to produce");
info!("got result: {:?}", &result);
assert_eq!(result, expected);
Ok(())
}
#[test]
fn number_flow_graph() -> Result<(), FlowError> {
let mut flow = FlowBuilder::new()
.with_task_orderer(GraphTraversalTaskOrderer)
.build();
let test_data = Vec::from_iter(0..32);
populate_flow(&test_data, &mut flow)?;
let expected = expected_result(&test_data);
let result: Vec<_> = flow
.apply(test_data)
.expect("failed to run flow to produce");
assert_eq!(result, expected);
Ok(())
}
fn populate_flow<T: JobOrderer>(
test_data: &Vec<i32>,
flow: &mut Flow<Vec<i32>, Vec<i32>, T>,
) -> Result<(), FlowError> {
let ref f = flow.create("init", move |i: Vec<i32>| i).reusable()?;
flow.input().flows_into(f)?;
let mut squares = vec![];
for i in 0..test_data.len() {
let get_nth = f.flows_into(flow.create(format!("get[{i}]"), move |v: Vec<i32>| v[i]))?;
let step_ref = get_nth.flows_into(flow.create(
format!("square[{i}]"),
action(|i| {
i * i
}),
))?;
squares.push(step_ref);
}
let ref sum = squares
.flows_into(
flow.create("sum", action(|i: Vec<i32>| -> i32 { i.iter().sum() }))
.funnelled()?,
)?
.reusable()?;
let mut final_sums = vec![];
for i in 0..test_data.len() {
let step_ref = flow.create(
format!("addSum[{i}]"),
action(move |(vs, sum): (Vec<i32>, i32)| vs[i] + sum),
);
let step_ref = (f, sum).flows_into(step_ref)?;
final_sums.push(step_ref);
}
let output = final_sums.flows_into(flow.create("output", |t: Vec<i32>| t).funnelled()?)?;
output.flows_into(flow.output())?;
Ok(())
}
#[must_use]
fn expected_result(t: &[i32]) -> Vec<i32> {
let sum = t.iter().map(|&x| x * x).sum::<i32>();
t.iter().map(|&x| x + sum).collect()
}