use crate::error::CanoError;
use crate::resource::Resources;
use crate::task::{TaskConfig, TaskResult};
use std::borrow::Cow;
use std::fmt;
use std::hash::Hash;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollOutcome<TState> {
Ready(TaskResult<TState>),
Pending {
delay_ms: u64,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum PollErrorPolicy {
#[default]
FailFast,
RetryOnError {
max_errors: u32,
},
}
#[crate::task::poll]
pub trait PollTask<TState, TResourceKey = Cow<'static, str>>: Send + Sync
where
TState: Clone + fmt::Debug + Send + Sync + 'static,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed(std::any::type_name::<Self>())
}
fn on_poll_error(&self) -> PollErrorPolicy {
PollErrorPolicy::FailFast
}
async fn poll(&self, res: &Resources<TResourceKey>) -> Result<PollOutcome<TState>, CanoError>;
}
pub async fn run_poll_loop<P, S, K>(p: &P, res: &Resources<K>) -> Result<TaskResult<S>, CanoError>
where
P: PollTask<S, K> + ?Sized,
S: Clone + fmt::Debug + Send + Sync + 'static,
K: Hash + Eq + Send + Sync + 'static,
{
let policy = p.on_poll_error();
let mut consecutive_errors: u32 = 0;
loop {
match p.poll(res).await {
Ok(PollOutcome::Ready(result)) => {
#[cfg(feature = "metrics")]
crate::metrics::poll_iteration(true);
return Ok(result);
}
Ok(PollOutcome::Pending { delay_ms }) => {
#[cfg(feature = "metrics")]
crate::metrics::poll_iteration(false);
consecutive_errors = 0;
if delay_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
}
Err(e) => match &policy {
PollErrorPolicy::FailFast => return Err(e),
PollErrorPolicy::RetryOnError { max_errors } => {
consecutive_errors += 1;
if consecutive_errors > *max_errors {
return Err(e);
}
}
},
}
}
}
pub type DynPollTask<TState, TResourceKey = Cow<'static, str>> =
dyn PollTask<TState, TResourceKey> + Send + Sync;
pub type PollTaskObject<TState, TResourceKey = Cow<'static, str>> =
std::sync::Arc<DynPollTask<TState, TResourceKey>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::resource::Resources;
use crate::task;
use crate::task::Task;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step {
Wait,
Done,
Next,
}
struct ImmediatePoller;
#[task::poll]
impl PollTask<Step> for ImmediatePoller {
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
Ok(PollOutcome::Ready(TaskResult::Single(Step::Done)))
}
}
#[tokio::test]
async fn test_poll_task_immediate_via_poll() {
let poller = ImmediatePoller;
let res = Resources::new();
let result = PollTask::poll(&poller, &res).await.unwrap();
assert_eq!(result, PollOutcome::Ready(TaskResult::Single(Step::Done)));
}
#[tokio::test]
async fn test_poll_task_immediate_via_task_run() {
let poller = ImmediatePoller;
let res = Resources::new();
let result = Task::run(&poller, &res).await.unwrap();
assert_eq!(result, TaskResult::Single(Step::Done));
}
struct CountingPoller {
target: u32,
count: AtomicU32,
}
impl CountingPoller {
fn new(target: u32) -> Self {
Self {
target,
count: AtomicU32::new(0),
}
}
}
#[task::poll]
impl PollTask<Step> for CountingPoller {
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let n = self.count.fetch_add(1, Ordering::Relaxed) + 1;
if n >= self.target {
Ok(PollOutcome::Ready(TaskResult::Single(Step::Done)))
} else {
Ok(PollOutcome::Pending { delay_ms: 0 })
}
}
}
#[tokio::test]
async fn test_poll_task_multiple_polls() {
let poller = CountingPoller::new(3);
let res = Resources::new();
let result = Task::run(&poller, &res).await.unwrap();
assert_eq!(result, TaskResult::Single(Step::Done));
assert_eq!(poller.count.load(Ordering::Relaxed), 3);
}
struct ErrorPoller;
#[task::poll]
impl PollTask<Step> for ErrorPoller {
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
Err(CanoError::task_execution("poll failed"))
}
}
#[tokio::test]
async fn test_poll_task_error_propagates() {
let poller = ErrorPoller;
let res = Resources::new();
let err = Task::run(&poller, &res).await.unwrap_err();
assert!(matches!(err, CanoError::TaskExecution(_)));
}
struct SplitPoller;
#[task::poll]
impl PollTask<Step> for SplitPoller {
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
Ok(PollOutcome::Ready(TaskResult::Split(vec![
Step::Wait,
Step::Next,
])))
}
}
#[tokio::test]
async fn test_poll_task_split() {
let poller = SplitPoller;
let res = Resources::new();
let result = Task::run(&poller, &res).await.unwrap();
assert_eq!(result, TaskResult::Split(vec![Step::Wait, Step::Next]));
}
struct CustomPoller;
#[task::poll]
impl PollTask<Step> for CustomPoller {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed("my-custom-poller")
}
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
Ok(PollOutcome::Ready(TaskResult::Single(Step::Done)))
}
}
#[test]
fn test_poll_task_config_override() {
let poller = CustomPoller;
assert_eq!(
PollTask::<Step>::config(&poller).retry_mode.max_attempts(),
1
);
}
#[test]
fn test_poll_task_name_override() {
let poller = CustomPoller;
assert_eq!(PollTask::<Step>::name(&poller), "my-custom-poller");
}
#[test]
fn test_companion_task_forwards_config_and_name() {
let poller = CustomPoller;
assert_eq!(Task::config(&poller).retry_mode.max_attempts(), 1);
assert_eq!(Task::name(&poller), "my-custom-poller");
}
struct DefaultConfigPoller;
#[task::poll]
impl PollTask<Step> for DefaultConfigPoller {
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
Ok(PollOutcome::Ready(TaskResult::Single(Step::Done)))
}
}
#[test]
fn test_poll_task_default_config_is_minimal() {
let poller = DefaultConfigPoller;
assert_eq!(
PollTask::<Step>::config(&poller).retry_mode.max_attempts(),
1,
"PollTask default config must be minimal (no retries)"
);
}
#[test]
fn test_poll_task_default_name_contains_type_name() {
let poller = DefaultConfigPoller;
let name = PollTask::<Step>::name(&poller);
assert!(
name.contains("DefaultConfigPoller"),
"default name should contain the type name, got: {name}",
);
}
#[tokio::test]
async fn test_poll_task_as_dyn_task() {
let poller: Arc<dyn Task<Step>> = Arc::new(ImmediatePoller);
let res = Resources::new();
let result = Task::run(poller.as_ref(), &res).await.unwrap();
assert_eq!(result, TaskResult::Single(Step::Done));
}
#[tokio::test]
async fn test_poll_task_in_workflow() {
use crate::workflow::Workflow;
struct NextTask;
#[task]
impl Task<Step> for NextTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
Ok(TaskResult::Single(Step::Done))
}
}
let poller = CountingPoller::new(2);
let workflow = Workflow::bare()
.register(Step::Wait, poller)
.register(Step::Next, NextTask)
.add_exit_state(Step::Done);
let result = workflow.orchestrate(Step::Wait).await.unwrap();
assert_eq!(result, Step::Done);
}
#[tokio::test]
async fn test_run_poll_loop_dyn_dispatch() {
let poller: &dyn PollTask<Step> = &ImmediatePoller;
let res = Resources::new();
let result = run_poll_loop(poller, &res).await.unwrap();
assert_eq!(result, TaskResult::Single(Step::Done));
}
struct ErrorThenReadyPoller {
errors_before_ready: u32,
count: AtomicU32,
}
impl ErrorThenReadyPoller {
fn new(errors_before_ready: u32) -> Self {
Self {
errors_before_ready,
count: AtomicU32::new(0),
}
}
}
#[task::poll]
impl PollTask<Step> for ErrorThenReadyPoller {
fn on_poll_error(&self) -> PollErrorPolicy {
PollErrorPolicy::RetryOnError { max_errors: 2 }
}
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
let n = self.count.fetch_add(1, Ordering::Relaxed);
if n < self.errors_before_ready {
Err(CanoError::task_execution("transient"))
} else {
Ok(PollOutcome::Ready(TaskResult::Single(Step::Done)))
}
}
}
#[tokio::test]
async fn test_retry_on_error_within_budget_succeeds() {
let poller = ErrorThenReadyPoller::new(2);
let res = Resources::new();
let result = Task::run(&poller, &res).await.unwrap();
assert_eq!(result, TaskResult::Single(Step::Done));
assert_eq!(poller.count.load(Ordering::Relaxed), 3); }
#[tokio::test]
async fn test_retry_on_error_exceeds_budget_fails() {
let poller = ErrorThenReadyPoller::new(3);
let res = Resources::new();
let err = Task::run(&poller, &res).await.unwrap_err();
assert!(matches!(err, CanoError::TaskExecution(_)));
assert_eq!(poller.count.load(Ordering::Relaxed), 3);
}
struct ErrorPendingErrorReadyPoller {
count: AtomicU32,
}
impl ErrorPendingErrorReadyPoller {
fn new() -> Self {
Self {
count: AtomicU32::new(0),
}
}
}
#[task::poll]
impl PollTask<Step> for ErrorPendingErrorReadyPoller {
fn on_poll_error(&self) -> PollErrorPolicy {
PollErrorPolicy::RetryOnError { max_errors: 1 }
}
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
match self.count.fetch_add(1, Ordering::Relaxed) {
0 => Err(CanoError::task_execution("first error")),
1 => Ok(PollOutcome::Pending { delay_ms: 0 }),
2 => Err(CanoError::task_execution("second error after reset")),
_ => Ok(PollOutcome::Ready(TaskResult::Single(Step::Done))),
}
}
}
#[tokio::test]
async fn test_retry_on_error_pending_resets_counter() {
let poller = ErrorPendingErrorReadyPoller::new();
let res = Resources::new();
let result = Task::run(&poller, &res).await.unwrap();
assert_eq!(result, TaskResult::Single(Step::Done));
assert_eq!(poller.count.load(Ordering::Relaxed), 4);
}
#[tokio::test]
async fn test_default_on_poll_error_is_fail_fast() {
let poller = ErrorPoller;
let res = Resources::new();
let err = Task::run(&poller, &res).await.unwrap_err();
assert!(matches!(err, CanoError::TaskExecution(_)));
assert_eq!(
PollTask::<Step>::on_poll_error(&poller),
PollErrorPolicy::FailFast
);
}
struct InfinitePendingPoller;
#[task::poll]
impl PollTask<Step> for InfinitePendingPoller {
fn config(&self) -> TaskConfig {
TaskConfig::minimal().with_attempt_timeout(std::time::Duration::from_millis(20))
}
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<Step>, CanoError> {
Ok(PollOutcome::Pending { delay_ms: 1 })
}
}
#[tokio::test]
async fn test_attempt_timeout_bounds_infinite_pending_loop() {
use crate::workflow::Workflow;
let workflow = Workflow::bare()
.register(Step::Wait, InfinitePendingPoller)
.add_exit_state(Step::Done);
let start = std::time::Instant::now();
let err = workflow.orchestrate(Step::Wait).await.unwrap_err();
let elapsed = start.elapsed();
assert!(
matches!(err.inner(), CanoError::Timeout(_)),
"expected CanoError::Timeout, got: {err:?}"
);
assert!(
elapsed < std::time::Duration::from_millis(500),
"timeout took too long: {elapsed:?}"
);
}
}
#[cfg(all(test, feature = "metrics"))]
mod metrics_tests {
use super::*;
use crate::metrics::test_support::*;
use crate::task::Task;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum St {
Done,
}
struct TwicePendingPoller {
count: Arc<AtomicU32>,
}
#[crate::task::poll]
impl PollTask<St> for TwicePendingPoller {
async fn poll(&self, _res: &Resources) -> Result<PollOutcome<St>, CanoError> {
let n = self.count.fetch_add(1, Ordering::Relaxed);
if n < 2 {
Ok(PollOutcome::Pending { delay_ms: 0 })
} else {
Ok(PollOutcome::Ready(TaskResult::Single(St::Done)))
}
}
}
#[test]
fn poll_iterations_counted_pending_twice_ready_once() {
let (result, rows) = run_with_recorder(|| async {
let poller = TwicePendingPoller {
count: Arc::new(AtomicU32::new(0)),
};
let res = crate::resource::Resources::new();
run_poll_loop(&poller, &res).await
});
assert!(result.is_ok(), "poll loop should succeed: {result:?}");
assert_eq!(
counter(
&rows,
"cano_poll_iterations_total",
&[("outcome", "pending")]
),
2,
"expected 2 pending iterations"
);
assert_eq!(
counter(&rows, "cano_poll_iterations_total", &[("outcome", "ready")]),
1,
"expected 1 ready iteration"
);
}
}