rucron/
job.rs

1use chrono::{DateTime, Datelike, Duration, Local, TimeZone, Weekday};
2use std::{fmt, sync::atomic::Ordering};
3
4use crate::{metric::Metric, METRIC_STORAGE};
5
6/// Time unit.
7pub(crate) enum TimeUnit {
8    Second,
9    Minute,
10    Hour,
11    Day,
12    Week,
13}
14
15impl TimeUnit {
16    #[inline(always)]
17    fn granularity(&self) -> Duration {
18        match self {
19            Self::Second => Duration::seconds(1),
20            Self::Minute => Duration::minutes(1),
21            Self::Hour => Duration::hours(1),
22            Self::Day => Duration::days(1),
23            Self::Week => Duration::weeks(1),
24        }
25    }
26    #[inline(always)]
27    fn number_from_zero(&self) -> i8 {
28        match self {
29            Self::Second => 0,
30            Self::Minute => 1,
31            Self::Hour => 2,
32            Self::Day => 3,
33            Self::Week => 4,
34        }
35    }
36}
37
38impl fmt::Debug for TimeUnit {
39    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
40        match self {
41            Self::Second => write!(fmt, "Second"),
42            Self::Minute => write!(fmt, "Minute"),
43            Self::Hour => write!(fmt, "Hour"),
44            Self::Day => write!(fmt, "Day"),
45            Self::Week => write!(fmt, "Week"),
46        }
47    }
48}
49
50/// A periodic job used by Scheduler.
51pub struct Job {
52    /// Pause `interval * unit` seconds between runs
53    call_interval: i32,
54    /// Datetime of next run.
55    next_run: DateTime<Local>,
56    /// Datetime of last run.
57    last_run: DateTime<Local>,
58    /// Time unit.
59    time_unit: Option<TimeUnit>,
60    /// Wheather run at specific time
61    is_at: bool,
62    /// Specific day of the week to start on.
63    weekday: Option<Weekday>,
64    /// Specific time of the day to start on.
65    at_time: Option<Duration>,
66    /// Wheather run at job immediately.
67    is_immediately_run: bool,
68    /// Job name or function name.
69    job_name: String,
70    /// A distributed locker,
71    locker: bool,
72    /// This function returns a `Duration` which is equal to `next_time` minus `last_time`.
73    interval_fn: Option<fn(&Metric, &DateTime<Local>) -> Duration>,
74    /// Numbers of thread to run a job parallely
75    n_threads: u8,
76}
77
78impl Job {
79    pub(crate) fn new(call_interval: i32, is_at: bool) -> Job {
80        let now = Local::now();
81        Job {
82            call_interval,
83            next_run: now,
84            last_run: now,
85            time_unit: None,
86            weekday: None,
87            is_immediately_run: false,
88            at_time: None,
89            is_at,
90            job_name: "".into(),
91            locker: false,
92            interval_fn: None,
93            n_threads: 1,
94        }
95    }
96
97    #[inline(always)]
98    pub(crate) fn get_job_name(&self) -> String {
99        self.job_name.clone()
100    }
101
102    #[inline(always)]
103    pub(crate) fn set_name(&mut self, name: String) {
104        self.job_name = name;
105    }
106
107    #[inline(always)]
108    pub(crate) fn need_locker(&mut self) {
109        self.locker = true;
110    }
111
112    #[inline(always)]
113    pub(crate) fn threads(&mut self, n: u8) {
114        self.n_threads = n;
115    }
116
117    #[inline(always)]
118    pub(crate) fn is_need_lock(&self) -> bool {
119        self.locker
120    }
121
122    #[inline(always)]
123    pub(crate) fn n_threads(&self) -> u8 {
124        self.n_threads
125    }
126
127    #[inline(always)]
128    pub(crate) fn set_unit(&mut self, unit: TimeUnit) {
129        self.time_unit = Some(unit);
130    }
131
132    #[inline(always)]
133    pub(crate) fn set_interval_fn(&mut self, f: fn(&Metric, &DateTime<Local>) -> Duration) {
134        self.interval_fn = Some(f);
135    }
136
137    #[inline(always)]
138    pub(crate) fn has_interval_fn(&self) -> bool {
139        self.interval_fn.is_some()
140    }
141
142    #[inline(always)]
143    pub(crate) fn get_unit(&self) -> Option<&TimeUnit> {
144        self.time_unit.as_ref()
145    }
146
147    #[inline(always)]
148    pub(crate) fn is_at(&self) -> bool {
149        self.is_at
150    }
151
152    #[inline(always)]
153    pub(crate) fn get_at_time(&self) -> Option<Duration> {
154        self.at_time
155    }
156
157    #[inline(always)]
158    pub(crate) fn immediately_run(&mut self) {
159        self.is_immediately_run = true
160    }
161
162    #[inline(always)]
163    pub(crate) fn get_immediately_run(&self) -> bool {
164        self.is_immediately_run
165    }
166
167    #[inline(always)]
168    pub(crate) fn runnable(&self) -> bool {
169        self.next_run.le(&Local::now())
170    }
171
172    #[inline(always)]
173    pub(crate) fn set_at_time(&mut self, h: i64, m: i64, s: i64) {
174        self.at_time = Some(Duration::hours(h) + Duration::minutes(m) + Duration::seconds(s));
175    }
176
177    #[inline(always)]
178    pub(crate) fn set_weekday(&mut self, w: i64) {
179        self.weekday = match w {
180            1 => Some(Weekday::Mon),
181            2 => Some(Weekday::Tue),
182            3 => Some(Weekday::Wed),
183            4 => Some(Weekday::Thu),
184            5 => Some(Weekday::Fri),
185            6 => Some(Weekday::Sat),
186            7 => Some(Weekday::Sun),
187            _ => None,
188        };
189    }
190
191    #[inline(always)]
192    pub(crate) fn get_next_run(&self) -> DateTime<Local> {
193        self.next_run
194    }
195
196    #[inline(always)]
197    pub(crate) fn get_last_run(&self) -> DateTime<Local> {
198        self.last_run
199    }
200
201    #[inline(always)]
202    pub(crate) fn get_weekday(&self) -> Option<Weekday> {
203        self.weekday
204    }
205
206    #[inline(always)]
207    pub(crate) fn get_time_unit(&self) -> Option<i8> {
208        self.time_unit
209            .as_ref()
210            .map_or(None, |t| Some(t.number_from_zero()))
211    }
212
213    /// Compute the time when the job should run next time.
214    pub(crate) fn schedule_run_time(&mut self) {
215        let granularity = self.cmp_time_granularity();
216        let mut next_run = match self.time_unit {
217            Some(TimeUnit::Second) | Some(TimeUnit::Minute) | Some(TimeUnit::Hour) => self.last_run,
218            Some(TimeUnit::Day) => {
219                let mut midnight = Local
220                    .with_ymd_and_hms(
221                        self.last_run.year(),
222                        self.last_run.month(),
223                        self.last_run.day(),
224                        0,
225                        0,
226                        0,
227                    )
228                    .unwrap();
229                midnight = midnight + self.at_time.unwrap_or(Duration::zero());
230                midnight
231            }
232            Some(TimeUnit::Week) => {
233                let mut midnight = Local
234                    .with_ymd_and_hms(
235                        self.last_run.year(),
236                        self.last_run.month(),
237                        self.last_run.day(),
238                        0,
239                        0,
240                        0,
241                    )
242                    .unwrap();
243                midnight = midnight + self.at_time.unwrap_or(Duration::zero());
244                println!("name: {}, weekday: {:?}", self.job_name, self.weekday);
245                let deviation: i32 = self.weekday.unwrap().number_from_sunday() as i32
246                    - midnight.weekday().number_from_sunday() as i32;
247                if deviation != 0 {
248                    midnight = midnight + Duration::days(1) * deviation;
249                }
250                midnight
251            }
252            None => self.last_run + granularity, // todo: handle this condition
253        };
254        let now = Local::now();
255        while (next_run.le(&now) || next_run.le(&self.last_run)) && self.interval_fn.is_none() {
256            next_run = next_run + granularity;
257        }
258        if next_run.gt(&self.next_run) {
259            self.last_run = self.next_run;
260            self.next_run = next_run;
261        }
262        METRIC_STORAGE
263            .get(&self.job_name)
264            .unwrap()
265            .n_scheduled
266            .fetch_add(1, Ordering::SeqCst);
267    }
268
269    #[inline(always)]
270    fn cmp_time_granularity(&self) -> Duration {
271        self.time_unit.as_ref().map_or_else(
272            || {
273                let f = self.interval_fn.expect(
274                    "Please set time unit or provide interval_fn in [cmp_time_granularity].",
275                );
276                f(&METRIC_STORAGE.get(&self.job_name).unwrap(), &self.last_run)
277            },
278            |tu| tu.granularity() * self.call_interval,
279        )
280    }
281}
282
283impl fmt::Debug for Job {
284    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
285        fmt.debug_struct("Job")
286            .field("job_name", &self.job_name)
287            .field("is_immediately_run", &self.is_immediately_run)
288            .field("at_time", &self.at_time)
289            .field("weekday", &self.weekday)
290            .field("is_at", &self.is_at)
291            .field("time_unit", &self.time_unit)
292            .field("last_run", &self.last_run)
293            .field("next_run", &self.next_run)
294            .field("call_interval", &self.call_interval)
295            .field("locker", &self.locker)
296            .field("n_threads", &self.n_threads)
297            .finish()
298    }
299}