cloudiful-scheduler 0.4.2

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use crate::model::{
    JobState, RunContext, RunRecord, RunStatus, TaskContext, TaskHandler, push_history,
};
use crate::observer::{SchedulerEvent, SchedulerObserver};
use crate::scheduler::control::{ControlSignal, StopSignal};
use crate::scheduler::trigger::PendingTrigger;
use crate::{ExecutionGuard, ExecutionGuardRenewal, ExecutionLease};
use chrono::Utc;
use chrono_tz::Tz;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio::time::{Duration, Instant, interval_at};

#[derive(Debug)]
pub(crate) struct CompletedRun {
    pub(crate) record: RunRecord,
    pub(crate) trigger_count: u32,
}

impl CompletedRun {
    pub(crate) fn apply_to(
        self,
        state: &mut JobState,
        history: &mut VecDeque<RunRecord>,
        history_limit: usize,
    ) -> RunRecord {
        state.last_run_at = Some(self.record.started_at);
        match self.record.status {
            RunStatus::Success => {
                state.last_success_at = Some(self.record.finished_at);
                state.last_error = None;
            }
            RunStatus::Failed => {
                state.last_error = self.record.error.clone();
            }
        }

        push_history(history, self.record.clone(), history_limit);
        self.record
    }
}

pub(crate) fn spawn_legacy_trigger<D, G>(
    active: &mut JoinSet<CompletedRun>,
    task: TaskHandler<D>,
    deps: Arc<D>,
    job_id: String,
    timezone: Tz,
    trigger: PendingTrigger,
    guard: Arc<G>,
    observer: Arc<dyn SchedulerObserver>,
    control: watch::Sender<ControlSignal>,
    lease: ExecutionLease,
) where
    G: ExecutionGuard + Send + Sync + 'static,
    D: Send + Sync + 'static,
{
    active.spawn(async move {
        let started_at = Utc::now();
        let mut renewal_count = 0u32;
        let mut failed_renewal_count = 0u32;
        let task_future = task(TaskContext {
            run: RunContext {
                job_id: job_id.clone(),
                scheduled_at: trigger.scheduled_at,
                catch_up: trigger.catch_up,
                timezone,
            },
            deps,
        });
        tokio::pin!(task_future);
        let renew_every = guard.renew_interval(&lease);
        let mut renewal = renewal_schedule(renew_every);
        let mut lost_reported = false;

        let result = loop {
            if let Some(ticker) = renewal.as_mut() {
                let mut stop_renewal = false;
                let outcome = tokio::select! {
                    result = &mut task_future => Some(result),
                    _ = ticker.tick() => {
                        let renew_result = guard.renew(&lease).await;
                        match renew_result {
                            Ok(ExecutionGuardRenewal::Renewed) => {
                                renewal_count += 1;
                                observer.on_event(&SchedulerEvent::ExecutionGuardRenewed {
                                    job_id: job_id.clone(),
                                    resource_id: lease.resource_id.clone(),
                                    scope: lease.scope,
                                    lease_key: lease.lease_key.clone(),
                                    scheduled_at: lease.scheduled_at,
                                    catch_up: trigger.catch_up,
                                    trigger_count: trigger.trigger_count,
                                    renewal_count,
                                });
                            }
                            Ok(ExecutionGuardRenewal::Lost) => {
                                if !lost_reported {
                                    observer.on_event(&SchedulerEvent::ExecutionGuardLost {
                                        job_id: job_id.clone(),
                                        resource_id: lease.resource_id.clone(),
                                        scope: lease.scope,
                                        lease_key: lease.lease_key.clone(),
                                        scheduled_at: lease.scheduled_at,
                                        catch_up: trigger.catch_up,
                                        trigger_count: trigger.trigger_count,
                                        renewal_count,
                                        failed_renewal_count,
                                    });
                                    let mut next = *control.borrow();
                                    next.stop_signal = Some(StopSignal::Shutdown);
                                    let _ = control.send(next);
                                    lost_reported = true;
                                }
                                stop_renewal = true;
                            }
                            Err(error) => {
                                failed_renewal_count += 1;
                                observer.on_event(&SchedulerEvent::ExecutionGuardRenewFailed {
                                    job_id: job_id.clone(),
                                    resource_id: lease.resource_id.clone(),
                                    scope: lease.scope,
                                    lease_key: lease.lease_key.clone(),
                                    scheduled_at: lease.scheduled_at,
                                    catch_up: trigger.catch_up,
                                    trigger_count: trigger.trigger_count,
                                    renewal_count,
                                    failed_renewal_count,
                                    error: error.to_string(),
                                });
                                if !lost_reported {
                                    observer.on_event(&SchedulerEvent::ExecutionGuardLost {
                                        job_id: job_id.clone(),
                                        resource_id: lease.resource_id.clone(),
                                        scope: lease.scope,
                                        lease_key: lease.lease_key.clone(),
                                        scheduled_at: lease.scheduled_at,
                                        catch_up: trigger.catch_up,
                                        trigger_count: trigger.trigger_count,
                                        renewal_count,
                                        failed_renewal_count,
                                    });
                                    let mut next = *control.borrow();
                                    next.stop_signal = Some(StopSignal::Shutdown);
                                    let _ = control.send(next);
                                    lost_reported = true;
                                }
                                stop_renewal = true;
                            }
                        }
                        None
                    }
                };

                if stop_renewal {
                    renewal = None;
                }

                if let Some(result) = outcome {
                    break result;
                }
            } else {
                break task_future.await;
            }
        };
        let finished_at = Utc::now();

        let (status, error) = match result {
            Ok(()) => (RunStatus::Success, None),
            Err(message) => (RunStatus::Failed, Some(message)),
        };

        if let Err(error) = guard.release(&lease).await {
            observer.on_event(&SchedulerEvent::ExecutionGuardReleaseFailed {
                job_id: job_id.clone(),
                resource_id: lease.resource_id.clone(),
                scope: lease.scope,
                lease_key: lease.lease_key.clone(),
                scheduled_at: lease.scheduled_at,
                catch_up: trigger.catch_up,
                trigger_count: trigger.trigger_count,
                error: error.to_string(),
            });
        } else {
            observer.on_event(&SchedulerEvent::ExecutionGuardReleased {
                job_id: job_id.clone(),
                resource_id: lease.resource_id.clone(),
                scope: lease.scope,
                lease_key: lease.lease_key.clone(),
                scheduled_at: lease.scheduled_at,
                catch_up: trigger.catch_up,
                trigger_count: trigger.trigger_count,
            });
        }

        CompletedRun {
            record: RunRecord {
                scheduled_at: trigger.scheduled_at,
                started_at,
                finished_at,
                catch_up: trigger.catch_up,
                status,
                error,
            },
            trigger_count: trigger.trigger_count,
        }
    });
}

pub(crate) fn renewal_schedule(renew_interval: Option<Duration>) -> Option<tokio::time::Interval> {
    renew_interval.map(|duration| interval_at(Instant::now() + duration, duration))
}