tokio_easy_timer/job/
async_job.rs1use std::{marker::PhantomData, time::Duration};
2
3use chrono::TimeZone;
4
5use crate::{extensions::Extensions, interval::Interval, scheduler::BoxedJob};
6
7use super::{
8 jobschedule::{JobSchedule, JobScheduleBuilder},
9 AsyncHandler, Job, JobBuilder,
10};
11
12#[derive(Clone)]
13pub struct AsyncJob<Args, F> {
14 pub f: F,
15 pub jobschedules: Vec<JobSchedule>,
16 pub _phantom: PhantomData<Args>,
17}
18
19pub struct AsyncJobBuilder<Args> {
20 jobschedules: Vec<JobSchedule>,
21 builder: JobScheduleBuilder,
22 _phantom: PhantomData<Args>,
23}
24
25impl<Args, F, Tz> Job<Tz> for AsyncJob<Args, F>
26where
27 F: AsyncHandler<Args> + Send + 'static + Copy,
28 Args: Send + 'static + Clone,
29 Tz: TimeZone + Send + Copy + 'static,
30 <Tz as TimeZone>::Offset: Send,
31{
32 fn box_clone(&self) -> Box<(dyn Job<Tz> + Send + 'static)> {
33 Box::new((*self).clone())
34 }
35
36 fn start_schedule(&self, e: Extensions, tz: Tz) {
37 for schedule in self.jobschedules.iter() {
38 let f = self.f.clone();
40 let e = e.clone();
41 let schedule = schedule.clone();
42 tokio::spawn(async move {
43 let now = chrono::Local::now().with_timezone(&tz);
45 let since = schedule.since;
46 let wait_to = tz
47 .ymd(since.0, since.1, since.2)
48 .and_hms(since.3, since.4, since.5);
49 let d = wait_to.timestamp() - now.timestamp();
50 if d > 0 {
51 tokio::time::sleep(Duration::from_secs(d as u64)).await;
52 }
53
54 for next in schedule.schedule.upcoming(tz) {
56 let now = chrono::Local::now().with_timezone(&tz);
58 let d = next.timestamp() - now.timestamp();
59 if d < 0 {
60 continue;
61 }
62 let e = e.clone();
63 tokio::spawn(async move {
65 tokio::time::sleep(Duration::from_secs(d as u64)).await;
67
68 for i in 0..schedule.repeat {
70 if schedule.is_async {
71 let e = e.clone();
72 tokio::spawn(async move {
73 f.call(&e).await;
74 });
75 } else {
76 f.call(&e).await;
77 }
78 if schedule.interval > 0 && i < schedule.repeat - 1 {
79 tokio::time::sleep(Duration::from_secs(schedule.interval)).await;
80 }
81 }
82 });
83
84 tokio::time::sleep(Duration::from_secs(d as u64)).await;
86 }
87 });
88 }
89 }
90}
91
92impl<Args> AsyncJobBuilder<Args>
93where
94 Args: Clone + 'static + Send + Sync,
95{
96 pub fn run<Tz, F>(&mut self, f: F) -> BoxedJob<Tz>
98 where
99 F: AsyncHandler<Args> + Send + 'static + Clone + Copy,
100 Tz: TimeZone + Send + Sync + 'static + Clone + Copy,
101 <Tz as TimeZone>::Offset: Send + Sync,
102 {
103 self.and();
104 let job: AsyncJob<Args, F> = AsyncJob {
105 f: f.to_owned(),
106 jobschedules: self.jobschedules.clone(),
107 _phantom: PhantomData,
108 };
109 let res = Box::new(job);
110 res
111 }
112
113 }
129
130impl<Args> JobBuilder<Args> for AsyncJobBuilder<Args> {
131 fn new() -> Self {
132 Self {
133 _phantom: PhantomData,
134 jobschedules: vec![],
135 builder: JobScheduleBuilder::new(),
136 }
137 }
138
139 fn and(&mut self) -> &mut Self {
140 self.jobschedules.push(self.builder.build());
141 self.builder = JobScheduleBuilder::new();
142 self
143 }
144
145 fn get_mut_cron_builder(&mut self) -> &mut JobScheduleBuilder {
146 &mut self.builder
147 }
148
149 fn get_mut_since(&mut self) -> &mut (i32, u32, u32, u32, u32, u32) {
150 &mut self.builder.since
151 }
152
153 fn repeat_seq(&mut self, n: u32, interval: Interval) -> &mut Self {
154 self.builder.is_async = false;
155 self.builder.repeat = n;
156 self.builder.interval = interval.to_sec();
157 self
158 }
159
160 fn repeat_async(&mut self, n: u32, interval: Interval) -> &mut Self {
161 self.builder.is_async = true;
162 self.builder.repeat = n;
163 self.builder.interval = interval.to_sec();
164 self
165 }
166}