Skip to main content

rocketmq_rust/schedule/
trigger.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::time::Duration;
17use std::time::SystemTime;
18use std::time::UNIX_EPOCH;
19
20use chrono::DateTime;
21use chrono::Utc;
22use cron::Schedule;
23
24use crate::schedule::SchedulerError;
25
26/// Trigger trait for determining when tasks should run
27pub trait Trigger: Send + Sync {
28    /// Get the next execution time after the given time
29    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime>;
30
31    /// Check if this trigger will fire again
32    fn has_next(&self, after: SystemTime) -> bool;
33
34    /// Get trigger description
35    fn description(&self) -> String;
36
37    /// Check if this trigger should fire now (new method for delay support)
38    fn should_trigger_now(&self, now: SystemTime) -> bool {
39        if let Some(next_time) = self.next_execution_time(now) {
40            next_time <= now
41        } else {
42            false
43        }
44    }
45}
46
47/// Cron-based trigger
48#[derive(Debug, Clone)]
49pub struct CronTrigger {
50    schedule: Schedule,
51    expression: String,
52}
53
54impl CronTrigger {
55    pub fn new(expression: impl Into<String>) -> Result<Self, SchedulerError> {
56        let expression = expression.into();
57        let schedule = Schedule::from_str(&expression)
58            .map_err(|e| SchedulerError::TriggerError(format!("Invalid cron expression: {e}")))?;
59
60        Ok(Self { schedule, expression })
61    }
62
63    /// Create a trigger that fires every minute
64    pub fn every_minute() -> Result<Self, SchedulerError> {
65        Self::new("0 * * * * *")
66    }
67
68    /// Create a trigger that fires every hour at minute 0
69    pub fn hourly() -> Result<Self, SchedulerError> {
70        Self::new("0 0 * * * *")
71    }
72
73    /// Create a trigger that fires daily at midnight
74    pub fn daily() -> Result<Self, SchedulerError> {
75        Self::new("0 0 0 * * *")
76    }
77
78    /// Create a trigger that fires weekly on Sunday at midnight
79    pub fn weekly() -> Result<Self, SchedulerError> {
80        Self::new("0 0 0 * * SUN")
81    }
82
83    /// Create a trigger that fires monthly on the 1st at midnight
84    pub fn monthly() -> Result<Self, SchedulerError> {
85        Self::new("0 0 0 1 * *")
86    }
87}
88
89impl Trigger for CronTrigger {
90    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
91        let after_datetime = system_time_to_datetime(after);
92        self.schedule.after(&after_datetime).next().map(datetime_to_system_time)
93    }
94
95    fn has_next(&self, after: SystemTime) -> bool {
96        self.next_execution_time(after).is_some()
97    }
98
99    fn description(&self) -> String {
100        format!("Cron: {}", self.expression)
101    }
102}
103
104/// Interval-based trigger
105#[derive(Debug, Clone)]
106pub struct IntervalTrigger {
107    interval: Duration,
108    start_time: Option<SystemTime>,
109    end_time: Option<SystemTime>,
110    repeat_count: Option<u32>,
111    executed_count: u32,
112}
113
114impl IntervalTrigger {
115    pub fn new(interval: Duration) -> Self {
116        Self {
117            interval,
118            start_time: None,
119            end_time: None,
120            repeat_count: None,
121            executed_count: 0,
122        }
123    }
124
125    pub fn with_start_time(mut self, start_time: SystemTime) -> Self {
126        self.start_time = Some(start_time);
127        self
128    }
129
130    pub fn with_end_time(mut self, end_time: SystemTime) -> Self {
131        self.end_time = Some(end_time);
132        self
133    }
134
135    pub fn with_repeat_count(mut self, count: u32) -> Self {
136        self.repeat_count = Some(count);
137        self
138    }
139
140    pub fn every_seconds(seconds: u64) -> Self {
141        Self::new(Duration::from_secs(seconds))
142    }
143
144    pub fn every_minutes(minutes: u64) -> Self {
145        Self::new(Duration::from_secs(minutes * 60))
146    }
147
148    pub fn every_hours(hours: u64) -> Self {
149        Self::new(Duration::from_secs(hours * 3600))
150    }
151
152    fn increment_executed_count(&mut self) {
153        self.executed_count += 1;
154    }
155}
156
157impl Trigger for IntervalTrigger {
158    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
159        // Check repeat count limit
160        if let Some(max_count) = self.repeat_count {
161            if self.executed_count >= max_count {
162                return None;
163            }
164        }
165
166        let start = self.start_time.unwrap_or(after);
167
168        // If we haven't started yet, return start time
169        if after < start {
170            return Some(start);
171        }
172
173        let next_time = if self.executed_count == 0 {
174            start
175        } else {
176            after + self.interval
177        };
178
179        // Check end time limit
180        if let Some(end) = self.end_time {
181            if next_time > end {
182                return None;
183            }
184        }
185
186        Some(next_time)
187    }
188
189    fn has_next(&self, after: SystemTime) -> bool {
190        self.next_execution_time(after).is_some()
191    }
192
193    fn description(&self) -> String {
194        format!("Interval: {:?}", self.interval)
195    }
196}
197
198/// One-time delay trigger
199#[derive(Debug, Clone)]
200pub struct DelayTrigger {
201    delay: Duration,
202    start_time: SystemTime,
203    executed: bool,
204}
205
206impl DelayTrigger {
207    pub fn new(delay: Duration) -> Self {
208        Self {
209            delay,
210            start_time: SystemTime::now(),
211            executed: false,
212        }
213    }
214
215    pub fn after_seconds(seconds: u64) -> Self {
216        Self::new(Duration::from_secs(seconds))
217    }
218
219    pub fn after_minutes(minutes: u64) -> Self {
220        Self::new(Duration::from_secs(minutes * 60))
221    }
222
223    pub fn after_hours(hours: u64) -> Self {
224        Self::new(Duration::from_secs(hours * 3600))
225    }
226
227    pub fn at_time(execution_time: SystemTime) -> Self {
228        let now = SystemTime::now();
229        let delay = execution_time.duration_since(now).unwrap_or(Duration::ZERO);
230        Self {
231            delay,
232            start_time: now,
233            executed: false,
234        }
235    }
236    /// Mark as executed (used internally by scheduler)
237    pub fn mark_executed(&mut self) {
238        self.executed = true;
239    }
240
241    /// Check if already executed
242    pub fn is_executed(&self) -> bool {
243        self.executed
244    }
245}
246
247impl Trigger for DelayTrigger {
248    fn next_execution_time(&self, _after: SystemTime) -> Option<SystemTime> {
249        if self.executed {
250            None
251        } else {
252            Some(self.start_time + self.delay)
253        }
254    }
255
256    fn has_next(&self, _after: SystemTime) -> bool {
257        !self.executed
258    }
259
260    fn description(&self) -> String {
261        format!("Delay: {:?}", self.delay)
262    }
263
264    fn should_trigger_now(&self, _now: SystemTime) -> bool {
265        if self.executed {
266            return false;
267        }
268
269        match self.start_time.elapsed() {
270            Ok(elapsed) => elapsed >= self.delay,
271            Err(_) => false,
272        }
273    }
274}
275
276/// Interval trigger with initial delay
277#[derive(Debug, Clone)]
278pub struct DelayedIntervalTrigger {
279    interval: Duration,
280    initial_delay: Duration,
281    start_time: SystemTime,
282    last_execution: Option<SystemTime>,
283    end_time: Option<SystemTime>,
284    repeat_count: Option<u32>,
285    executed_count: u32,
286}
287
288impl DelayedIntervalTrigger {
289    pub fn new(interval: Duration, initial_delay: Duration) -> Self {
290        Self {
291            interval,
292            initial_delay,
293            start_time: SystemTime::now(),
294            last_execution: None,
295            end_time: None,
296            repeat_count: None,
297            executed_count: 0,
298        }
299    }
300
301    pub fn every_seconds_with_delay(interval_seconds: u64, delay_seconds: u64) -> Self {
302        Self::new(
303            Duration::from_secs(interval_seconds),
304            Duration::from_secs(delay_seconds),
305        )
306    }
307
308    pub fn every_minutes_with_delay(interval_minutes: u64, delay_minutes: u64) -> Self {
309        Self::new(
310            Duration::from_secs(interval_minutes * 60),
311            Duration::from_secs(delay_minutes * 60),
312        )
313    }
314
315    /// Set an end time for the trigger
316    pub fn until(mut self, end_time: SystemTime) -> Self {
317        self.end_time = Some(end_time);
318        self
319    }
320
321    /// Set a maximum number of executions
322    pub fn repeat(mut self, count: u32) -> Self {
323        self.repeat_count = Some(count);
324        self
325    }
326
327    /// Get the number of times this trigger has been executed
328    pub fn execution_count(&self) -> u32 {
329        self.executed_count
330    }
331
332    /// Mark that an execution has occurred
333    pub fn mark_executed(&mut self, execution_time: SystemTime) {
334        self.last_execution = Some(execution_time);
335        self.executed_count += 1;
336    }
337
338    /// Check if this trigger should stop executing
339    fn should_stop(&self, now: SystemTime) -> bool {
340        // Check if we've reached the end time
341        if let Some(end_time) = self.end_time {
342            if now >= end_time {
343                return true;
344            }
345        }
346
347        // Check if we've reached the repeat count
348        if let Some(repeat_count) = self.repeat_count {
349            if self.executed_count >= repeat_count {
350                return true;
351            }
352        }
353
354        false
355    }
356
357    /// Calculate the first execution time
358    fn first_execution_time(&self) -> SystemTime {
359        self.start_time + self.initial_delay
360    }
361}
362
363impl Trigger for DelayedIntervalTrigger {
364    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
365        if self.should_stop(after) {
366            return None;
367        }
368
369        match self.last_execution {
370            None => {
371                // First execution
372                let first_time = self.first_execution_time();
373                if first_time > after {
374                    Some(first_time)
375                } else {
376                    // Calculate next proper interval if first time passed
377                    let elapsed_since_first = after.duration_since(first_time).unwrap_or(Duration::ZERO);
378                    let intervals_passed = (elapsed_since_first.as_millis() / self.interval.as_millis()) + 1;
379                    Some(first_time + Duration::from_millis((intervals_passed * self.interval.as_millis()) as u64))
380                }
381            }
382            Some(last) => {
383                // Subsequent executions
384                let next_time = last + self.interval;
385                if next_time > after && !self.should_stop(next_time) {
386                    Some(next_time)
387                } else if next_time <= after {
388                    // Calculate proper next interval if time passed
389                    let elapsed = after.duration_since(last).unwrap_or(Duration::ZERO);
390                    let intervals_passed = (elapsed.as_millis() / self.interval.as_millis()) + 1;
391                    let calculated_next =
392                        last + Duration::from_millis((intervals_passed * self.interval.as_millis()) as u64);
393
394                    if !self.should_stop(calculated_next) {
395                        Some(calculated_next)
396                    } else {
397                        None
398                    }
399                } else {
400                    None
401                }
402            }
403        }
404    }
405
406    fn has_next(&self, after: SystemTime) -> bool {
407        !self.should_stop(after) && self.next_execution_time(after).is_some()
408    }
409
410    fn description(&self) -> String {
411        let mut desc = format!(
412            "DelayedInterval: interval={:?}, initial_delay={:?}, executed_count={}",
413            self.interval, self.initial_delay, self.executed_count
414        );
415
416        if let Some(end_time) = self.end_time {
417            desc.push_str(&format!(", end_time={end_time:?}"));
418        }
419
420        if let Some(repeat_count) = self.repeat_count {
421            desc.push_str(&format!(", repeat_count={repeat_count}"));
422        }
423
424        desc
425    }
426
427    fn should_trigger_now(&self, now: SystemTime) -> bool {
428        if self.should_stop(now) {
429            return false;
430        }
431
432        match self.last_execution {
433            None => {
434                // Check if first execution time has arrived
435                let first_time = self.first_execution_time();
436                now >= first_time
437            }
438            Some(last) => {
439                // Check if next execution time has arrived
440                let next_time = last + self.interval;
441                now >= next_time
442            }
443        }
444    }
445}
446
447// Helper functions for time conversion
448fn system_time_to_datetime(system_time: SystemTime) -> DateTime<Utc> {
449    DateTime::from(system_time)
450}
451
452fn datetime_to_system_time(datetime: DateTime<Utc>) -> SystemTime {
453    UNIX_EPOCH + Duration::from_secs(datetime.timestamp() as u64)
454}