use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use cano::RedbCheckpointStore;
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
Start,
Process,
Finalize,
Done,
}
#[derive(Default)]
struct CrashOnce {
attempts: AtomicU32,
}
#[cano::resource]
impl Resource for CrashOnce {}
struct Watcher;
impl WorkflowObserver for Watcher {
fn on_checkpoint(&self, workflow_id: &str, seq: u64) {
println!(" ✓ checkpoint workflow_id={workflow_id} sequence={seq}");
}
fn on_resume(&self, workflow_id: &str, seq: u64) {
println!(" ↺ resuming workflow_id={workflow_id} from_sequence={seq}");
}
}
#[derive(Clone)]
struct StartTask;
#[derive(Clone)]
struct ProcessTask;
#[derive(Clone)]
struct FinalizeTask;
#[task(state = Step)]
impl StartTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!(" start");
Ok(TaskResult::Single(Step::Process))
}
}
#[task(state = Step)]
impl ProcessTask {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn run(&self, res: &Resources) -> Result<TaskResult<Step>, CanoError> {
let n = res
.get::<CrashOnce, _>("crash")?
.attempts
.fetch_add(1, Ordering::SeqCst)
+ 1;
if n == 1 {
println!(" process: attempt {n} — crash!");
return Err(CanoError::task_execution("simulated crash in ProcessTask"));
}
println!(" process: attempt {n} — ok");
Ok(TaskResult::Single(Step::Finalize))
}
}
#[task(state = Step)]
impl FinalizeTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!(" finalize");
Ok(TaskResult::Single(Step::Done))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempfile::tempdir()?;
let store = Arc::new(RedbCheckpointStore::new(dir.path().join("recovery.redb"))?);
let run_id = "demo-run-42";
let workflow = Workflow::new(Resources::new().insert("crash", CrashOnce::default()))
.register(Step::Start, StartTask)
.register(Step::Process, ProcessTask)
.register(Step::Finalize, FinalizeTask)
.add_exit_state(Step::Done)
.with_checkpoint_store(store.clone())
.with_workflow_id(run_id)
.with_observer(Arc::new(Watcher));
println!("── run 1: orchestrate (Process will crash) ──");
if let Err(e) = workflow.orchestrate(Step::Start).await {
println!(" stopped: {e}");
}
println!("checkpoint log after run 1 (the crash left it intact):");
for row in store.load_run(run_id).await? {
println!(" #{:<2} {:<9} {}", row.sequence, row.state, row.task_id);
}
println!("\n── run 2: resume_from ──");
let final_state = workflow.resume_from(run_id).await?;
println!(" reached {final_state:?} — checkpoint log cleared on success");
assert_eq!(final_state, Step::Done);
assert!(store.load_run(run_id).await?.is_empty());
Ok(())
}