mod backoff;
pub use backoff::BackoffPolicy;
use crate::error::{CanoError, CanoResult};
use crate::workflow::Workflow;
use chrono::{DateTime, Utc};
use cron::Schedule as CronSchedule;
use std::borrow::Cow;
use std::collections::HashMap;
use std::hash::Hash;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::{Notify, RwLock, mpsc, oneshot, watch};
use tokio::task::JoinHandle;
use tokio::time::{Duration, sleep};
#[cfg(feature = "tracing")]
use tracing::Instrument;
enum SchedulerCommand {
Stop,
Trigger {
id: String,
response: oneshot::Sender<CanoResult<()>>,
},
Reset {
id: String,
response: oneshot::Sender<CanoResult<()>>,
},
}
#[derive(Debug, Clone)]
pub enum Schedule {
Every(Duration),
Cron(String),
Manual,
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum Status {
Idle,
Running,
Completed,
Failed(String),
Backoff {
until: DateTime<Utc>,
streak: u32,
last_error: String,
},
Tripped {
streak: u32,
last_error: String,
},
}
#[derive(Debug, Clone)]
pub struct FlowInfo {
pub id: String,
pub status: Status,
pub run_count: u64,
pub last_run: Option<DateTime<Utc>>,
pub failure_streak: u32,
pub next_eligible: Option<DateTime<Utc>>,
}
#[derive(Clone)]
enum ParsedSchedule {
Every(Duration),
Cron(Box<CronSchedule>),
Manual,
}
struct FlowData<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
workflow: Arc<Workflow<TState, TResourceKey>>,
initial_state: TState,
schedule: ParsedSchedule,
info: Arc<RwLock<FlowInfo>>,
policy: Option<Arc<BackoffPolicy>>,
}
impl<TState, TResourceKey> Clone for FlowData<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
workflow: Arc::clone(&self.workflow),
initial_state: self.initial_state.clone(),
schedule: self.schedule.clone(),
info: Arc::clone(&self.info),
policy: self.policy.clone(),
}
}
}
pub struct Scheduler<TState, TResourceKey = Cow<'static, str>>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
workflows: HashMap<String, FlowData<TState, TResourceKey>>,
flow_order: Vec<String>,
}
impl<TState, TResourceKey> Scheduler<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
workflows: HashMap::new(),
flow_order: Vec::new(),
}
}
pub fn every(
&mut self,
id: &str,
workflow: Workflow<TState, TResourceKey>,
initial_state: TState,
interval: Duration,
) -> CanoResult<()> {
self.add_flow_internal(id, workflow, initial_state, ParsedSchedule::Every(interval))
}
pub fn every_seconds(
&mut self,
id: &str,
workflow: Workflow<TState, TResourceKey>,
initial_state: TState,
seconds: u64,
) -> CanoResult<()> {
self.every(id, workflow, initial_state, Duration::from_secs(seconds))
}
pub fn every_minutes(
&mut self,
id: &str,
workflow: Workflow<TState, TResourceKey>,
initial_state: TState,
minutes: u64,
) -> CanoResult<()> {
self.every(
id,
workflow,
initial_state,
Duration::from_secs(minutes * 60),
)
}
pub fn every_hours(
&mut self,
id: &str,
workflow: Workflow<TState, TResourceKey>,
initial_state: TState,
hours: u64,
) -> CanoResult<()> {
self.every(
id,
workflow,
initial_state,
Duration::from_secs(hours * 3600),
)
}
pub fn cron(
&mut self,
id: &str,
workflow: Workflow<TState, TResourceKey>,
initial_state: TState,
expr: &str,
) -> CanoResult<()> {
let parsed = CronSchedule::from_str(expr)
.map_err(|e| CanoError::Configuration(format!("Invalid cron expression: {e}")))?;
self.add_flow_internal(
id,
workflow,
initial_state,
ParsedSchedule::Cron(Box::new(parsed)),
)
}
pub fn manual(
&mut self,
id: &str,
workflow: Workflow<TState, TResourceKey>,
initial_state: TState,
) -> CanoResult<()> {
self.add_flow_internal(id, workflow, initial_state, ParsedSchedule::Manual)
}
fn add_flow_internal(
&mut self,
id: &str,
workflow: Workflow<TState, TResourceKey>,
initial_state: TState,
schedule: ParsedSchedule,
) -> CanoResult<()> {
if self.workflows.contains_key(id) {
return Err(CanoError::Configuration(format!(
"Flow '{}' already exists",
id
)));
}
let info = Arc::new(RwLock::new(FlowInfo {
id: id.to_string(),
status: Status::Idle,
run_count: 0,
last_run: None,
failure_streak: 0,
next_eligible: None,
}));
self.workflows.insert(
id.to_string(),
FlowData {
workflow: Arc::new(workflow),
initial_state,
schedule,
info,
policy: None,
},
);
self.flow_order.push(id.to_string());
Ok(())
}
pub fn len(&self) -> usize {
self.flow_order.len()
}
pub fn is_empty(&self) -> bool {
self.flow_order.is_empty()
}
pub fn contains(&self, id: &str) -> bool {
self.workflows.contains_key(id)
}
pub fn set_backoff(&mut self, id: &str, policy: BackoffPolicy) -> CanoResult<()> {
let flow = self.workflows.get_mut(id).ok_or_else(|| {
CanoError::Configuration(format!("Flow '{id}' not found — cannot set backoff"))
})?;
flow.policy = Some(Arc::new(policy));
Ok(())
}
pub async fn start(self) -> CanoResult<RunningScheduler<TState, TResourceKey>> {
let Self {
workflows,
flow_order,
} = self;
let ordered: Vec<&FlowData<TState, TResourceKey>> = flow_order
.iter()
.filter_map(|id| workflows.get(id))
.collect();
for flow in ordered.iter() {
flow.workflow
.validate()
.and_then(|_| flow.workflow.validate_initial_state(&flow.initial_state))?;
}
for (idx, flow) in ordered.iter().enumerate() {
if let Err(e) = flow.workflow.resources.setup_all().await {
for prior in ordered[..idx].iter().rev() {
let len = prior.workflow.resources.lifecycle_len();
prior.workflow.resources.teardown_range(0..len).await;
}
return Err(e);
}
}
drop(ordered);
let (command_tx, command_rx) = mpsc::channel::<SchedulerCommand>(64);
let stop_notify = Arc::new(Notify::new());
let running = Arc::new(RwLock::new(true));
let scheduler_tasks: Arc<RwLock<Vec<JoinHandle<()>>>> = Arc::new(RwLock::new(Vec::new()));
let mut flows_view: HashMap<String, Arc<RwLock<FlowInfo>>> =
HashMap::with_capacity(workflows.len());
for (id, fd) in &workflows {
flows_view.insert(id.clone(), Arc::clone(&fd.info));
}
let flows_view = Arc::new(flows_view);
let flow_order_view = Arc::new(flow_order.clone());
{
let mut tasks = scheduler_tasks.write().await;
for id in flow_order.iter() {
let Some(fd) = workflows.get(id) else {
continue;
};
let workflow = Arc::clone(&fd.workflow);
let initial_state = fd.initial_state.clone();
let info = Arc::clone(&fd.info);
let policy = fd.policy.clone();
let running_clone = Arc::clone(&running);
let notify_clone = Arc::clone(&stop_notify);
match &fd.schedule {
ParsedSchedule::Every(interval) => {
let interval = *interval;
let handle = tokio::spawn(spawn_every_loop(
workflow,
initial_state,
info,
policy,
running_clone,
notify_clone,
interval,
));
tasks.push(handle);
}
ParsedSchedule::Cron(cron_schedule) => {
let cron_schedule = cron_schedule.clone();
let handle = tokio::spawn(spawn_cron_loop(
workflow,
initial_state,
info,
policy,
running_clone,
notify_clone,
cron_schedule,
));
tasks.push(handle);
}
ParsedSchedule::Manual => {
}
}
}
}
let (result_tx, result_rx) = watch::channel::<Option<CanoResult<()>>>(None);
let driver_handle = tokio::spawn(driver_task(
command_rx,
workflows,
flow_order,
Arc::clone(&running),
Arc::clone(&stop_notify),
Arc::clone(&scheduler_tasks),
result_tx,
));
Ok(RunningScheduler {
command_tx,
flows: flows_view,
flow_order: flow_order_view,
result_rx,
scheduler_tasks,
driver_handle: Arc::new(driver_handle),
liveness: Arc::new(()),
_marker: std::marker::PhantomData,
})
}
}
impl<TState, TResourceKey> Default for Scheduler<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct RunningScheduler<TState, TResourceKey = Cow<'static, str>>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
command_tx: mpsc::Sender<SchedulerCommand>,
flows: Arc<HashMap<String, Arc<RwLock<FlowInfo>>>>,
flow_order: Arc<Vec<String>>,
result_rx: watch::Receiver<Option<CanoResult<()>>>,
scheduler_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
driver_handle: Arc<JoinHandle<()>>,
liveness: Arc<()>,
_marker: std::marker::PhantomData<fn() -> (TState, TResourceKey)>,
}
impl<TState, TResourceKey> RunningScheduler<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
pub async fn stop(&self) -> CanoResult<()> {
let _ = self.command_tx.send(SchedulerCommand::Stop).await;
self.wait().await
}
pub async fn wait(&self) -> CanoResult<()> {
let mut rx = self.result_rx.clone();
loop {
if let Some(result) = rx.borrow().clone() {
return result;
}
if rx.changed().await.is_err() {
return Err(CanoError::Workflow(
"Scheduler driver task terminated unexpectedly without publishing a result"
.to_string(),
));
}
}
}
pub async fn trigger(&self, id: &str) -> CanoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.try_send(SchedulerCommand::Trigger {
id: id.to_string(),
response: response_tx,
})
.map_err(|e| match e {
mpsc::error::TrySendError::Closed(_) => CanoError::Workflow(
"Scheduler not running — call start() before trigger()".to_string(),
),
mpsc::error::TrySendError::Full(_) => {
CanoError::Workflow("Scheduler command queue full".to_string())
}
})?;
response_rx.await.map_err(|_| {
CanoError::Workflow("Scheduler stopped before trigger was processed".to_string())
})?
}
pub async fn reset_flow(&self, id: &str) -> CanoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.try_send(SchedulerCommand::Reset {
id: id.to_string(),
response: response_tx,
})
.map_err(|e| match e {
mpsc::error::TrySendError::Closed(_) => CanoError::Workflow(
"Scheduler not running — call start() before reset_flow()".to_string(),
),
mpsc::error::TrySendError::Full(_) => {
CanoError::Workflow("Scheduler command queue full".to_string())
}
})?;
response_rx.await.map_err(|_| {
CanoError::Workflow("Scheduler stopped before reset was processed".to_string())
})?
}
pub async fn status(&self, id: &str) -> Option<FlowInfo> {
let info = self.flows.get(id)?;
Some(info.read().await.clone())
}
pub async fn list(&self) -> Vec<FlowInfo> {
let mut results = Vec::with_capacity(self.flow_order.len());
for id in self.flow_order.iter() {
if let Some(info) = self.flows.get(id) {
results.push(info.read().await.clone());
}
}
results
}
pub async fn has_running_flows(&self) -> bool {
for info in self.flows.values() {
if info.read().await.status == Status::Running {
return true;
}
}
false
}
}
impl<TState, TResourceKey> std::fmt::Debug for RunningScheduler<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RunningScheduler")
.field("flows", &self.flow_order)
.field("driver_finished", &self.driver_handle.is_finished())
.finish_non_exhaustive()
}
}
impl<TState, TResourceKey> Drop for RunningScheduler<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
fn drop(&mut self) {
if Arc::strong_count(&self.liveness) > 1 {
return;
}
if self.driver_handle.is_finished() {
return;
}
self.driver_handle.abort();
if let Ok(handles) = self.scheduler_tasks.try_write() {
let n = handles.len();
for h in handles.iter() {
h.abort();
}
#[cfg(feature = "tracing")]
tracing::warn!(
aborted = n,
"RunningScheduler dropped without stop() — aborted spawned tasks"
);
#[cfg(not(feature = "tracing"))]
let _ = n;
}
}
}
async fn spawn_every_loop<TState, TResourceKey>(
workflow: Arc<Workflow<TState, TResourceKey>>,
initial_state: TState,
info: Arc<RwLock<FlowInfo>>,
policy: Option<Arc<BackoffPolicy>>,
running: Arc<RwLock<bool>>,
stop_notify: Arc<Notify>,
interval: Duration,
) where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
if !*running.read().await {
return;
}
if dispatchable_now(&info).await {
execute_flow(
Arc::clone(&workflow),
initial_state.clone(),
Arc::clone(&info),
policy.as_ref(),
)
.await;
}
loop {
let wait = wait_until_eligible(&info, interval).await;
tokio::select! {
_ = sleep(wait) => {}
_ = stop_notify.notified() => break,
}
if !*running.read().await {
break;
}
if !dispatchable_now(&info).await {
continue;
}
execute_flow(
Arc::clone(&workflow),
initial_state.clone(),
Arc::clone(&info),
policy.as_ref(),
)
.await;
}
}
async fn spawn_cron_loop<TState, TResourceKey>(
workflow: Arc<Workflow<TState, TResourceKey>>,
initial_state: TState,
info: Arc<RwLock<FlowInfo>>,
policy: Option<Arc<BackoffPolicy>>,
running: Arc<RwLock<bool>>,
stop_notify: Arc<Notify>,
schedule: Box<CronSchedule>,
) where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
loop {
if !*running.read().await {
break;
}
let now = Utc::now();
let Some(next) = schedule.after(&now).next() else {
break;
};
let wait_duration = (next - now).to_std().unwrap_or(Duration::from_secs(0));
tokio::select! {
_ = sleep(wait_duration) => {}
_ = stop_notify.notified() => break,
}
if !*running.read().await {
break;
}
let info_snapshot = info.read().await;
if let Some(eligible) = info_snapshot.next_eligible
&& Utc::now() < eligible
{
#[cfg(feature = "tracing")]
tracing::debug!(
flow_id = %info_snapshot.id,
next_eligible = %eligible,
"cron tick suppressed by backoff window"
);
drop(info_snapshot);
continue;
}
drop(info_snapshot);
if !dispatchable_now(&info).await {
continue;
}
execute_flow(
Arc::clone(&workflow),
initial_state.clone(),
Arc::clone(&info),
policy.as_ref(),
)
.await;
}
}
async fn driver_task<TState, TResourceKey>(
mut rx: mpsc::Receiver<SchedulerCommand>,
workflows: HashMap<String, FlowData<TState, TResourceKey>>,
flow_order: Vec<String>,
running: Arc<RwLock<bool>>,
stop_notify: Arc<Notify>,
scheduler_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
result_tx: watch::Sender<Option<CanoResult<()>>>,
) where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
while let Some(cmd) = rx.recv().await {
match cmd {
SchedulerCommand::Stop => {
rx.close();
break;
}
SchedulerCommand::Trigger { id, response } => {
let outcome = if let Some(flow) = workflows.get(&id) {
match reserve_flow(Arc::clone(&flow.info)).await {
ReserveOutcome::Reserved => {
let workflow = Arc::clone(&flow.workflow);
let initial_state = flow.initial_state.clone();
let info = Arc::clone(&flow.info);
let policy = flow.policy.clone();
let handle = tokio::spawn(async move {
execute_reserved_flow(
workflow,
initial_state,
info,
policy.as_ref(),
)
.await;
});
let mut tasks = scheduler_tasks.write().await;
tasks.retain(|h| !h.is_finished());
tasks.push(handle);
Ok(())
}
ReserveOutcome::AlreadyRunning => Err(CanoError::Workflow(format!(
"Flow '{id}' is already running"
))),
ReserveOutcome::Tripped => Err(CanoError::Workflow(format!(
"Flow '{id}' is tripped — call reset_flow before triggering"
))),
}
} else {
Err(CanoError::Workflow(format!(
"No workflow registered with id '{id}'"
)))
};
let _ = response.send(outcome);
}
SchedulerCommand::Reset { id, response } => {
let outcome = if let Some(flow) = workflows.get(&id) {
let mut info_guard = flow.info.write().await;
info_guard.failure_streak = 0;
info_guard.next_eligible = None;
if !matches!(info_guard.status, Status::Running) {
info_guard.status = Status::Idle;
}
Ok(())
} else {
Err(CanoError::Workflow(format!(
"No workflow registered with id '{id}'"
)))
};
let _ = response.send(outcome);
}
}
}
*running.write().await = false;
stop_notify.notify_waiters();
{
let mut tasks = scheduler_tasks.write().await;
while let Some(handle) = tasks.pop() {
let _ = handle.await;
}
}
let timeout = Duration::from_secs(30);
let start_time = tokio::time::Instant::now();
let mut result: CanoResult<()> = Ok(());
'wait: loop {
let mut any_running = false;
for fd in workflows.values() {
if fd.info.read().await.status == Status::Running {
any_running = true;
break;
}
}
if !any_running {
break 'wait;
}
if start_time.elapsed() >= timeout {
result = Err(CanoError::Workflow(
"Timeout waiting for workflows to complete".to_string(),
));
break 'wait;
}
sleep(Duration::from_millis(100)).await;
}
for id in flow_order.iter().rev() {
if let Some(flow) = workflows.get(id) {
let len = flow.workflow.resources.lifecycle_len();
flow.workflow.resources.teardown_range(0..len).await;
}
}
let _ = result_tx.send(Some(result));
}
async fn execute_flow<TState, TResourceKey>(
workflow: Arc<Workflow<TState, TResourceKey>>,
initial_state: TState,
info: Arc<RwLock<FlowInfo>>,
policy: Option<&Arc<BackoffPolicy>>,
) where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
if !matches!(
reserve_flow(Arc::clone(&info)).await,
ReserveOutcome::Reserved
) {
return;
}
execute_reserved_flow(workflow, initial_state, info, policy).await;
}
enum ReserveOutcome {
Reserved,
AlreadyRunning,
Tripped,
}
async fn reserve_flow(info: Arc<RwLock<FlowInfo>>) -> ReserveOutcome {
let mut info_guard = info.write().await;
match info_guard.status {
Status::Running => return ReserveOutcome::AlreadyRunning,
Status::Tripped { .. } => return ReserveOutcome::Tripped,
_ => {}
}
info_guard.status = Status::Running;
info_guard.last_run = Some(Utc::now());
info_guard.run_count += 1;
ReserveOutcome::Reserved
}
async fn execute_reserved_flow<TState, TResourceKey>(
workflow: Arc<Workflow<TState, TResourceKey>>,
initial_state: TState,
info: Arc<RwLock<FlowInfo>>,
policy: Option<&Arc<BackoffPolicy>>,
) where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
#[cfg(feature = "tracing")]
let result = workflow
.execute_workflow(initial_state)
.instrument(tracing::info_span!("execute_flow"))
.await;
#[cfg(not(feature = "tracing"))]
let result = workflow.execute_workflow(initial_state).await;
apply_outcome(&info, result.map(|_| ()), policy.map(|p| p.as_ref())).await;
}
async fn apply_outcome(
info: &Arc<RwLock<FlowInfo>>,
result: Result<(), CanoError>,
policy: Option<&BackoffPolicy>,
) {
let mut info_guard = info.write().await;
match result {
Ok(_) => {
info_guard.status = Status::Completed;
info_guard.failure_streak = 0;
info_guard.next_eligible = None;
}
Err(e) => {
let err_str = e.to_string();
match policy {
None => {
debug_assert!(
info_guard.failure_streak == 0,
"Status::Failed path should never see a non-zero failure_streak (no policy is attached)"
);
info_guard.status = Status::Failed(err_str);
}
Some(p) => {
let new_streak = info_guard.failure_streak.saturating_add(1);
info_guard.failure_streak = new_streak;
if p.is_tripped(new_streak) {
info_guard.next_eligible = None;
info_guard.status = Status::Tripped {
streak: new_streak,
last_error: err_str,
};
} else {
let delay = p.compute_delay(new_streak);
let until = Utc::now()
+ chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::zero());
info_guard.next_eligible = Some(until);
info_guard.status = Status::Backoff {
until,
streak: new_streak,
last_error: err_str,
};
}
}
}
}
}
}
async fn dispatchable_now(info: &Arc<RwLock<FlowInfo>>) -> bool {
let guard = info.read().await;
!matches!(guard.status, Status::Running | Status::Tripped { .. })
}
async fn wait_until_eligible(info: &Arc<RwLock<FlowInfo>>, interval: Duration) -> Duration {
let snapshot = info.read().await;
if let Some(eligible) = snapshot.next_eligible {
let now = Utc::now();
if eligible > now {
let extra = (eligible - now).to_std().unwrap_or(Duration::from_secs(0));
return interval.max(extra);
}
}
interval
}
#[cfg(test)]
mod tests {
use super::*;
use crate::node::Node;
use crate::resource::Resources;
use cano_macros::node;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::time::sleep;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum TestState {
Start,
Complete,
Error,
}
#[derive(Clone)]
struct TestNode {
execution_count: Arc<AtomicU32>,
should_fail: bool,
}
impl TestNode {
fn new() -> Self {
Self {
execution_count: Arc::new(AtomicU32::new(0)),
should_fail: false,
}
}
fn new_failing() -> Self {
Self {
execution_count: Arc::new(AtomicU32::new(0)),
should_fail: true,
}
}
}
#[node]
impl Node<TestState> for TestNode {
type PrepResult = ();
type ExecResult = ();
async fn prep(&self, _res: &Resources) -> CanoResult<()> {
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
self.execution_count.fetch_add(1, Ordering::Relaxed);
}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> CanoResult<TestState> {
if self.should_fail {
Err(CanoError::NodeExecution("Test failure".to_string()))
} else {
Ok(TestState::Complete)
}
}
}
fn create_test_workflow() -> Workflow<TestState> {
Workflow::bare()
.register(TestState::Start, TestNode::new())
.add_exit_state(TestState::Complete)
.add_exit_state(TestState::Error)
}
fn create_failing_workflow() -> Workflow<TestState> {
Workflow::bare()
.register(TestState::Start, TestNode::new_failing())
.add_exit_state(TestState::Complete)
.add_exit_state(TestState::Error)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_scheduler_creation() {
let scheduler: Scheduler<TestState> = Scheduler::new();
assert!(scheduler.is_empty());
assert_eq!(scheduler.len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_add_workflow_every_seconds() {
let mut scheduler = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.every_seconds("test_task", workflow, TestState::Start, 5)
.unwrap();
assert_eq!(scheduler.len(), 1);
assert!(scheduler.contains("test_task"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_add_workflow_every_minutes() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.every_minutes("test_task", workflow, TestState::Start, 2)
.unwrap();
assert!(scheduler.contains("test_task"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_add_workflow_every_hours() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.every_hours("test_task", workflow, TestState::Start, 1)
.unwrap();
assert!(scheduler.contains("test_task"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_add_workflow_every_duration() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.every(
"test_task",
workflow,
TestState::Start,
Duration::from_millis(100),
)
.unwrap();
assert!(scheduler.contains("test_task"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_add_workflow_cron() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.cron("test_task", workflow, TestState::Start, "0 */5 * * * *")
.unwrap();
assert!(scheduler.contains("test_task"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_add_workflow_cron_invalid() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
let err = scheduler
.cron("test_task", workflow, TestState::Start, "invalid cron")
.unwrap_err();
assert!(matches!(err, CanoError::Configuration(_)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_add_workflow_manual() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.manual("test_task", workflow, TestState::Start)
.unwrap();
assert!(scheduler.contains("test_task"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_duplicate_workflow_id() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow1 = create_test_workflow();
let workflow2 = create_test_workflow();
scheduler
.every_seconds("test_task", workflow1, TestState::Start, 5)
.unwrap();
let err = scheduler
.every_seconds("test_task", workflow2, TestState::Start, 10)
.unwrap_err();
assert!(matches!(err, CanoError::Configuration(_)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_start_rejects_misconfigured_workflow() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let bad_workflow: Workflow<TestState> =
Workflow::bare().register(TestState::Start, TestNode::new());
scheduler
.every_seconds("bad", bad_workflow, TestState::Start, 60)
.unwrap();
let err = scheduler.start().await.expect_err("start should reject");
assert!(matches!(err, CanoError::Configuration(_)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_start_rejects_unregistered_initial_state() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow: Workflow<TestState> = Workflow::bare()
.register(TestState::Start, TestNode::new())
.add_exit_state(TestState::Complete);
scheduler
.every_seconds("bad_init", workflow, TestState::Error, 60)
.unwrap();
let err = scheduler.start().await.expect_err("start should reject");
assert!(matches!(err, CanoError::Configuration(_)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_stop_interrupts_long_interval_sleep() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.every(
"long",
workflow,
TestState::Start,
Duration::from_secs(3600),
)
.unwrap();
let running = scheduler.start().await.expect("start should succeed");
sleep(Duration::from_millis(100)).await;
let stop_started = tokio::time::Instant::now();
tokio::time::timeout(Duration::from_secs(5), running.stop())
.await
.expect("stop should return shortly")
.expect("stop should not error");
let elapsed = stop_started.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"stop took too long: {:?}",
elapsed
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_stop_interrupts_cron_sleep() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.cron("daily", workflow, TestState::Start, "0 0 0 * * *")
.unwrap();
let running = scheduler.start().await.expect("start should succeed");
sleep(Duration::from_millis(100)).await;
let stop_started = tokio::time::Instant::now();
tokio::time::timeout(Duration::from_secs(5), running.stop())
.await
.expect("stop should return shortly")
.expect("stop should not error");
let elapsed = stop_started.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"stop took too long: {:?}",
elapsed
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_manual_trigger() {
let timeout = Duration::from_secs(2);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.manual("test_task", workflow, TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
running.trigger("test_task").await.unwrap();
sleep(Duration::from_millis(100)).await;
let status = running.status("test_task").await;
assert!(status.is_some());
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_clones_share_state() {
let timeout = Duration::from_secs(2);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
scheduler
.manual("shared", create_test_workflow(), TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
let other = running.clone();
other.trigger("shared").await.unwrap();
sleep(Duration::from_millis(100)).await;
assert_eq!(running.status("shared").await.unwrap().run_count, 1);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_status_check_post_start() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_test_workflow();
scheduler
.manual("test_task", workflow, TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
let status = running.status("test_task").await.expect("must exist");
assert_eq!(status.id, "test_task");
assert_eq!(status.status, Status::Idle);
assert_eq!(status.run_count, 0);
assert!(status.last_run.is_none());
running.stop().await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_list_workflows() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
scheduler
.manual("task1", create_test_workflow(), TestState::Start)
.unwrap();
scheduler
.manual("task2", create_test_workflow(), TestState::Start)
.unwrap();
scheduler
.manual("task3", create_test_workflow(), TestState::Start)
.unwrap();
assert_eq!(scheduler.len(), 3);
assert!(scheduler.contains("task1"));
assert!(scheduler.contains("task2"));
assert!(scheduler.contains("task3"));
let running = scheduler.start().await.unwrap();
let flows = running.list().await;
assert_eq!(flows.len(), 3);
let ids: Vec<String> = flows.iter().map(|f| f.id.clone()).collect();
assert!(ids.contains(&"task1".to_string()));
assert!(ids.contains(&"task2".to_string()));
assert!(ids.contains(&"task3".to_string()));
running.stop().await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_nonexistent_status() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
scheduler
.manual("known", create_test_workflow(), TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
assert!(running.status("nonexistent").await.is_none());
running.stop().await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_trigger_unknown_workflow_errors() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
scheduler
.manual("known", create_test_workflow(), TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
let err = running
.trigger("missing")
.await
.expect_err("unknown workflow id must error");
assert!(
err.to_string().contains("No workflow registered"),
"expected unknown workflow error, got: {err}"
);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_manual_trigger_rejects_overlap() {
#[derive(Clone)]
struct SlowNode;
#[node]
impl Node<TestState> for SlowNode {
type PrepResult = ();
type ExecResult = ();
async fn prep(&self, _res: &Resources) -> CanoResult<()> {
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
sleep(Duration::from_millis(300)).await;
}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> CanoResult<TestState> {
Ok(TestState::Complete)
}
}
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = Workflow::bare()
.register(TestState::Start, SlowNode)
.add_exit_state(TestState::Complete)
.add_exit_state(TestState::Error);
scheduler
.manual("slow", workflow, TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
running.trigger("slow").await.unwrap();
let err = running
.trigger("slow")
.await
.expect_err("overlapping manual trigger must be rejected");
assert!(
err.to_string().contains("already running"),
"expected overlap error, got: {err}"
);
let status = running.status("slow").await.unwrap();
assert_eq!(status.run_count, 1);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_trigger_reaps_finished_handles() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
scheduler
.manual("manual_task", create_test_workflow(), TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
assert_eq!(running.scheduler_tasks.read().await.len(), 0);
for _ in 0..20 {
running.trigger("manual_task").await.unwrap();
sleep(Duration::from_millis(30)).await;
}
let in_flight = running.scheduler_tasks.read().await.len();
assert!(
in_flight <= 1,
"expected reaping to bound in-flight handles to <=1, got {in_flight}"
);
let status = running.status("manual_task").await.unwrap();
assert_eq!(status.run_count, 20);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_post_shutdown_calls_report_not_running() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
scheduler
.manual("manual_task", create_test_workflow(), TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
running.stop().await.unwrap();
running.stop().await.unwrap();
let err = running.trigger("manual_task").await.unwrap_err();
assert!(
err.to_string().contains("Scheduler not running"),
"expected not-running error after shutdown, got: {err}"
);
let err = running.reset_flow("manual_task").await.unwrap_err();
assert!(
err.to_string().contains("Scheduler not running"),
"expected not-running error after shutdown, got: {err}"
);
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_trigger_during_graceful_shutdown_window_reports_not_running() {
#[derive(Clone)]
struct SlowNode;
#[node]
impl Node<TestState> for SlowNode {
type PrepResult = ();
type ExecResult = ();
async fn prep(&self, _res: &Resources) -> CanoResult<()> {
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
sleep(Duration::from_millis(400)).await;
}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> CanoResult<TestState> {
Ok(TestState::Complete)
}
}
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let slow_workflow = Workflow::bare()
.register(TestState::Start, SlowNode)
.add_exit_state(TestState::Complete)
.add_exit_state(TestState::Error);
scheduler
.manual("slow_task", slow_workflow, TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
let probe = running.clone();
probe.trigger("slow_task").await.unwrap();
sleep(Duration::from_millis(50)).await;
assert!(
probe.has_running_flows().await,
"slow workflow should be Running before stop()"
);
let stop_handle = tokio::spawn(async move { running.stop().await });
sleep(Duration::from_millis(50)).await;
assert!(
!stop_handle.is_finished(),
"stop() must still be parked while the slow workflow is in flight"
);
let err = probe.trigger("slow_task").await.unwrap_err();
assert!(
err.to_string().contains("Scheduler not running"),
"expected not-running during shutdown window, got: {err}"
);
let stop_result = stop_handle.await.expect("stop task should not panic");
stop_result.expect("stop should succeed once slow workflow finishes");
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_failed_workflow_registration() {
let timeout = Duration::from_secs(2);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = create_failing_workflow();
scheduler
.manual("failing_task", workflow, TestState::Start)
.unwrap();
assert!(scheduler.contains("failing_task"));
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[derive(Clone)]
struct FlakyNode {
attempts: Arc<AtomicU32>,
succeed_after: u32,
}
impl FlakyNode {
fn always_failing() -> Self {
Self {
attempts: Arc::new(AtomicU32::new(0)),
succeed_after: u32::MAX,
}
}
fn succeed_on_attempt(n: u32) -> Self {
Self {
attempts: Arc::new(AtomicU32::new(0)),
succeed_after: n,
}
}
}
#[node]
impl Node<TestState> for FlakyNode {
type PrepResult = ();
type ExecResult = bool;
fn config(&self) -> crate::task::TaskConfig {
crate::task::TaskConfig::minimal()
}
async fn prep(&self, _res: &Resources) -> CanoResult<()> {
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1;
attempt >= self.succeed_after
}
async fn post(&self, _res: &Resources, ok: Self::ExecResult) -> CanoResult<TestState> {
if ok {
Ok(TestState::Complete)
} else {
Err(CanoError::NodeExecution("flaky".to_string()))
}
}
}
fn flaky_workflow(node: FlakyNode) -> Workflow<TestState> {
Workflow::bare()
.register(TestState::Start, node)
.add_exit_state(TestState::Complete)
.add_exit_state(TestState::Error)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_backoff_progression_extends_interval() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.every(
"flaky",
workflow,
TestState::Start,
Duration::from_millis(50),
)
.unwrap();
scheduler
.set_backoff(
"flaky",
BackoffPolicy {
initial: Duration::from_millis(150),
multiplier: 2.0,
max_delay: Duration::from_millis(600),
jitter: 0.0,
streak_limit: None,
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
sleep(Duration::from_millis(1300)).await;
let runs_observed = running.status("flaky").await.unwrap().run_count;
running.stop().await.unwrap();
assert!(
(3..=8).contains(&runs_observed),
"expected backoff to throttle to ~3-8 runs in 1.3s, got {runs_observed}"
);
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_trip_on_streak_limit_stops_dispatch() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.every(
"trippy",
workflow,
TestState::Start,
Duration::from_millis(40),
)
.unwrap();
scheduler
.set_backoff(
"trippy",
BackoffPolicy {
initial: Duration::from_millis(20),
multiplier: 1.0,
max_delay: Duration::from_millis(40),
jitter: 0.0,
streak_limit: Some(3),
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
sleep(Duration::from_millis(500)).await;
let snap_before = running.status("trippy").await.unwrap();
assert!(
matches!(snap_before.status, Status::Tripped { streak: 3, .. }),
"expected Tripped(streak=3), got: {:?}",
snap_before.status
);
assert_eq!(snap_before.run_count, 3);
sleep(Duration::from_millis(300)).await;
let snap_after = running.status("trippy").await.unwrap();
assert_eq!(
snap_after.run_count, 3,
"tripped flow must stop dispatching"
);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reset_flow_clears_trip_and_resumes() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::succeed_on_attempt(5));
scheduler
.every(
"reset_me",
workflow,
TestState::Start,
Duration::from_millis(40),
)
.unwrap();
scheduler
.set_backoff(
"reset_me",
BackoffPolicy {
initial: Duration::from_millis(20),
multiplier: 1.0,
max_delay: Duration::from_millis(20),
jitter: 0.0,
streak_limit: Some(2),
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
sleep(Duration::from_millis(300)).await;
let snap = running.status("reset_me").await.unwrap();
assert!(
matches!(snap.status, Status::Tripped { .. }),
"expected trip, got: {:?}",
snap.status
);
assert_eq!(snap.run_count, 2);
running.reset_flow("reset_me").await.unwrap();
let snap_after_reset = running.status("reset_me").await.unwrap();
assert_eq!(snap_after_reset.failure_streak, 0);
assert!(snap_after_reset.next_eligible.is_none());
sleep(Duration::from_millis(200)).await;
running.reset_flow("reset_me").await.unwrap();
sleep(Duration::from_millis(200)).await;
let snap_done = running.status("reset_me").await.unwrap();
assert_eq!(snap_done.status, Status::Completed);
assert_eq!(snap_done.failure_streak, 0);
assert!(snap_done.next_eligible.is_none());
assert!(snap_done.run_count >= 5);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_streak_resets_on_success() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::succeed_on_attempt(3));
scheduler
.every(
"recover",
workflow,
TestState::Start,
Duration::from_millis(30),
)
.unwrap();
scheduler
.set_backoff(
"recover",
BackoffPolicy {
initial: Duration::from_millis(30),
multiplier: 1.0,
max_delay: Duration::from_millis(30),
jitter: 0.0,
streak_limit: None,
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
sleep(Duration::from_millis(400)).await;
let snap = running.status("recover").await.unwrap();
assert_eq!(
snap.failure_streak, 0,
"streak must reset on success, got {snap:?}"
);
assert_eq!(snap.status, Status::Completed);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_manual_trigger_blocked_when_tripped() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.manual("manual_flaky", workflow, TestState::Start)
.unwrap();
scheduler
.set_backoff(
"manual_flaky",
BackoffPolicy {
initial: Duration::from_millis(10),
multiplier: 1.0,
max_delay: Duration::from_millis(10),
jitter: 0.0,
streak_limit: Some(2),
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
running.trigger("manual_flaky").await.unwrap();
sleep(Duration::from_millis(40)).await;
running.trigger("manual_flaky").await.unwrap();
sleep(Duration::from_millis(40)).await;
let snap = running.status("manual_flaky").await.unwrap();
assert!(
matches!(snap.status, Status::Tripped { .. }),
"expected trip, got: {:?}",
snap.status
);
let err = running
.trigger("manual_flaky")
.await
.expect_err("trigger on tripped flow must fail");
assert!(
err.to_string().contains("tripped"),
"expected tripped error, got: {err}"
);
running.reset_flow("manual_flaky").await.unwrap();
running
.trigger("manual_flaky")
.await
.expect("trigger after reset must succeed");
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reset_flow_unknown_id_errors() {
let timeout = Duration::from_secs(2);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
scheduler
.manual("known", create_test_workflow(), TestState::Start)
.unwrap();
let running = scheduler.start().await.unwrap();
let err = running.reset_flow("missing").await.unwrap_err();
assert!(
err.to_string().contains("No workflow registered"),
"expected unknown error, got: {err}"
);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_set_backoff_unknown_id_errors() {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let err = scheduler
.set_backoff("missing", BackoffPolicy::default())
.unwrap_err();
assert!(
matches!(err, CanoError::Configuration(_)),
"expected Configuration error, got: {err}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_stop_interrupts_backoff_sleep() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.every(
"long_backoff",
workflow,
TestState::Start,
Duration::from_millis(20),
)
.unwrap();
scheduler
.set_backoff(
"long_backoff",
BackoffPolicy {
initial: Duration::from_secs(60),
multiplier: 1.0,
max_delay: Duration::from_secs(60),
jitter: 0.0,
streak_limit: None,
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
sleep(Duration::from_millis(150)).await;
assert!(matches!(
running.status("long_backoff").await.unwrap().status,
Status::Backoff { .. }
));
let stop_started = tokio::time::Instant::now();
running.stop().await.unwrap();
let elapsed = stop_started.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"stop took too long during backoff: {:?}",
elapsed
);
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_no_policy_preserves_failed_status() {
let timeout = Duration::from_secs(3);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.every(
"legacy",
workflow,
TestState::Start,
Duration::from_millis(40),
)
.unwrap();
let running = scheduler.start().await.unwrap();
let snap = loop {
sleep(Duration::from_millis(50)).await;
let s = running.status("legacy").await.unwrap();
if !matches!(s.status, Status::Running) && s.run_count >= 3 {
break s;
}
};
assert!(
matches!(snap.status, Status::Failed(_)),
"no-policy flow must report Failed, got: {:?}",
snap.status
);
assert_eq!(snap.failure_streak, 0, "no streak tracking without policy");
assert!(snap.next_eligible.is_none());
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_trigger_overrides_backoff_window() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::succeed_on_attempt(2));
scheduler
.manual("flow", workflow, TestState::Start)
.unwrap();
scheduler
.set_backoff(
"flow",
BackoffPolicy {
initial: Duration::from_secs(60),
multiplier: 1.0,
max_delay: Duration::from_secs(60),
jitter: 0.0,
streak_limit: None,
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
running.trigger("flow").await.unwrap();
let snap = loop {
sleep(Duration::from_millis(20)).await;
let s = running.status("flow").await.unwrap();
if matches!(s.status, Status::Backoff { .. }) {
break s;
}
};
assert_eq!(snap.run_count, 1);
running.trigger("flow").await.unwrap();
let snap = loop {
sleep(Duration::from_millis(20)).await;
let s = running.status("flow").await.unwrap();
if s.run_count == 2 && !matches!(s.status, Status::Running) {
break s;
}
};
assert!(
matches!(snap.status, Status::Completed),
"override run should succeed and clear backoff, got: {:?}",
snap.status
);
assert_eq!(snap.failure_streak, 0);
assert!(snap.next_eligible.is_none());
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_trip_reset_trip_roundtrip_in_one_session() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.every(
"flow",
workflow,
TestState::Start,
Duration::from_millis(20),
)
.unwrap();
scheduler
.set_backoff(
"flow",
BackoffPolicy {
initial: Duration::from_millis(20),
multiplier: 1.0,
max_delay: Duration::from_millis(20),
jitter: 0.0,
streak_limit: Some(2),
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
let snap = loop {
sleep(Duration::from_millis(20)).await;
let s = running.status("flow").await.unwrap();
if matches!(s.status, Status::Tripped { .. }) {
break s;
}
};
let runs_at_first_trip = snap.run_count;
assert_eq!(snap.failure_streak, 2, "tripped at streak limit");
running.reset_flow("flow").await.unwrap();
let snap = running.status("flow").await.unwrap();
assert_eq!(snap.failure_streak, 0);
assert!(matches!(
snap.status,
Status::Idle | Status::Backoff { .. } | Status::Running
));
let snap = loop {
sleep(Duration::from_millis(20)).await;
let s = running.status("flow").await.unwrap();
if matches!(s.status, Status::Tripped { .. }) && s.run_count > runs_at_first_trip {
break s;
}
};
assert_eq!(snap.failure_streak, 2, "second trip also at streak limit");
assert!(snap.run_count >= runs_at_first_trip + 2);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_cron_tick_suppressed_in_backoff_window() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.cron("flow", workflow, TestState::Start, "* * * * * *")
.unwrap();
scheduler
.set_backoff(
"flow",
BackoffPolicy {
initial: Duration::from_secs(60),
multiplier: 1.0,
max_delay: Duration::from_secs(60),
jitter: 0.0,
streak_limit: None,
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
let snap = loop {
sleep(Duration::from_millis(50)).await;
let s = running.status("flow").await.unwrap();
if matches!(s.status, Status::Backoff { .. }) {
break s;
}
};
assert_eq!(snap.run_count, 1);
sleep(Duration::from_millis(3200)).await;
let snap = running.status("flow").await.unwrap();
assert_eq!(
snap.run_count, 1,
"cron ticks must not dispatch inside backoff window"
);
assert!(matches!(snap.status, Status::Backoff { .. }));
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reset_flow_during_inflight_run_documents_race() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let mut scheduler: Scheduler<TestState> = Scheduler::<TestState>::new();
let workflow = flaky_workflow(FlakyNode::always_failing());
scheduler
.manual("flow", workflow, TestState::Start)
.unwrap();
scheduler
.set_backoff(
"flow",
BackoffPolicy {
initial: Duration::from_millis(20),
multiplier: 1.0,
max_delay: Duration::from_millis(20),
jitter: 0.0,
streak_limit: None,
},
)
.unwrap();
let running = scheduler.start().await.unwrap();
for _ in 0..3 {
running.trigger("flow").await.unwrap();
loop {
sleep(Duration::from_millis(15)).await;
let s = running.status("flow").await.unwrap();
if matches!(s.status, Status::Backoff { .. }) {
break;
}
}
}
assert_eq!(running.status("flow").await.unwrap().failure_streak, 3);
running.reset_flow("flow").await.unwrap();
assert_eq!(running.status("flow").await.unwrap().failure_streak, 0);
running.trigger("flow").await.unwrap();
let snap = loop {
sleep(Duration::from_millis(15)).await;
let s = running.status("flow").await.unwrap();
if matches!(s.status, Status::Backoff { .. }) {
break s;
}
};
assert_eq!(
snap.failure_streak, 1,
"reset clears streak — next failure starts a fresh streak"
);
running.stop().await.unwrap();
})
.await;
assert!(result.is_ok(), "Test timed out");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_drop_aborts_loops_when_no_stop() {
let timeout = Duration::from_secs(5);
let result = tokio::time::timeout(timeout, async {
let node = TestNode::new();
let count = Arc::clone(&node.execution_count);
let workflow = Workflow::bare()
.register(TestState::Start, node)
.add_exit_state(TestState::Complete)
.add_exit_state(TestState::Error);
let mut scheduler: Scheduler<TestState> = Scheduler::new();
scheduler
.every(
"ticker",
workflow,
TestState::Start,
Duration::from_millis(20),
)
.unwrap();
let running = scheduler.start().await.unwrap();
sleep(Duration::from_millis(120)).await;
let observed = count.load(Ordering::SeqCst);
assert!(
observed >= 2,
"loop should have ticked at least twice, got {observed}"
);
drop(running);
sleep(Duration::from_millis(50)).await;
let final_count = count.load(Ordering::SeqCst);
sleep(Duration::from_millis(120)).await;
let later_count = count.load(Ordering::SeqCst);
assert_eq!(
final_count, later_count,
"spawned loops must stop after RunningScheduler is dropped (was {final_count}, became {later_count})"
);
})
.await;
assert!(result.is_ok(), "Test timed out");
}
}