1use crate::job_scheduler::JobsSchedulerLocked;
2use cron::Schedule;
3use chrono::Utc;
4use std::str::FromStr;
5use tokio::sync::{RwLock, Mutex};
6use tokio::time::{Duration,sleep};
7use std::sync::Arc;
8use uuid::Uuid;
9use std::pin::Pin;
10use std::fmt;
11use std::future::Future;
12use std::error::Error;
13use std::time::Instant;
14use async_trait::async_trait;
15
16
17pub type JobToRunAsync = dyn FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send;
18
19pub type JobLocked = Arc<RwLock<JobType>>;
22
23#[derive(Debug)]
24pub enum JobError{
25 StoppedError(String),
26 CreationError(String),
27 InvalidArgument(String),
28 WrappedError(String),
29}
30
31impl fmt::Display for JobError {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 match &*self {
34 JobError::StoppedError(err_message) => write!(f, " Job stopped error -- {}", err_message),
35 JobError::CreationError(err_message) => write!(f, " Job creation error -- {}", err_message),
36 JobError::InvalidArgument(err_message) => write!(f, " Invalide argument error -- {}", err_message),
37 JobError::WrappedError(err_message) => write!(f, " Wrapped error -- {}", err_message),
38 }
39 }
40}
41
42impl Error for JobError{
43 fn source(&self) -> Option<&(dyn Error + 'static)>
44 {
45 match *self {
46 _ => None
47 }
48 }
49}
50
51#[async_trait]
52pub trait LockedJobInterface {
53
54 #[doc(hidden)]
55 fn make_new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
56 where
57 T: 'static,
58 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
59 Self: Sized;
60
61
62 fn new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
63 where
64 T: 'static,
65 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
66 Self: Sized;
67
68 #[doc(hidden)]
69 fn make_one_shot_job(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
70 where
71 Self: Sized;
72
73
74 fn new_one_shot<T>(duration: Duration, run: T) -> Result<Self, JobError>
75 where
76 T: 'static,
77 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
78 Self: Sized;
79
80 #[doc(hidden)]
81 fn make_new_one_shot_at_an_instant(instant: std::time::Instant, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
82 where
83 Self: Sized;
84
85 fn new_one_shot_at_instant<T>(instant: std::time::Instant, run: T) -> Result<Self, JobError>
86 where
87 T: 'static,
88 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
89 Self: Sized;
90
91 #[doc(hidden)]
92 fn make_new_repeated(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
93 where
94 Self: Sized;
95
96 fn new_repeated<T>(duration: Duration, run: T) -> Result<Self, JobError>
97 where
98 T: 'static,
99 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
100 Self: Sized;
101
102 async fn get_job_id(&self) -> Uuid;
103
104}
105
106#[async_trait]
107pub trait Job {
108 async fn job_id(&self) -> Uuid;
109 async fn run(&mut self, jobs: JobsSchedulerLocked);
110 async fn stop(&mut self) -> ();
111 }
113
114pub struct JobHandle {
115 pub schedule: Option<Schedule>,
116 pub run_async: Arc<Mutex<JobToRunAsync>>,
117 pub time_til_next: Option<Duration>,
118 pub job_id: Uuid,
119 pub stopped: bool,
120}
121
122impl fmt::Debug for JobHandle {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("JobHandle")
126 .field("schedule", &self.schedule)
127 .field("time_til_next", &self.time_til_next)
128 .field("job_id", &self.job_id)
129 .field("stopped", &self.stopped)
130 .finish()
131 }
132}
133
134#[derive(Debug)]
135pub enum JobType {
136 CronJob(JobHandle),
137 OneShot(JobHandle),
138 Repeated(JobHandle)
139}
140
141#[async_trait]
142impl Job for JobLocked
143{
144 async fn job_id(&self) -> Uuid {
145 let read_lock = self.read().await;
146 match *read_lock
147 {
148 JobType::CronJob(ref handle)
149 | JobType::OneShot(ref handle)
150 | JobType::Repeated(ref handle) => { handle.job_id }
151 }
152 }
153
154 async fn run(&mut self, jobs: JobsSchedulerLocked) {
155 let job_copy = self.clone();
156 tokio::task::spawn(async move {
158
159 loop {
160 let the_duration : Duration;
162 {
164 let read_lock = job_copy.read().await;
165 match *read_lock
166 {
167 JobType::CronJob(ref handle) =>
168 {
169 let next_date_time = handle.schedule.as_ref().ok_or(JobError::InvalidArgument("Schedule is None".to_string()))?;
170 let next_date_time = next_date_time.upcoming(Utc).next();
171 if let Some(date_time) = next_date_time
172 {
173 the_duration = (date_time - Utc::now()).to_std().map_err(|err| { JobError::InvalidArgument(format!("{:#?}", err.source())) })?;
174 }
175 else
176 {
177 the_duration = Duration::from_secs(0)
178 }
179 },
181 JobType::OneShot(ref handle)
182 | JobType::Repeated(ref handle) =>
183 {
184 the_duration = handle.time_til_next.unwrap_or(Duration::from_secs(0));
185 }
187 }
188 }
189
190 sleep(the_duration).await;
192
193 let async_func;
194 let the_job_id;
195 {
197 let read_lock = job_copy.read().await;
198 match *read_lock
199 {
200 JobType::CronJob(ref handle)
201 | JobType::OneShot(ref handle)
202 | JobType::Repeated(ref handle) => {
203 if handle.stopped
204 {
205 return Err(JobError::StoppedError("Job was stopped manually!".to_string()));
206 }
207 async_func = handle.run_async.clone();
208 the_job_id = handle.job_id.clone();
209 },
210 };
211 }
212 {
214 let mut write_lock = async_func.lock().await;
216 let future = (*write_lock)(the_job_id, jobs.clone());
217 future.await.map_err(|err| { JobError::WrappedError(err.to_string()) })?;
218 }
219
220
221 {
223 let mut write_lock = job_copy.write().await;
224 if let JobType::OneShot(ref mut handle) = *write_lock
225 {
226 handle.stopped = true;
227 return Ok(());
228 }
229 }
230 }
231
232 });
233 }
234
235 async fn stop(&mut self) -> ()
236 {
237 let mut write_lock = self.write().await;
238 match *write_lock
239 {
240 JobType::CronJob(ref mut handle)
241 | JobType::Repeated(ref mut handle)
242 | JobType::OneShot(ref mut handle) => { handle.stopped = true; }
243 };
244 }
245}
246
247
248#[async_trait]
249impl LockedJobInterface for JobLocked {
250
251 fn make_new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
264 where
265 T: 'static,
266 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
267 Self: Sized,
268 {
269 let schedule: Schedule = Schedule::from_str(schedule).map_err(|err| {JobError::InvalidArgument(err.to_string()) })?;
270 Ok(Arc::new(RwLock::new(JobType::CronJob(JobHandle {
271 schedule: Some(schedule),
272 run_async: Arc::new(Mutex::new(run)),
273 time_til_next: None,
274 job_id: Uuid::new_v4(),
275 stopped: false }))))
276
277
278 }
279
280 fn new_cron_job<T>(schedule: &str, run: T) -> Result<Self, JobError>
293 where
294 T: 'static,
295 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
296 Self: Sized,
297 {
298 JobLocked::make_new_cron_job(schedule, run)
299 }
300
301
302 fn make_one_shot_job(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
303 where
304 Self: Sized,
305 {
306
307 Ok(Arc::new(RwLock::new(JobType::OneShot(JobHandle {
308 schedule: None,
309 run_async,
310 time_til_next: Some(duration),
311 job_id: Uuid::new_v4(),
312 stopped: false }))))
313 }
314
315 fn new_one_shot<T>(duration: Duration, run: T) -> Result<Self, JobError>
327 where
328 T: 'static,
329 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
330 Self: Sized,
331 {
332 JobLocked::make_one_shot_job(duration, Arc::new(Mutex::new(run)))
333 }
334
335 fn make_new_one_shot_at_an_instant(instant: std::time::Instant, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
336 where
337 Self: Sized,
338 {
339 let first = Instant::now();
340 if instant < first
341 {
342 return Err(JobError::CreationError("Instant is in the past".to_string()));
343 }
344 let job_duration = instant - first;
345 Ok(Arc::new(RwLock::new(JobType::OneShot(JobHandle {
346 schedule: None,
347 run_async,
348 time_til_next: Some(job_duration),
349 job_id: Uuid::new_v4(),
350 stopped: false }))))
351 }
352
353 fn new_one_shot_at_instant<T>(instant: std::time::Instant, run: T) -> Result<Self, JobError>
364 where
365 T: 'static,
366 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
367 Self: Sized,
368 {
369 JobLocked::make_new_one_shot_at_an_instant(instant, Arc::new(Mutex::new(run)))
370 }
371
372 fn make_new_repeated(duration: Duration, run_async: Arc<Mutex<JobToRunAsync>>) -> Result<Self, JobError>
373 where
374 Self: Sized,
375 {
376
377 let job = JobHandle {
378 schedule: None,
379 run_async,
380 time_til_next: Some(duration),
381 job_id: Uuid::new_v4(),
382 stopped: false
383 };
384
385 Ok(Arc::new(RwLock::new(JobType::Repeated(job))))
386 }
387
388 fn new_repeated<T>(duration: Duration, run: T) -> Result<Self, JobError>
400 where
401 T: 'static,
402 T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send,
403 Self: Sized,
404 {
405 JobLocked::make_new_repeated(duration, Arc::new(Mutex::new(run)))
406 }
407
408 async fn get_job_id(&self) -> Uuid
409 {
410 self.job_id().await
411 }
412
413}