1use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24 let runtime = {
26 let running = state.running.lock().await;
27 running.get(task_id).cloned()
28 };
29 if let Some(rt) = runtime {
30 rt.stop_flag.store(true, Ordering::SeqCst);
31 }
32 state
34 .db_writer
35 .set_task_status(task_id, "cancelled", false)
36 .await?;
37 insert_event(
38 state,
39 task_id,
40 None,
41 "task_control",
42 serde_json::json!({"status": "cancelled"}),
43 )
44 .await?;
45 Ok(())
46}
47
48#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53 pub event_type: String,
55 pub task_id: String,
57 pub payload: Option<serde_json::Value>,
59}
60
61#[derive(Debug)]
63pub enum TriggerReloadEvent {
64 Reload,
66}
67
68#[derive(Clone)]
70pub struct TriggerEngineHandle {
71 reload_tx: mpsc::Sender<TriggerReloadEvent>,
72}
73
74impl TriggerEngineHandle {
75 pub async fn reload(&self) {
77 let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
78 }
79
80 pub fn reload_sync(&self) -> bool {
85 self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
86 }
87}
88
89pub struct TriggerEngine {
92 state: Arc<InnerState>,
93 reload_rx: mpsc::Receiver<TriggerReloadEvent>,
94 trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
95 stabilized_triggers: HashSet<(String, String)>,
100}
101
102impl TriggerEngine {
103 pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
105 let (reload_tx, reload_rx) = mpsc::channel(16);
106 let trigger_event_rx = state.trigger_event_tx.subscribe();
107 let engine = Self {
108 state,
109 reload_rx,
110 trigger_event_rx,
111 stabilized_triggers: HashSet::new(),
112 };
113 let handle = TriggerEngineHandle { reload_tx };
114 (engine, handle)
115 }
116
117 pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
119 info!("trigger engine started");
120
121 let mut cron_schedule = self.build_cron_schedule();
123
124 loop {
125 let sleep_duration = next_cron_sleep(&cron_schedule);
126 let sleep_fut = tokio::time::sleep(sleep_duration);
127 tokio::pin!(sleep_fut);
128
129 tokio::select! {
130 () = &mut sleep_fut => {
132 let now = Utc::now();
133 let fired = collect_due_triggers(&cron_schedule, now);
134 for (trigger_name, project) in fired {
135 self.fire_trigger(&trigger_name, &project).await;
136 }
137 cron_schedule = self.build_cron_schedule();
139 }
140
141 event_result = self.trigger_event_rx.recv() => {
143 match event_result {
144 Ok(payload) => {
145 self.handle_event_trigger(&payload).await;
146 }
147 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
148 warn!(skipped = n, "trigger event receiver lagged");
149 }
150 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
151 debug!("trigger event channel closed");
152 break;
153 }
154 }
155 }
156
157 Some(_) = self.reload_rx.recv() => {
159 info!("trigger engine: reloading configuration");
160 cron_schedule = self.build_cron_schedule();
161 }
162
163 _ = shutdown_rx.changed() => {
165 info!("trigger engine shutting down");
166 break;
167 }
168 }
169 }
170 }
171
172 fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
175 let snap = self.state.config_runtime.load();
176 let config = &snap.active_config.config;
177 let mut entries = Vec::new();
178
179 let mut current_triggers: HashSet<(String, String)> = HashSet::new();
181 for (project_id, project) in &config.projects {
182 for name in project.triggers.keys() {
183 current_triggers.insert((project_id.clone(), name.clone()));
184 }
185 }
186
187 let previously_known = std::mem::take(&mut self.stabilized_triggers);
191 self.stabilized_triggers = current_triggers.clone();
192
193 for (project_id, project) in &config.projects {
194 for (name, trigger) in &project.triggers {
195 if trigger.suspend {
196 continue;
197 }
198 if !previously_known.contains(&(project_id.clone(), name.clone())) {
200 debug!(
201 trigger = name.as_str(),
202 project = project_id.as_str(),
203 "trigger not yet stabilized, skipping cron schedule"
204 );
205 continue;
206 }
207 if let Some(ref cron_spec) = trigger.cron {
208 match compute_next_fire(cron_spec, Utc::now()) {
209 Ok(next) => {
210 entries.push(CronEntry {
211 trigger_name: name.clone(),
212 project: project_id.clone(),
213 next_fire: next,
214 });
215 }
216 Err(e) => {
217 warn!(
218 trigger = name.as_str(),
219 project = project_id.as_str(),
220 error = %e,
221 "failed to compute next fire time"
222 );
223 }
224 }
225 }
226 }
227 }
228 entries
229 }
230
231 async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
234 let source_workflow = if payload.task_id.is_empty() {
238 None
239 } else {
240 self.lookup_task_workflow(&payload.task_id).await
241 };
242
243 let snap = self.state.config_runtime.load();
244 let config = &snap.active_config.config;
245
246 for (project_id, project) in &config.projects {
247 for (name, trigger) in &project.triggers {
248 if trigger.suspend {
249 continue;
250 }
251 if !self
253 .stabilized_triggers
254 .contains(&(project_id.clone(), name.clone()))
255 {
256 continue;
257 }
258 if let Some(ref event_spec) = trigger.event {
259 if event_spec.source != payload.event_type {
261 continue;
262 }
263 if let Some(ref filter) = event_spec.filter {
265 if let Some(ref filter_wf) = filter.workflow {
266 match source_workflow {
267 Some(ref sw) if sw == filter_wf => {}
268 _ => continue,
269 }
270 }
271 if let Some(ref condition) = filter.condition {
273 if let Some(ref event_payload) = payload.payload {
274 match crate::prehook::evaluate_webhook_filter(
275 condition,
276 event_payload,
277 ) {
278 Ok(true) => {} Ok(false) => {
280 debug!(
281 trigger = name.as_str(),
282 condition, "CEL filter rejected payload"
283 );
284 continue;
285 }
286 Err(e) => {
287 warn!(
288 trigger = name.as_str(),
289 error = %e,
290 "CEL filter evaluation failed, skipping"
291 );
292 continue;
293 }
294 }
295 } else {
296 debug!(
298 trigger = name.as_str(),
299 "CEL condition set but no payload available, skipping"
300 );
301 continue;
302 }
303 }
304 }
305
306 info!(
307 trigger = name.as_str(),
308 project = project_id.as_str(),
309 event_type = payload.event_type.as_str(),
310 "event trigger matched"
311 );
312 self.fire_trigger_with_config(
313 name,
314 project_id,
315 trigger,
316 payload.payload.as_ref(),
317 )
318 .await;
319 }
320 }
321 }
322 }
323
324 async fn fire_trigger(&self, trigger_name: &str, project: &str) {
327 let snap = self.state.config_runtime.load();
328 let config = &snap.active_config.config;
329
330 let trigger = config
331 .projects
332 .get(project)
333 .and_then(|p| p.triggers.get(trigger_name));
334
335 let Some(trigger) = trigger else {
336 warn!(trigger = trigger_name, "trigger not found in config");
337 return;
338 };
339
340 self.fire_trigger_with_config(trigger_name, project, trigger, None)
341 .await;
342 }
343
344 async fn fire_trigger_with_config(
345 &self,
346 trigger_name: &str,
347 project: &str,
348 trigger: &TriggerConfig,
349 webhook_payload: Option<&serde_json::Value>,
350 ) {
351 if trigger.suspend {
353 self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
354 return;
355 }
356
357 if let Some(ref throttle) = trigger.throttle {
359 if throttle.min_interval > 0 {
360 if let Some(last) = self.load_last_fired(trigger_name, project).await {
361 let elapsed = (Utc::now() - last).num_seconds();
362 if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
363 self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
364 return;
365 }
366 }
367 }
368 }
369
370 match trigger.concurrency_policy {
372 crate::cli_types::ConcurrencyPolicy::Forbid => {
373 if self.has_active_task(trigger_name, project).await {
374 self.emit_trigger_event(
375 trigger_name,
376 "trigger_skipped",
377 "concurrent_task_active",
378 );
379 return;
380 }
381 }
382 crate::cli_types::ConcurrencyPolicy::Replace => {
383 self.cancel_active_tasks(trigger_name, project).await;
385 }
386 crate::cli_types::ConcurrencyPolicy::Allow => {}
387 }
388
389 let target_files = trigger
391 .action
392 .args
393 .as_ref()
394 .and_then(|a| a.get("target-file"))
395 .cloned();
396
397 let task_name = format!("trigger-{trigger_name}");
398
399 let payload = CreateTaskPayload {
400 name: Some(task_name),
401 goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
402 project_id: Some(project.to_string()),
403 workspace_id: Some(trigger.action.workspace.clone()),
404 workflow_id: Some(trigger.action.workflow.clone()),
405 target_files,
406 parent_task_id: None,
407 spawn_reason: None,
408 };
409
410 match crate::task_ops::create_task_as_service(&self.state, payload) {
411 Ok(summary) => {
412 let task_id = summary.id.clone();
413 info!(
414 trigger = trigger_name,
415 task_id = task_id.as_str(),
416 "trigger fired: task created"
417 );
418
419 self.update_trigger_state(trigger_name, project, &task_id, "created")
421 .await;
422
423 self.state.emit_event(
425 &task_id,
426 None,
427 "trigger_fired",
428 serde_json::json!({
429 "trigger": trigger_name,
430 "source": if trigger.cron.is_some() { "cron" } else { "event" },
431 "task_id": task_id,
432 }),
433 );
434
435 if trigger.action.start {
437 let state = self.state.clone();
438 let tid = task_id.clone();
439 tokio::spawn(async move {
440 if let Err(e) =
441 crate::scheduler_service::enqueue_task_as_service(&state, &tid).await
442 {
443 error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
444 } else {
445 state.worker_notify.notify_one();
446 }
447 });
448 }
449
450 if trigger.history_limit.is_some() {
452 let state = self.state.clone();
453 let name = trigger_name.to_string();
454 let proj = project.to_string();
455 let limit = trigger.history_limit.clone();
456 tokio::spawn(async move {
457 if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
458 {
459 debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
460 }
461 });
462 }
463 }
464 Err(e) => {
465 error!(
466 trigger = trigger_name,
467 error = %e,
468 "trigger failed to create task"
469 );
470 self.update_trigger_state(trigger_name, project, "", "failed_to_create")
471 .await;
472 self.state.emit_event(
473 "",
474 None,
475 "trigger_error",
476 serde_json::json!({
477 "trigger": trigger_name,
478 "error": e.to_string(),
479 }),
480 );
481 }
482 }
483 }
484
485 async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
489 let tid = task_id.to_owned();
490 let result = self
491 .state
492 .async_database
493 .reader()
494 .call(move |conn| {
495 let wf: Option<String> = conn
496 .query_row(
497 "SELECT workflow_id FROM tasks WHERE id = ?1",
498 rusqlite::params![tid],
499 |row| row.get(0),
500 )
501 .ok();
502 Ok(wf)
503 })
504 .await;
505 match result {
506 Ok(wf) => wf,
507 Err(e) => {
508 debug!(task_id, error = %e, "failed to look up task workflow");
509 None
510 }
511 }
512 }
513
514 async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
515 let name = trigger_name.to_owned();
516 let proj = project.to_owned();
517 let result = self
518 .state
519 .async_database
520 .reader()
521 .call(move |conn| {
522 let mut stmt = conn
523 .prepare(
524 "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
525 )
526 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
527 let ts: Option<String> = stmt
528 .query_row(rusqlite::params![name, proj], |row| row.get(0))
529 .ok();
530 Ok(ts)
531 })
532 .await;
533
534 match result {
535 Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
536 _ => None,
537 }
538 }
539
540 async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
541 let name = trigger_name.to_owned();
542 let proj = project.to_owned();
543 let result = self
544 .state
545 .async_database
546 .reader()
547 .call(move |conn| {
548 let last_task_id: Option<String> = conn
549 .query_row(
550 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
551 rusqlite::params![name, proj],
552 |row| row.get(0),
553 )
554 .ok()
555 .flatten();
556
557 if let Some(ref tid) = last_task_id {
558 let status: Option<String> = conn
559 .query_row(
560 "SELECT status FROM tasks WHERE id = ?1",
561 rusqlite::params![tid],
562 |row| row.get(0),
563 )
564 .ok();
565 if let Some(s) = status {
566 return Ok(matches!(
567 s.as_str(),
568 "created" | "pending" | "running" | "restart_pending"
569 ));
570 }
571 }
572 Ok(false)
573 })
574 .await;
575
576 result.unwrap_or(false)
577 }
578
579 async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
580 let name = trigger_name.to_owned();
581 let proj = project.to_owned();
582 let state = self.state.clone();
583 let result = state
584 .async_database
585 .reader()
586 .call(move |conn| {
587 let tid: Option<String> = conn
588 .query_row(
589 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
590 rusqlite::params![name, proj],
591 |row| row.get(0),
592 )
593 .ok()
594 .flatten();
595 Ok(tid)
596 })
597 .await;
598
599 if let Ok(Some(task_id)) = result {
600 if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
601 warn!(
602 trigger = trigger_name,
603 task_id = task_id.as_str(),
604 error = %e,
605 "failed to cancel active task for Replace policy"
606 );
607 }
608 }
609 }
610
611 async fn update_trigger_state(
612 &self,
613 trigger_name: &str,
614 project: &str,
615 task_id: &str,
616 status: &str,
617 ) {
618 let name = trigger_name.to_owned();
619 let proj = project.to_owned();
620 let tid = task_id.to_owned();
621 let st = status.to_owned();
622 let now = Utc::now().to_rfc3339();
623 let now2 = now.clone();
624
625 if let Err(e) = self
626 .state
627 .async_database
628 .writer()
629 .call(move |conn| {
630 conn.execute(
631 "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
632 VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
633 ON CONFLICT(trigger_name, project) DO UPDATE SET
634 last_fired_at = ?3,
635 fire_count = fire_count + 1,
636 last_task_id = ?4,
637 last_status = ?5,
638 updated_at = ?7",
639 rusqlite::params![name, proj, now, tid, st, now2, now2],
640 )
641 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
642 Ok(())
643 })
644 .await
645 {
646 warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
647 }
648 }
649
650 fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
651 debug!(trigger = trigger_name, event_type, reason, "trigger event");
652 self.state.emit_event(
653 "",
654 None,
655 event_type,
656 serde_json::json!({
657 "trigger": trigger_name,
658 "reason": reason,
659 }),
660 );
661 }
662}
663
664struct CronEntry {
667 trigger_name: String,
668 project: String,
669 next_fire: DateTime<Utc>,
670}
671
672fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
673 use cron::Schedule;
674 use std::str::FromStr;
675
676 let schedule = Schedule::from_str(&spec.schedule)
677 .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
678
679 if let Some(ref tz_name) = spec.timezone {
681 let tz: chrono_tz::Tz = tz_name
682 .parse()
683 .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
684 let local_after = after.with_timezone(&tz);
685 let next = schedule
686 .after(&local_after)
687 .next()
688 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
689 Ok(next.with_timezone(&Utc))
690 } else {
691 let next = schedule
692 .after(&after)
693 .next()
694 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
695 Ok(next.with_timezone(&Utc))
696 }
697}
698
699fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
700 let now = Utc::now();
701 entries
702 .iter()
703 .map(|e| {
704 let diff = e.next_fire.signed_duration_since(now);
705 if diff.num_milliseconds() <= 0 {
706 std::time::Duration::from_millis(100)
707 } else {
708 std::time::Duration::from_millis(diff.num_milliseconds() as u64)
709 }
710 })
711 .min()
712 .unwrap_or(std::time::Duration::from_secs(3600))
714}
715
716fn collect_due_triggers(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<(String, String)> {
717 entries
718 .iter()
719 .filter(|e| e.next_fire <= now)
720 .map(|e| (e.trigger_name.clone(), e.project.clone()))
721 .collect()
722}
723
724async fn cleanup_history(
725 state: &InnerState,
726 trigger_name: &str,
727 project: &str,
728 limit: Option<&crate::config::TriggerHistoryLimitConfig>,
729) -> Result<()> {
730 let limit = match limit {
731 Some(l) => l,
732 None => return Ok(()),
733 };
734
735 let task_name_pattern = format!("trigger-{trigger_name}");
736 let proj = project.to_owned();
737
738 let mut ids_to_delete: Vec<String> = Vec::new();
740
741 if let Some(max_successful) = limit.successful {
742 let pattern = task_name_pattern.clone();
743 let p = proj.clone();
744 let max = max_successful as usize;
745 let ids = state
746 .async_database
747 .reader()
748 .call(move |conn| {
749 let mut stmt = conn
750 .prepare(
751 "SELECT id FROM tasks \
752 WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
753 ORDER BY created_at DESC",
754 )
755 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
756 let rows = stmt
757 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
758 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
759 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
760 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
761 })
762 .await
763 .context("query completed tasks for history cleanup")?;
764 ids_to_delete.extend(ids);
765 }
766
767 if let Some(max_failed) = limit.failed {
768 let pattern = task_name_pattern.clone();
769 let p = proj.clone();
770 let max = max_failed as usize;
771 let ids = state
772 .async_database
773 .reader()
774 .call(move |conn| {
775 let mut stmt = conn
776 .prepare(
777 "SELECT id FROM tasks \
778 WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
779 ORDER BY created_at DESC",
780 )
781 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
782 let rows = stmt
783 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
784 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
785 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
786 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
787 })
788 .await
789 .context("query failed tasks for history cleanup")?;
790 ids_to_delete.extend(ids);
791 }
792
793 if ids_to_delete.is_empty() {
794 return Ok(());
795 }
796
797 state
798 .async_database
799 .writer()
800 .call(move |conn| {
801 let placeholders: Vec<String> =
802 (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
803 let sql = format!(
804 "DELETE FROM tasks WHERE id IN ({})",
805 placeholders.join(", ")
806 );
807 let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
808 .iter()
809 .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
810 .collect();
811 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
812 params.iter().map(|p| p.as_ref()).collect();
813 conn.execute(&sql, param_refs.as_slice())
814 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
815 Ok(())
816 })
817 .await
818 .context("delete excess trigger history tasks")?;
819
820 Ok(())
821}
822
823fn build_trigger_goal(trigger_name: &str, webhook_payload: Option<&serde_json::Value>) -> String {
828 match webhook_payload {
829 Some(payload) => {
830 let summary = serde_json::to_string(payload).unwrap_or_default();
831 let truncated = if summary.len() > 500 {
832 format!("{}...", &summary[..497])
833 } else {
834 summary
835 };
836 format!("Triggered by webhook '{trigger_name}': {truncated}")
837 }
838 None => format!("Triggered by: {trigger_name}"),
839 }
840}
841
842pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
847 let _ = state.trigger_event_tx.send(payload);
849}
850
851pub fn notify_trigger_reload(state: &InnerState) {
854 if let Ok(guard) = state.trigger_engine_handle.lock() {
855 if let Some(ref handle) = *guard {
856 let _ = handle.reload_sync();
857 }
858 }
859}
860
861#[cfg(test)]
862mod tests {
863 use super::*;
864 use chrono::Timelike;
865
866 #[test]
867 fn compute_next_fire_utc() {
868 let spec = TriggerCronConfig {
869 schedule: "0 0 2 * * *".to_string(), timezone: None,
871 };
872 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
873 .unwrap()
874 .and_hms_opt(0, 0, 0)
875 .unwrap()
876 .and_utc();
877 let next = compute_next_fire(&spec, after).expect("should compute");
878 assert!(next > after);
879 assert_eq!(next.hour(), 2);
880 }
881
882 #[test]
883 fn compute_next_fire_with_timezone() {
884 let spec = TriggerCronConfig {
885 schedule: "0 0 2 * * *".to_string(),
886 timezone: Some("Asia/Shanghai".to_string()),
887 };
888 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
889 .unwrap()
890 .and_hms_opt(0, 0, 0)
891 .unwrap()
892 .and_utc();
893 let next = compute_next_fire(&spec, after).expect("should compute with tz");
894 assert!(next > after);
895 assert_eq!(next.hour(), 18);
897 }
898
899 #[test]
900 fn compute_next_fire_rejects_invalid_schedule() {
901 let spec = TriggerCronConfig {
902 schedule: "not a cron".to_string(),
903 timezone: None,
904 };
905 assert!(compute_next_fire(&spec, Utc::now()).is_err());
906 }
907
908 #[test]
909 fn compute_next_fire_rejects_invalid_timezone() {
910 let spec = TriggerCronConfig {
911 schedule: "0 0 2 * * *".to_string(),
912 timezone: Some("Invalid/TZ".to_string()),
913 };
914 assert!(compute_next_fire(&spec, Utc::now()).is_err());
915 }
916
917 #[test]
918 fn next_cron_sleep_empty_returns_1h() {
919 let d = next_cron_sleep(&[]);
920 assert_eq!(d, std::time::Duration::from_secs(3600));
921 }
922
923 #[test]
924 fn collect_due_triggers_finds_past_entries() {
925 let now = Utc::now();
926 let past = now - chrono::Duration::seconds(10);
927 let future = now + chrono::Duration::seconds(300);
928 let entries = vec![
929 CronEntry {
930 trigger_name: "past".to_string(),
931 project: "p".to_string(),
932 next_fire: past,
933 },
934 CronEntry {
935 trigger_name: "future".to_string(),
936 project: "p".to_string(),
937 next_fire: future,
938 },
939 ];
940 let due = collect_due_triggers(&entries, now);
941 assert_eq!(due.len(), 1);
942 assert_eq!(due[0].0, "past");
943 }
944}