jact/
job.rs

1use crate::job_scheduler::JobsSchedulerLocked;
2use cron::Schedule;
3use chrono::Utc;
4use std::str::FromStr;
5use tokio::sync::{RwLock, Mutex};
6use tokio::time::{Duration,sleep};
7use std::sync::Arc;
8use uuid::Uuid;
9use std::pin::Pin;
10use std::fmt;
11use std::future::Future;
12use std::error::Error;
13use std::time::Instant;
14use async_trait::async_trait;
15
16
17pub type JobToRunAsync = dyn FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send;
18
19///
20/// A schedulable Job
21pub type JobLocked = Arc<RwLock<JobType>>;
22
23#[derive(Debug)]
24pub enum JobError{
25    StoppedError(String),
26    CreationError(String),
27    InvalidArgument(String),
28    WrappedError(String),
29}
30
31impl fmt::Display for JobError {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        match &*self {
34            JobError::StoppedError(err_message) => write!(f, " Job stopped error -- {}", err_message),
35            JobError::CreationError(err_message) => write!(f, " Job creation error -- {}", err_message),
36            JobError::InvalidArgument(err_message) => write!(f, " Invalide argument error -- {}", err_message),
37            JobError::WrappedError(err_message) => write!(f, " Wrapped error -- {}", err_message),
38        }
39    }
40}
41
42impl Error for JobError{
43    fn source(&self) -> Option<&(dyn Error + 'static)> 
44    { 
45        match *self {
46            _ => None
47        }
48    }
49}
50
51#[async_trait]
52pub trait LockedJobInterface {
53
54    #[doc(hidden)]
55    fn make_new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
56    where
57        T: 'static,
58        T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
59        Self: Sized;
60
61
62    fn new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
63    where
64        T: 'static,
65        T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
66        Self: Sized;
67
68    #[doc(hidden)]
69    fn make_one_shot_job(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
70    where 
71        Self: Sized;
72
73
74    fn new_one_shot<T>(duration: Duration, run: T) -> Result<Self, JobError>
75        where
76            T: 'static,
77            T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
78            Self: Sized;
79
80    #[doc(hidden)]
81    fn make_new_one_shot_at_an_instant(instant: std::time::Instant, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
82    where 
83        Self: Sized;
84
85    fn new_one_shot_at_instant<T>(instant: std::time::Instant, run: T) -> Result<Self, JobError>
86    where
87        T: 'static,
88        T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
89        Self: Sized;
90
91    #[doc(hidden)]
92    fn make_new_repeated(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
93    where
94        Self: Sized;
95
96    fn new_repeated<T>(duration: Duration, run: T) -> Result<Self, JobError>
97        where
98            T: 'static,
99            T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
100            Self: Sized;
101
102    async fn get_job_id(&self) -> Uuid;
103
104}
105
106#[async_trait]
107pub trait Job {
108    async fn job_id(&self) -> Uuid;
109    async fn run(&mut self, jobs: JobsSchedulerLocked);
110    async fn stop(&mut self) -> ();
111    //fn stopped(&self) -> bool;  // Do I need this? I think not
112}
113
114pub struct JobHandle {
115    pub schedule:       Option<Schedule>,
116    pub run_async:      Arc<Mutex<JobToRunAsync>>,
117    pub time_til_next:  Option<Duration>,
118    pub job_id:         Uuid,
119    pub stopped:        bool,
120}
121
122//Manual implementation to avoid the annoying aync function it's holding
123impl fmt::Debug for JobHandle {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("JobHandle")
126         .field("schedule", &self.schedule)
127         .field("time_til_next", &self.time_til_next)
128         .field("job_id", &self.job_id)
129         .field("stopped", &self.stopped)
130         .finish()
131    }
132}
133
134#[derive(Debug)]
135pub enum JobType {
136    CronJob(JobHandle),
137    OneShot(JobHandle),
138    Repeated(JobHandle)
139}
140
141#[async_trait]
142impl Job for JobLocked
143{
144    async fn job_id(&self) -> Uuid {
145        let read_lock = self.read().await;
146        match *read_lock
147        {
148            JobType::CronJob(ref handle)
149          | JobType::OneShot(ref handle)
150          | JobType::Repeated(ref handle) => { handle.job_id }
151        }
152    }
153
154    async fn run(&mut self, jobs: JobsSchedulerLocked) {
155        let job_copy = self.clone();
156        //let job_scheduler_copy = jobs.clone();
157        tokio::task::spawn(async move {
158
159            loop {
160                //So I can drop the read lock during the sleep
161                let the_duration : Duration;
162                //let uuid : Uuid;
163                {
164                    let read_lock = job_copy.read().await;
165                    match *read_lock
166                    {
167                        JobType::CronJob(ref handle) => 
168                        { 
169                            let next_date_time = handle.schedule.as_ref().ok_or(JobError::InvalidArgument("Schedule is None".to_string()))?;
170                            let next_date_time = next_date_time.upcoming(Utc).next();
171                            if let Some(date_time) = next_date_time 
172                            {
173                                the_duration = (date_time - Utc::now()).to_std().map_err(|err| { JobError::InvalidArgument(format!("{:#?}", err.source())) })?;
174                            }
175                            else
176                            {
177                                the_duration = Duration::from_secs(0) 
178                            }
179                            //uuid = handle.job_id;
180                        },
181                        JobType::OneShot(ref handle)
182                      | JobType::Repeated(ref handle) => 
183                      { 
184                          the_duration = handle.time_til_next.unwrap_or(Duration::from_secs(0)); 
185                          //uuid = handle.job_id;
186                      }
187                    }
188                }
189
190                //println!("Uuid: {} sleeping for {}", uuid, the_duration.as_secs());
191                sleep(the_duration).await;
192
193                let async_func;
194                let the_job_id;
195                //Pick the read lock up and check that we still have work to do
196                {
197                    let read_lock = job_copy.read().await;
198                    match *read_lock
199                    {
200                        JobType::CronJob(ref handle)
201                      | JobType::OneShot(ref handle)
202                      | JobType::Repeated(ref handle) => { 
203                            if handle.stopped 
204                            { 
205                                return Err(JobError::StoppedError("Job was stopped manually!".to_string())); 
206                            }
207                            async_func = handle.run_async.clone();
208                            the_job_id = handle.job_id.clone();
209                      },
210                    };
211                }
212                //Pick up the run_async lock
213                {
214                    //println!("Uuid: {} about to grab write lock and wait", uuid);
215                    let mut write_lock = async_func.lock().await;
216                    let future = (*write_lock)(the_job_id, jobs.clone());
217                    future.await.map_err(|err| { JobError::WrappedError(err.to_string()) })?;
218                }
219
220
221                //Pick up the write lock and set up the next round
222                {
223                    let mut write_lock = job_copy.write().await;
224                    if let JobType::OneShot(ref mut handle) = *write_lock
225                    {
226                        handle.stopped = true;
227                        return Ok(());
228                    }
229                }
230            }
231
232        });
233    }
234
235    async fn stop(&mut self) -> ()
236    {
237        let mut write_lock = self.write().await;
238        match *write_lock
239        {
240            JobType::CronJob(ref mut handle)
241          | JobType::Repeated(ref mut handle)
242          | JobType::OneShot(ref mut handle) => { handle.stopped = true; }
243        };
244    }
245}
246
247
248#[async_trait]
249impl LockedJobInterface for JobLocked {
250
251    /// Create a new async cron job.
252    ///
253    /// ```rust,ignore
254    /// let mut sched = JobScheduler::new();
255    /// // Run at second 0 of the 15th minute of the 6th, 8th, and 10th hour
256    /// // of any day in March and June that is a Friday of the year 2017.
257    /// let job = Job::new("0 15 6,8,10 * Mar,Jun Fri 2017", |_uuid, _lock| Box::pin( async move {
258    ///             println!("{:?} Hi I ran", chrono::Utc::now());
259    ///         }));
260    /// sched.add(job)
261    /// tokio::spawn(sched.start());
262    /// ```
263    fn make_new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
264    where
265        T: 'static,
266        T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
267        Self: Sized,
268    {
269        let schedule: Schedule = Schedule::from_str(schedule).map_err(|err| {JobError::InvalidArgument(err.to_string()) })?;
270        Ok(Arc::new(RwLock::new(JobType::CronJob(JobHandle {
271                                                schedule: Some(schedule),
272                                                run_async: Arc::new(Mutex::new(run)),
273                                                time_til_next: None,
274                                                job_id: Uuid::new_v4(),
275                                                stopped: false }))))
276
277
278    }
279
280   /// Create a new cron job.
281   ///
282   /// ```rust,ignore
283   /// let mut sched = JobScheduler::new();
284   /// // Run at second 0 of the 15th minute of the 6th, 8th, and 10th hour
285   /// // of any day in March and June that is a Friday of the year 2017.
286   /// let job = Job::new_cron_job("0 15 6,8,10 * Mar,Jun Fri 2017", |_uuid, _lock| {
287   ///             println!("{:?} Hi I ran", chrono::Utc::now());
288   ///         });
289   /// sched.add(job)
290   /// tokio::spawn(sched.start());
291   /// ```
292    fn new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
293    where
294        T: 'static,
295        T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
296        Self: Sized,
297    {
298        JobLocked::make_new_cron_job(schedule, run)
299    }
300
301
302    fn make_one_shot_job(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
303    where 
304        Self: Sized,
305    {
306
307        Ok(Arc::new(RwLock::new(JobType::OneShot(JobHandle {
308                                                    schedule: None,
309                                                    run_async,
310                                                    time_til_next: Some(duration),
311                                                    job_id: Uuid::new_v4(),
312                                                    stopped: false }))))
313    }
314
315    /// Create a new async one shot job.
316    ///
317    /// This is checked if it is running only after 500ms in 500ms intervals.
318    /// ```rust,ignore
319    /// let mut sched = JobScheduler::new();
320    /// let job = Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| Box::pin(async move {
321    ///            println!("{:?} I'm only run once", chrono::Utc::now());
322    ///        }));
323    /// sched.add(job)
324    /// tokio::spawn(sched.start());
325    /// ```
326    fn new_one_shot<T>(duration: Duration, run: T) -> Result<Self, JobError>
327        where
328            T: 'static,
329            T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
330            Self: Sized,
331    {
332        JobLocked::make_one_shot_job(duration, Arc::new(Mutex::new(run)))
333    }
334
335    fn make_new_one_shot_at_an_instant(instant: std::time::Instant, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
336    where 
337        Self: Sized,
338    {
339        let first = Instant::now();
340        if instant < first
341        {
342            return Err(JobError::CreationError("Instant is in the past".to_string()));
343        }
344        let job_duration = instant - first;
345        Ok(Arc::new(RwLock::new(JobType::OneShot(JobHandle {
346                                                    schedule: None,
347                                                    run_async,
348                                                    time_til_next: Some(job_duration),
349                                                    job_id: Uuid::new_v4(),
350                                                    stopped: false }))))
351    }
352
353    /// Create a new async one shot job that runs at an instant
354    ///
355    /// ```rust,ignore
356    /// // Run after 20 seconds
357    /// let mut sched = JobScheduler::new();
358    /// let instant = std::time::Instant::now().checked_add(std::time::Duration::from_secs(20));
359    /// let job = Job::new_one_shot_at_instant(instant, |_uuid, _lock| Box::pin(async move {println!("I run once after 20 seconds")}) );
360    /// sched.add(job)
361    /// tokio::spawn(sched.start());
362    /// ```
363    fn new_one_shot_at_instant<T>(instant: std::time::Instant, run: T) -> Result<Self, JobError>
364    where
365        T: 'static,
366        T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
367        Self: Sized,
368    {
369        JobLocked::make_new_one_shot_at_an_instant(instant, Arc::new(Mutex::new(run)))
370    }
371
372    fn make_new_repeated(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError> 
373    where 
374        Self: Sized,
375    {
376
377        let job = JobHandle {
378                        schedule: None,
379                        run_async,
380                        time_til_next: Some(duration),
381                        job_id: Uuid::new_v4(),
382                        stopped: false
383        };
384
385        Ok(Arc::new(RwLock::new(JobType::Repeated(job))))
386    }
387
388    /// Create a new async repeated job.
389    ///
390    /// This is checked if it is running only after 500ms in 500ms intervals.
391    /// ```rust,ignore
392    /// let mut sched = JobScheduler::new();
393    /// let job = Job::new_repeated(Duration::from_secs(8), |_uuid, _lock| Box::pin(async move {
394    ///     println!("{:?} I'm repeated every 8 seconds", chrono::Utc::now());
395    /// }));
396    /// sched.add(job)
397    /// tokio::spawn(sched.start());
398    /// ```
399    fn new_repeated<T>(duration: Duration, run: T) -> Result<Self, JobError>
400        where
401            T: 'static,
402            T:  FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
403            Self: Sized,
404    {
405        JobLocked::make_new_repeated(duration, Arc::new(Mutex::new(run)))
406    }
407
408    async fn get_job_id(&self) -> Uuid
409    {
410        self.job_id().await
411    }
412
413}