1use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24 let runtime = {
26 let running = state.running.lock().await;
27 running.get(task_id).cloned()
28 };
29 if let Some(rt) = runtime {
30 rt.stop_flag.store(true, Ordering::SeqCst);
31 }
32 state
34 .db_writer
35 .set_task_status(task_id, "cancelled", false)
36 .await?;
37 insert_event(
38 state,
39 task_id,
40 None,
41 "task_control",
42 serde_json::json!({"status": "cancelled"}),
43 )
44 .await?;
45 Ok(())
46}
47
48#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53 pub event_type: String,
55 pub task_id: String,
57 pub payload: Option<serde_json::Value>,
59 pub project: Option<String>,
62}
63
64#[derive(Debug)]
66pub enum TriggerReloadEvent {
67 Reload,
69}
70
71#[derive(Clone)]
73pub struct TriggerEngineHandle {
74 reload_tx: mpsc::Sender<TriggerReloadEvent>,
75}
76
77impl TriggerEngineHandle {
78 pub async fn reload(&self) {
80 let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
81 }
82
83 pub fn reload_sync(&self) -> bool {
88 self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
89 }
90}
91
92pub struct TriggerEngine {
95 state: Arc<InnerState>,
96 reload_rx: mpsc::Receiver<TriggerReloadEvent>,
97 trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
98 stabilized_triggers: HashSet<(String, String)>,
103}
104
105impl TriggerEngine {
106 pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
108 let (reload_tx, reload_rx) = mpsc::channel(16);
109 let trigger_event_rx = state.trigger_event_tx.subscribe();
110 let engine = Self {
111 state,
112 reload_rx,
113 trigger_event_rx,
114 stabilized_triggers: HashSet::new(),
115 };
116 let handle = TriggerEngineHandle { reload_tx };
117 (engine, handle)
118 }
119
120 pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
122 info!("trigger engine started");
123
124 let mut cron_schedule = self.build_cron_schedule();
126
127 loop {
128 let sleep_duration = next_cron_sleep(&cron_schedule);
129 let sleep_fut = tokio::time::sleep(sleep_duration);
130 tokio::pin!(sleep_fut);
131
132 tokio::select! {
133 () = &mut sleep_fut => {
135 let now = Utc::now();
136 let fired = collect_due_triggers(&cron_schedule, now);
137 for (trigger_name, project) in fired {
138 self.fire_trigger(&trigger_name, &project).await;
139 }
140 cron_schedule = self.build_cron_schedule();
142 }
143
144 event_result = self.trigger_event_rx.recv() => {
146 match event_result {
147 Ok(payload) => {
148 self.handle_event_trigger(&payload).await;
149 }
150 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
151 warn!(skipped = n, "trigger event receiver lagged");
152 }
153 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
154 debug!("trigger event channel closed");
155 break;
156 }
157 }
158 }
159
160 Some(_) = self.reload_rx.recv() => {
162 info!("trigger engine: reloading configuration");
163 cron_schedule = self.build_cron_schedule();
164 }
165
166 _ = shutdown_rx.changed() => {
168 info!("trigger engine shutting down");
169 break;
170 }
171 }
172 }
173 }
174
175 fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
178 let snap = self.state.config_runtime.load();
179 let config = &snap.active_config.config;
180 let mut entries = Vec::new();
181
182 let mut current_triggers: HashSet<(String, String)> = HashSet::new();
184 for (project_id, project) in &config.projects {
185 for name in project.triggers.keys() {
186 current_triggers.insert((project_id.clone(), name.clone()));
187 }
188 }
189
190 let previously_known = std::mem::take(&mut self.stabilized_triggers);
194 self.stabilized_triggers = current_triggers.clone();
195
196 for (project_id, project) in &config.projects {
197 for (name, trigger) in &project.triggers {
198 if trigger.suspend {
199 continue;
200 }
201 if !previously_known.contains(&(project_id.clone(), name.clone())) {
203 debug!(
204 trigger = name.as_str(),
205 project = project_id.as_str(),
206 "trigger not yet stabilized, skipping cron schedule"
207 );
208 continue;
209 }
210 if let Some(ref cron_spec) = trigger.cron {
211 match compute_next_fire(cron_spec, Utc::now()) {
212 Ok(next) => {
213 entries.push(CronEntry {
214 trigger_name: name.clone(),
215 project: project_id.clone(),
216 next_fire: next,
217 });
218 }
219 Err(e) => {
220 warn!(
221 trigger = name.as_str(),
222 project = project_id.as_str(),
223 error = %e,
224 "failed to compute next fire time"
225 );
226 }
227 }
228 }
229 }
230 }
231 entries
232 }
233
234 async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
237 let source_workflow = if payload.task_id.is_empty() {
241 None
242 } else {
243 self.lookup_task_workflow(&payload.task_id).await
244 };
245
246 let snap = self.state.config_runtime.load();
247 let config = &snap.active_config.config;
248
249 for (project_id, project) in &config.projects {
250 if let Some(ref scoped_project) = payload.project {
252 if project_id != scoped_project {
253 continue;
254 }
255 }
256 for (name, trigger) in &project.triggers {
257 if trigger.suspend {
258 continue;
259 }
260 if !self
262 .stabilized_triggers
263 .contains(&(project_id.clone(), name.clone()))
264 {
265 continue;
266 }
267 if let Some(ref event_spec) = trigger.event {
268 if event_spec.source != payload.event_type {
270 continue;
271 }
272 if let Some(ref filter) = event_spec.filter {
274 if let Some(ref filter_wf) = filter.workflow {
275 match source_workflow {
276 Some(ref sw) if sw == filter_wf => {}
277 _ => continue,
278 }
279 }
280 if let Some(ref condition) = filter.condition {
282 if let Some(ref event_payload) = payload.payload {
283 match crate::prehook::evaluate_webhook_filter(
284 condition,
285 event_payload,
286 ) {
287 Ok(true) => {} Ok(false) => {
289 debug!(
290 trigger = name.as_str(),
291 condition, "CEL filter rejected payload"
292 );
293 continue;
294 }
295 Err(e) => {
296 warn!(
297 trigger = name.as_str(),
298 error = %e,
299 "CEL filter evaluation failed, skipping"
300 );
301 continue;
302 }
303 }
304 } else {
305 debug!(
307 trigger = name.as_str(),
308 "CEL condition set but no payload available, skipping"
309 );
310 continue;
311 }
312 }
313 }
314
315 info!(
316 trigger = name.as_str(),
317 project = project_id.as_str(),
318 event_type = payload.event_type.as_str(),
319 "event trigger matched"
320 );
321 self.fire_trigger_with_config(
322 name,
323 project_id,
324 trigger,
325 payload.payload.as_ref(),
326 )
327 .await;
328 }
329 }
330 }
331 }
332
333 async fn fire_trigger(&self, trigger_name: &str, project: &str) {
336 let snap = self.state.config_runtime.load();
337 let config = &snap.active_config.config;
338
339 let trigger = config
340 .projects
341 .get(project)
342 .and_then(|p| p.triggers.get(trigger_name));
343
344 let Some(trigger) = trigger else {
345 warn!(trigger = trigger_name, "trigger not found in config");
346 return;
347 };
348
349 self.fire_trigger_with_config(trigger_name, project, trigger, None)
350 .await;
351 }
352
353 async fn fire_trigger_with_config(
354 &self,
355 trigger_name: &str,
356 project: &str,
357 trigger: &TriggerConfig,
358 webhook_payload: Option<&serde_json::Value>,
359 ) {
360 if trigger.suspend {
362 self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
363 return;
364 }
365
366 if let Some(ref throttle) = trigger.throttle {
368 if throttle.min_interval > 0 {
369 if let Some(last) = self.load_last_fired(trigger_name, project).await {
370 let elapsed = (Utc::now() - last).num_seconds();
371 if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
372 self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
373 return;
374 }
375 }
376 }
377 }
378
379 match trigger.concurrency_policy {
381 crate::cli_types::ConcurrencyPolicy::Forbid => {
382 if self.has_active_task(trigger_name, project).await {
383 self.emit_trigger_event(
384 trigger_name,
385 "trigger_skipped",
386 "concurrent_task_active",
387 );
388 return;
389 }
390 }
391 crate::cli_types::ConcurrencyPolicy::Replace => {
392 self.cancel_active_tasks(trigger_name, project).await;
394 }
395 crate::cli_types::ConcurrencyPolicy::Allow => {}
396 }
397
398 let target_files = trigger
400 .action
401 .args
402 .as_ref()
403 .and_then(|a| a.get("target-file"))
404 .cloned();
405
406 let task_name = format!("trigger-{trigger_name}");
407
408 let payload = CreateTaskPayload {
409 name: Some(task_name),
410 goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
411 project_id: Some(project.to_string()),
412 workspace_id: Some(trigger.action.workspace.clone()),
413 workflow_id: Some(trigger.action.workflow.clone()),
414 target_files,
415 parent_task_id: None,
416 spawn_reason: None,
417 };
418
419 match crate::task_ops::create_task_as_service(&self.state, payload) {
420 Ok(summary) => {
421 let task_id = summary.id.clone();
422 info!(
423 trigger = trigger_name,
424 task_id = task_id.as_str(),
425 "trigger fired: task created"
426 );
427
428 self.update_trigger_state(trigger_name, project, &task_id, "created")
430 .await;
431
432 self.state.emit_event(
434 &task_id,
435 None,
436 "trigger_fired",
437 serde_json::json!({
438 "trigger": trigger_name,
439 "source": if trigger.cron.is_some() { "cron" } else { "event" },
440 "task_id": task_id,
441 }),
442 );
443
444 if trigger.action.start {
446 let state = self.state.clone();
447 let tid = task_id.clone();
448 tokio::spawn(async move {
449 if let Err(e) = state.task_enqueuer.enqueue_task(&state, &tid).await {
450 error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
451 } else {
452 state.worker_notify.notify_one();
453 }
454 });
455 }
456
457 if trigger.history_limit.is_some() {
459 let state = self.state.clone();
460 let name = trigger_name.to_string();
461 let proj = project.to_string();
462 let limit = trigger.history_limit.clone();
463 tokio::spawn(async move {
464 if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
465 {
466 debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
467 }
468 });
469 }
470 }
471 Err(e) => {
472 error!(
473 trigger = trigger_name,
474 error = %e,
475 "trigger failed to create task"
476 );
477 self.update_trigger_state(trigger_name, project, "", "failed_to_create")
478 .await;
479 self.state.emit_event(
480 "",
481 None,
482 "trigger_error",
483 serde_json::json!({
484 "trigger": trigger_name,
485 "error": e.to_string(),
486 }),
487 );
488 }
489 }
490 }
491
492 async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
496 let tid = task_id.to_owned();
497 let result = self
498 .state
499 .async_database
500 .reader()
501 .call(move |conn| {
502 let wf: Option<String> = conn
503 .query_row(
504 "SELECT workflow_id FROM tasks WHERE id = ?1",
505 rusqlite::params![tid],
506 |row| row.get(0),
507 )
508 .ok();
509 Ok(wf)
510 })
511 .await;
512 match result {
513 Ok(wf) => wf,
514 Err(e) => {
515 debug!(task_id, error = %e, "failed to look up task workflow");
516 None
517 }
518 }
519 }
520
521 async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
522 let name = trigger_name.to_owned();
523 let proj = project.to_owned();
524 let result = self
525 .state
526 .async_database
527 .reader()
528 .call(move |conn| {
529 let mut stmt = conn
530 .prepare(
531 "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
532 )
533 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
534 let ts: Option<String> = stmt
535 .query_row(rusqlite::params![name, proj], |row| row.get(0))
536 .ok();
537 Ok(ts)
538 })
539 .await;
540
541 match result {
542 Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
543 _ => None,
544 }
545 }
546
547 async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
548 let name = trigger_name.to_owned();
549 let proj = project.to_owned();
550 let result = self
551 .state
552 .async_database
553 .reader()
554 .call(move |conn| {
555 let last_task_id: Option<String> = conn
556 .query_row(
557 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
558 rusqlite::params![name, proj],
559 |row| row.get(0),
560 )
561 .ok()
562 .flatten();
563
564 if let Some(ref tid) = last_task_id {
565 let status: Option<String> = conn
566 .query_row(
567 "SELECT status FROM tasks WHERE id = ?1",
568 rusqlite::params![tid],
569 |row| row.get(0),
570 )
571 .ok();
572 if let Some(s) = status {
573 return Ok(matches!(
574 s.as_str(),
575 "created" | "pending" | "running" | "restart_pending"
576 ));
577 }
578 }
579 Ok(false)
580 })
581 .await;
582
583 result.unwrap_or(false)
584 }
585
586 async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
587 let name = trigger_name.to_owned();
588 let proj = project.to_owned();
589 let state = self.state.clone();
590 let result = state
591 .async_database
592 .reader()
593 .call(move |conn| {
594 let tid: Option<String> = conn
595 .query_row(
596 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
597 rusqlite::params![name, proj],
598 |row| row.get(0),
599 )
600 .ok()
601 .flatten();
602 Ok(tid)
603 })
604 .await;
605
606 if let Ok(Some(task_id)) = result {
607 if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
608 warn!(
609 trigger = trigger_name,
610 task_id = task_id.as_str(),
611 error = %e,
612 "failed to cancel active task for Replace policy"
613 );
614 }
615 }
616 }
617
618 async fn update_trigger_state(
619 &self,
620 trigger_name: &str,
621 project: &str,
622 task_id: &str,
623 status: &str,
624 ) {
625 let name = trigger_name.to_owned();
626 let proj = project.to_owned();
627 let tid = task_id.to_owned();
628 let st = status.to_owned();
629 let now = Utc::now().to_rfc3339();
630 let now2 = now.clone();
631
632 if let Err(e) = self
633 .state
634 .async_database
635 .writer()
636 .call(move |conn| {
637 conn.execute(
638 "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
639 VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
640 ON CONFLICT(trigger_name, project) DO UPDATE SET
641 last_fired_at = ?3,
642 fire_count = fire_count + 1,
643 last_task_id = ?4,
644 last_status = ?5,
645 updated_at = ?7",
646 rusqlite::params![name, proj, now, tid, st, now2, now2],
647 )
648 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
649 Ok(())
650 })
651 .await
652 {
653 warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
654 }
655 }
656
657 fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
658 debug!(trigger = trigger_name, event_type, reason, "trigger event");
659 self.state.emit_event(
660 "",
661 None,
662 event_type,
663 serde_json::json!({
664 "trigger": trigger_name,
665 "reason": reason,
666 }),
667 );
668 }
669}
670
671struct CronEntry {
674 trigger_name: String,
675 project: String,
676 next_fire: DateTime<Utc>,
677}
678
679fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
680 use cron::Schedule;
681 use std::str::FromStr;
682
683 let schedule = Schedule::from_str(&spec.schedule)
684 .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
685
686 if let Some(ref tz_name) = spec.timezone {
688 let tz: chrono_tz::Tz = tz_name
689 .parse()
690 .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
691 let local_after = after.with_timezone(&tz);
692 let next = schedule
693 .after(&local_after)
694 .next()
695 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
696 Ok(next.with_timezone(&Utc))
697 } else {
698 let next = schedule
699 .after(&after)
700 .next()
701 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
702 Ok(next.with_timezone(&Utc))
703 }
704}
705
706fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
707 let now = Utc::now();
708 entries
709 .iter()
710 .map(|e| {
711 let diff = e.next_fire.signed_duration_since(now);
712 if diff.num_milliseconds() <= 0 {
713 std::time::Duration::from_millis(100)
714 } else {
715 std::time::Duration::from_millis(diff.num_milliseconds() as u64)
716 }
717 })
718 .min()
719 .unwrap_or(std::time::Duration::from_secs(3600))
721}
722
723fn collect_due_triggers(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<(String, String)> {
724 entries
725 .iter()
726 .filter(|e| e.next_fire <= now)
727 .map(|e| (e.trigger_name.clone(), e.project.clone()))
728 .collect()
729}
730
731async fn cleanup_history(
732 state: &InnerState,
733 trigger_name: &str,
734 project: &str,
735 limit: Option<&crate::config::TriggerHistoryLimitConfig>,
736) -> Result<()> {
737 let limit = match limit {
738 Some(l) => l,
739 None => return Ok(()),
740 };
741
742 let task_name_pattern = format!("trigger-{trigger_name}");
743 let proj = project.to_owned();
744
745 let mut ids_to_delete: Vec<String> = Vec::new();
747
748 if let Some(max_successful) = limit.successful {
749 let pattern = task_name_pattern.clone();
750 let p = proj.clone();
751 let max = max_successful as usize;
752 let ids = state
753 .async_database
754 .reader()
755 .call(move |conn| {
756 let mut stmt = conn
757 .prepare(
758 "SELECT id FROM tasks \
759 WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
760 ORDER BY created_at DESC",
761 )
762 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
763 let rows = stmt
764 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
765 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
766 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
767 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
768 })
769 .await
770 .context("query completed tasks for history cleanup")?;
771 ids_to_delete.extend(ids);
772 }
773
774 if let Some(max_failed) = limit.failed {
775 let pattern = task_name_pattern.clone();
776 let p = proj.clone();
777 let max = max_failed as usize;
778 let ids = state
779 .async_database
780 .reader()
781 .call(move |conn| {
782 let mut stmt = conn
783 .prepare(
784 "SELECT id FROM tasks \
785 WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
786 ORDER BY created_at DESC",
787 )
788 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
789 let rows = stmt
790 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
791 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
792 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
793 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
794 })
795 .await
796 .context("query failed tasks for history cleanup")?;
797 ids_to_delete.extend(ids);
798 }
799
800 if ids_to_delete.is_empty() {
801 return Ok(());
802 }
803
804 state
805 .async_database
806 .writer()
807 .call(move |conn| {
808 let placeholders: Vec<String> =
809 (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
810 let sql = format!(
811 "DELETE FROM tasks WHERE id IN ({})",
812 placeholders.join(", ")
813 );
814 let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
815 .iter()
816 .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
817 .collect();
818 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
819 params.iter().map(|p| p.as_ref()).collect();
820 conn.execute(&sql, param_refs.as_slice())
821 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
822 Ok(())
823 })
824 .await
825 .context("delete excess trigger history tasks")?;
826
827 Ok(())
828}
829
830fn build_trigger_goal(trigger_name: &str, event_payload: Option<&serde_json::Value>) -> String {
835 match event_payload {
836 Some(payload) => {
837 if let Some(filename) = payload.get("filename").and_then(|v| v.as_str()) {
839 if let Some(event_type) = payload.get("event_type").and_then(|v| v.as_str()) {
840 return format!(
841 "Triggered by filesystem '{trigger_name}': {event_type} {filename}"
842 );
843 }
844 }
845 let summary = serde_json::to_string(payload).unwrap_or_default();
846 let truncated = if summary.len() > 500 {
847 format!("{}...", &summary[..497])
848 } else {
849 summary
850 };
851 format!("Triggered by '{trigger_name}': {truncated}")
852 }
853 None => format!("Triggered by: {trigger_name}"),
854 }
855}
856
857pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
862 let _ = state.trigger_event_tx.send(payload);
864}
865
866pub fn notify_trigger_reload(state: &InnerState) {
870 if let Ok(guard) = state.trigger_engine_handle.lock() {
871 if let Some(ref handle) = *guard {
872 let _ = handle.reload_sync();
873 }
874 }
875 if let Ok(guard) = state.fs_watcher_reload_tx.lock() {
877 if let Some(ref tx) = *guard {
878 let _ = tx.try_send(());
879 }
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use chrono::Timelike;
887
888 #[test]
889 fn compute_next_fire_utc() {
890 let spec = TriggerCronConfig {
891 schedule: "0 0 2 * * *".to_string(), timezone: None,
893 };
894 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
895 .unwrap()
896 .and_hms_opt(0, 0, 0)
897 .unwrap()
898 .and_utc();
899 let next = compute_next_fire(&spec, after).expect("should compute");
900 assert!(next > after);
901 assert_eq!(next.hour(), 2);
902 }
903
904 #[test]
905 fn compute_next_fire_with_timezone() {
906 let spec = TriggerCronConfig {
907 schedule: "0 0 2 * * *".to_string(),
908 timezone: Some("Asia/Shanghai".to_string()),
909 };
910 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
911 .unwrap()
912 .and_hms_opt(0, 0, 0)
913 .unwrap()
914 .and_utc();
915 let next = compute_next_fire(&spec, after).expect("should compute with tz");
916 assert!(next > after);
917 assert_eq!(next.hour(), 18);
919 }
920
921 #[test]
922 fn compute_next_fire_rejects_invalid_schedule() {
923 let spec = TriggerCronConfig {
924 schedule: "not a cron".to_string(),
925 timezone: None,
926 };
927 assert!(compute_next_fire(&spec, Utc::now()).is_err());
928 }
929
930 #[test]
931 fn compute_next_fire_rejects_invalid_timezone() {
932 let spec = TriggerCronConfig {
933 schedule: "0 0 2 * * *".to_string(),
934 timezone: Some("Invalid/TZ".to_string()),
935 };
936 assert!(compute_next_fire(&spec, Utc::now()).is_err());
937 }
938
939 #[test]
940 fn next_cron_sleep_empty_returns_1h() {
941 let d = next_cron_sleep(&[]);
942 assert_eq!(d, std::time::Duration::from_secs(3600));
943 }
944
945 #[test]
946 fn collect_due_triggers_finds_past_entries() {
947 let now = Utc::now();
948 let past = now - chrono::Duration::seconds(10);
949 let future = now + chrono::Duration::seconds(300);
950 let entries = vec![
951 CronEntry {
952 trigger_name: "past".to_string(),
953 project: "p".to_string(),
954 next_fire: past,
955 },
956 CronEntry {
957 trigger_name: "future".to_string(),
958 project: "p".to_string(),
959 next_fire: future,
960 },
961 ];
962 let due = collect_due_triggers(&entries, now);
963 assert_eq!(due.len(), 1);
964 assert_eq!(due[0].0, "past");
965 }
966}