use cano::prelude::*;
use cano::recovery::CheckpointRow;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
#[derive(Default, Clone)]
struct MapStore(Arc<Mutex<HashMap<String, Vec<CheckpointRow>>>>);
#[cano::checkpoint_store]
impl MapStore {
async fn append(&self, workflow_id: &str, row: CheckpointRow) -> Result<(), CanoError> {
let mut map = self.0.lock().expect("MapStore mutex poisoned");
let rows = map.entry(workflow_id.to_string()).or_default();
if rows.iter().any(|r| r.sequence == row.sequence) {
return Err(CanoError::checkpoint_store(format!(
"duplicate checkpoint: workflow={workflow_id:?} sequence={}",
row.sequence
)));
}
rows.push(row);
Ok(())
}
async fn load_run(&self, workflow_id: &str) -> Result<Vec<CheckpointRow>, CanoError> {
let mut rows = self
.0
.lock()
.expect("MapStore mutex poisoned")
.get(workflow_id)
.cloned()
.unwrap_or_default();
rows.sort_by_key(|r| r.sequence);
Ok(rows)
}
async fn clear(&self, workflow_id: &str) -> Result<(), CanoError> {
self.0
.lock()
.expect("MapStore mutex poisoned")
.remove(workflow_id);
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
Init,
Process,
Finalize,
Done,
}
#[derive(Default)]
struct CrashOnce {
attempts: AtomicU32,
}
#[resource]
impl Resource for CrashOnce {}
struct InitTask;
#[task(state = Step)]
impl InitTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!(" init: preparing workflow");
Ok(TaskResult::Single(Step::Process))
}
}
struct ProcessTask;
#[task(state = Step)]
impl ProcessTask {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let n = res
.get::<CrashOnce, _>("crash")?
.attempts
.fetch_add(1, Ordering::SeqCst)
+ 1;
if n == 1 {
println!(" process: attempt {n} — simulated crash");
return Err(CanoError::task_execution("ProcessTask: simulated crash"));
}
println!(" process: attempt {n} — ok");
Ok(TaskResult::Single(Step::Finalize))
}
}
struct FinalizeTask;
#[task(state = Step)]
impl FinalizeTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!(" finalize: wrapping up");
Ok(TaskResult::Single(Step::Done))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = Arc::new(MapStore::default());
let run_id = "demo-custom-store";
let workflow = Workflow::new(Resources::new().insert("crash", CrashOnce::default()))
.register(Step::Init, InitTask)
.register(Step::Process, ProcessTask)
.register(Step::Finalize, FinalizeTask)
.add_exit_state(Step::Done)
.with_checkpoint_store(store.clone() as Arc<dyn CheckpointStore>)
.with_workflow_id(run_id);
println!("=== run 1: Process will crash ===");
match workflow.orchestrate(Step::Init).await {
Ok(s) => println!(" completed at {s:?} (unexpected)"),
Err(e) => println!(" stopped with error: {e}"),
}
println!("\nCheckpoint log after run 1 (crash left it intact):");
for row in store.load_run(run_id).await? {
println!(
" seq={:<3} state={:<9} task={:<20} kind={:?}",
row.sequence, row.state, row.task_id, row.kind
);
}
let dup_result = store
.append(run_id, CheckpointRow::new(0, "Init", "InitTask"))
.await;
assert!(
dup_result.is_err(),
"store must reject a duplicate (workflow_id, sequence)"
);
println!(
"\nDuplicate-sequence rejection: {}",
dup_result.unwrap_err()
);
println!("\n=== run 2: resume_from ===");
let final_state = workflow.resume_from(run_id).await?;
println!(" reached {final_state:?}");
assert_eq!(final_state, Step::Done);
println!("\nCheckpoint log after successful run (cleared by engine):");
let remaining = store.load_run(run_id).await?;
if remaining.is_empty() {
println!(" (empty — engine cleared it on success)");
} else {
for row in &remaining {
println!(" seq={} state={}", row.sequence, row.state);
}
}
println!("\n=== Done ===");
Ok(())
}