use cano::prelude::*;
use std::time::{Duration, Instant};
use tokio::time::sleep;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
Parallel,
Done,
}
#[derive(Clone)]
struct Worker {
id: &'static str,
delay_ms: u64,
}
impl Worker {
fn new(id: &'static str, delay_ms: u64) -> Self {
Self { id, delay_ms }
}
}
#[task(state = Step)]
impl Worker {
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
sleep(Duration::from_millis(self.delay_ms)).await;
store.put(self.id, format!("done after {}ms", self.delay_ms))?;
println!(" [+] worker {} done ({}ms)", self.id, self.delay_ms);
Ok(TaskResult::Single(Step::Done))
}
}
fn workers() -> Vec<Worker> {
vec![
Worker::new("A", 50),
Worker::new("B", 150),
Worker::new("C", 300),
Worker::new("D", 500),
]
}
async fn run_strategy(label: &str, strategy: JoinStrategy) -> CanoResult<()> {
println!("--- {} ---", label);
let store = MemoryStore::new();
let join_config = JoinConfig::new(strategy, Step::Done);
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register_split(Step::Parallel, workers(), join_config)
.add_exit_state(Step::Done);
let start = Instant::now();
let result = workflow.orchestrate(Step::Parallel).await?;
let elapsed = start.elapsed();
let completed: usize = ["A", "B", "C", "D"]
.iter()
.filter(|&&id| store.get::<String>(id).is_ok())
.count();
println!(
" => strategy={label}, state={result:?}, elapsed={elapsed:?}, completions={completed}/4\n"
);
Ok(())
}
#[tokio::main]
async fn main() -> CanoResult<()> {
println!("=== Join Strategies Demo ===\n");
run_strategy("Any (early-exit after 1st success)", JoinStrategy::Any).await?;
run_strategy(
"PartialResults(2) (early-exit after 2nd completion)",
JoinStrategy::PartialResults(2),
)
.await?;
run_strategy(
"Quorum(2) (waits for all, then checks threshold)",
JoinStrategy::Quorum(2),
)
.await?;
run_strategy(
"Percentage(0.5) (waits for all, then checks 50 %)",
JoinStrategy::Percentage(0.5),
)
.await?;
println!("=== Done ===");
Ok(())
}