rocketmq_rust/schedule/
trigger.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::str::FromStr;
19use std::time::Duration;
20use std::time::SystemTime;
21use std::time::UNIX_EPOCH;
22
23use chrono::DateTime;
24use chrono::Utc;
25use cron::Schedule;
26
27use crate::schedule::SchedulerError;
28
29/// Trigger trait for determining when tasks should run
30pub trait Trigger: Send + Sync {
31    /// Get the next execution time after the given time
32    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime>;
33
34    /// Check if this trigger will fire again
35    fn has_next(&self, after: SystemTime) -> bool;
36
37    /// Get trigger description
38    fn description(&self) -> String;
39
40    /// Check if this trigger should fire now (new method for delay support)
41    fn should_trigger_now(&self, now: SystemTime) -> bool {
42        if let Some(next_time) = self.next_execution_time(now) {
43            next_time <= now
44        } else {
45            false
46        }
47    }
48}
49
50/// Cron-based trigger
51#[derive(Debug, Clone)]
52pub struct CronTrigger {
53    schedule: Schedule,
54    expression: String,
55}
56
57impl CronTrigger {
58    pub fn new(expression: impl Into<String>) -> Result<Self, SchedulerError> {
59        let expression = expression.into();
60        let schedule = Schedule::from_str(&expression)
61            .map_err(|e| SchedulerError::TriggerError(format!("Invalid cron expression: {e}")))?;
62
63        Ok(Self {
64            schedule,
65            expression,
66        })
67    }
68
69    /// Create a trigger that fires every minute
70    pub fn every_minute() -> Result<Self, SchedulerError> {
71        Self::new("0 * * * * *")
72    }
73
74    /// Create a trigger that fires every hour at minute 0
75    pub fn hourly() -> Result<Self, SchedulerError> {
76        Self::new("0 0 * * * *")
77    }
78
79    /// Create a trigger that fires daily at midnight
80    pub fn daily() -> Result<Self, SchedulerError> {
81        Self::new("0 0 0 * * *")
82    }
83
84    /// Create a trigger that fires weekly on Sunday at midnight
85    pub fn weekly() -> Result<Self, SchedulerError> {
86        Self::new("0 0 0 * * SUN")
87    }
88
89    /// Create a trigger that fires monthly on the 1st at midnight
90    pub fn monthly() -> Result<Self, SchedulerError> {
91        Self::new("0 0 0 1 * *")
92    }
93}
94
95impl Trigger for CronTrigger {
96    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
97        let after_datetime = system_time_to_datetime(after);
98        self.schedule
99            .after(&after_datetime)
100            .next()
101            .map(datetime_to_system_time)
102    }
103
104    fn has_next(&self, after: SystemTime) -> bool {
105        self.next_execution_time(after).is_some()
106    }
107
108    fn description(&self) -> String {
109        format!("Cron: {}", self.expression)
110    }
111}
112
113/// Interval-based trigger
114#[derive(Debug, Clone)]
115pub struct IntervalTrigger {
116    interval: Duration,
117    start_time: Option<SystemTime>,
118    end_time: Option<SystemTime>,
119    repeat_count: Option<u32>,
120    executed_count: u32,
121}
122
123impl IntervalTrigger {
124    pub fn new(interval: Duration) -> Self {
125        Self {
126            interval,
127            start_time: None,
128            end_time: None,
129            repeat_count: None,
130            executed_count: 0,
131        }
132    }
133
134    pub fn with_start_time(mut self, start_time: SystemTime) -> Self {
135        self.start_time = Some(start_time);
136        self
137    }
138
139    pub fn with_end_time(mut self, end_time: SystemTime) -> Self {
140        self.end_time = Some(end_time);
141        self
142    }
143
144    pub fn with_repeat_count(mut self, count: u32) -> Self {
145        self.repeat_count = Some(count);
146        self
147    }
148
149    pub fn every_seconds(seconds: u64) -> Self {
150        Self::new(Duration::from_secs(seconds))
151    }
152
153    pub fn every_minutes(minutes: u64) -> Self {
154        Self::new(Duration::from_secs(minutes * 60))
155    }
156
157    pub fn every_hours(hours: u64) -> Self {
158        Self::new(Duration::from_secs(hours * 3600))
159    }
160
161    fn increment_executed_count(&mut self) {
162        self.executed_count += 1;
163    }
164}
165
166impl Trigger for IntervalTrigger {
167    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
168        // Check repeat count limit
169        if let Some(max_count) = self.repeat_count {
170            if self.executed_count >= max_count {
171                return None;
172            }
173        }
174
175        let start = self.start_time.unwrap_or(after);
176
177        // If we haven't started yet, return start time
178        if after < start {
179            return Some(start);
180        }
181
182        let next_time = if self.executed_count == 0 {
183            start
184        } else {
185            after + self.interval
186        };
187
188        // Check end time limit
189        if let Some(end) = self.end_time {
190            if next_time > end {
191                return None;
192            }
193        }
194
195        Some(next_time)
196    }
197
198    fn has_next(&self, after: SystemTime) -> bool {
199        self.next_execution_time(after).is_some()
200    }
201
202    fn description(&self) -> String {
203        format!("Interval: {:?}", self.interval)
204    }
205}
206
207/// One-time delay trigger
208#[derive(Debug, Clone)]
209pub struct DelayTrigger {
210    delay: Duration,
211    start_time: SystemTime,
212    executed: bool,
213}
214
215impl DelayTrigger {
216    pub fn new(delay: Duration) -> Self {
217        Self {
218            delay,
219            start_time: SystemTime::now(),
220            executed: false,
221        }
222    }
223
224    pub fn after_seconds(seconds: u64) -> Self {
225        Self::new(Duration::from_secs(seconds))
226    }
227
228    pub fn after_minutes(minutes: u64) -> Self {
229        Self::new(Duration::from_secs(minutes * 60))
230    }
231
232    pub fn after_hours(hours: u64) -> Self {
233        Self::new(Duration::from_secs(hours * 3600))
234    }
235
236    pub fn at_time(execution_time: SystemTime) -> Self {
237        let now = SystemTime::now();
238        let delay = execution_time.duration_since(now).unwrap_or(Duration::ZERO);
239        Self {
240            delay,
241            start_time: now,
242            executed: false,
243        }
244    }
245    /// Mark as executed (used internally by scheduler)
246    pub fn mark_executed(&mut self) {
247        self.executed = true;
248    }
249
250    /// Check if already executed
251    pub fn is_executed(&self) -> bool {
252        self.executed
253    }
254}
255
256impl Trigger for DelayTrigger {
257    fn next_execution_time(&self, _after: SystemTime) -> Option<SystemTime> {
258        if self.executed {
259            None
260        } else {
261            Some(self.start_time + self.delay)
262        }
263    }
264
265    fn has_next(&self, _after: SystemTime) -> bool {
266        !self.executed
267    }
268
269    fn description(&self) -> String {
270        format!("Delay: {:?}", self.delay)
271    }
272
273    fn should_trigger_now(&self, _now: SystemTime) -> bool {
274        if self.executed {
275            return false;
276        }
277
278        match self.start_time.elapsed() {
279            Ok(elapsed) => elapsed >= self.delay,
280            Err(_) => false,
281        }
282    }
283}
284
285/// Interval trigger with initial delay
286#[derive(Debug, Clone)]
287pub struct DelayedIntervalTrigger {
288    interval: Duration,
289    initial_delay: Duration,
290    start_time: SystemTime,
291    last_execution: Option<SystemTime>,
292    end_time: Option<SystemTime>,
293    repeat_count: Option<u32>,
294    executed_count: u32,
295}
296
297impl DelayedIntervalTrigger {
298    pub fn new(interval: Duration, initial_delay: Duration) -> Self {
299        Self {
300            interval,
301            initial_delay,
302            start_time: SystemTime::now(),
303            last_execution: None,
304            end_time: None,
305            repeat_count: None,
306            executed_count: 0,
307        }
308    }
309
310    pub fn every_seconds_with_delay(interval_seconds: u64, delay_seconds: u64) -> Self {
311        Self::new(
312            Duration::from_secs(interval_seconds),
313            Duration::from_secs(delay_seconds),
314        )
315    }
316
317    pub fn every_minutes_with_delay(interval_minutes: u64, delay_minutes: u64) -> Self {
318        Self::new(
319            Duration::from_secs(interval_minutes * 60),
320            Duration::from_secs(delay_minutes * 60),
321        )
322    }
323
324    /// Set an end time for the trigger
325    pub fn until(mut self, end_time: SystemTime) -> Self {
326        self.end_time = Some(end_time);
327        self
328    }
329
330    /// Set a maximum number of executions
331    pub fn repeat(mut self, count: u32) -> Self {
332        self.repeat_count = Some(count);
333        self
334    }
335
336    /// Get the number of times this trigger has been executed
337    pub fn execution_count(&self) -> u32 {
338        self.executed_count
339    }
340
341    /// Mark that an execution has occurred
342    pub fn mark_executed(&mut self, execution_time: SystemTime) {
343        self.last_execution = Some(execution_time);
344        self.executed_count += 1;
345    }
346
347    /// Check if this trigger should stop executing
348    fn should_stop(&self, now: SystemTime) -> bool {
349        // Check if we've reached the end time
350        if let Some(end_time) = self.end_time {
351            if now >= end_time {
352                return true;
353            }
354        }
355
356        // Check if we've reached the repeat count
357        if let Some(repeat_count) = self.repeat_count {
358            if self.executed_count >= repeat_count {
359                return true;
360            }
361        }
362
363        false
364    }
365
366    /// Calculate the first execution time
367    fn first_execution_time(&self) -> SystemTime {
368        self.start_time + self.initial_delay
369    }
370}
371
372impl Trigger for DelayedIntervalTrigger {
373    fn next_execution_time(&self, after: SystemTime) -> Option<SystemTime> {
374        if self.should_stop(after) {
375            return None;
376        }
377
378        match self.last_execution {
379            None => {
380                // First execution
381                let first_time = self.first_execution_time();
382                if first_time > after {
383                    Some(first_time)
384                } else {
385                    // Calculate next proper interval if first time passed
386                    let elapsed_since_first =
387                        after.duration_since(first_time).unwrap_or(Duration::ZERO);
388                    let intervals_passed =
389                        (elapsed_since_first.as_millis() / self.interval.as_millis()) + 1;
390                    Some(
391                        first_time
392                            + Duration::from_millis(
393                                (intervals_passed * self.interval.as_millis()) as u64,
394                            ),
395                    )
396                }
397            }
398            Some(last) => {
399                // Subsequent executions
400                let next_time = last + self.interval;
401                if next_time > after && !self.should_stop(next_time) {
402                    Some(next_time)
403                } else if next_time <= after {
404                    // Calculate proper next interval if time passed
405                    let elapsed = after.duration_since(last).unwrap_or(Duration::ZERO);
406                    let intervals_passed = (elapsed.as_millis() / self.interval.as_millis()) + 1;
407                    let calculated_next = last
408                        + Duration::from_millis(
409                            (intervals_passed * self.interval.as_millis()) as u64,
410                        );
411
412                    if !self.should_stop(calculated_next) {
413                        Some(calculated_next)
414                    } else {
415                        None
416                    }
417                } else {
418                    None
419                }
420            }
421        }
422    }
423
424    fn has_next(&self, after: SystemTime) -> bool {
425        !self.should_stop(after) && self.next_execution_time(after).is_some()
426    }
427
428    fn description(&self) -> String {
429        let mut desc = format!(
430            "DelayedInterval: interval={:?}, initial_delay={:?}, executed_count={}",
431            self.interval, self.initial_delay, self.executed_count
432        );
433
434        if let Some(end_time) = self.end_time {
435            desc.push_str(&format!(", end_time={end_time:?}"));
436        }
437
438        if let Some(repeat_count) = self.repeat_count {
439            desc.push_str(&format!(", repeat_count={repeat_count}"));
440        }
441
442        desc
443    }
444
445    fn should_trigger_now(&self, now: SystemTime) -> bool {
446        if self.should_stop(now) {
447            return false;
448        }
449
450        match self.last_execution {
451            None => {
452                // Check if first execution time has arrived
453                let first_time = self.first_execution_time();
454                now >= first_time
455            }
456            Some(last) => {
457                // Check if next execution time has arrived
458                let next_time = last + self.interval;
459                now >= next_time
460            }
461        }
462    }
463}
464
465// Helper functions for time conversion
466fn system_time_to_datetime(system_time: SystemTime) -> DateTime<Utc> {
467    /*let duration = system_time.duration_since(UNIX_EPOCH).unwrap();
468    let naive = NaiveDateTime::from_timestamp(duration.as_secs() as i64, duration.subsec_nanos());
469    DateTime::from_utc(naive, Utc)*/
470    DateTime::from(system_time)
471}
472
473fn datetime_to_system_time(datetime: DateTime<Utc>) -> SystemTime {
474    UNIX_EPOCH + Duration::from_secs(datetime.timestamp() as u64)
475}