mod backoff;
pub use backoff::BackoffPolicy;
use crate::error::CanoResult;
use crate::workflow::Workflow;
use chrono::{DateTime, Utc};
use cron::Schedule as CronSchedule;
use std::hash::Hash;
use std::sync::Arc;
use tokio::sync::{RwLock, oneshot};
use tokio::time::Duration;
enum SchedulerCommand {
Stop,
Trigger {
id: Arc<str>,
response: oneshot::Sender<CanoResult<()>>,
},
Reset {
id: Arc<str>,
response: oneshot::Sender<CanoResult<()>>,
},
}
#[derive(Debug, Clone)]
pub enum Schedule {
Every(Duration),
Cron(String),
Manual,
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum Status {
Idle,
Running,
Completed,
Backoff {
until: DateTime<Utc>,
streak: u32,
last_error: Arc<str>,
},
Tripped {
streak: u32,
last_error: Arc<str>,
},
}
#[derive(Debug, Clone)]
pub struct FlowInfo {
pub id: Arc<str>,
pub status: Status,
pub run_count: u64,
pub last_run: Option<DateTime<Utc>>,
pub failure_streak: u32,
pub next_eligible: Option<DateTime<Utc>>,
}
#[derive(Clone)]
enum ParsedSchedule {
Every(Duration),
Cron(Box<CronSchedule>),
Manual,
}
struct FlowData<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
workflow: Arc<Workflow<TState, TResourceKey>>,
initial_state: TState,
schedule: ParsedSchedule,
info: Arc<RwLock<FlowInfo>>,
policy: Arc<BackoffPolicy>,
}
impl<TState, TResourceKey> Clone for FlowData<TState, TResourceKey>
where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
workflow: Arc::clone(&self.workflow),
initial_state: self.initial_state.clone(),
schedule: self.schedule.clone(),
info: Arc::clone(&self.info),
policy: self.policy.clone(),
}
}
}
mod builder;
mod loops;
mod running;
#[cfg(test)]
mod test_support;
pub use builder::Scheduler;
pub use running::RunningScheduler;
#[cfg(all(test, feature = "metrics"))]
mod metrics_tests {
use crate::metrics::test_support::*;
use crate::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum St {
Start,
Done,
}
struct Ok1;
#[crate::task]
impl Task<St> for Ok1 {
async fn run_bare(&self) -> Result<TaskResult<St>, CanoError> {
Ok(TaskResult::Single(St::Done))
}
}
#[test]
fn manual_trigger_records_a_completed_flow_run() {
let ((), rows) = run_with_recorder(|| async {
let wf = Workflow::bare()
.register(St::Start, Ok1)
.add_exit_state(St::Done);
let mut scheduler = Scheduler::<St>::new();
scheduler.manual("flow_a", wf, St::Start).unwrap();
let running = scheduler.start().await.unwrap();
running.trigger("flow_a").await.unwrap();
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
running.stop().await.unwrap();
});
assert_eq!(
counter(
&rows,
"cano_scheduler_flow_runs_total",
&[("flow", "flow_a"), ("outcome", "completed")]
),
1
);
assert_eq!(
histogram_count(
&rows,
"cano_scheduler_flow_duration_seconds",
&[("flow", "flow_a")]
),
1
);
assert_eq!(gauge(&rows, "cano_scheduler_active_flows", &[]), 0.0);
assert_eq!(
counter_opt(
&rows,
"cano_scheduler_flow_backoff_total",
&[("flow", "flow_a")]
),
None
);
}
}