use taskflow_rs::{
Executor, Taskflow, CompositionBuilder, TaskflowComposable,
ParameterizedComposition, CompositionParams,
};
use std::sync::{Arc, Mutex};
fn main() {
println!("=== Enhanced Composition Demo ===\n");
demo_cloneable_work();
println!();
demo_parameterized_composition();
println!();
demo_reusable_patterns();
println!();
demo_complex_pipeline();
}
fn demo_cloneable_work() {
println!("1. Cloneable Work");
println!(" Compositions with properly cloned work\n");
let counter = Arc::new(Mutex::new(0));
let mut builder = CompositionBuilder::new();
let c = counter.clone();
let task1 = builder.emplace_cloneable(move || {
let mut count = c.lock().unwrap();
*count += 1;
println!(" Task 1: Count = {}", *count);
});
let c = counter.clone();
let task2 = builder.emplace_cloneable(move || {
let mut count = c.lock().unwrap();
*count += 10;
println!(" Task 2: Count = {}", *count);
});
task1.precede(&task2);
builder.mark_entries(&[task1]);
builder.mark_exits(&[task2]);
let composition = builder.build();
let mut executor = Executor::new(4);
println!(" First execution:");
let mut taskflow1 = Taskflow::new();
taskflow1.compose(&composition);
executor.run(&taskflow1).wait();
println!("\n Second execution:");
let mut taskflow2 = Taskflow::new();
taskflow2.compose(&composition);
executor.run(&taskflow2).wait();
println!("\n Final counter: {}", *counter.lock().unwrap());
println!(" ✓ Work was properly cloned and reused");
}
fn demo_parameterized_composition() {
println!("2. Parameterized Composition");
println!(" Compositions that adapt based on parameters\n");
let param_comp = ParameterizedComposition::new(|params| {
let mut builder = CompositionBuilder::new();
let num_tasks = params.get_int("num_tasks").unwrap_or(3);
let message = params.get_string("message").unwrap_or("Task").to_string();
let mut tasks = Vec::new();
for i in 0..num_tasks {
let msg = message.clone();
let task = builder.emplace_cloneable(move || {
println!(" {} {}", msg, i);
});
tasks.push(task);
}
for i in 0..tasks.len().saturating_sub(1) {
tasks[i].precede(&tasks[i + 1]);
}
if !tasks.is_empty() {
builder.mark_entries(&[tasks[0].clone()]);
builder.mark_exits(&[tasks.last().unwrap().clone()]);
}
builder.build()
});
let mut executor = Executor::new(4);
println!(" Configuration 1: 3 tasks");
let mut params1 = CompositionParams::new();
params1.set_int("num_tasks", 3);
params1.set_string("message", "Step".to_string());
let mut taskflow1 = Taskflow::new();
param_comp.compose_into(&mut taskflow1, ¶ms1);
executor.run(&taskflow1).wait();
println!("\n Configuration 2: 5 tasks");
let mut params2 = CompositionParams::new();
params2.set_int("num_tasks", 5);
params2.set_string("message", "Phase".to_string());
let mut taskflow2 = Taskflow::new();
param_comp.compose_into(&mut taskflow2, ¶ms2);
executor.run(&taskflow2).wait();
println!("\n ✓ Parameterized composition works");
}
fn demo_reusable_patterns() {
println!("3. Reusable Patterns");
println!(" Common task patterns as compositions\n");
let map_reduce_pattern = ParameterizedComposition::new(|params| {
let mut builder = CompositionBuilder::new();
let num_mappers = params.get_int("num_mappers").unwrap_or(4) as usize;
let mut mappers = Vec::new();
for i in 0..num_mappers {
let mapper = builder.emplace_cloneable(move || {
println!(" [Map {}] Processing data", i);
});
mappers.push(mapper);
}
let reducer = builder.emplace_cloneable(|| {
println!(" [Reduce] Aggregating results");
});
for mapper in &mappers {
mapper.precede(&reducer);
}
builder.mark_entries(&mappers);
builder.mark_exits(&[reducer]);
builder.build()
});
let mut executor = Executor::new(4);
println!(" Map-Reduce with 4 mappers:");
let mut params = CompositionParams::new();
params.set_int("num_mappers", 4);
let mut taskflow = Taskflow::new();
map_reduce_pattern.compose_into(&mut taskflow, ¶ms);
executor.run(&taskflow).wait();
println!("\n ✓ Reusable pattern works");
}
fn demo_complex_pipeline() {
println!("4. Complex Pipeline");
println!(" Multiple compositions chained together\n");
let preprocess = ParameterizedComposition::new(|params| {
let mut builder = CompositionBuilder::new();
let validate = builder.emplace_cloneable(|| {
println!(" [Preprocess] Validating input");
});
let clean = builder.emplace_cloneable(|| {
println!(" [Preprocess] Cleaning data");
});
validate.precede(&clean);
builder.mark_entries(&[validate]);
builder.mark_exits(&[clean]);
builder.build()
});
let process = ParameterizedComposition::new(|params| {
let mut builder = CompositionBuilder::new();
let num_workers = params.get_int("workers").unwrap_or(2) as usize;
let mut workers = Vec::new();
for i in 0..num_workers {
let worker = builder.emplace_cloneable(move || {
println!(" [Process] Worker {} processing", i);
});
workers.push(worker);
}
builder.mark_entries(&workers);
builder.mark_exits(&workers);
builder.build()
});
let postprocess = ParameterizedComposition::new(|_params| {
let mut builder = CompositionBuilder::new();
let aggregate = builder.emplace_cloneable(|| {
println!(" [Postprocess] Aggregating results");
});
let save = builder.emplace_cloneable(|| {
println!(" [Postprocess] Saving output");
});
aggregate.precede(&save);
builder.mark_entries(&[aggregate]);
builder.mark_exits(&[save]);
builder.build()
});
let mut executor = Executor::new(4);
let mut taskflow = Taskflow::new();
let params = CompositionParams::new();
let mut process_params = CompositionParams::new();
process_params.set_int("workers", 3);
let pre_instance = preprocess.compose_into(&mut taskflow, ¶ms);
let proc_instance = process.compose_into(&mut taskflow, &process_params);
let post_instance = postprocess.compose_into(&mut taskflow, ¶ms);
for pre_exit in pre_instance.exits() {
for proc_entry in proc_instance.entries() {
pre_exit.precede(proc_entry);
}
}
for proc_exit in proc_instance.exits() {
for post_entry in post_instance.entries() {
proc_exit.precede(post_entry);
}
}
println!(" Executing complex pipeline:");
executor.run(&taskflow).wait();
println!("\n ✓ Complex pipeline with multiple compositions works");
}