#![cfg(feature = "scheduler")]
use cano::prelude::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum FlowState {
Start,
Done,
}
#[derive(Clone)]
struct FlakyJob {
runs: Arc<AtomicU32>,
fail_count: u32,
}
#[node(state = FlowState)]
impl FlakyJob {
type PrepResult = ();
type ExecResult = bool;
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
Ok(())
}
async fn exec(&self, _prep: Self::PrepResult) -> Self::ExecResult {
let n = self.runs.fetch_add(1, Ordering::SeqCst) + 1;
let succeeded = n > self.fail_count;
println!(
" [{}] run #{n} {}",
chrono::Utc::now().format("%H:%M:%S%.3f"),
if succeeded { "OK" } else { "FAIL" }
);
succeeded
}
async fn post(&self, _res: &Resources, ok: Self::ExecResult) -> Result<FlowState, CanoError> {
if ok {
Ok(FlowState::Done)
} else {
Err(CanoError::node_execution("flaky job failure"))
}
}
}
#[tokio::main]
async fn main() -> CanoResult<()> {
println!("⏰ Scheduler Backoff Example");
println!("============================");
let mut scheduler: Scheduler<FlowState> = Scheduler::new();
let flaky = FlakyJob {
runs: Arc::new(AtomicU32::new(0)),
fail_count: 4,
};
let workflow = Workflow::bare()
.register(FlowState::Start, flaky.clone())
.add_exit_state(FlowState::Done);
scheduler.every(
"flaky",
workflow,
FlowState::Start,
Duration::from_millis(200),
)?;
scheduler.set_backoff(
"flaky",
BackoffPolicy {
initial: Duration::from_millis(300),
multiplier: 2.0,
max_delay: Duration::from_secs(2),
jitter: 0.0,
streak_limit: Some(3),
},
)?;
let running = scheduler.start().await?;
tokio::time::sleep(Duration::from_millis(1500)).await;
let snap = running.status("flaky").await.expect("flow exists");
println!("\nAfter ~1.5s — status: {:?}", snap.status);
println!(
" run_count = {}, streak = {}",
snap.run_count, snap.failure_streak
);
if matches!(snap.status, Status::Tripped { .. }) {
println!("\nFlow tripped — calling reset_flow to give it another chance...");
running.reset_flow("flaky").await?;
}
tokio::time::sleep(Duration::from_millis(1500)).await;
let snap = running.status("flaky").await.expect("flow exists");
println!("\nAfter reset + ~1.5s — status: {:?}", snap.status);
println!(
" run_count = {}, streak = {}",
snap.run_count, snap.failure_streak
);
running.stop().await?;
Ok(())
}