1use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24 let runtime = {
26 let running = state.running.lock().await;
27 running.get(task_id).cloned()
28 };
29 if let Some(rt) = runtime {
30 rt.stop_flag.store(true, Ordering::SeqCst);
31 }
32 state
34 .db_writer
35 .set_task_status(task_id, "cancelled", false)
36 .await?;
37 insert_event(
38 state,
39 task_id,
40 None,
41 "task_control",
42 serde_json::json!({"status": "cancelled"}),
43 )
44 .await?;
45 Ok(())
46}
47
48#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53 pub event_type: String,
55 pub task_id: String,
57 pub payload: Option<serde_json::Value>,
59 pub project: Option<String>,
62}
63
64#[derive(Debug)]
66pub enum TriggerReloadEvent {
67 Reload,
69}
70
71#[derive(Clone)]
73pub struct TriggerEngineHandle {
74 reload_tx: mpsc::Sender<TriggerReloadEvent>,
75}
76
77impl TriggerEngineHandle {
78 pub async fn reload(&self) {
80 let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
81 }
82
83 pub fn reload_sync(&self) -> bool {
88 self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
89 }
90}
91
92pub struct TriggerEngine {
95 state: Arc<InnerState>,
96 reload_rx: mpsc::Receiver<TriggerReloadEvent>,
97 trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
98 stabilized_triggers: HashSet<(String, String)>,
103}
104
105impl TriggerEngine {
106 pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
108 let (reload_tx, reload_rx) = mpsc::channel(16);
109 let trigger_event_rx = state.trigger_event_tx.subscribe();
110 let engine = Self {
111 state,
112 reload_rx,
113 trigger_event_rx,
114 stabilized_triggers: HashSet::new(),
115 };
116 let handle = TriggerEngineHandle { reload_tx };
117 (engine, handle)
118 }
119
120 pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
122 info!("trigger engine started");
123
124 let mut cron_schedule = self.build_cron_schedule();
126
127 loop {
128 let sleep_duration = next_cron_sleep(&cron_schedule);
129 let sleep_fut = tokio::time::sleep(sleep_duration);
130 tokio::pin!(sleep_fut);
131
132 tokio::select! {
133 () = &mut sleep_fut => {
135 let now = Utc::now();
136 let due = collect_due_entries(&cron_schedule, now);
137 for entry in due {
138 match &entry.kind {
139 CronEntryKind::Trigger => {
140 self.fire_trigger(&entry.trigger_name, &entry.project).await;
141 }
142 CronEntryKind::CrdPlugin { crd_kind, plugin } => {
143 info!(
144 crd = crd_kind.as_str(),
145 plugin = plugin.name.as_str(),
146 "firing CRD cron plugin"
147 );
148 if let Err(e) = crate::crd::plugins::execute_cron_plugin(plugin, crd_kind, Some(&self.state.db_path)).await {
149 warn!(
150 crd = crd_kind.as_str(),
151 plugin = plugin.name.as_str(),
152 error = %e,
153 "CRD cron plugin failed"
154 );
155 }
156 }
157 }
158 }
159 cron_schedule = self.build_cron_schedule();
161 }
162
163 event_result = self.trigger_event_rx.recv() => {
165 match event_result {
166 Ok(payload) => {
167 self.handle_event_trigger(&payload).await;
168 }
169 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
170 warn!(skipped = n, "trigger event receiver lagged");
171 }
172 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
173 debug!("trigger event channel closed");
174 break;
175 }
176 }
177 }
178
179 Some(_) = self.reload_rx.recv() => {
181 info!("trigger engine: reloading configuration");
182 cron_schedule = self.build_cron_schedule();
183 }
184
185 _ = shutdown_rx.changed() => {
187 info!("trigger engine shutting down");
188 break;
189 }
190 }
191 }
192 }
193
194 fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
197 let snap = self.state.config_runtime.load();
198 let config = &snap.active_config.config;
199 let mut entries = Vec::new();
200
201 let mut current_triggers: HashSet<(String, String)> = HashSet::new();
203 for (project_id, project) in &config.projects {
204 for name in project.triggers.keys() {
205 current_triggers.insert((project_id.clone(), name.clone()));
206 }
207 }
208
209 let previously_known = std::mem::take(&mut self.stabilized_triggers);
213 self.stabilized_triggers = current_triggers.clone();
214
215 for (project_id, project) in &config.projects {
216 for (name, trigger) in &project.triggers {
217 if trigger.suspend {
218 continue;
219 }
220 if !previously_known.contains(&(project_id.clone(), name.clone())) {
222 debug!(
223 trigger = name.as_str(),
224 project = project_id.as_str(),
225 "trigger not yet stabilized, skipping cron schedule"
226 );
227 continue;
228 }
229 if let Some(ref cron_spec) = trigger.cron {
230 match compute_next_fire(cron_spec, Utc::now()) {
231 Ok(next) => {
232 entries.push(CronEntry {
233 trigger_name: name.clone(),
234 project: project_id.clone(),
235 next_fire: next,
236 kind: CronEntryKind::Trigger,
237 });
238 }
239 Err(e) => {
240 warn!(
241 trigger = name.as_str(),
242 project = project_id.as_str(),
243 error = %e,
244 "failed to compute next fire time"
245 );
246 }
247 }
248 }
249 }
250 }
251
252 for (crd_kind, crd) in &config.custom_resource_definitions {
254 for plugin in crate::crd::plugins::cron_plugins(&crd.plugins) {
255 if let Some(ref schedule) = plugin.schedule {
256 let cron_spec = TriggerCronConfig {
257 schedule: schedule.clone(),
258 timezone: plugin.timezone.clone(),
259 };
260 match compute_next_fire(&cron_spec, Utc::now()) {
261 Ok(next) => {
262 entries.push(CronEntry {
263 trigger_name: format!("crd:{}:{}", crd_kind, plugin.name),
264 project: String::new(),
265 next_fire: next,
266 kind: CronEntryKind::CrdPlugin {
267 crd_kind: crd_kind.clone(),
268 plugin: plugin.clone(),
269 },
270 });
271 }
272 Err(e) => {
273 warn!(
274 crd = crd_kind.as_str(),
275 plugin = plugin.name.as_str(),
276 error = %e,
277 "failed to compute next fire time for CRD cron plugin"
278 );
279 }
280 }
281 }
282 }
283 }
284
285 entries
286 }
287
288 async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
291 let source_workflow = if payload.task_id.is_empty() {
295 None
296 } else {
297 self.lookup_task_workflow(&payload.task_id).await
298 };
299
300 let snap = self.state.config_runtime.load();
301 let config = &snap.active_config.config;
302
303 for (project_id, project) in &config.projects {
304 if let Some(ref scoped_project) = payload.project {
306 if project_id != scoped_project {
307 continue;
308 }
309 }
310 for (name, trigger) in &project.triggers {
311 if trigger.suspend {
312 continue;
313 }
314 if !self
316 .stabilized_triggers
317 .contains(&(project_id.clone(), name.clone()))
318 {
319 continue;
320 }
321 if let Some(ref event_spec) = trigger.event {
322 if event_spec.source != payload.event_type {
324 continue;
325 }
326 if let Some(ref filter) = event_spec.filter {
328 if let Some(ref filter_wf) = filter.workflow {
329 match source_workflow {
330 Some(ref sw) if sw == filter_wf => {}
331 _ => continue,
332 }
333 }
334 if let Some(ref condition) = filter.condition {
336 if let Some(ref event_payload) = payload.payload {
337 match crate::prehook::evaluate_webhook_filter(
338 condition,
339 event_payload,
340 ) {
341 Ok(true) => {} Ok(false) => {
343 debug!(
344 trigger = name.as_str(),
345 condition, "CEL filter rejected payload"
346 );
347 continue;
348 }
349 Err(e) => {
350 warn!(
351 trigger = name.as_str(),
352 error = %e,
353 "CEL filter evaluation failed, skipping"
354 );
355 continue;
356 }
357 }
358 } else {
359 debug!(
361 trigger = name.as_str(),
362 "CEL condition set but no payload available, skipping"
363 );
364 continue;
365 }
366 }
367 }
368
369 info!(
370 trigger = name.as_str(),
371 project = project_id.as_str(),
372 event_type = payload.event_type.as_str(),
373 "event trigger matched"
374 );
375 self.fire_trigger_with_config(
376 name,
377 project_id,
378 trigger,
379 payload.payload.as_ref(),
380 )
381 .await;
382 }
383 }
384 }
385 }
386
387 async fn fire_trigger(&self, trigger_name: &str, project: &str) {
390 let snap = self.state.config_runtime.load();
391 let config = &snap.active_config.config;
392
393 let trigger = config
394 .projects
395 .get(project)
396 .and_then(|p| p.triggers.get(trigger_name));
397
398 let Some(trigger) = trigger else {
399 warn!(trigger = trigger_name, "trigger not found in config");
400 return;
401 };
402
403 self.fire_trigger_with_config(trigger_name, project, trigger, None)
404 .await;
405 }
406
407 async fn fire_trigger_with_config(
408 &self,
409 trigger_name: &str,
410 project: &str,
411 trigger: &TriggerConfig,
412 webhook_payload: Option<&serde_json::Value>,
413 ) {
414 if trigger.suspend {
416 self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
417 return;
418 }
419
420 if let Some(ref throttle) = trigger.throttle {
422 if throttle.min_interval > 0 {
423 if let Some(last) = self.load_last_fired(trigger_name, project).await {
424 let elapsed = (Utc::now() - last).num_seconds();
425 if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
426 self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
427 return;
428 }
429 }
430 }
431 }
432
433 match trigger.concurrency_policy {
435 crate::cli_types::ConcurrencyPolicy::Forbid => {
436 if self.has_active_task(trigger_name, project).await {
437 self.emit_trigger_event(
438 trigger_name,
439 "trigger_skipped",
440 "concurrent_task_active",
441 );
442 return;
443 }
444 }
445 crate::cli_types::ConcurrencyPolicy::Replace => {
446 self.cancel_active_tasks(trigger_name, project).await;
448 }
449 crate::cli_types::ConcurrencyPolicy::Allow => {}
450 }
451
452 let target_files = trigger
454 .action
455 .args
456 .as_ref()
457 .and_then(|a| a.get("target-file"))
458 .cloned();
459
460 let task_name = format!("trigger-{trigger_name}");
461
462 let payload = CreateTaskPayload {
463 name: Some(task_name),
464 goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
465 project_id: Some(project.to_string()),
466 workspace_id: Some(trigger.action.workspace.clone()),
467 workflow_id: Some(trigger.action.workflow.clone()),
468 target_files,
469 parent_task_id: None,
470 spawn_reason: None,
471 step_filter: None,
472 initial_vars: None,
473 };
474
475 match crate::task_ops::create_task_as_service(&self.state, payload) {
476 Ok(summary) => {
477 let task_id = summary.id.clone();
478 info!(
479 trigger = trigger_name,
480 task_id = task_id.as_str(),
481 "trigger fired: task created"
482 );
483
484 self.update_trigger_state(trigger_name, project, &task_id, "created")
486 .await;
487
488 self.state.emit_event(
490 &task_id,
491 None,
492 "trigger_fired",
493 serde_json::json!({
494 "trigger": trigger_name,
495 "source": if trigger.cron.is_some() { "cron" } else { "event" },
496 "task_id": task_id,
497 }),
498 );
499
500 if trigger.action.start {
502 let state = self.state.clone();
503 let tid = task_id.clone();
504 tokio::spawn(async move {
505 if let Err(e) = state.task_enqueuer.enqueue_task(&state, &tid).await {
506 error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
507 } else {
508 state.worker_notify.notify_one();
509 }
510 });
511 }
512
513 if trigger.history_limit.is_some() {
515 let state = self.state.clone();
516 let name = trigger_name.to_string();
517 let proj = project.to_string();
518 let limit = trigger.history_limit.clone();
519 tokio::spawn(async move {
520 if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
521 {
522 debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
523 }
524 });
525 }
526 }
527 Err(e) => {
528 error!(
529 trigger = trigger_name,
530 error = %e,
531 "trigger failed to create task"
532 );
533 self.update_trigger_state(trigger_name, project, "", "failed_to_create")
534 .await;
535 self.state.emit_event(
536 "",
537 None,
538 "trigger_error",
539 serde_json::json!({
540 "trigger": trigger_name,
541 "error": e.to_string(),
542 }),
543 );
544 }
545 }
546 }
547
548 async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
552 let tid = task_id.to_owned();
553 let result = self
554 .state
555 .async_database
556 .reader()
557 .call(move |conn| {
558 let wf: Option<String> = conn
559 .query_row(
560 "SELECT workflow_id FROM tasks WHERE id = ?1",
561 rusqlite::params![tid],
562 |row| row.get(0),
563 )
564 .ok();
565 Ok(wf)
566 })
567 .await;
568 match result {
569 Ok(wf) => wf,
570 Err(e) => {
571 debug!(task_id, error = %e, "failed to look up task workflow");
572 None
573 }
574 }
575 }
576
577 async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
578 let name = trigger_name.to_owned();
579 let proj = project.to_owned();
580 let result = self
581 .state
582 .async_database
583 .reader()
584 .call(move |conn| {
585 let mut stmt = conn
586 .prepare(
587 "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
588 )
589 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
590 let ts: Option<String> = stmt
591 .query_row(rusqlite::params![name, proj], |row| row.get(0))
592 .ok();
593 Ok(ts)
594 })
595 .await;
596
597 match result {
598 Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
599 _ => None,
600 }
601 }
602
603 async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
604 let name = trigger_name.to_owned();
605 let proj = project.to_owned();
606 let result = self
607 .state
608 .async_database
609 .reader()
610 .call(move |conn| {
611 let last_task_id: Option<String> = conn
612 .query_row(
613 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
614 rusqlite::params![name, proj],
615 |row| row.get(0),
616 )
617 .ok()
618 .flatten();
619
620 if let Some(ref tid) = last_task_id {
621 let status: Option<String> = conn
622 .query_row(
623 "SELECT status FROM tasks WHERE id = ?1",
624 rusqlite::params![tid],
625 |row| row.get(0),
626 )
627 .ok();
628 if let Some(s) = status {
629 return Ok(matches!(
630 s.as_str(),
631 "created" | "pending" | "running" | "restart_pending"
632 ));
633 }
634 }
635 Ok(false)
636 })
637 .await;
638
639 result.unwrap_or(false)
640 }
641
642 async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
643 let name = trigger_name.to_owned();
644 let proj = project.to_owned();
645 let state = self.state.clone();
646 let result = state
647 .async_database
648 .reader()
649 .call(move |conn| {
650 let tid: Option<String> = conn
651 .query_row(
652 "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
653 rusqlite::params![name, proj],
654 |row| row.get(0),
655 )
656 .ok()
657 .flatten();
658 Ok(tid)
659 })
660 .await;
661
662 if let Ok(Some(task_id)) = result {
663 if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
664 warn!(
665 trigger = trigger_name,
666 task_id = task_id.as_str(),
667 error = %e,
668 "failed to cancel active task for Replace policy"
669 );
670 }
671 }
672 }
673
674 async fn update_trigger_state(
675 &self,
676 trigger_name: &str,
677 project: &str,
678 task_id: &str,
679 status: &str,
680 ) {
681 let name = trigger_name.to_owned();
682 let proj = project.to_owned();
683 let tid = task_id.to_owned();
684 let st = status.to_owned();
685 let now = Utc::now().to_rfc3339();
686 let now2 = now.clone();
687
688 if let Err(e) = self
689 .state
690 .async_database
691 .writer()
692 .call(move |conn| {
693 conn.execute(
694 "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
695 VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
696 ON CONFLICT(trigger_name, project) DO UPDATE SET
697 last_fired_at = ?3,
698 fire_count = fire_count + 1,
699 last_task_id = ?4,
700 last_status = ?5,
701 updated_at = ?7",
702 rusqlite::params![name, proj, now, tid, st, now2, now2],
703 )
704 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
705 Ok(())
706 })
707 .await
708 {
709 warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
710 }
711 }
712
713 fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
714 debug!(trigger = trigger_name, event_type, reason, "trigger event");
715 self.state.emit_event(
716 "",
717 None,
718 event_type,
719 serde_json::json!({
720 "trigger": trigger_name,
721 "reason": reason,
722 }),
723 );
724 }
725}
726
727#[derive(Debug, Clone, PartialEq, Eq)]
731enum CronEntryKind {
732 Trigger,
734 CrdPlugin {
736 crd_kind: String,
737 plugin: crate::crd::types::CrdPlugin,
738 },
739}
740
741struct CronEntry {
742 trigger_name: String,
743 project: String,
744 next_fire: DateTime<Utc>,
745 kind: CronEntryKind,
746}
747
748fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
749 use cron::Schedule;
750 use std::str::FromStr;
751
752 let schedule = Schedule::from_str(&spec.schedule)
753 .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
754
755 if let Some(ref tz_name) = spec.timezone {
757 let tz: chrono_tz::Tz = tz_name
758 .parse()
759 .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
760 let local_after = after.with_timezone(&tz);
761 let next = schedule
762 .after(&local_after)
763 .next()
764 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
765 Ok(next.with_timezone(&Utc))
766 } else {
767 let next = schedule
768 .after(&after)
769 .next()
770 .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
771 Ok(next.with_timezone(&Utc))
772 }
773}
774
775fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
776 let now = Utc::now();
777 entries
778 .iter()
779 .map(|e| {
780 let diff = e.next_fire.signed_duration_since(now);
781 if diff.num_milliseconds() <= 0 {
782 std::time::Duration::from_millis(100)
783 } else {
784 std::time::Duration::from_millis(diff.num_milliseconds() as u64)
785 }
786 })
787 .min()
788 .unwrap_or(std::time::Duration::from_secs(3600))
790}
791
792fn collect_due_entries(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<&CronEntry> {
793 entries.iter().filter(|e| e.next_fire <= now).collect()
794}
795
796async fn cleanup_history(
797 state: &InnerState,
798 trigger_name: &str,
799 project: &str,
800 limit: Option<&crate::config::TriggerHistoryLimitConfig>,
801) -> Result<()> {
802 let limit = match limit {
803 Some(l) => l,
804 None => return Ok(()),
805 };
806
807 let task_name_pattern = format!("trigger-{trigger_name}");
808 let proj = project.to_owned();
809
810 let mut ids_to_delete: Vec<String> = Vec::new();
812
813 if let Some(max_successful) = limit.successful {
814 let pattern = task_name_pattern.clone();
815 let p = proj.clone();
816 let max = max_successful as usize;
817 let ids = state
818 .async_database
819 .reader()
820 .call(move |conn| {
821 let mut stmt = conn
822 .prepare(
823 "SELECT id FROM tasks \
824 WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
825 ORDER BY created_at DESC",
826 )
827 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
828 let rows = stmt
829 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
830 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
831 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
832 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
833 })
834 .await
835 .context("query completed tasks for history cleanup")?;
836 ids_to_delete.extend(ids);
837 }
838
839 if let Some(max_failed) = limit.failed {
840 let pattern = task_name_pattern.clone();
841 let p = proj.clone();
842 let max = max_failed as usize;
843 let ids = state
844 .async_database
845 .reader()
846 .call(move |conn| {
847 let mut stmt = conn
848 .prepare(
849 "SELECT id FROM tasks \
850 WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
851 ORDER BY created_at DESC",
852 )
853 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
854 let rows = stmt
855 .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
856 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
857 let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
858 Ok(all.into_iter().skip(max).collect::<Vec<String>>())
859 })
860 .await
861 .context("query failed tasks for history cleanup")?;
862 ids_to_delete.extend(ids);
863 }
864
865 if ids_to_delete.is_empty() {
866 return Ok(());
867 }
868
869 state
870 .async_database
871 .writer()
872 .call(move |conn| {
873 let placeholders: Vec<String> =
874 (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
875 let sql = format!(
876 "DELETE FROM tasks WHERE id IN ({})",
877 placeholders.join(", ")
878 );
879 let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
880 .iter()
881 .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
882 .collect();
883 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
884 params.iter().map(|p| p.as_ref()).collect();
885 conn.execute(&sql, param_refs.as_slice())
886 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
887 Ok(())
888 })
889 .await
890 .context("delete excess trigger history tasks")?;
891
892 Ok(())
893}
894
895fn build_trigger_goal(trigger_name: &str, event_payload: Option<&serde_json::Value>) -> String {
900 match event_payload {
901 Some(payload) => {
902 if let Some(filename) = payload.get("filename").and_then(|v| v.as_str()) {
904 if let Some(event_type) = payload.get("event_type").and_then(|v| v.as_str()) {
905 return format!(
906 "Triggered by filesystem '{trigger_name}': {event_type} {filename}"
907 );
908 }
909 }
910 let summary = serde_json::to_string(payload).unwrap_or_default();
911 let truncated = if summary.len() > 500 {
912 format!("{}...", &summary[..497])
913 } else {
914 summary
915 };
916 format!("Triggered by '{trigger_name}': {truncated}")
917 }
918 None => format!("Triggered by: {trigger_name}"),
919 }
920}
921
922pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
927 let _ = state.trigger_event_tx.send(payload);
929}
930
931pub fn notify_trigger_reload(state: &InnerState) {
935 if let Ok(guard) = state.trigger_engine_handle.lock() {
936 if let Some(ref handle) = *guard {
937 let _ = handle.reload_sync();
938 }
939 }
940 if let Ok(guard) = state.fs_watcher_reload_tx.lock() {
942 if let Some(ref tx) = *guard {
943 let _ = tx.try_send(());
944 }
945 }
946}
947
948#[cfg(test)]
949mod tests {
950 use super::*;
951 use chrono::Timelike;
952
953 #[test]
954 fn compute_next_fire_utc() {
955 let spec = TriggerCronConfig {
956 schedule: "0 0 2 * * *".to_string(), timezone: None,
958 };
959 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
960 .unwrap()
961 .and_hms_opt(0, 0, 0)
962 .unwrap()
963 .and_utc();
964 let next = compute_next_fire(&spec, after).expect("should compute");
965 assert!(next > after);
966 assert_eq!(next.hour(), 2);
967 }
968
969 #[test]
970 fn compute_next_fire_with_timezone() {
971 let spec = TriggerCronConfig {
972 schedule: "0 0 2 * * *".to_string(),
973 timezone: Some("Asia/Shanghai".to_string()),
974 };
975 let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
976 .unwrap()
977 .and_hms_opt(0, 0, 0)
978 .unwrap()
979 .and_utc();
980 let next = compute_next_fire(&spec, after).expect("should compute with tz");
981 assert!(next > after);
982 assert_eq!(next.hour(), 18);
984 }
985
986 #[test]
987 fn compute_next_fire_rejects_invalid_schedule() {
988 let spec = TriggerCronConfig {
989 schedule: "not a cron".to_string(),
990 timezone: None,
991 };
992 assert!(compute_next_fire(&spec, Utc::now()).is_err());
993 }
994
995 #[test]
996 fn compute_next_fire_rejects_invalid_timezone() {
997 let spec = TriggerCronConfig {
998 schedule: "0 0 2 * * *".to_string(),
999 timezone: Some("Invalid/TZ".to_string()),
1000 };
1001 assert!(compute_next_fire(&spec, Utc::now()).is_err());
1002 }
1003
1004 #[test]
1005 fn next_cron_sleep_empty_returns_1h() {
1006 let d = next_cron_sleep(&[]);
1007 assert_eq!(d, std::time::Duration::from_secs(3600));
1008 }
1009
1010 #[test]
1011 fn collect_due_entries_finds_past_entries() {
1012 let now = Utc::now();
1013 let past = now - chrono::Duration::seconds(10);
1014 let future = now + chrono::Duration::seconds(300);
1015 let entries = vec![
1016 CronEntry {
1017 trigger_name: "past".to_string(),
1018 project: "p".to_string(),
1019 next_fire: past,
1020 kind: CronEntryKind::Trigger,
1021 },
1022 CronEntry {
1023 trigger_name: "future".to_string(),
1024 project: "p".to_string(),
1025 next_fire: future,
1026 kind: CronEntryKind::Trigger,
1027 },
1028 ];
1029 let due = collect_due_entries(&entries, now);
1030 assert_eq!(due.len(), 1);
1031 assert_eq!(due[0].trigger_name, "past");
1032 }
1033}