use super::control::{ControlSignal, SchedulerHandle};
use super::coordinated::run_coordinated_scheduler;
use super::legacy::run_legacy_scheduler;
use crate::coordinated_store::{
CoordinatedLeaseConfig, CoordinatedStateStore, NoopCoordinatedStateStore,
};
use crate::error::SchedulerError;
use crate::model::{Job, JobState, SchedulerConfig};
use crate::observer::{LogObserver, NoopObserver, SchedulerEvent, SchedulerObserver};
use crate::store::{StateStore, StoreEvent};
use crate::{ExecutionGuard, InMemoryStateStore, NoopExecutionGuard};
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::watch;
pub(super) enum SchedulerBackend<S, G, C>
where
S: StateStore,
G: ExecutionGuard,
C: CoordinatedStateStore,
{
Legacy {
store: Arc<S>,
guard: Arc<G>,
},
Coordinated {
store: Arc<C>,
lease_config: CoordinatedLeaseConfig,
},
}
pub struct Scheduler<S, G = NoopExecutionGuard, C = NoopCoordinatedStateStore>
where
S: StateStore,
G: ExecutionGuard,
C: CoordinatedStateStore,
{
pub(super) config: SchedulerConfig,
pub(super) backend: SchedulerBackend<S, G, C>,
pub(super) observer: Arc<dyn SchedulerObserver>,
pub(super) control: watch::Sender<ControlSignal>,
}
impl<S> Scheduler<S, NoopExecutionGuard, NoopCoordinatedStateStore>
where
S: StateStore + Send + Sync + 'static,
{
pub fn new(config: SchedulerConfig, store: S) -> Self {
Self::with_observer_and_execution_guard(config, store, NoopObserver, NoopExecutionGuard)
}
pub fn with_log_observer(config: SchedulerConfig, store: S) -> Self {
Self::with_observer_and_execution_guard(config, store, LogObserver, NoopExecutionGuard)
}
pub fn with_observer<O>(config: SchedulerConfig, store: S, observer: O) -> Self
where
O: SchedulerObserver,
{
Self::with_observer_and_execution_guard(config, store, observer, NoopExecutionGuard)
}
}
impl<S, G> Scheduler<S, G, NoopCoordinatedStateStore>
where
S: StateStore + Send + Sync + 'static,
G: ExecutionGuard + Send + Sync + 'static,
{
pub fn with_execution_guard(config: SchedulerConfig, store: S, guard: G) -> Self {
Self::with_observer_and_execution_guard(config, store, NoopObserver, guard)
}
pub fn with_observer_and_execution_guard<O>(
config: SchedulerConfig,
store: S,
observer: O,
guard: G,
) -> Self
where
O: SchedulerObserver,
{
let (control, _) = watch::channel(ControlSignal::Running);
Self {
config,
backend: SchedulerBackend::Legacy {
store: Arc::new(store),
guard: Arc::new(guard),
},
observer: Arc::new(observer),
control,
}
}
}
impl<C> Scheduler<InMemoryStateStore, NoopExecutionGuard, C>
where
C: CoordinatedStateStore + Send + Sync + 'static,
{
pub fn with_coordinated_state_store(
config: SchedulerConfig,
store: C,
lease_config: CoordinatedLeaseConfig,
) -> Self {
Self::with_observer_and_coordinated_state_store(config, store, NoopObserver, lease_config)
}
pub fn with_observer_and_coordinated_state_store<O>(
config: SchedulerConfig,
store: C,
observer: O,
lease_config: CoordinatedLeaseConfig,
) -> Self
where
O: SchedulerObserver,
{
let (control, _) = watch::channel(ControlSignal::Running);
Self {
config,
backend: SchedulerBackend::Coordinated {
store: Arc::new(store),
lease_config,
},
observer: Arc::new(observer),
control,
}
}
}
impl<S, G, C> Scheduler<S, G, C>
where
S: StateStore + Send + Sync + 'static,
G: ExecutionGuard + Send + Sync + 'static,
C: CoordinatedStateStore + Send + Sync + 'static,
{
pub(super) fn emit(&self, event: SchedulerEvent) {
self.observer.on_event(&event);
}
pub(super) fn should_repair_interval_state<D>(&self, job: &Job<D>, state: &JobState) -> bool
where
D: Send + Sync + 'static,
{
if state.next_run_at.is_some() {
return false;
}
if !matches!(job.schedule, crate::Schedule::Interval(_)) {
return false;
}
match job.max_runs {
None => true,
Some(max_runs) => state.trigger_count < max_runs,
}
}
pub(super) fn should_wait_for_active_replay<D>(&self, job: &Job<D>, active_count: usize) -> bool
where
D: Send + Sync + 'static,
{
active_count > 0
&& matches!(job.missed_run_policy, crate::MissedRunPolicy::ReplayAll)
&& !matches!(job.overlap_policy, crate::OverlapPolicy::AllowParallel)
}
pub(super) async fn sleep_until_next(&self, next_run_at: Option<chrono::DateTime<Utc>>) {
let Some(next_run_at) = next_run_at else {
return;
};
let now = Utc::now();
if let Ok(duration) = (next_run_at - now).to_std() {
tokio::time::sleep(duration).await;
}
}
pub(super) async fn emit_store_events_for(
&self,
store: &S,
job_id: &str,
) -> Result<(), SchedulerError> {
let events = store.drain_events().await.map_err(|error| {
let kind = S::classify_error(&error);
SchedulerError::store(error, kind)
})?;
for event in events {
match event {
StoreEvent::Degraded { operation, error } => {
self.emit(SchedulerEvent::StoreDegraded {
job_id: job_id.to_string(),
operation,
error,
});
}
}
}
Ok(())
}
pub(super) async fn load_state_from_legacy(
&self,
store: &S,
job_id: &str,
) -> Result<Option<JobState>, SchedulerError> {
let state = store.load(job_id).await.map_err(|error| {
let kind = S::classify_error(&error);
SchedulerError::store(error, kind)
})?;
self.emit_store_events_for(store, job_id).await?;
Ok(state)
}
pub(super) async fn persist_state_to_legacy(
&self,
store: &S,
state: &JobState,
) -> Result<(), SchedulerError> {
store.save(state).await.map_err(|error| {
let kind = S::classify_error(&error);
SchedulerError::store(error, kind)
})?;
self.emit_store_events_for(store, &state.job_id).await
}
pub(super) async fn delete_state_from_legacy(
&self,
store: &S,
job_id: &str,
) -> Result<(), SchedulerError> {
store.delete(job_id).await.map_err(|error| {
let kind = S::classify_error(&error);
SchedulerError::store(error, kind)
})?;
self.emit_store_events_for(store, job_id).await
}
fn normalize_job<D>(&self, mut job: Job<D>) -> Result<Job<D>, SchedulerError>
where
D: Send + Sync + 'static,
{
match &mut job.schedule {
crate::Schedule::Interval(every) => {
if every.is_zero() {
return Err(SchedulerError::invalid_zero_interval());
}
}
crate::Schedule::AtTimes(times) => times.sort_unstable(),
crate::Schedule::Cron(_) => {}
}
if matches!(self.backend, SchedulerBackend::Coordinated { .. })
&& matches!(job.overlap_policy, crate::OverlapPolicy::AllowParallel)
{
return Err(SchedulerError::invalid_job_with_kind(
crate::InvalidJobKind::Other,
"coordinated scheduler does not support OverlapPolicy::AllowParallel",
));
}
Ok(job)
}
pub fn handle(&self) -> SchedulerHandle {
SchedulerHandle::new(self.control.clone())
}
pub async fn run<D>(&self, job: Job<D>) -> Result<crate::SchedulerReport, SchedulerError>
where
D: Send + Sync + 'static,
{
let job = self.normalize_job(job)?;
match &self.backend {
SchedulerBackend::Legacy { store, guard } => {
run_legacy_scheduler(self, job, store, guard).await
}
SchedulerBackend::Coordinated {
store,
lease_config,
} => run_coordinated_scheduler(self, job, store, *lease_config).await,
}
}
}