Skip to main content

scheduler/model/
state.rs

1use crate::model::RunSkipReason;
2use chrono::{DateTime, Utc};
3#[cfg(feature = "valkey-store")]
4use serde::{Deserialize, Serialize};
5use std::collections::VecDeque;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub enum RunStatus {
9    Success,
10    Failed,
11}
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct RunRecord {
15    pub scheduled_at: DateTime<Utc>,
16    pub started_at: DateTime<Utc>,
17    pub finished_at: DateTime<Utc>,
18    pub catch_up: bool,
19    pub status: RunStatus,
20    pub error: Option<String>,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24#[cfg_attr(feature = "valkey-store", derive(Serialize, Deserialize))]
25pub struct JobState {
26    pub job_id: String,
27    pub trigger_count: u32,
28    pub last_run_at: Option<DateTime<Utc>>,
29    pub last_success_at: Option<DateTime<Utc>>,
30    pub next_run_at: Option<DateTime<Utc>>,
31    pub last_error: Option<String>,
32}
33
34impl JobState {
35    pub fn new(job_id: impl Into<String>, next_run_at: Option<DateTime<Utc>>) -> Self {
36        Self {
37            job_id: job_id.into(),
38            trigger_count: 0,
39            last_run_at: None,
40            last_success_at: None,
41            next_run_at,
42            last_error: None,
43        }
44    }
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct SchedulerReport {
49    pub job_id: String,
50    pub state: JobState,
51    pub history: Vec<RunRecord>,
52    pub last_skip_reason: Option<RunSkipReason>,
53}
54
55pub(crate) fn push_history(
56    history: &mut VecDeque<RunRecord>,
57    record: RunRecord,
58    history_limit: usize,
59) {
60    if history_limit == 0 {
61        return;
62    }
63
64    history.push_back(record);
65    while history.len() > history_limit {
66        history.pop_front();
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::{RunRecord, RunStatus, push_history};
73    use chrono::Utc;
74    use std::collections::VecDeque;
75
76    fn record(second: i64) -> RunRecord {
77        let instant = Utc::now() + chrono::TimeDelta::seconds(second);
78        RunRecord {
79            scheduled_at: instant,
80            started_at: instant,
81            finished_at: instant,
82            catch_up: false,
83            status: RunStatus::Success,
84            error: None,
85        }
86    }
87
88    #[test]
89    fn push_history_keeps_latest_records_within_limit() {
90        let base = Utc::now();
91        let mut history = VecDeque::new();
92        push_history(&mut history, record_at(base, 1), 2);
93        push_history(&mut history, record_at(base, 2), 2);
94        push_history(&mut history, record_at(base, 3), 2);
95
96        assert_eq!(history.len(), 2);
97        assert_eq!(
98            history.front().unwrap().scheduled_at,
99            record_at(base, 2).scheduled_at
100        );
101        assert_eq!(
102            history.back().unwrap().scheduled_at,
103            record_at(base, 3).scheduled_at
104        );
105    }
106
107    #[test]
108    fn push_history_ignores_records_when_limit_is_zero() {
109        let mut history = VecDeque::new();
110        push_history(&mut history, record(1), 0);
111
112        assert!(history.is_empty());
113    }
114
115    fn record_at(base: chrono::DateTime<Utc>, second: i64) -> RunRecord {
116        let instant = base + chrono::TimeDelta::seconds(second);
117        RunRecord {
118            scheduled_at: instant,
119            started_at: instant,
120            finished_at: instant,
121            catch_up: false,
122            status: RunStatus::Success,
123            error: None,
124        }
125    }
126}