use std::sync::Arc;
use cano::RedbCheckpointStore;
use cano::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage {
Crunch,
Report,
Done,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Progress {
processed: u32,
total: u32,
}
struct Cruncher {
total: u32,
}
#[task::stepped(state = Stage)]
impl Cruncher {
async fn step(
&self,
_res: &Resources,
cursor: Option<Progress>,
) -> Result<StepOutcome<Progress, Stage>, CanoError> {
let p = cursor.unwrap_or(Progress {
processed: 0,
total: self.total,
});
println!("crunch : item {}/{} processed", p.processed + 1, p.total);
let next = Progress {
processed: p.processed + 1,
total: p.total,
};
if next.processed >= next.total {
Ok(StepOutcome::Done(TaskResult::Single(Stage::Report)))
} else {
Ok(StepOutcome::More(next))
}
}
}
struct Reporter {
total: u32,
}
#[task(state = Stage)]
impl Reporter {
async fn run_bare(&self) -> Result<TaskResult<Stage>, CanoError> {
println!(
"report : all {} items processed — transitioning to Done",
self.total
);
Ok(TaskResult::Single(Stage::Done))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
const TOTAL: u32 = 8;
let run_id = "stepped-demo";
let dir = tempfile::tempdir()?;
let store = Arc::new(RedbCheckpointStore::new(dir.path().join("stepped.redb"))?);
println!("=== stepped_task example ({TOTAL} items) ===\n");
let workflow = Workflow::bare()
.register_stepped(Stage::Crunch, Cruncher { total: TOTAL })
.register(Stage::Report, Reporter { total: TOTAL })
.add_exit_state(Stage::Done)
.with_checkpoint_store(store.clone())
.with_workflow_id(run_id);
let result = workflow.orchestrate(Stage::Crunch).await?;
assert_eq!(result, Stage::Done);
println!("\ncompleted at {result:?}");
let rows = store.load_run(run_id).await?;
println!(
"\ncheckpoint log after successful run: {} row(s) (cleared on success)",
rows.len()
);
assert!(
rows.is_empty(),
"checkpoint log should be empty after a successful run"
);
println!("\nNote: to observe cursor persistence and mid-loop resumption,");
println!("run `cargo test --features recovery --test stepped_resume_e2e`.");
Ok(())
}