use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
AwaitJob,
Process,
Done,
}
struct Job {
ticks_done: Arc<AtomicU32>,
ticks_total: u32,
}
impl Job {
fn new(ticks_total: u32) -> (Self, Arc<AtomicU32>) {
let counter = Arc::new(AtomicU32::new(0));
let job = Job {
ticks_done: Arc::clone(&counter),
ticks_total,
};
(job, counter)
}
fn done(&self) -> u32 {
self.ticks_done.load(Ordering::Acquire)
}
}
#[resource]
impl Resource for Job {}
struct AwaitJob;
#[task::poll(state = Step)]
impl AwaitJob {
async fn poll(&self, res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let job = res.get::<Job, _>("job")?;
let done = job.done();
let total = job.ticks_total;
if done >= total {
println!("await job : complete ({done}/{total} ticks)");
Ok(PollOutcome::Ready(TaskResult::Single(Step::Process)))
} else {
let remaining = total - done;
let delay_ms = if remaining <= 2 { 2 } else { 5 };
println!("await job : progress {done}/{total} ticks, next poll in {delay_ms}ms");
Ok(PollOutcome::Pending { delay_ms })
}
}
}
struct Process;
#[task(state = Step)]
impl Process {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
println!("process : handling completed job results");
Ok(TaskResult::Single(Step::Done))
}
}
#[tokio::main]
async fn main() -> CanoResult<()> {
const TICKS: u32 = 6;
let (job, counter) = Job::new(TICKS);
tokio::spawn(async move {
for _ in 0..TICKS {
tokio::time::sleep(Duration::from_millis(10)).await;
counter.fetch_add(1, Ordering::Release);
}
});
println!("=== poll_task example ===");
println!("background job: {TICKS} ticks at ~10 ms each\n");
let resources = Resources::new().insert("job", job);
let workflow = Workflow::new(resources)
.register(Step::AwaitJob, AwaitJob)
.register(Step::Process, Process)
.add_exit_state(Step::Done);
let result = workflow.orchestrate(Step::AwaitJob).await?;
assert_eq!(result, Step::Done);
println!("\ncompleted at {result:?}");
Ok(())
}