Skip to main content

bamboo_server/schedule_app/
trigger_engine.rs

1use std::sync::Arc;
2
3use chrono::{
4    DateTime, Datelike, Days, Duration, LocalResult, NaiveDate, NaiveDateTime, NaiveTime, TimeZone,
5    Utc, Weekday,
6};
7use chrono_tz::Tz;
8use cron::Schedule as CronSchedule;
9use thiserror::Error;
10
11use bamboo_domain::{ScheduleTrigger, ScheduleWeekday, ScheduleWindow};
12
13pub type DynTriggerEngine = Arc<dyn TriggerEngine>;
14
15/// Interface for computing schedule occurrences.
16///
17/// Bamboo owns schedule state, policies, and execution. Concrete trigger engines
18/// only answer recurrence questions and can be swapped independently.
19pub trait TriggerEngine: Send + Sync {
20    fn kind(&self) -> TriggerEngineKind;
21
22    fn next_after(
23        &self,
24        trigger: &ScheduleTrigger,
25        timezone: Option<&str>,
26        after: DateTime<Utc>,
27        window: &ScheduleWindow,
28    ) -> Result<Option<DateTime<Utc>>, TriggerComputationError>;
29
30    fn due_between(
31        &self,
32        trigger: &ScheduleTrigger,
33        timezone: Option<&str>,
34        from: DateTime<Utc>,
35        to: DateTime<Utc>,
36        window: &ScheduleWindow,
37        limit: usize,
38    ) -> Result<Vec<DateTime<Utc>>, TriggerComputationError> {
39        if limit == 0 {
40            return Ok(Vec::new());
41        }
42
43        let mut current = from;
44        let mut out = Vec::new();
45        while out.len() < limit {
46            let Some(next) = self.next_after(trigger, timezone, current, window)? else {
47                break;
48            };
49            if next > to {
50                break;
51            }
52            out.push(next);
53            current = next;
54        }
55        Ok(out)
56    }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum TriggerEngineKind {
61    Native,
62    CronBacked,
63    RRuleBacked,
64}
65
66#[derive(Debug, Error, Clone, PartialEq, Eq)]
67pub enum TriggerComputationError {
68    #[error("unsupported trigger kind for engine: {0}")]
69    UnsupportedTrigger(&'static str),
70    #[error("invalid trigger: {0}")]
71    InvalidTrigger(String),
72    #[error("timezone support is not available for engine kind {0:?}")]
73    UnsupportedTimezone(TriggerEngineKind),
74}
75
76/// Minimal built-in trigger engine used as the default interface implementation.
77///
78/// It owns Bamboo's native recurrence logic for interval/daily/weekly/monthly.
79#[derive(Debug, Default)]
80pub struct NativeTriggerEngine;
81
82#[derive(Debug, Default)]
83pub struct CronBackedTriggerEngine;
84
85#[derive(Debug, Default)]
86pub struct CompositeTriggerEngine {
87    native: NativeTriggerEngine,
88    cron: CronBackedTriggerEngine,
89}
90
91impl NativeTriggerEngine {
92    pub fn new() -> Self {
93        Self
94    }
95}
96
97impl CronBackedTriggerEngine {
98    pub fn new() -> Self {
99        Self
100    }
101}
102
103impl CompositeTriggerEngine {
104    pub fn new() -> Self {
105        Self {
106            native: NativeTriggerEngine::new(),
107            cron: CronBackedTriggerEngine::new(),
108        }
109    }
110}
111
112impl TriggerEngine for NativeTriggerEngine {
113    fn kind(&self) -> TriggerEngineKind {
114        TriggerEngineKind::Native
115    }
116
117    fn next_after(
118        &self,
119        trigger: &ScheduleTrigger,
120        timezone: Option<&str>,
121        after: DateTime<Utc>,
122        window: &ScheduleWindow,
123    ) -> Result<Option<DateTime<Utc>>, TriggerComputationError> {
124        let next = match trigger {
125            ScheduleTrigger::Interval {
126                every_seconds,
127                anchor_at,
128            } => {
129                if timezone.is_some() {
130                    return Err(TriggerComputationError::UnsupportedTimezone(self.kind()));
131                }
132                compute_interval_next(*every_seconds, *anchor_at, after)?
133            }
134            ScheduleTrigger::Daily {
135                hour,
136                minute,
137                second,
138            } => compute_daily_next(*hour, *minute, *second, timezone, after)?,
139            ScheduleTrigger::Weekly {
140                weekdays,
141                hour,
142                minute,
143                second,
144            } => compute_weekly_next(weekdays, *hour, *minute, *second, timezone, after)?,
145            ScheduleTrigger::Monthly {
146                days,
147                hour,
148                minute,
149                second,
150            } => compute_monthly_next(days, *hour, *minute, *second, timezone, after)?,
151            ScheduleTrigger::Cron { .. } => {
152                return Err(TriggerComputationError::UnsupportedTrigger("cron"));
153            }
154        };
155
156        Ok(apply_window(next, window))
157    }
158}
159
160fn parse_timezone(timezone: Option<&str>) -> Result<Tz, TriggerComputationError> {
161    match timezone {
162        Some(value) => value.parse::<Tz>().map_err(|_| {
163            TriggerComputationError::InvalidTrigger(format!("invalid timezone: {value}"))
164        }),
165        None => Ok(chrono_tz::UTC),
166    }
167}
168
169fn resolve_local_datetime(
170    timezone: Tz,
171    local: NaiveDateTime,
172) -> Result<DateTime<Utc>, TriggerComputationError> {
173    match timezone.from_local_datetime(&local) {
174        LocalResult::Single(dt) => Ok(dt.with_timezone(&Utc)),
175        LocalResult::Ambiguous(first, second) => Ok(first.min(second).with_timezone(&Utc)),
176        LocalResult::None => Err(TriggerComputationError::InvalidTrigger(format!(
177            "local datetime does not exist in timezone {}: {}",
178            timezone, local
179        ))),
180    }
181}
182
183fn local_candidate_utc(
184    timezone: Tz,
185    date: NaiveDate,
186    hour: u8,
187    minute: u8,
188    second: u8,
189) -> Result<DateTime<Utc>, TriggerComputationError> {
190    let time =
191        NaiveTime::from_hms_opt(hour as u32, minute as u32, second as u32).ok_or_else(|| {
192            TriggerComputationError::InvalidTrigger(format!(
193                "invalid time components: {:02}:{:02}:{:02}",
194                hour, minute, second
195            ))
196        })?;
197    resolve_local_datetime(timezone, NaiveDateTime::new(date, time))
198}
199
200fn compute_daily_next(
201    hour: u8,
202    minute: u8,
203    second: u8,
204    timezone: Option<&str>,
205    after: DateTime<Utc>,
206) -> Result<Option<DateTime<Utc>>, TriggerComputationError> {
207    let timezone = parse_timezone(timezone)?;
208    let after_local = after.with_timezone(&timezone);
209    let mut date = after_local.date_naive();
210
211    for _ in 0..370 {
212        let candidate = local_candidate_utc(timezone, date, hour, minute, second)?;
213        if candidate > after {
214            return Ok(Some(candidate));
215        }
216        date = date.checked_add_days(Days::new(1)).ok_or_else(|| {
217            TriggerComputationError::InvalidTrigger(
218                "daily schedule overflowed date range".to_string(),
219            )
220        })?;
221    }
222
223    Err(TriggerComputationError::InvalidTrigger(
224        "daily schedule failed to produce next occurrence".to_string(),
225    ))
226}
227
228fn to_chrono_weekday(weekday: ScheduleWeekday) -> Weekday {
229    match weekday {
230        ScheduleWeekday::Mon => Weekday::Mon,
231        ScheduleWeekday::Tue => Weekday::Tue,
232        ScheduleWeekday::Wed => Weekday::Wed,
233        ScheduleWeekday::Thu => Weekday::Thu,
234        ScheduleWeekday::Fri => Weekday::Fri,
235        ScheduleWeekday::Sat => Weekday::Sat,
236        ScheduleWeekday::Sun => Weekday::Sun,
237    }
238}
239
240fn compute_weekly_next(
241    weekdays: &[ScheduleWeekday],
242    hour: u8,
243    minute: u8,
244    second: u8,
245    timezone: Option<&str>,
246    after: DateTime<Utc>,
247) -> Result<Option<DateTime<Utc>>, TriggerComputationError> {
248    if weekdays.is_empty() {
249        return Err(TriggerComputationError::InvalidTrigger(
250            "weekly.weekdays must not be empty".to_string(),
251        ));
252    }
253
254    let timezone = parse_timezone(timezone)?;
255    let after_local = after.with_timezone(&timezone);
256    let mut date = after_local.date_naive();
257    let allowed: Vec<Weekday> = weekdays.iter().copied().map(to_chrono_weekday).collect();
258
259    for _ in 0..370 {
260        if allowed.contains(&date.weekday()) {
261            let candidate = local_candidate_utc(timezone, date, hour, minute, second)?;
262            if candidate > after {
263                return Ok(Some(candidate));
264            }
265        }
266        date = date.checked_add_days(Days::new(1)).ok_or_else(|| {
267            TriggerComputationError::InvalidTrigger(
268                "weekly schedule overflowed date range".to_string(),
269            )
270        })?;
271    }
272
273    Err(TriggerComputationError::InvalidTrigger(
274        "weekly schedule failed to produce next occurrence".to_string(),
275    ))
276}
277
278fn days_in_month(year: i32, month: u32) -> Result<u32, TriggerComputationError> {
279    let first = NaiveDate::from_ymd_opt(year, month, 1).ok_or_else(|| {
280        TriggerComputationError::InvalidTrigger(format!("invalid year-month: {year}-{month}"))
281    })?;
282    let next_month = if month == 12 {
283        NaiveDate::from_ymd_opt(year + 1, 1, 1)
284    } else {
285        NaiveDate::from_ymd_opt(year, month + 1, 1)
286    }
287    .ok_or_else(|| {
288        TriggerComputationError::InvalidTrigger(format!(
289            "invalid next year-month from {year}-{month}"
290        ))
291    })?;
292    Ok(next_month.signed_duration_since(first).num_days() as u32)
293}
294
295fn compute_monthly_next(
296    days: &[u8],
297    hour: u8,
298    minute: u8,
299    second: u8,
300    timezone: Option<&str>,
301    after: DateTime<Utc>,
302) -> Result<Option<DateTime<Utc>>, TriggerComputationError> {
303    if days.is_empty() {
304        return Err(TriggerComputationError::InvalidTrigger(
305            "monthly.days must not be empty".to_string(),
306        ));
307    }
308    if days.iter().any(|day| *day == 0 || *day > 31) {
309        return Err(TriggerComputationError::InvalidTrigger(
310            "monthly.days values must be between 1 and 31".to_string(),
311        ));
312    }
313
314    let timezone = parse_timezone(timezone)?;
315    let after_local = after.with_timezone(&timezone);
316    let mut year = after_local.year();
317    let mut month = after_local.month();
318    let mut sorted_days: Vec<u32> = days.iter().copied().map(u32::from).collect();
319    sorted_days.sort_unstable();
320    sorted_days.dedup();
321
322    for _ in 0..1200 {
323        let max_day = days_in_month(year, month)?;
324        for day in &sorted_days {
325            if *day > max_day {
326                continue;
327            }
328            let date = NaiveDate::from_ymd_opt(year, month, *day).ok_or_else(|| {
329                TriggerComputationError::InvalidTrigger(format!(
330                    "invalid monthly occurrence date: {year}-{month}-{day}"
331                ))
332            })?;
333            let candidate = local_candidate_utc(timezone, date, hour, minute, second)?;
334            if candidate > after {
335                return Ok(Some(candidate));
336            }
337        }
338
339        if month == 12 {
340            year += 1;
341            month = 1;
342        } else {
343            month += 1;
344        }
345    }
346
347    Err(TriggerComputationError::InvalidTrigger(
348        "monthly schedule failed to produce next occurrence".to_string(),
349    ))
350}
351
352fn compute_interval_next(
353    every_seconds: u64,
354    anchor_at: Option<DateTime<Utc>>,
355    after: DateTime<Utc>,
356) -> Result<Option<DateTime<Utc>>, TriggerComputationError> {
357    if every_seconds == 0 {
358        return Err(TriggerComputationError::InvalidTrigger(
359            "interval.every_seconds must be > 0".to_string(),
360        ));
361    }
362
363    let duration = Duration::seconds(every_seconds as i64);
364    let next = match anchor_at {
365        Some(anchor) if anchor > after => anchor,
366        Some(anchor) => {
367            let elapsed = after.signed_duration_since(anchor).num_seconds();
368            let step = every_seconds as i64;
369            let intervals_elapsed = elapsed.div_euclid(step) + 1;
370            anchor + Duration::seconds(intervals_elapsed * step)
371        }
372        None => after + duration,
373    };
374
375    Ok(Some(next))
376}
377
378fn apply_window(
379    candidate: Option<DateTime<Utc>>,
380    window: &ScheduleWindow,
381) -> Option<DateTime<Utc>> {
382    let candidate = candidate?;
383    if let Some(start_at) = window.start_at {
384        if candidate < start_at {
385            return Some(start_at);
386        }
387    }
388    if let Some(end_at) = window.end_at {
389        if candidate > end_at {
390            return None;
391        }
392    }
393    Some(candidate)
394}
395
396impl TriggerEngine for CronBackedTriggerEngine {
397    fn kind(&self) -> TriggerEngineKind {
398        TriggerEngineKind::CronBacked
399    }
400
401    fn next_after(
402        &self,
403        trigger: &ScheduleTrigger,
404        timezone: Option<&str>,
405        after: DateTime<Utc>,
406        window: &ScheduleWindow,
407    ) -> Result<Option<DateTime<Utc>>, TriggerComputationError> {
408        let ScheduleTrigger::Cron { expr } = trigger else {
409            return Err(TriggerComputationError::UnsupportedTrigger(
410                trigger.kind_name(),
411            ));
412        };
413
414        let schedule = expr.parse::<CronSchedule>().map_err(|error| {
415            TriggerComputationError::InvalidTrigger(format!(
416                "invalid cron expression '{}': {}",
417                expr, error
418            ))
419        })?;
420
421        let timezone = parse_timezone(timezone)?;
422        let after_local = after.with_timezone(&timezone);
423        let next_local = schedule.after(&after_local).next();
424        Ok(apply_window(
425            next_local.map(|candidate| candidate.with_timezone(&Utc)),
426            window,
427        ))
428    }
429}
430
431impl TriggerEngine for CompositeTriggerEngine {
432    fn kind(&self) -> TriggerEngineKind {
433        TriggerEngineKind::CronBacked
434    }
435
436    fn next_after(
437        &self,
438        trigger: &ScheduleTrigger,
439        timezone: Option<&str>,
440        after: DateTime<Utc>,
441        window: &ScheduleWindow,
442    ) -> Result<Option<DateTime<Utc>>, TriggerComputationError> {
443        match trigger {
444            ScheduleTrigger::Cron { .. } => self.cron.next_after(trigger, timezone, after, window),
445            _ => self.native.next_after(trigger, timezone, after, window),
446        }
447    }
448}
449
450pub fn default_trigger_engine() -> DynTriggerEngine {
451    Arc::new(CompositeTriggerEngine::new())
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use bamboo_domain::{ScheduleTrigger, ScheduleWeekday};
458
459    #[test]
460    fn interval_next_uses_anchor_alignment() {
461        let engine = NativeTriggerEngine::new();
462        let anchor = DateTime::parse_from_rfc3339("2026-04-04T00:00:00Z")
463            .unwrap()
464            .with_timezone(&Utc);
465        let after = DateTime::parse_from_rfc3339("2026-04-04T00:01:01Z")
466            .unwrap()
467            .with_timezone(&Utc);
468        let next = engine
469            .next_after(
470                &ScheduleTrigger::legacy_interval(60, Some(anchor)),
471                None,
472                after,
473                &ScheduleWindow::default(),
474            )
475            .unwrap();
476        assert_eq!(next, Some(anchor + Duration::seconds(120)));
477    }
478
479    #[test]
480    fn due_between_returns_bounded_sequence() {
481        let engine = NativeTriggerEngine::new();
482        let anchor = DateTime::parse_from_rfc3339("2026-04-04T00:00:00Z")
483            .unwrap()
484            .with_timezone(&Utc);
485        let from = anchor;
486        let to = anchor + Duration::seconds(181);
487        let due = engine
488            .due_between(
489                &ScheduleTrigger::legacy_interval(60, Some(anchor)),
490                None,
491                from,
492                to,
493                &ScheduleWindow::default(),
494                10,
495            )
496            .unwrap();
497        assert_eq!(due.len(), 3);
498        assert_eq!(due[0], anchor + Duration::seconds(60));
499        assert_eq!(due[1], anchor + Duration::seconds(120));
500        assert_eq!(due[2], anchor + Duration::seconds(180));
501    }
502
503    #[test]
504    fn daily_next_works_in_utc() {
505        let engine = NativeTriggerEngine::new();
506        let after = DateTime::parse_from_rfc3339("2026-04-04T09:30:00Z")
507            .unwrap()
508            .with_timezone(&Utc);
509        let next = engine
510            .next_after(
511                &ScheduleTrigger::Daily {
512                    hour: 10,
513                    minute: 0,
514                    second: 0,
515                },
516                None,
517                after,
518                &ScheduleWindow::default(),
519            )
520            .unwrap();
521        assert_eq!(
522            next,
523            Some(
524                DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
525                    .unwrap()
526                    .with_timezone(&Utc)
527            )
528        );
529    }
530
531    #[test]
532    fn weekly_next_works_in_utc() {
533        let engine = NativeTriggerEngine::new();
534        let after = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
535            .unwrap()
536            .with_timezone(&Utc);
537        let next = engine
538            .next_after(
539                &ScheduleTrigger::Weekly {
540                    weekdays: vec![ScheduleWeekday::Mon],
541                    hour: 9,
542                    minute: 0,
543                    second: 0,
544                },
545                None,
546                after,
547                &ScheduleWindow::default(),
548            )
549            .unwrap();
550        assert_eq!(
551            next,
552            Some(
553                DateTime::parse_from_rfc3339("2026-04-06T09:00:00Z")
554                    .unwrap()
555                    .with_timezone(&Utc)
556            )
557        );
558    }
559
560    #[test]
561    fn daily_next_respects_timezone() {
562        let engine = NativeTriggerEngine::new();
563        let after = DateTime::parse_from_rfc3339("2026-04-04T00:30:00Z")
564            .unwrap()
565            .with_timezone(&Utc);
566        let next = engine
567            .next_after(
568                &ScheduleTrigger::Daily {
569                    hour: 9,
570                    minute: 0,
571                    second: 0,
572                },
573                Some("Asia/Shanghai"),
574                after,
575                &ScheduleWindow::default(),
576            )
577            .unwrap();
578        assert_eq!(
579            next,
580            Some(
581                DateTime::parse_from_rfc3339("2026-04-04T01:00:00Z")
582                    .unwrap()
583                    .with_timezone(&Utc)
584            )
585        );
586    }
587
588    #[test]
589    fn monthly_next_uses_same_month_when_valid_day_remains() {
590        let engine = NativeTriggerEngine::new();
591        let after = DateTime::parse_from_rfc3339("2026-04-04T10:00:00Z")
592            .unwrap()
593            .with_timezone(&Utc);
594        let next = engine
595            .next_after(
596                &ScheduleTrigger::Monthly {
597                    days: vec![10, 20],
598                    hour: 9,
599                    minute: 0,
600                    second: 0,
601                },
602                None,
603                after,
604                &ScheduleWindow::default(),
605            )
606            .unwrap();
607        assert_eq!(
608            next,
609            Some(
610                DateTime::parse_from_rfc3339("2026-04-10T09:00:00Z")
611                    .unwrap()
612                    .with_timezone(&Utc)
613            )
614        );
615    }
616
617    #[test]
618    fn monthly_next_skips_short_month_for_missing_day() {
619        let engine = NativeTriggerEngine::new();
620        let after = DateTime::parse_from_rfc3339("2026-04-30T23:00:00Z")
621            .unwrap()
622            .with_timezone(&Utc);
623        let next = engine
624            .next_after(
625                &ScheduleTrigger::Monthly {
626                    days: vec![31],
627                    hour: 9,
628                    minute: 0,
629                    second: 0,
630                },
631                None,
632                after,
633                &ScheduleWindow::default(),
634            )
635            .unwrap();
636        assert_eq!(
637            next,
638            Some(
639                DateTime::parse_from_rfc3339("2026-05-31T09:00:00Z")
640                    .unwrap()
641                    .with_timezone(&Utc)
642            )
643        );
644    }
645
646    #[test]
647    fn monthly_next_respects_timezone() {
648        let engine = NativeTriggerEngine::new();
649        let after = DateTime::parse_from_rfc3339("2026-04-30T15:30:00Z")
650            .unwrap()
651            .with_timezone(&Utc);
652        let next = engine
653            .next_after(
654                &ScheduleTrigger::Monthly {
655                    days: vec![1],
656                    hour: 9,
657                    minute: 0,
658                    second: 0,
659                },
660                Some("Asia/Shanghai"),
661                after,
662                &ScheduleWindow::default(),
663            )
664            .unwrap();
665        assert_eq!(
666            next,
667            Some(
668                DateTime::parse_from_rfc3339("2026-05-01T01:00:00Z")
669                    .unwrap()
670                    .with_timezone(&Utc)
671            )
672        );
673    }
674
675    #[test]
676    fn cron_backed_next_works_in_utc() {
677        let engine = CronBackedTriggerEngine::new();
678        let after = DateTime::parse_from_rfc3339("2026-04-04T10:15:00Z")
679            .unwrap()
680            .with_timezone(&Utc);
681        let next = engine
682            .next_after(
683                &ScheduleTrigger::Cron {
684                    expr: "0 */30 * * * * *".to_string(),
685                },
686                Some("UTC"),
687                after,
688                &ScheduleWindow::default(),
689            )
690            .unwrap();
691        assert_eq!(
692            next,
693            Some(
694                DateTime::parse_from_rfc3339("2026-04-04T10:30:00Z")
695                    .unwrap()
696                    .with_timezone(&Utc)
697            )
698        );
699    }
700
701    #[test]
702    fn cron_backed_next_respects_timezone() {
703        let engine = CronBackedTriggerEngine::new();
704        let after = DateTime::parse_from_rfc3339("2026-04-04T00:30:00Z")
705            .unwrap()
706            .with_timezone(&Utc);
707        let next = engine
708            .next_after(
709                &ScheduleTrigger::Cron {
710                    expr: "0 0 9 * * * *".to_string(),
711                },
712                Some("Asia/Shanghai"),
713                after,
714                &ScheduleWindow::default(),
715            )
716            .unwrap();
717        assert_eq!(
718            next,
719            Some(
720                DateTime::parse_from_rfc3339("2026-04-04T01:00:00Z")
721                    .unwrap()
722                    .with_timezone(&Utc)
723            )
724        );
725    }
726}