use std::io::Write as _;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use cano::RedbCheckpointStore;
use cano::prelude::*;
const TARGET: u32 = 10;
const WORKFLOW_ID: &str = "stepped-run";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State {
Crunch,
Done,
}
struct Cruncher {
is_resume: bool,
first_call: AtomicBool,
}
impl Cruncher {
fn new(is_resume: bool) -> Self {
Self {
is_resume,
first_call: AtomicBool::new(true),
}
}
}
#[task::stepped(state = State)]
impl Cruncher {
async fn step(
&self,
_res: &Resources,
cursor: Option<u32>,
) -> Result<StepOutcome<u32, State>, CanoError> {
let n = cursor.unwrap_or(0);
if self.is_resume && cursor.is_some() && self.first_call.swap(false, Ordering::SeqCst) {
println!("RESUMED cursor={n}");
let _ = std::io::stdout().flush();
} else {
println!("STEP cursor={n}");
let _ = std::io::stdout().flush();
}
tokio::time::sleep(Duration::from_millis(50)).await;
if n + 1 >= TARGET {
Ok(StepOutcome::Done(TaskResult::Single(State::Done)))
} else {
Ok(StepOutcome::More(n + 1))
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = std::env::args().collect();
if args.len() != 3 {
eprintln!("usage: stepped_resume <db_path> <run|resume>");
std::process::exit(2);
}
let db_path = &args[1];
let mode = args[2].as_str();
let store = Arc::new(RedbCheckpointStore::new(db_path)?);
let is_resume = mode == "resume";
let workflow = Workflow::bare()
.register_stepped(State::Crunch, Cruncher::new(is_resume))
.add_exit_state(State::Done)
.with_checkpoint_store(store)
.with_workflow_id(WORKFLOW_ID);
let final_state = match mode {
"resume" => {
let result = workflow.resume_from(WORKFLOW_ID).await?;
println!("RESUME COMPLETE final={result:?}");
let _ = std::io::stdout().flush();
result
}
_ => {
let result = workflow.orchestrate(State::Crunch).await?;
println!("RUN COMPLETE final={result:?}");
let _ = std::io::stdout().flush();
result
}
};
let _ = final_state;
Ok(())
}