rocketmq_rust/schedule/
trigger.rs1use std::str::FromStr;
16use std::time::Duration;
17use std::time::SystemTime;
18use std::time::UNIX_EPOCH;
19
20use chrono::DateTime;
21use chrono::Utc;
22use cron::Schedule;
23
24use crate::schedule::SchedulerError;
25
26pub trait Trigger: Send + Sync {
28 fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime>;
30
31 fn has_next(&self, after: SystemTime) -> bool;
33
34 fn description(&self) -> String;
36
37 fn should_trigger_now(&self, now: SystemTime) -> bool {
39 if let Some(next_time) = self.next_execution_time(now) {
40 next_time <= now
41 } else {
42 false
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct CronTrigger {
50 schedule: Schedule,
51 expression: String,
52}
53
54impl CronTrigger {
55 pub fn new(expression: impl Into<String>) -> Result<Self, SchedulerError> {
56 let expression = expression.into();
57 let schedule = Schedule::from_str(&expression)
58 .map_err(|e| SchedulerError::TriggerError(format!("Invalid cron expression: {e}")))?;
59
60 Ok(Self { schedule, expression })
61 }
62
63 pub fn every_minute() -> Result<Self, SchedulerError> {
65 Self::new("0 * * * * *")
66 }
67
68 pub fn hourly() -> Result<Self, SchedulerError> {
70 Self::new("0 0 * * * *")
71 }
72
73 pub fn daily() -> Result<Self, SchedulerError> {
75 Self::new("0 0 0 * * *")
76 }
77
78 pub fn weekly() -> Result<Self, SchedulerError> {
80 Self::new("0 0 0 * * SUN")
81 }
82
83 pub fn monthly() -> Result<Self, SchedulerError> {
85 Self::new("0 0 0 1 * *")
86 }
87}
88
89impl Trigger for CronTrigger {
90 fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
91 let after_datetime = system_time_to_datetime(after);
92 self.schedule.after(&after_datetime).next().map(datetime_to_system_time)
93 }
94
95 fn has_next(&self, after: SystemTime) -> bool {
96 self.next_execution_time(after).is_some()
97 }
98
99 fn description(&self) -> String {
100 format!("Cron: {}", self.expression)
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct IntervalTrigger {
107 interval: Duration,
108 start_time: Option<SystemTime>,
109 end_time: Option<SystemTime>,
110 repeat_count: Option<u32>,
111 executed_count: u32,
112}
113
114impl IntervalTrigger {
115 pub fn new(interval: Duration) -> Self {
116 Self {
117 interval,
118 start_time: None,
119 end_time: None,
120 repeat_count: None,
121 executed_count: 0,
122 }
123 }
124
125 pub fn with_start_time(mut self, start_time: SystemTime) -> Self {
126 self.start_time = Some(start_time);
127 self
128 }
129
130 pub fn with_end_time(mut self, end_time: SystemTime) -> Self {
131 self.end_time = Some(end_time);
132 self
133 }
134
135 pub fn with_repeat_count(mut self, count: u32) -> Self {
136 self.repeat_count = Some(count);
137 self
138 }
139
140 pub fn every_seconds(seconds: u64) -> Self {
141 Self::new(Duration::from_secs(seconds))
142 }
143
144 pub fn every_minutes(minutes: u64) -> Self {
145 Self::new(Duration::from_secs(minutes * 60))
146 }
147
148 pub fn every_hours(hours: u64) -> Self {
149 Self::new(Duration::from_secs(hours * 3600))
150 }
151
152 fn increment_executed_count(&mut self) {
153 self.executed_count += 1;
154 }
155}
156
157impl Trigger for IntervalTrigger {
158 fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
159 if let Some(max_count) = self.repeat_count {
161 if self.executed_count >= max_count {
162 return None;
163 }
164 }
165
166 let start = self.start_time.unwrap_or(after);
167
168 if after < start {
170 return Some(start);
171 }
172
173 let next_time = if self.executed_count == 0 {
174 start
175 } else {
176 after + self.interval
177 };
178
179 if let Some(end) = self.end_time {
181 if next_time > end {
182 return None;
183 }
184 }
185
186 Some(next_time)
187 }
188
189 fn has_next(&self, after: SystemTime) -> bool {
190 self.next_execution_time(after).is_some()
191 }
192
193 fn description(&self) -> String {
194 format!("Interval: {:?}", self.interval)
195 }
196}
197
198#[derive(Debug, Clone)]
200pub struct DelayTrigger {
201 delay: Duration,
202 start_time: SystemTime,
203 executed: bool,
204}
205
206impl DelayTrigger {
207 pub fn new(delay: Duration) -> Self {
208 Self {
209 delay,
210 start_time: SystemTime::now(),
211 executed: false,
212 }
213 }
214
215 pub fn after_seconds(seconds: u64) -> Self {
216 Self::new(Duration::from_secs(seconds))
217 }
218
219 pub fn after_minutes(minutes: u64) -> Self {
220 Self::new(Duration::from_secs(minutes * 60))
221 }
222
223 pub fn after_hours(hours: u64) -> Self {
224 Self::new(Duration::from_secs(hours * 3600))
225 }
226
227 pub fn at_time(execution_time: SystemTime) -> Self {
228 let now = SystemTime::now();
229 let delay = execution_time.duration_since(now).unwrap_or(Duration::ZERO);
230 Self {
231 delay,
232 start_time: now,
233 executed: false,
234 }
235 }
236 pub fn mark_executed(&mut self) {
238 self.executed = true;
239 }
240
241 pub fn is_executed(&self) -> bool {
243 self.executed
244 }
245}
246
247impl Trigger for DelayTrigger {
248 fn next_execution_time(&self, _after: SystemTime) -> Option<SystemTime> {
249 if self.executed {
250 None
251 } else {
252 Some(self.start_time + self.delay)
253 }
254 }
255
256 fn has_next(&self, _after: SystemTime) -> bool {
257 !self.executed
258 }
259
260 fn description(&self) -> String {
261 format!("Delay: {:?}", self.delay)
262 }
263
264 fn should_trigger_now(&self, _now: SystemTime) -> bool {
265 if self.executed {
266 return false;
267 }
268
269 match self.start_time.elapsed() {
270 Ok(elapsed) => elapsed >= self.delay,
271 Err(_) => false,
272 }
273 }
274}
275
276#[derive(Debug, Clone)]
278pub struct DelayedIntervalTrigger {
279 interval: Duration,
280 initial_delay: Duration,
281 start_time: SystemTime,
282 last_execution: Option<SystemTime>,
283 end_time: Option<SystemTime>,
284 repeat_count: Option<u32>,
285 executed_count: u32,
286}
287
288impl DelayedIntervalTrigger {
289 pub fn new(interval: Duration, initial_delay: Duration) -> Self {
290 Self {
291 interval,
292 initial_delay,
293 start_time: SystemTime::now(),
294 last_execution: None,
295 end_time: None,
296 repeat_count: None,
297 executed_count: 0,
298 }
299 }
300
301 pub fn every_seconds_with_delay(interval_seconds: u64, delay_seconds: u64) -> Self {
302 Self::new(
303 Duration::from_secs(interval_seconds),
304 Duration::from_secs(delay_seconds),
305 )
306 }
307
308 pub fn every_minutes_with_delay(interval_minutes: u64, delay_minutes: u64) -> Self {
309 Self::new(
310 Duration::from_secs(interval_minutes * 60),
311 Duration::from_secs(delay_minutes * 60),
312 )
313 }
314
315 pub fn until(mut self, end_time: SystemTime) -> Self {
317 self.end_time = Some(end_time);
318 self
319 }
320
321 pub fn repeat(mut self, count: u32) -> Self {
323 self.repeat_count = Some(count);
324 self
325 }
326
327 pub fn execution_count(&self) -> u32 {
329 self.executed_count
330 }
331
332 pub fn mark_executed(&mut self, execution_time: SystemTime) {
334 self.last_execution = Some(execution_time);
335 self.executed_count += 1;
336 }
337
338 fn should_stop(&self, now: SystemTime) -> bool {
340 if let Some(end_time) = self.end_time {
342 if now >= end_time {
343 return true;
344 }
345 }
346
347 if let Some(repeat_count) = self.repeat_count {
349 if self.executed_count >= repeat_count {
350 return true;
351 }
352 }
353
354 false
355 }
356
357 fn first_execution_time(&self) -> SystemTime {
359 self.start_time + self.initial_delay
360 }
361}
362
363impl Trigger for DelayedIntervalTrigger {
364 fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
365 if self.should_stop(after) {
366 return None;
367 }
368
369 match self.last_execution {
370 None => {
371 let first_time = self.first_execution_time();
373 if first_time > after {
374 Some(first_time)
375 } else {
376 let elapsed_since_first = after.duration_since(first_time).unwrap_or(Duration::ZERO);
378 let intervals_passed = (elapsed_since_first.as_millis() / self.interval.as_millis()) + 1;
379 Some(first_time + Duration::from_millis((intervals_passed * self.interval.as_millis()) as u64))
380 }
381 }
382 Some(last) => {
383 let next_time = last + self.interval;
385 if next_time > after && !self.should_stop(next_time) {
386 Some(next_time)
387 } else if next_time <= after {
388 let elapsed = after.duration_since(last).unwrap_or(Duration::ZERO);
390 let intervals_passed = (elapsed.as_millis() / self.interval.as_millis()) + 1;
391 let calculated_next =
392 last + Duration::from_millis((intervals_passed * self.interval.as_millis()) as u64);
393
394 if !self.should_stop(calculated_next) {
395 Some(calculated_next)
396 } else {
397 None
398 }
399 } else {
400 None
401 }
402 }
403 }
404 }
405
406 fn has_next(&self, after: SystemTime) -> bool {
407 !self.should_stop(after) && self.next_execution_time(after).is_some()
408 }
409
410 fn description(&self) -> String {
411 let mut desc = format!(
412 "DelayedInterval: interval={:?}, initial_delay={:?}, executed_count={}",
413 self.interval, self.initial_delay, self.executed_count
414 );
415
416 if let Some(end_time) = self.end_time {
417 desc.push_str(&format!(", end_time={end_time:?}"));
418 }
419
420 if let Some(repeat_count) = self.repeat_count {
421 desc.push_str(&format!(", repeat_count={repeat_count}"));
422 }
423
424 desc
425 }
426
427 fn should_trigger_now(&self, now: SystemTime) -> bool {
428 if self.should_stop(now) {
429 return false;
430 }
431
432 match self.last_execution {
433 None => {
434 let first_time = self.first_execution_time();
436 now >= first_time
437 }
438 Some(last) => {
439 let next_time = last + self.interval;
441 now >= next_time
442 }
443 }
444 }
445}
446
447fn system_time_to_datetime(system_time: SystemTime) -> DateTime<Utc> {
449 DateTime::from(system_time)
450}
451
452fn datetime_to_system_time(datetime: DateTime<Utc>) -> SystemTime {
453 UNIX_EPOCH + Duration::from_secs(datetime.timestamp() as u64)
454}