tokio_easy_timer/job/
async_job.rs

1use std::{marker::PhantomData, time::Duration};
2
3use chrono::TimeZone;
4
5use crate::{extensions::Extensions, interval::Interval, scheduler::BoxedJob};
6
7use super::{
8    jobschedule::{JobSchedule, JobScheduleBuilder},
9    AsyncHandler, Job, JobBuilder,
10};
11
12#[derive(Clone)]
13pub struct AsyncJob<Args, F> {
14    pub f: F,
15    pub jobschedules: Vec<JobSchedule>,
16    pub _phantom: PhantomData<Args>,
17}
18
19pub struct AsyncJobBuilder<Args> {
20    jobschedules: Vec<JobSchedule>,
21    builder: JobScheduleBuilder,
22    _phantom: PhantomData<Args>,
23}
24
25impl<Args, F, Tz> Job<Tz> for AsyncJob<Args, F>
26where
27    F: AsyncHandler<Args> + Send + 'static + Copy,
28    Args: Send + 'static + Clone,
29    Tz: TimeZone + Send + Copy + 'static,
30    <Tz as TimeZone>::Offset: Send,
31{
32    fn box_clone(&self) -> Box<(dyn Job<Tz> + Send + 'static)> {
33        Box::new((*self).clone())
34    }
35
36    fn start_schedule(&self, e: Extensions, tz: Tz) {
37        for schedule in self.jobschedules.iter() {
38            // spawn a task for every corn schedule
39            let f = self.f.clone();
40            let e = e.clone();
41            let schedule = schedule.clone();
42            tokio::spawn(async move {
43                // delay
44                let now = chrono::Local::now().with_timezone(&tz);
45                let since = schedule.since;
46                let wait_to = tz
47                    .ymd(since.0, since.1, since.2)
48                    .and_hms(since.3, since.4, since.5);
49                let d = wait_to.timestamp() - now.timestamp();
50                if d > 0 {
51                    tokio::time::sleep(Duration::from_secs(d as u64)).await;
52                }
53
54                // run jobs
55                for next in schedule.schedule.upcoming(tz) {
56                    // Calculates the time left until the next task run
57                    let now = chrono::Local::now().with_timezone(&tz);
58                    let d = next.timestamp() - now.timestamp();
59                    if d < 0 {
60                        continue;
61                    }
62                    let e = e.clone();
63                    // prepare a task for the next job
64                    tokio::spawn(async move {
65                        // Wait until the next job runs
66                        tokio::time::sleep(Duration::from_secs(d as u64)).await;
67
68                        // Handle repeat
69                        for i in 0..schedule.repeat {
70                            if schedule.is_async {
71                                let e = e.clone();
72                                tokio::spawn(async move {
73                                    f.call(&e).await;
74                                });
75                            } else {
76                                f.call(&e).await;
77                            }
78                            if schedule.interval > 0 && i < schedule.repeat - 1 {
79                                tokio::time::sleep(Duration::from_secs(schedule.interval)).await;
80                            }
81                        }
82                    });
83
84                    // wait until this task run
85                    tokio::time::sleep(Duration::from_secs(d as u64)).await;
86                }
87            });
88        }
89    }
90}
91
92impl<Args> AsyncJobBuilder<Args>
93where
94    Args: Clone + 'static + Send + Sync,
95{
96    /// Constructs a new async job
97    pub fn run<Tz, F>(&mut self, f: F) -> BoxedJob<Tz>
98    where
99        F: AsyncHandler<Args> + Send + 'static + Clone + Copy,
100        Tz: TimeZone + Send + Sync + 'static + Clone + Copy,
101        <Tz as TimeZone>::Offset: Send + Sync,
102    {
103        self.and();
104        let job: AsyncJob<Args, F> = AsyncJob {
105            f: f.to_owned(),
106            jobschedules: self.jobschedules.clone(),
107            _phantom: PhantomData,
108        };
109        let res = Box::new(job);
110        res
111    }
112
113    // / Constructs a new async job
114    // pub fn build<Tz, F>(&mut self, f: F) -> AsyncJob<Args, F>
115    // where
116    //     F: AsyncHandler<Args> + Send + 'static + Clone + Copy,
117    //     Tz: TimeZone + Send + Sync + 'static + Clone + Copy,
118    //     <Tz as TimeZone>::Offset: Send + Sync,
119    // {
120    //     self.and();
121    //     let job: AsyncJob<Args, F> = AsyncJob {
122    //         f: f.to_owned(),
123    //         jobschedules: self.jobschedules.clone(),
124    //         _phantom: PhantomData,
125    //     };
126    //     job
127    // }
128}
129
130impl<Args> JobBuilder<Args> for AsyncJobBuilder<Args> {
131    fn new() -> Self {
132        Self {
133            _phantom: PhantomData,
134            jobschedules: vec![],
135            builder: JobScheduleBuilder::new(),
136        }
137    }
138
139    fn and(&mut self) -> &mut Self {
140        self.jobschedules.push(self.builder.build());
141        self.builder = JobScheduleBuilder::new();
142        self
143    }
144
145    fn get_mut_cron_builder(&mut self) -> &mut JobScheduleBuilder {
146        &mut self.builder
147    }
148
149    fn get_mut_since(&mut self) -> &mut (i32, u32, u32, u32, u32, u32) {
150        &mut self.builder.since
151    }
152
153    fn repeat_seq(&mut self, n: u32, interval: Interval) -> &mut Self {
154        self.builder.is_async = false;
155        self.builder.repeat = n;
156        self.builder.interval = interval.to_sec();
157        self
158    }
159
160    fn repeat_async(&mut self, n: u32, interval: Interval) -> &mut Self {
161        self.builder.is_async = true;
162        self.builder.repeat = n;
163        self.builder.interval = interval.to_sec();
164        self
165    }
166}