use cano::prelude::*;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
struct TimestampLog {
epoch: Instant,
entries: Arc<Mutex<Vec<(usize, u64, u64)>>>,
}
impl TimestampLog {
fn new() -> Self {
Self {
epoch: Instant::now(),
entries: Arc::new(Mutex::new(Vec::new())),
}
}
fn record(&self, task_id: usize, start: Instant, finish: Instant) {
let start_ms = start.duration_since(self.epoch).as_millis() as u64;
let finish_ms = finish.duration_since(self.epoch).as_millis() as u64;
self.entries
.lock()
.unwrap()
.push((task_id, start_ms, finish_ms));
}
fn snapshot(&self) -> Vec<(usize, u64, u64)> {
let mut v = self.entries.lock().unwrap().clone();
v.sort_by_key(|&(id, start, _)| (start, id));
v
}
}
#[resource]
impl Resource for TimestampLog {}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
ParallelWork,
Summarize,
Done,
}
struct Worker {
id: usize,
}
#[task(state = Step)]
impl Worker {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let log = res.get::<TimestampLog, _>("log")?;
let start = Instant::now();
println!(
" task {:>2}: start (+{}ms)",
self.id,
start.duration_since(log.epoch).as_millis()
);
tokio::time::sleep(Duration::from_millis(50)).await;
let finish = Instant::now();
println!(
" task {:>2}: finish (+{}ms)",
self.id,
finish.duration_since(log.epoch).as_millis()
);
log.record(self.id, start, finish);
Ok(TaskResult::Single(Step::Summarize))
}
}
struct Summarize;
#[task(state = Step)]
impl Summarize {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let log = res.get::<TimestampLog, _>("log")?;
let entries = log.snapshot();
println!("\n Timeline (task_id | start_ms | finish_ms):");
for (id, start, finish) in &entries {
println!(" {:>2} {:>6}ms -> {:>6}ms", id, start, finish);
}
let mut max_concurrent = 0usize;
for &(_, s1, f1) in &entries {
let concurrent = entries
.iter()
.filter(|&&(_, s2, f2)| s2 < f1 && s1 < f2)
.count();
max_concurrent = max_concurrent.max(concurrent);
}
println!("\n max concurrent tasks observed: {max_concurrent}");
assert!(
max_concurrent <= 2,
"bulkhead=2 violated: {max_concurrent} tasks ran concurrently"
);
println!(" bulkhead=2 respected (max_concurrent={max_concurrent} <= 2)");
Ok(TaskResult::Single(Step::Done))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("=== Split / Join with Bulkhead Demo ===\n");
println!("8 tasks, bulkhead=2: at most 2 run at a time (4 waves expected)\n");
let log = TimestampLog::new();
let resources = Resources::new().insert("log", log);
let workers: Vec<Worker> = (0..8).map(|id| Worker { id }).collect();
let join_config = JoinConfig::new(JoinStrategy::All, Step::Summarize).with_bulkhead(2);
let workflow = Workflow::new(resources)
.register_split(Step::ParallelWork, workers, join_config)
.register(Step::Summarize, Summarize)
.add_exit_state(Step::Done);
let result = workflow.orchestrate(Step::ParallelWork).await?;
println!("\ncompleted at {result:?}");
println!("\n=== Done ===");
Ok(())
}