use cano::prelude::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
Poll,
Done,
}
struct FlakyPoller {
error_streak: u32,
calls: Arc<AtomicU32>,
}
impl FlakyPoller {
fn new(error_streak: u32) -> Self {
Self {
error_streak,
calls: Arc::new(AtomicU32::new(0)),
}
}
}
#[task::poll(state = Step)]
impl FlakyPoller {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
fn on_poll_error(&self) -> PollErrorPolicy {
PollErrorPolicy::RetryOnError { max_errors: 3 }
}
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let call_number = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
if call_number <= self.error_streak {
println!(
" poll #{call_number}: Err (consecutive errors: {call_number}/{}) <- transient failure",
self.error_streak
);
Err(CanoError::task_execution(format!(
"transient error on call {call_number}"
)))
} else {
println!(
" poll #{call_number}: Ready <- success after {}/{} errors",
call_number - 1,
self.error_streak
);
Ok(PollOutcome::Ready(TaskResult::Single(Step::Done)))
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== PollTask RetryOnError Demo ===");
println!("max_errors = 3 (up to 3 consecutive errors tolerated)\n");
println!("-- Scenario A: 3 errors then success (should succeed) --");
{
let poller = FlakyPoller::new(3);
let workflow = Workflow::bare()
.register(Step::Poll, poller)
.add_exit_state(Step::Done);
match workflow.orchestrate(Step::Poll).await {
Ok(state) => println!(" result: Ok({state:?}) -- loop tolerated the streak\n"),
Err(e) => println!(" result: Err({e}) -- unexpected failure\n"),
}
}
println!("-- Scenario B: 4 consecutive errors (should fail) --");
{
let poller = FlakyPoller::new(4);
let workflow = Workflow::bare()
.register(Step::Poll, poller)
.add_exit_state(Step::Done);
match workflow.orchestrate(Step::Poll).await {
Ok(state) => println!(" result: Ok({state:?}) -- unexpected success\n"),
Err(e) => println!(" result: Err(\"{e}\") -- loop aborted after streak > cap\n"),
}
}
println!("-- Scenario C: interleaved errors with Pending resets counter --");
{
use std::sync::atomic::{AtomicU32, Ordering};
struct InterleavedPoller {
calls: Arc<AtomicU32>,
}
#[task::poll(state = Step)]
impl InterleavedPoller {
fn on_poll_error(&self) -> PollErrorPolicy {
PollErrorPolicy::RetryOnError { max_errors: 3 }
}
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let n = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
match n {
1 | 2 => {
println!(" poll #{n}: Err (streak so far: {n})");
Err(CanoError::task_execution(format!("error #{n}")))
}
3 => {
println!(" poll #{n}: Pending <- resets consecutive-error counter to 0");
Ok(PollOutcome::Pending { delay_ms: 0 })
}
4 | 5 => {
println!(" poll #{n}: Err (streak so far: {})", n - 3);
Err(CanoError::task_execution(format!("error #{n}")))
}
_ => {
println!(" poll #{n}: Ready");
Ok(PollOutcome::Ready(TaskResult::Single(Step::Done)))
}
}
}
}
let workflow = Workflow::bare()
.register(
Step::Poll,
InterleavedPoller {
calls: Arc::new(AtomicU32::new(0)),
},
)
.add_exit_state(Step::Done);
match workflow.orchestrate(Step::Poll).await {
Ok(state) => println!(" result: Ok({state:?}) -- Pending reset the counter\n"),
Err(e) => println!(" result: Err({e}) -- unexpected failure\n"),
}
}
println!("=== Done ===");
Ok(())
}