1use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::atomic::Ordering;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24 let runtime = {
26 let running = state.running.lock().await;
27 running.get(task_id).cloned()
28 };
29 if let Some(rt) = runtime {
30 rt.stop_flag.store(true, Ordering::SeqCst);
31 }
32 state
34 .db_writer
35 .set_task_status(task_id, "cancelled", false)
36 .await?;
37 insert_event(
38 state,
39 task_id,
40 None,
41 "task_control",
42 serde_json::json!({"status": "cancelled"}),
43 )
44 .await?;
45 Ok(())
46}
47
48#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53 pub event_type: String,
55 pub task_id: String,
57}
58
59#[derive(Debug)]
61pub enum TriggerReloadEvent {
62 Reload,
64}
65
66#[derive(Clone)]
68pub struct TriggerEngineHandle {
69 reload_tx: mpsc::Sender<TriggerReloadEvent>,
70}
71
72impl TriggerEngineHandle {
73 pub async fn reload(&self) {
75 let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
76 }
77
78 pub fn reload_sync(&self) -> bool {
83 self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
84 }
85}
86
87pub struct TriggerEngine {
90 state: Arc<InnerState>,
91 reload_rx: mpsc::Receiver<TriggerReloadEvent>,
92 trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
93 stabilized_triggers: HashSet<(String, String)>,
98}
99
100impl TriggerEngine {
101 pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
103 let (reload_tx, reload_rx) = mpsc::channel(16);
104 let trigger_event_rx = state.trigger_event_tx.subscribe();
105 let engine = Self {
106 state,
107 reload_rx,
108 trigger_event_rx,
109 stabilized_triggers: HashSet::new(),
110 };
111 let handle = TriggerEngineHandle { reload_tx };
112 (engine, handle)
113 }
114
115 pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
117 info!("trigger engine started");
118
119 let mut cron_schedule = self.build_cron_schedule();
121
122 loop {
123 let sleep_duration = next_cron_sleep(&cron_schedule);
124 let sleep_fut = tokio::time::sleep(sleep_duration);
125 tokio::pin!(sleep_fut);
126
127 tokio::select! {
128 () = &mut sleep_fut => {
130 let now = Utc::now();
131 let fired = collect_due_triggers(&cron_schedule, now);
132 for (trigger_name, project) in fired {
133 self.fire_trigger(&trigger_name, &project).await;
134 }
135 cron_schedule = self.build_cron_schedule();
137 }
138
139 event_result = self.trigger_event_rx.recv() => {
141 match event_result {
142 Ok(payload) => {
143 self.handle_event_trigger(&payload).await;
144 }
145 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
146 warn!(skipped = n, "trigger event receiver lagged");
147 }
148 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
149 debug!("trigger event channel closed");
150 break;
151 }
152 }
153 }
154
155 Some(_) = self.reload_rx.recv() => {
157 info!("trigger engine: reloading configuration");
158 cron_schedule = self.build_cron_schedule();
159 }
160
161 _ = shutdown_rx.changed() => {
163 info!("trigger engine shutting down");
164 break;
165 }
166 }
167 }
168 }
169
170 fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
173 let snap = self.state.config_runtime.load();
174 let config = &snap.active_config.config;
175 let mut entries = Vec::new();
176
177 let mut current_triggers: HashSet<(String, String)> = HashSet::new();
179 for (project_id, project) in &config.projects {
180 for name in project.triggers.keys() {
181 current_triggers.insert((project_id.clone(), name.clone()));
182 }
183 }
184
185 let previously_known = std::mem::take(&mut self.stabilized_triggers);
189 self.stabilized_triggers = current_triggers.clone();
190
191 for (project_id, project) in &config.projects {
192 for (name, trigger) in &project.triggers {
193 if trigger.suspend {
194 continue;
195 }
196 if !previously_known.contains(&(project_id.clone(), name.clone())) {
198 debug!(
199 trigger = name.as_str(),
200 project = project_id.as_str(),
201 "trigger not yet stabilized, skipping cron schedule"
202 );
203 continue;
204 }
205 if let Some(ref cron_spec) = trigger.cron {
206 match compute_next_fire(cron_spec, Utc::now()) {
207 Ok(next) => {
208 entries.push(CronEntry {
209 trigger_name: name.clone(),
210 project: project_id.clone(),
211 next_fire: next,
212 });
213 }
214 Err(e) => {
215 warn!(
216 trigger = name.as_str(),
217 project = project_id.as_str(),
218 error = %e,
219 "failed to compute next fire time"
220 );
221 }
222 }
223 }
224 }
225 }
226 entries
227 }
228
229 async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
232 let source_workflow = self.lookup_task_workflow(&payload.task_id).await;
235
236 let snap = self.state.config_runtime.load();
237 let config = &snap.active_config.config;
238
239 for (project_id, project) in &config.projects {
240 for (name, trigger) in &project.triggers {
241 if trigger.suspend {
242 continue;
243 }
244 if !self
246 .stabilized_triggers
247 .contains(&(project_id.clone(), name.clone()))
248 {
249 continue;
250 }
251 if let Some(ref event_spec) = trigger.event {
252 if event_spec.source != payload.event_type {
254 continue;
255 }
256 if let Some(ref filter) = event_spec.filter {
258 if let Some(ref filter_wf) = filter.workflow {
259 match source_workflow {
260 Some(ref sw) if sw == filter_wf => {}
261 _ => continue,
262 }
263 }
264 if filter.condition.is_some() {
267 debug!(
268 trigger = name.as_str(),
269 "CEL condition evaluation not yet implemented, skipping"
270 );
271 continue;
272 }
273 }
274
275 info!(
276 trigger = name.as_str(),
277 project = project_id.as_str(),
278 event_type = payload.event_type.as_str(),
279 source_task = payload.task_id.as_str(),
280 "event trigger matched"
281 );
282 self.fire_trigger_with_config(name, project_id, trigger)
283 .await;
284 }
285 }
286 }
287 }
288
289 async fn fire_trigger(&self, trigger_name: &str, project: &str) {
292 let snap = self.state.config_runtime.load();
293 let config = &snap.active_config.config;
294
295 let trigger = config
296 .projects
297 .get(project)
298 .and_then(|p| p.triggers.get(trigger_name));
299
300 let Some(trigger) = trigger else {
301 warn!(trigger = trigger_name, "trigger not found in config");
302 return;
303 };
304
305 self.fire_trigger_with_config(trigger_name, project, trigger)
306 .await;
307 }
308
309 async fn fire_trigger_with_config(
310 &self,
311 trigger_name: &str,
312 project: &str,
313 trigger: &TriggerConfig,
314 ) {
315 if trigger.suspend {
317 self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
318 return;
319 }
320
321 if let Some(ref throttle) = trigger.throttle {
323 if throttle.min_interval > 0 {
324 if let Some(last) = self.load_last_fired(trigger_name, project).await {
325 let elapsed = (Utc::now() - last).num_seconds();
326 if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
327 self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
328 return;
329 }
330 }
331 }
332 }
333
334 match trigger.concurrency_policy {
336 crate::cli_types::ConcurrencyPolicy::Forbid => {
337 if self.has_active_task(trigger_name, project).await {
338 self.emit_trigger_event(
339 trigger_name,
340 "trigger_skipped",
341 "concurrent_task_active",
342 );
343 return;
344 }
345 }
346 crate::cli_types::ConcurrencyPolicy::Replace => {
347 self.cancel_active_tasks(trigger_name, project).await;
349 }
350 crate::cli_types::ConcurrencyPolicy::Allow => {}
351 }
352
353 let target_files = trigger
355 .action
356 .args
357 .as_ref()
358 .and_then(|a| a.get("target-file"))
359 .cloned();
360
361 let task_name = format!("trigger-{trigger_name}");
362
363 let payload = CreateTaskPayload {
364 name: Some(task_name),
365 goal: Some(format!("Triggered by: {trigger_name}")),
366 project_id: Some(project.to_string()),
367 workspace_id: Some(trigger.action.workspace.clone()),
368 workflow_id: Some(trigger.action.workflow.clone()),
369 target_files,
370 parent_task_id: None,
371 spawn_reason: None,
372 };
373
374 match crate::task_ops::create_task_as_service(&self.state, payload) {
375 Ok(summary) => {
376 let task_id = summary.id.clone();
377 info!(
378 trigger = trigger_name,
379 task_id = task_id.as_str(),
380 "trigger fired: task created"
381 );
382
383 self.update_trigger_state(trigger_name, project, &task_id, "created")
385 .await;
386
387 self.state.emit_event(
389 &task_id,
390 None,
391 "trigger_fired",
392 serde_json::json!({
393 "trigger": trigger_name,
394 "source": if trigger.cron.is_some() { "cron" } else { "event" },
395 "task_id": task_id,
396 }),
397 );
398
399 if trigger.action.start {
401 let state = self.state.clone();
402 let tid = task_id.clone();
403 tokio::spawn(async move {
404 if let Err(e) =
405 crate::scheduler_service::enqueue_task_as_service(&state, &tid).await
406 {
407 error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
408 } else {
409 state.worker_notify.notify_one();
410 }
411 });
412 }
413
414 if trigger.history_limit.is_some() {
416 let state = self.state.clone();
417 let name = trigger_name.to_string();
418 let proj = project.to_string();
419 let limit = trigger.history_limit.clone();
420 tokio::spawn(async move {
421 if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
422 {
423 debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
424 }
425 });
426 }
427 }
428 Err(e) => {
429 error!(
430 trigger = trigger_name,
431 error = %e,
432 "trigger failed to create task"
433 );
434 self.update_trigger_state(trigger_name, project, "", "failed_to_create")
435 .await;
436 self.state.emit_event(
437 "",
438 None,
439 "trigger_error",
440 serde_json::json!({
441 "trigger": trigger_name,
442 "error": e.to_string(),
443 }),
444 );
445 }
446 }
447 }
448
449 async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
453 let tid = task_id.to_owned();
454 let result = self
455 .state
456 .async_database
457 .reader()
458 .call(move |conn| {
459 let wf: Option<String> = conn
460 .query_row(
461 "SELECT workflow_id FROM tasks WHERE id = ?1",
462 rusqlite::params![tid],
463 |row| row.get(0),
464 )
465 .ok();
466 Ok(wf)
467 })
468 .await;
469 match result {
470 Ok(wf) => wf,
471 Err(e) => {
472 debug!(task_id, error = %e, "failed to look up task workflow");
473 None
474 }
475 }
476 }
477
478 async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
479 let name = trigger_name.to_owned();
480 let proj = project.to_owned();
481 let result = self
482 .state
483 .async_database
484 .reader()
485 .call(move |conn| {
486 let mut stmt = conn
487 .prepare(
488 "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
489 )
490 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
491 let ts: Option<String> = stmt
492 .query_row(rusqlite::params![name, proj], |row| row.get(0))
493 .ok();
494 Ok(ts)
495 })
496 .await;
497
498 match result {
499 Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
500 _ => None,
501 }
502 }
503
504 async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
505 let name = trigger_name.to_owned();
506 let proj = project.to_owned();
507 let result = self
508 .state
509 .async_database
510 .reader()
511 .call(move |conn| {
512 let last_task_id: Option<String> = conn
513 .query_row(
514 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
515 rusqlite::params![name, proj],
516 |row| row.get(0),
517 )
518 .ok()
519 .flatten();
520
521 if let Some(ref tid) = last_task_id {
522 let status: Option<String> = conn
523 .query_row(
524 "SELECT status FROM tasks WHERE id = ?1",
525 rusqlite::params![tid],
526 |row| row.get(0),
527 )
528 .ok();
529 if let Some(s) = status {
530 return Ok(matches!(
531 s.as_str(),
532 "created" | "pending" | "running" | "restart_pending"
533 ));
534 }
535 }
536 Ok(false)
537 })
538 .await;
539
540 result.unwrap_or(false)
541 }
542
543 async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
544 let name = trigger_name.to_owned();
545 let proj = project.to_owned();
546 let state = self.state.clone();
547 let result = state
548 .async_database
549 .reader()
550 .call(move |conn| {
551 let tid: Option<String> = conn
552 .query_row(
553 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
554 rusqlite::params![name, proj],
555 |row| row.get(0),
556 )
557 .ok()
558 .flatten();
559 Ok(tid)
560 })
561 .await;
562
563 if let Ok(Some(task_id)) = result {
564 if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
565 warn!(
566 trigger = trigger_name,
567 task_id = task_id.as_str(),
568 error = %e,
569 "failed to cancel active task for Replace policy"
570 );
571 }
572 }
573 }
574
575 async fn update_trigger_state(
576 &self,
577 trigger_name: &str,
578 project: &str,
579 task_id: &str,
580 status: &str,
581 ) {
582 let name = trigger_name.to_owned();
583 let proj = project.to_owned();
584 let tid = task_id.to_owned();
585 let st = status.to_owned();
586 let now = Utc::now().to_rfc3339();
587 let now2 = now.clone();
588
589 if let Err(e) = self
590 .state
591 .async_database
592 .writer()
593 .call(move |conn| {
594 conn.execute(
595 "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
596 VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
597 ON CONFLICT(trigger_name, project) DO UPDATE SET
598 last_fired_at = ?3,
599 fire_count = fire_count + 1,
600 last_task_id = ?4,
601 last_status = ?5,
602 updated_at = ?7",
603 rusqlite::params![name, proj, now, tid, st, now2, now2],
604 )
605 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
606 Ok(())
607 })
608 .await
609 {
610 warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
611 }
612 }
613
614 fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
615 debug!(trigger = trigger_name, event_type, reason, "trigger event");
616 self.state.emit_event(
617 "",
618 None,
619 event_type,
620 serde_json::json!({
621 "trigger": trigger_name,
622 "reason": reason,
623 }),
624 );
625 }
626}
627
628struct CronEntry {
631 trigger_name: String,
632 project: String,
633 next_fire: DateTime<Utc>,
634}
635
636fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
637 use cron::Schedule;
638 use std::str::FromStr;
639
640 let schedule = Schedule::from_str(&spec.schedule)
641 .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
642
643 if let Some(ref tz_name) = spec.timezone {
645 let tz: chrono_tz::Tz = tz_name
646 .parse()
647 .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
648 let local_after = after.with_timezone(&tz);
649 let next = schedule
650 .after(&local_after)
651 .next()
652 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
653 Ok(next.with_timezone(&Utc))
654 } else {
655 let next = schedule
656 .after(&after)
657 .next()
658 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
659 Ok(next.with_timezone(&Utc))
660 }
661}
662
663fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
664 let now = Utc::now();
665 entries
666 .iter()
667 .map(|e| {
668 let diff = e.next_fire.signed_duration_since(now);
669 if diff.num_milliseconds() <= 0 {
670 std::time::Duration::from_millis(100)
671 } else {
672 std::time::Duration::from_millis(diff.num_milliseconds() as u64)
673 }
674 })
675 .min()
676 .unwrap_or(std::time::Duration::from_secs(3600))
678}
679
680fn collect_due_triggers(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<(String, String)> {
681 entries
682 .iter()
683 .filter(|e| e.next_fire <= now)
684 .map(|e| (e.trigger_name.clone(), e.project.clone()))
685 .collect()
686}
687
688async fn cleanup_history(
689 state: &InnerState,
690 trigger_name: &str,
691 project: &str,
692 limit: Option<&crate::config::TriggerHistoryLimitConfig>,
693) -> Result<()> {
694 let limit = match limit {
695 Some(l) => l,
696 None => return Ok(()),
697 };
698
699 let task_name_pattern = format!("trigger-{trigger_name}");
700 let proj = project.to_owned();
701
702 let mut ids_to_delete: Vec<String> = Vec::new();
704
705 if let Some(max_successful) = limit.successful {
706 let pattern = task_name_pattern.clone();
707 let p = proj.clone();
708 let max = max_successful as usize;
709 let ids = state
710 .async_database
711 .reader()
712 .call(move |conn| {
713 let mut stmt = conn
714 .prepare(
715 "SELECT id FROM tasks \
716 WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
717 ORDER BY created_at DESC",
718 )
719 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
720 let rows = stmt
721 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
722 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
723 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
724 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
725 })
726 .await
727 .context("query completed tasks for history cleanup")?;
728 ids_to_delete.extend(ids);
729 }
730
731 if let Some(max_failed) = limit.failed {
732 let pattern = task_name_pattern.clone();
733 let p = proj.clone();
734 let max = max_failed as usize;
735 let ids = state
736 .async_database
737 .reader()
738 .call(move |conn| {
739 let mut stmt = conn
740 .prepare(
741 "SELECT id FROM tasks \
742 WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
743 ORDER BY created_at DESC",
744 )
745 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
746 let rows = stmt
747 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
748 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
749 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
750 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
751 })
752 .await
753 .context("query failed tasks for history cleanup")?;
754 ids_to_delete.extend(ids);
755 }
756
757 if ids_to_delete.is_empty() {
758 return Ok(());
759 }
760
761 state
762 .async_database
763 .writer()
764 .call(move |conn| {
765 let placeholders: Vec<String> =
766 (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
767 let sql = format!(
768 "DELETE FROM tasks WHERE id IN ({})",
769 placeholders.join(", ")
770 );
771 let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
772 .iter()
773 .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
774 .collect();
775 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
776 params.iter().map(|p| p.as_ref()).collect();
777 conn.execute(&sql, param_refs.as_slice())
778 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
779 Ok(())
780 })
781 .await
782 .context("delete excess trigger history tasks")?;
783
784 Ok(())
785}
786
787pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
792 let _ = state.trigger_event_tx.send(payload);
794}
795
796pub fn notify_trigger_reload(state: &InnerState) {
799 if let Ok(guard) = state.trigger_engine_handle.lock() {
800 if let Some(ref handle) = *guard {
801 let _ = handle.reload_sync();
802 }
803 }
804}
805
806#[cfg(test)]
807mod tests {
808 use super::*;
809 use chrono::Timelike;
810
811 #[test]
812 fn compute_next_fire_utc() {
813 let spec = TriggerCronConfig {
814 schedule: "0 0 2 * * *".to_string(), timezone: None,
816 };
817 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
818 .unwrap()
819 .and_hms_opt(0, 0, 0)
820 .unwrap()
821 .and_utc();
822 let next = compute_next_fire(&spec, after).expect("should compute");
823 assert!(next > after);
824 assert_eq!(next.hour(), 2);
825 }
826
827 #[test]
828 fn compute_next_fire_with_timezone() {
829 let spec = TriggerCronConfig {
830 schedule: "0 0 2 * * *".to_string(),
831 timezone: Some("Asia/Shanghai".to_string()),
832 };
833 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
834 .unwrap()
835 .and_hms_opt(0, 0, 0)
836 .unwrap()
837 .and_utc();
838 let next = compute_next_fire(&spec, after).expect("should compute with tz");
839 assert!(next > after);
840 assert_eq!(next.hour(), 18);
842 }
843
844 #[test]
845 fn compute_next_fire_rejects_invalid_schedule() {
846 let spec = TriggerCronConfig {
847 schedule: "not a cron".to_string(),
848 timezone: None,
849 };
850 assert!(compute_next_fire(&spec, Utc::now()).is_err());
851 }
852
853 #[test]
854 fn compute_next_fire_rejects_invalid_timezone() {
855 let spec = TriggerCronConfig {
856 schedule: "0 0 2 * * *".to_string(),
857 timezone: Some("Invalid/TZ".to_string()),
858 };
859 assert!(compute_next_fire(&spec, Utc::now()).is_err());
860 }
861
862 #[test]
863 fn next_cron_sleep_empty_returns_1h() {
864 let d = next_cron_sleep(&[]);
865 assert_eq!(d, std::time::Duration::from_secs(3600));
866 }
867
868 #[test]
869 fn collect_due_triggers_finds_past_entries() {
870 let now = Utc::now();
871 let past = now - chrono::Duration::seconds(10);
872 let future = now + chrono::Duration::seconds(300);
873 let entries = vec![
874 CronEntry {
875 trigger_name: "past".to_string(),
876 project: "p".to_string(),
877 next_fire: past,
878 },
879 CronEntry {
880 trigger_name: "future".to_string(),
881 project: "p".to_string(),
882 next_fire: future,
883 },
884 ];
885 let due = collect_due_triggers(&entries, now);
886 assert_eq!(due.len(), 1);
887 assert_eq!(due[0].0, "past");
888 }
889}