cloudiful-scheduler 0.4.1

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use super::control::{CommandDisposition, SchedulerMode, StopSignal};
use super::overlap::{OverlapAction, dispatch_trigger, take_queued_if_idle};
use super::trigger::{PendingTrigger, TriggerDecision, next_trigger};
use crate::error::SchedulerError;
use crate::model::{Job, JobState, RunRecord, SchedulerReport};
use crate::observer::{PauseScope, SchedulerEvent, SchedulerStopReason};
use crate::scheduler::trigger_math::next_run_is_in_future;
use chrono::Utc;
use std::collections::VecDeque;
use tokio::task::JoinSet;

pub(super) trait SchedulerRuntimeBackend<S, G, C, D>
where
    S: crate::StateStore + Send + Sync + 'static,
    G: crate::ExecutionGuard + Send + Sync + 'static,
    C: crate::CoordinatedStateStore + Send + Sync + 'static,
    D: Send + Sync + 'static,
{
    type Runtime: Send + 'static;
    type Completed: Send + 'static;

    fn initialize<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
    ) -> impl std::future::Future<Output = Result<Self::Runtime, SchedulerError>> + Send + 'a;

    fn refresh_runtime<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: Self::Runtime,
    ) -> impl std::future::Future<Output = Result<Self::Runtime, SchedulerError>> + Send + 'a;

    fn state<'a>(&self, runtime: &'a Self::Runtime) -> &'a JobState;

    fn is_paused(&self, runtime: &Self::Runtime) -> bool;

    fn pause_scope(&self) -> PauseScope;

    fn save_state<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: &'a mut Self::Runtime,
        state: &'a JobState,
    ) -> impl std::future::Future<Output = Result<bool, SchedulerError>> + Send + 'a;

    fn handle_queued_trigger<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: &'a mut Self::Runtime,
        trigger: PendingTrigger,
        active: &'a mut JoinSet<Self::Completed>,
    ) -> impl std::future::Future<Output = Result<bool, SchedulerError>> + Send + 'a;

    fn try_reclaim_inflight<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: &'a mut Self::Runtime,
        active: &'a mut JoinSet<Self::Completed>,
    ) -> impl std::future::Future<Output = Result<bool, SchedulerError>> + Send + 'a;

    fn handle_due_trigger<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: &'a mut Self::Runtime,
        candidate_state: JobState,
        trigger: PendingTrigger,
        overlap_action: OverlapAction,
        active: &'a mut JoinSet<Self::Completed>,
    ) -> impl std::future::Future<Output = Result<bool, SchedulerError>> + Send + 'a;

    fn apply_completed<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        runtime: &'a mut Self::Runtime,
        history: &'a mut VecDeque<RunRecord>,
        completed: Self::Completed,
    ) -> impl std::future::Future<Output = Result<(), SchedulerError>> + Send + 'a;

    fn delete_terminal_state<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: &'a mut Self::Runtime,
    ) -> impl std::future::Future<Output = Result<(), SchedulerError>> + Send + 'a;

    fn pause<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: &'a mut Self::Runtime,
    ) -> impl std::future::Future<Output = Result<bool, SchedulerError>> + Send + 'a;

    fn resume<'a>(
        &'a self,
        scheduler: &'a super::engine::Scheduler<S, G, C>,
        job: &'a Job<D>,
        runtime: &'a mut Self::Runtime,
    ) -> impl std::future::Future<Output = Result<bool, SchedulerError>> + Send + 'a;
}

pub(super) async fn run_scheduler<S, G, C, D, B>(
    scheduler: &super::engine::Scheduler<S, G, C>,
    job: Job<D>,
    backend: &B,
) -> Result<SchedulerReport, SchedulerError>
where
    S: crate::StateStore + Send + Sync + 'static,
    G: crate::ExecutionGuard + Send + Sync + 'static,
    C: crate::CoordinatedStateStore + Send + Sync + 'static,
    D: Send + Sync + 'static,
    B: SchedulerRuntimeBackend<S, G, C, D>,
{
    let mut runtime = backend.initialize(scheduler, &job).await?;
    let mut history = VecDeque::new();
    let mut last_skip_reason = None;
    let mut active = JoinSet::new();
    let mut active_count = 0usize;
    let mut queued_trigger = None;
    let mut initial_control = *scheduler.control.borrow();
    initial_control.stop_signal = None;
    let _ = scheduler.control.send(initial_control);
    let mut control_rx = scheduler.control.subscribe();
    let mut last_seen_mode_command = 0u64;

    loop {
        runtime = backend.refresh_runtime(scheduler, &job, runtime).await?;
        let control = *control_rx.borrow();

        if let Some(stop_signal) = control.stop_signal && active_count == 0 {
            scheduler.emit(SchedulerEvent::SchedulerStopped {
                job_id: job.job_id.clone(),
                trigger_count: backend.state(&runtime).trigger_count,
                reason: match stop_signal {
                    StopSignal::Cancel => SchedulerStopReason::Cancelled,
                    StopSignal::Shutdown => SchedulerStopReason::Shutdown,
                },
            });
            break;
        }

        if control.mode_command_seq != last_seen_mode_command {
            let changed = match control.command_disposition {
                CommandDisposition::Apply => match control.desired_mode {
                    SchedulerMode::Running => backend.resume(scheduler, &job, &mut runtime).await?,
                    SchedulerMode::Paused => backend.pause(scheduler, &job, &mut runtime).await?,
                },
                CommandDisposition::ObserveOnly { changed } => changed,
            };
            last_seen_mode_command = control.mode_command_seq;
            if changed {
                match control.desired_mode {
                    SchedulerMode::Paused => scheduler.emit(SchedulerEvent::SchedulerPaused {
                        job_id: job.job_id.clone(),
                        trigger_count: backend.state(&runtime).trigger_count,
                        scope: backend.pause_scope(),
                    }),
                    SchedulerMode::Running => scheduler.emit(SchedulerEvent::SchedulerResumed {
                        job_id: job.job_id.clone(),
                        trigger_count: backend.state(&runtime).trigger_count,
                        scope: backend.pause_scope(),
                    }),
                }
            }
        }

        if control.stop_signal.is_none() && !backend.is_paused(&runtime) {
            if let Some(trigger) = take_queued_if_idle(active_count, &mut queued_trigger) {
                let now = Utc::now();
                if let Some(reason) = job.skip_reason_at(now, scheduler.config.timezone) {
                    last_skip_reason = Some(reason);
                    scheduler.emit(SchedulerEvent::RunSkipped {
                        job_id: job.job_id.clone(),
                        scheduled_at: trigger.scheduled_at,
                        catch_up: trigger.catch_up,
                        trigger_count: trigger.trigger_count,
                        reason,
                    });
                    continue;
                }

                if backend
                    .handle_queued_trigger(scheduler, &job, &mut runtime, trigger, &mut active)
                    .await?
                {
                    active_count += 1;
                }
                continue;
            }

            if backend
                .try_reclaim_inflight(scheduler, &job, &mut runtime, &mut active)
                .await?
            {
                active_count += 1;
                continue;
            }

            let now = Utc::now();
            if !scheduler.should_wait_for_active_replay(&job, active_count) {
                let mut candidate_state = backend.state(&runtime).clone();
                match next_trigger(&job, &mut candidate_state, now, scheduler.config.timezone)? {
                    TriggerDecision::Idle => {}
                    TriggerDecision::StateAdvanced => {
                        if backend
                            .save_state(scheduler, &job, &mut runtime, &candidate_state)
                            .await?
                        {
                            continue;
                        }
                    }
                    TriggerDecision::Trigger(trigger) => {
                        let now = Utc::now();
                        if let Some(reason) = job.skip_reason_at(now, scheduler.config.timezone) {
                            if backend
                                .save_state(scheduler, &job, &mut runtime, &candidate_state)
                                .await?
                            {
                                last_skip_reason = Some(reason);
                                scheduler.emit(SchedulerEvent::RunSkipped {
                                    job_id: job.job_id.clone(),
                                    scheduled_at: trigger.scheduled_at,
                                    catch_up: trigger.catch_up,
                                    trigger_count: trigger.trigger_count,
                                    reason,
                                });
                            }
                            continue;
                        }

                        match dispatch_trigger(
                            job.overlap_policy,
                            active_count,
                            &mut queued_trigger,
                            trigger,
                        ) {
                            OverlapAction::Spawn(trigger) => {
                                if backend
                                    .handle_due_trigger(
                                        scheduler,
                                        &job,
                                        &mut runtime,
                                        candidate_state,
                                        trigger,
                                        OverlapAction::Spawn(trigger),
                                        &mut active,
                                    )
                                    .await?
                                {
                                    active_count += 1;
                                }
                                continue;
                            }
                            OverlapAction::QueueUpdated | OverlapAction::Dropped => {
                                let _ = backend
                                    .handle_due_trigger(
                                        scheduler,
                                        &job,
                                        &mut runtime,
                                        candidate_state,
                                        trigger,
                                        OverlapAction::QueueUpdated,
                                        &mut active,
                                    )
                                    .await?;
                                continue;
                            }
                        }
                    }
                }
            }
        }

        let next_run_at = backend.state(&runtime).next_run_at;
        if !backend.is_paused(&runtime)
            && next_run_at.is_none()
            && active_count == 0
            && queued_trigger.is_none()
        {
            if matches!(
                scheduler.config.terminal_state_policy,
                crate::model::TerminalStatePolicy::Delete
            ) {
                backend
                    .delete_terminal_state(scheduler, &job, &mut runtime)
                    .await?;
                scheduler.emit(SchedulerEvent::TerminalStateDeleted {
                    job_id: job.job_id.clone(),
                    trigger_count: backend.state(&runtime).trigger_count,
                });
            }
            scheduler.emit(SchedulerEvent::SchedulerStopped {
                job_id: job.job_id.clone(),
                trigger_count: backend.state(&runtime).trigger_count,
                reason: SchedulerStopReason::Terminal,
            });
            break;
        }

        tokio::select! {
            maybe_result = active.join_next(), if active_count > 0 => {
                if let Some(result) = maybe_result {
                    active_count -= 1;
                    let completed = result.map_err(SchedulerError::task_join)?;
                    backend.apply_completed(
                        scheduler,
                        &mut runtime,
                        &mut history,
                        completed,
                    ).await?;
                }
            }
            changed = control_rx.changed() => {
                if changed.is_err() {
                    scheduler.emit(SchedulerEvent::SchedulerStopped {
                        job_id: job.job_id.clone(),
                        trigger_count: backend.state(&runtime).trigger_count,
                        reason: SchedulerStopReason::ChannelClosed,
                    });
                    break;
                }
            }
            _ = scheduler.sleep_until_next(next_run_at), if control_rx.borrow().stop_signal.is_none() && matches!(control_rx.borrow().desired_mode, SchedulerMode::Running) && !backend.is_paused(&runtime) && queued_trigger.is_none() && next_run_is_in_future(next_run_at) => {}
            _ = tokio::time::sleep(std::time::Duration::from_millis(50)), if backend.is_paused(&runtime) => {}
        }
    }

    while let Some(result) = active.join_next().await {
        let completed = result.map_err(SchedulerError::task_join)?;
        backend
            .apply_completed(scheduler, &mut runtime, &mut history, completed)
            .await?;
    }

    Ok(SchedulerReport {
        job_id: job.job_id.clone(),
        state: backend.state(&runtime).clone(),
        history: history.into_iter().collect(),
        last_skip_reason,
    })
}