1use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24 let runtime = {
26 let running = state.running.lock().await;
27 running.get(task_id).cloned()
28 };
29 if let Some(rt) = runtime {
30 rt.stop_flag.store(true, Ordering::SeqCst);
31 }
32 state
34 .db_writer
35 .set_task_status(task_id, "cancelled", false)
36 .await?;
37 insert_event(
38 state,
39 task_id,
40 None,
41 "task_control",
42 serde_json::json!({"status": "cancelled"}),
43 )
44 .await?;
45 Ok(())
46}
47
48#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53 pub event_type: String,
55 pub task_id: String,
57 pub payload: Option<serde_json::Value>,
59}
60
61#[derive(Debug)]
63pub enum TriggerReloadEvent {
64 Reload,
66}
67
68#[derive(Clone)]
70pub struct TriggerEngineHandle {
71 reload_tx: mpsc::Sender<TriggerReloadEvent>,
72}
73
74impl TriggerEngineHandle {
75 pub async fn reload(&self) {
77 let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
78 }
79
80 pub fn reload_sync(&self) -> bool {
85 self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
86 }
87}
88
89pub struct TriggerEngine {
92 state: Arc<InnerState>,
93 reload_rx: mpsc::Receiver<TriggerReloadEvent>,
94 trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
95 stabilized_triggers: HashSet<(String, String)>,
100}
101
102impl TriggerEngine {
103 pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
105 let (reload_tx, reload_rx) = mpsc::channel(16);
106 let trigger_event_rx = state.trigger_event_tx.subscribe();
107 let engine = Self {
108 state,
109 reload_rx,
110 trigger_event_rx,
111 stabilized_triggers: HashSet::new(),
112 };
113 let handle = TriggerEngineHandle { reload_tx };
114 (engine, handle)
115 }
116
117 pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
119 info!("trigger engine started");
120
121 let mut cron_schedule = self.build_cron_schedule();
123
124 loop {
125 let sleep_duration = next_cron_sleep(&cron_schedule);
126 let sleep_fut = tokio::time::sleep(sleep_duration);
127 tokio::pin!(sleep_fut);
128
129 tokio::select! {
130 () = &mut sleep_fut => {
132 let now = Utc::now();
133 let fired = collect_due_triggers(&cron_schedule, now);
134 for (trigger_name, project) in fired {
135 self.fire_trigger(&trigger_name, &project).await;
136 }
137 cron_schedule = self.build_cron_schedule();
139 }
140
141 event_result = self.trigger_event_rx.recv() => {
143 match event_result {
144 Ok(payload) => {
145 self.handle_event_trigger(&payload).await;
146 }
147 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
148 warn!(skipped = n, "trigger event receiver lagged");
149 }
150 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
151 debug!("trigger event channel closed");
152 break;
153 }
154 }
155 }
156
157 Some(_) = self.reload_rx.recv() => {
159 info!("trigger engine: reloading configuration");
160 cron_schedule = self.build_cron_schedule();
161 }
162
163 _ = shutdown_rx.changed() => {
165 info!("trigger engine shutting down");
166 break;
167 }
168 }
169 }
170 }
171
172 fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
175 let snap = self.state.config_runtime.load();
176 let config = &snap.active_config.config;
177 let mut entries = Vec::new();
178
179 let mut current_triggers: HashSet<(String, String)> = HashSet::new();
181 for (project_id, project) in &config.projects {
182 for name in project.triggers.keys() {
183 current_triggers.insert((project_id.clone(), name.clone()));
184 }
185 }
186
187 let previously_known = std::mem::take(&mut self.stabilized_triggers);
191 self.stabilized_triggers = current_triggers.clone();
192
193 for (project_id, project) in &config.projects {
194 for (name, trigger) in &project.triggers {
195 if trigger.suspend {
196 continue;
197 }
198 if !previously_known.contains(&(project_id.clone(), name.clone())) {
200 debug!(
201 trigger = name.as_str(),
202 project = project_id.as_str(),
203 "trigger not yet stabilized, skipping cron schedule"
204 );
205 continue;
206 }
207 if let Some(ref cron_spec) = trigger.cron {
208 match compute_next_fire(cron_spec, Utc::now()) {
209 Ok(next) => {
210 entries.push(CronEntry {
211 trigger_name: name.clone(),
212 project: project_id.clone(),
213 next_fire: next,
214 });
215 }
216 Err(e) => {
217 warn!(
218 trigger = name.as_str(),
219 project = project_id.as_str(),
220 error = %e,
221 "failed to compute next fire time"
222 );
223 }
224 }
225 }
226 }
227 }
228 entries
229 }
230
231 async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
234 let source_workflow = if payload.task_id.is_empty() {
238 None
239 } else {
240 self.lookup_task_workflow(&payload.task_id).await
241 };
242
243 let snap = self.state.config_runtime.load();
244 let config = &snap.active_config.config;
245
246 for (project_id, project) in &config.projects {
247 for (name, trigger) in &project.triggers {
248 if trigger.suspend {
249 continue;
250 }
251 if !self
253 .stabilized_triggers
254 .contains(&(project_id.clone(), name.clone()))
255 {
256 continue;
257 }
258 if let Some(ref event_spec) = trigger.event {
259 if event_spec.source != payload.event_type {
261 continue;
262 }
263 if let Some(ref filter) = event_spec.filter {
265 if let Some(ref filter_wf) = filter.workflow {
266 match source_workflow {
267 Some(ref sw) if sw == filter_wf => {}
268 _ => continue,
269 }
270 }
271 if let Some(ref _condition) = filter.condition {
273 if payload.event_type != "webhook" {
277 debug!(
278 trigger = name.as_str(),
279 "CEL condition on non-webhook trigger not yet implemented, skipping"
280 );
281 continue;
282 }
283 }
284 }
285
286 info!(
287 trigger = name.as_str(),
288 project = project_id.as_str(),
289 event_type = payload.event_type.as_str(),
290 "event trigger matched"
291 );
292 self.fire_trigger_with_config(
293 name,
294 project_id,
295 trigger,
296 payload.payload.as_ref(),
297 )
298 .await;
299 }
300 }
301 }
302 }
303
304 async fn fire_trigger(&self, trigger_name: &str, project: &str) {
307 let snap = self.state.config_runtime.load();
308 let config = &snap.active_config.config;
309
310 let trigger = config
311 .projects
312 .get(project)
313 .and_then(|p| p.triggers.get(trigger_name));
314
315 let Some(trigger) = trigger else {
316 warn!(trigger = trigger_name, "trigger not found in config");
317 return;
318 };
319
320 self.fire_trigger_with_config(trigger_name, project, trigger, None)
321 .await;
322 }
323
324 async fn fire_trigger_with_config(
325 &self,
326 trigger_name: &str,
327 project: &str,
328 trigger: &TriggerConfig,
329 webhook_payload: Option<&serde_json::Value>,
330 ) {
331 if trigger.suspend {
333 self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
334 return;
335 }
336
337 if let Some(ref throttle) = trigger.throttle {
339 if throttle.min_interval > 0 {
340 if let Some(last) = self.load_last_fired(trigger_name, project).await {
341 let elapsed = (Utc::now() - last).num_seconds();
342 if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
343 self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
344 return;
345 }
346 }
347 }
348 }
349
350 match trigger.concurrency_policy {
352 crate::cli_types::ConcurrencyPolicy::Forbid => {
353 if self.has_active_task(trigger_name, project).await {
354 self.emit_trigger_event(
355 trigger_name,
356 "trigger_skipped",
357 "concurrent_task_active",
358 );
359 return;
360 }
361 }
362 crate::cli_types::ConcurrencyPolicy::Replace => {
363 self.cancel_active_tasks(trigger_name, project).await;
365 }
366 crate::cli_types::ConcurrencyPolicy::Allow => {}
367 }
368
369 let target_files = trigger
371 .action
372 .args
373 .as_ref()
374 .and_then(|a| a.get("target-file"))
375 .cloned();
376
377 let task_name = format!("trigger-{trigger_name}");
378
379 let payload = CreateTaskPayload {
380 name: Some(task_name),
381 goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
382 project_id: Some(project.to_string()),
383 workspace_id: Some(trigger.action.workspace.clone()),
384 workflow_id: Some(trigger.action.workflow.clone()),
385 target_files,
386 parent_task_id: None,
387 spawn_reason: None,
388 };
389
390 match crate::task_ops::create_task_as_service(&self.state, payload) {
391 Ok(summary) => {
392 let task_id = summary.id.clone();
393 info!(
394 trigger = trigger_name,
395 task_id = task_id.as_str(),
396 "trigger fired: task created"
397 );
398
399 self.update_trigger_state(trigger_name, project, &task_id, "created")
401 .await;
402
403 self.state.emit_event(
405 &task_id,
406 None,
407 "trigger_fired",
408 serde_json::json!({
409 "trigger": trigger_name,
410 "source": if trigger.cron.is_some() { "cron" } else { "event" },
411 "task_id": task_id,
412 }),
413 );
414
415 if trigger.action.start {
417 let state = self.state.clone();
418 let tid = task_id.clone();
419 tokio::spawn(async move {
420 if let Err(e) =
421 crate::scheduler_service::enqueue_task_as_service(&state, &tid).await
422 {
423 error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
424 } else {
425 state.worker_notify.notify_one();
426 }
427 });
428 }
429
430 if trigger.history_limit.is_some() {
432 let state = self.state.clone();
433 let name = trigger_name.to_string();
434 let proj = project.to_string();
435 let limit = trigger.history_limit.clone();
436 tokio::spawn(async move {
437 if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
438 {
439 debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
440 }
441 });
442 }
443 }
444 Err(e) => {
445 error!(
446 trigger = trigger_name,
447 error = %e,
448 "trigger failed to create task"
449 );
450 self.update_trigger_state(trigger_name, project, "", "failed_to_create")
451 .await;
452 self.state.emit_event(
453 "",
454 None,
455 "trigger_error",
456 serde_json::json!({
457 "trigger": trigger_name,
458 "error": e.to_string(),
459 }),
460 );
461 }
462 }
463 }
464
465 async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
469 let tid = task_id.to_owned();
470 let result = self
471 .state
472 .async_database
473 .reader()
474 .call(move |conn| {
475 let wf: Option<String> = conn
476 .query_row(
477 "SELECT workflow_id FROM tasks WHERE id = ?1",
478 rusqlite::params![tid],
479 |row| row.get(0),
480 )
481 .ok();
482 Ok(wf)
483 })
484 .await;
485 match result {
486 Ok(wf) => wf,
487 Err(e) => {
488 debug!(task_id, error = %e, "failed to look up task workflow");
489 None
490 }
491 }
492 }
493
494 async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
495 let name = trigger_name.to_owned();
496 let proj = project.to_owned();
497 let result = self
498 .state
499 .async_database
500 .reader()
501 .call(move |conn| {
502 let mut stmt = conn
503 .prepare(
504 "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
505 )
506 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
507 let ts: Option<String> = stmt
508 .query_row(rusqlite::params![name, proj], |row| row.get(0))
509 .ok();
510 Ok(ts)
511 })
512 .await;
513
514 match result {
515 Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
516 _ => None,
517 }
518 }
519
520 async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
521 let name = trigger_name.to_owned();
522 let proj = project.to_owned();
523 let result = self
524 .state
525 .async_database
526 .reader()
527 .call(move |conn| {
528 let last_task_id: Option<String> = conn
529 .query_row(
530 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
531 rusqlite::params![name, proj],
532 |row| row.get(0),
533 )
534 .ok()
535 .flatten();
536
537 if let Some(ref tid) = last_task_id {
538 let status: Option<String> = conn
539 .query_row(
540 "SELECT status FROM tasks WHERE id = ?1",
541 rusqlite::params![tid],
542 |row| row.get(0),
543 )
544 .ok();
545 if let Some(s) = status {
546 return Ok(matches!(
547 s.as_str(),
548 "created" | "pending" | "running" | "restart_pending"
549 ));
550 }
551 }
552 Ok(false)
553 })
554 .await;
555
556 result.unwrap_or(false)
557 }
558
559 async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
560 let name = trigger_name.to_owned();
561 let proj = project.to_owned();
562 let state = self.state.clone();
563 let result = state
564 .async_database
565 .reader()
566 .call(move |conn| {
567 let tid: Option<String> = conn
568 .query_row(
569 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
570 rusqlite::params![name, proj],
571 |row| row.get(0),
572 )
573 .ok()
574 .flatten();
575 Ok(tid)
576 })
577 .await;
578
579 if let Ok(Some(task_id)) = result {
580 if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
581 warn!(
582 trigger = trigger_name,
583 task_id = task_id.as_str(),
584 error = %e,
585 "failed to cancel active task for Replace policy"
586 );
587 }
588 }
589 }
590
591 async fn update_trigger_state(
592 &self,
593 trigger_name: &str,
594 project: &str,
595 task_id: &str,
596 status: &str,
597 ) {
598 let name = trigger_name.to_owned();
599 let proj = project.to_owned();
600 let tid = task_id.to_owned();
601 let st = status.to_owned();
602 let now = Utc::now().to_rfc3339();
603 let now2 = now.clone();
604
605 if let Err(e) = self
606 .state
607 .async_database
608 .writer()
609 .call(move |conn| {
610 conn.execute(
611 "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
612 VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
613 ON CONFLICT(trigger_name, project) DO UPDATE SET
614 last_fired_at = ?3,
615 fire_count = fire_count + 1,
616 last_task_id = ?4,
617 last_status = ?5,
618 updated_at = ?7",
619 rusqlite::params![name, proj, now, tid, st, now2, now2],
620 )
621 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
622 Ok(())
623 })
624 .await
625 {
626 warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
627 }
628 }
629
630 fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
631 debug!(trigger = trigger_name, event_type, reason, "trigger event");
632 self.state.emit_event(
633 "",
634 None,
635 event_type,
636 serde_json::json!({
637 "trigger": trigger_name,
638 "reason": reason,
639 }),
640 );
641 }
642}
643
644struct CronEntry {
647 trigger_name: String,
648 project: String,
649 next_fire: DateTime<Utc>,
650}
651
652fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
653 use cron::Schedule;
654 use std::str::FromStr;
655
656 let schedule = Schedule::from_str(&spec.schedule)
657 .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
658
659 if let Some(ref tz_name) = spec.timezone {
661 let tz: chrono_tz::Tz = tz_name
662 .parse()
663 .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
664 let local_after = after.with_timezone(&tz);
665 let next = schedule
666 .after(&local_after)
667 .next()
668 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
669 Ok(next.with_timezone(&Utc))
670 } else {
671 let next = schedule
672 .after(&after)
673 .next()
674 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
675 Ok(next.with_timezone(&Utc))
676 }
677}
678
679fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
680 let now = Utc::now();
681 entries
682 .iter()
683 .map(|e| {
684 let diff = e.next_fire.signed_duration_since(now);
685 if diff.num_milliseconds() <= 0 {
686 std::time::Duration::from_millis(100)
687 } else {
688 std::time::Duration::from_millis(diff.num_milliseconds() as u64)
689 }
690 })
691 .min()
692 .unwrap_or(std::time::Duration::from_secs(3600))
694}
695
696fn collect_due_triggers(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<(String, String)> {
697 entries
698 .iter()
699 .filter(|e| e.next_fire <= now)
700 .map(|e| (e.trigger_name.clone(), e.project.clone()))
701 .collect()
702}
703
704async fn cleanup_history(
705 state: &InnerState,
706 trigger_name: &str,
707 project: &str,
708 limit: Option<&crate::config::TriggerHistoryLimitConfig>,
709) -> Result<()> {
710 let limit = match limit {
711 Some(l) => l,
712 None => return Ok(()),
713 };
714
715 let task_name_pattern = format!("trigger-{trigger_name}");
716 let proj = project.to_owned();
717
718 let mut ids_to_delete: Vec<String> = Vec::new();
720
721 if let Some(max_successful) = limit.successful {
722 let pattern = task_name_pattern.clone();
723 let p = proj.clone();
724 let max = max_successful as usize;
725 let ids = state
726 .async_database
727 .reader()
728 .call(move |conn| {
729 let mut stmt = conn
730 .prepare(
731 "SELECT id FROM tasks \
732 WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
733 ORDER BY created_at DESC",
734 )
735 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
736 let rows = stmt
737 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
738 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
739 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
740 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
741 })
742 .await
743 .context("query completed tasks for history cleanup")?;
744 ids_to_delete.extend(ids);
745 }
746
747 if let Some(max_failed) = limit.failed {
748 let pattern = task_name_pattern.clone();
749 let p = proj.clone();
750 let max = max_failed as usize;
751 let ids = state
752 .async_database
753 .reader()
754 .call(move |conn| {
755 let mut stmt = conn
756 .prepare(
757 "SELECT id FROM tasks \
758 WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
759 ORDER BY created_at DESC",
760 )
761 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
762 let rows = stmt
763 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
764 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
765 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
766 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
767 })
768 .await
769 .context("query failed tasks for history cleanup")?;
770 ids_to_delete.extend(ids);
771 }
772
773 if ids_to_delete.is_empty() {
774 return Ok(());
775 }
776
777 state
778 .async_database
779 .writer()
780 .call(move |conn| {
781 let placeholders: Vec<String> =
782 (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
783 let sql = format!(
784 "DELETE FROM tasks WHERE id IN ({})",
785 placeholders.join(", ")
786 );
787 let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
788 .iter()
789 .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
790 .collect();
791 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
792 params.iter().map(|p| p.as_ref()).collect();
793 conn.execute(&sql, param_refs.as_slice())
794 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
795 Ok(())
796 })
797 .await
798 .context("delete excess trigger history tasks")?;
799
800 Ok(())
801}
802
803fn build_trigger_goal(trigger_name: &str, webhook_payload: Option<&serde_json::Value>) -> String {
808 match webhook_payload {
809 Some(payload) => {
810 let summary = serde_json::to_string(payload).unwrap_or_default();
811 let truncated = if summary.len() > 500 {
812 format!("{}...", &summary[..497])
813 } else {
814 summary
815 };
816 format!("Triggered by webhook '{trigger_name}': {truncated}")
817 }
818 None => format!("Triggered by: {trigger_name}"),
819 }
820}
821
822pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
827 let _ = state.trigger_event_tx.send(payload);
829}
830
831pub fn notify_trigger_reload(state: &InnerState) {
834 if let Ok(guard) = state.trigger_engine_handle.lock() {
835 if let Some(ref handle) = *guard {
836 let _ = handle.reload_sync();
837 }
838 }
839}
840
841#[cfg(test)]
842mod tests {
843 use super::*;
844 use chrono::Timelike;
845
846 #[test]
847 fn compute_next_fire_utc() {
848 let spec = TriggerCronConfig {
849 schedule: "0 0 2 * * *".to_string(), timezone: None,
851 };
852 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
853 .unwrap()
854 .and_hms_opt(0, 0, 0)
855 .unwrap()
856 .and_utc();
857 let next = compute_next_fire(&spec, after).expect("should compute");
858 assert!(next > after);
859 assert_eq!(next.hour(), 2);
860 }
861
862 #[test]
863 fn compute_next_fire_with_timezone() {
864 let spec = TriggerCronConfig {
865 schedule: "0 0 2 * * *".to_string(),
866 timezone: Some("Asia/Shanghai".to_string()),
867 };
868 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
869 .unwrap()
870 .and_hms_opt(0, 0, 0)
871 .unwrap()
872 .and_utc();
873 let next = compute_next_fire(&spec, after).expect("should compute with tz");
874 assert!(next > after);
875 assert_eq!(next.hour(), 18);
877 }
878
879 #[test]
880 fn compute_next_fire_rejects_invalid_schedule() {
881 let spec = TriggerCronConfig {
882 schedule: "not a cron".to_string(),
883 timezone: None,
884 };
885 assert!(compute_next_fire(&spec, Utc::now()).is_err());
886 }
887
888 #[test]
889 fn compute_next_fire_rejects_invalid_timezone() {
890 let spec = TriggerCronConfig {
891 schedule: "0 0 2 * * *".to_string(),
892 timezone: Some("Invalid/TZ".to_string()),
893 };
894 assert!(compute_next_fire(&spec, Utc::now()).is_err());
895 }
896
897 #[test]
898 fn next_cron_sleep_empty_returns_1h() {
899 let d = next_cron_sleep(&[]);
900 assert_eq!(d, std::time::Duration::from_secs(3600));
901 }
902
903 #[test]
904 fn collect_due_triggers_finds_past_entries() {
905 let now = Utc::now();
906 let past = now - chrono::Duration::seconds(10);
907 let future = now + chrono::Duration::seconds(300);
908 let entries = vec![
909 CronEntry {
910 trigger_name: "past".to_string(),
911 project: "p".to_string(),
912 next_fire: past,
913 },
914 CronEntry {
915 trigger_name: "future".to_string(),
916 project: "p".to_string(),
917 next_fire: future,
918 },
919 ];
920 let due = collect_due_triggers(&entries, now);
921 assert_eq!(due.len(), 1);
922 assert_eq!(due[0].0, "past");
923 }
924}