use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use cano::RedbCheckpointStore;
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage {
Route,
Wait,
Crunch,
Grind,
Done,
}
struct Config {
proceed: bool,
}
#[resource]
impl Resource for Config {}
struct Counter {
done: Arc<AtomicU32>,
threshold: u32,
}
impl Counter {
fn new(threshold: u32) -> (Self, Arc<AtomicU32>) {
let done = Arc::new(AtomicU32::new(0));
let ctr = Counter {
done: Arc::clone(&done),
threshold,
};
(ctr, done)
}
fn is_done(&self) -> bool {
self.done.load(Ordering::Acquire) >= self.threshold
}
}
#[resource]
impl Resource for Counter {}
struct Router;
#[task::router(state = Stage)]
impl Router {
async fn route(&self, res: &Resources) -> Result<TaskResult<Stage>, CanoError> {
let config = res.get::<Config, _>("config")?;
if config.proceed {
println!("route : config.proceed=true → Wait");
Ok(TaskResult::Single(Stage::Wait))
} else {
Err(CanoError::configuration(
"config.proceed must be true for this tour",
))
}
}
}
struct Waiter;
#[task::poll(state = Stage)]
impl Waiter {
async fn poll(&self, res: &Resources) -> Result<PollOutcome<Stage>, CanoError> {
let counter = res.get::<Counter, _>("counter")?;
if counter.is_done() {
println!("wait : counter reached threshold → Crunch");
Ok(PollOutcome::Ready(TaskResult::Single(Stage::Crunch)))
} else {
Ok(PollOutcome::Pending { delay_ms: 5 })
}
}
}
struct Cruncher;
#[task::batch(state = Stage)]
impl Cruncher {
fn concurrency(&self) -> usize {
4
}
async fn load(&self, _res: &Resources) -> Result<Vec<u32>, CanoError> {
let items: Vec<u32> = (1..=8).collect();
println!("crunch : loaded {} items", items.len());
Ok(items)
}
async fn process_item(&self, item: &u32) -> Result<u32, CanoError> {
Ok(item * item)
}
async fn finish(
&self,
_res: &Resources,
outputs: Vec<Result<u32, CanoError>>,
) -> Result<TaskResult<Stage>, CanoError> {
let sum: u32 = outputs.into_iter().filter_map(|r| r.ok()).sum();
println!("crunch : sum of squares 1²+…+8² = {sum} → Grind");
Ok(TaskResult::Single(Stage::Grind))
}
}
struct Grinder;
#[task::stepped(state = Stage)]
impl Grinder {
async fn step(
&self,
_res: &Resources,
cursor: Option<u32>,
) -> Result<StepOutcome<u32, Stage>, CanoError> {
let n = cursor.unwrap_or(0);
println!("grind : step {}", n + 1);
if n >= 6 {
println!("grind : complete → Done");
Ok(StepOutcome::Done(TaskResult::Single(Stage::Done)))
} else {
Ok(StepOutcome::More(n + 1))
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let run_id = "processing-models-tour";
let dir = tempfile::tempdir()?;
let store = Arc::new(RedbCheckpointStore::new(dir.path().join("tour.redb"))?);
const THRESHOLD: u32 = 4;
let (counter, done) = Counter::new(THRESHOLD);
tokio::spawn(async move {
for _ in 0..THRESHOLD {
tokio::time::sleep(Duration::from_millis(8)).await;
done.fetch_add(1, Ordering::Release);
}
});
let resources = Resources::new()
.insert("config", Config { proceed: true })
.insert("counter", counter);
println!("=== processing models tour ===\n");
let workflow = Workflow::new(resources)
.register_router(Stage::Route, Router)
.register(Stage::Wait, Waiter)
.register(Stage::Crunch, Cruncher)
.register_stepped(Stage::Grind, Grinder)
.add_exit_state(Stage::Done)
.with_checkpoint_store(store.clone())
.with_workflow_id(run_id);
let result = workflow.orchestrate(Stage::Route).await?;
assert_eq!(result, Stage::Done);
println!("\ncompleted at {result:?}");
let rows = store.load_run(run_id).await?;
println!(
"\ncheckpoint log after successful run: {} row(s) (cleared on success)",
rows.len()
);
assert!(
rows.is_empty(),
"checkpoint log should be empty after a successful run"
);
println!(
"\nNote: the `Route` router state never wrote a checkpoint row; `Wait`, `Crunch`,\n\
`Grind` (entry + cursor rows), and `Done` each did during the run.\n\
Run `cargo test -p cano --features recovery` for the cross-model interop assertion."
);
Ok(())
}