Skip to main content

agent_orchestrator/
trigger_engine.rs

1//! Trigger engine: cron scheduler and event-driven task creation.
2//!
3//! The `TriggerEngine` runs as a long-lived tokio task inside `orchestratord`,
4//! watching for cron ticks and task-lifecycle events and creating tasks when
5//! trigger conditions are met.
6
7use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19/// Cancels a running task for the Replace trigger policy.
20///
21/// This is a simplified version of `scheduler::stop_task_runtime` that avoids
22/// a dependency on the scheduler crate.
23async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24    // Signal the in-process running task to stop, if present.
25    let runtime = {
26        let running = state.running.lock().await;
27        running.get(task_id).cloned()
28    };
29    if let Some(rt) = runtime {
30        rt.stop_flag.store(true, Ordering::SeqCst);
31    }
32    // Update task status to cancelled.
33    state
34        .db_writer
35        .set_task_status(task_id, "cancelled", false)
36        .await?;
37    insert_event(
38        state,
39        task_id,
40        None,
41        "task_control",
42        serde_json::json!({"status": "cancelled"}),
43    )
44    .await?;
45    Ok(())
46}
47
48// ── Public types ─────────────────────────────────────────────────────────────
49
50/// Payload broadcast when a trigger-relevant event fires.
51#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53    /// Event type: "task_completed", "task_failed", "webhook", or "filesystem".
54    pub event_type: String,
55    /// Source task ID (empty for webhook/filesystem events).
56    pub task_id: String,
57    /// Optional JSON payload (webhook body, filesystem event context, or task metadata).
58    pub payload: Option<serde_json::Value>,
59    /// Optional project scope. When set, the trigger engine only matches triggers
60    /// in this specific project, preventing cross-project trigger leakage.
61    pub project: Option<String>,
62}
63
64/// Notification sent to the engine when trigger configuration changes.
65#[derive(Debug)]
66pub enum TriggerReloadEvent {
67    /// Re-read all triggers from the current config snapshot.
68    Reload,
69}
70
71/// Handle used by the rest of the system to communicate with a running engine.
72#[derive(Clone)]
73pub struct TriggerEngineHandle {
74    reload_tx: mpsc::Sender<TriggerReloadEvent>,
75}
76
77impl TriggerEngineHandle {
78    /// Notify the engine to reload trigger configuration (async).
79    pub async fn reload(&self) {
80        let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
81    }
82
83    /// Notify the engine to reload trigger configuration (sync-safe).
84    ///
85    /// Uses `try_send` so it can be called from synchronous code paths
86    /// like `apply_manifests`. Returns `true` if the notification was sent.
87    pub fn reload_sync(&self) -> bool {
88        self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
89    }
90}
91
92/// The trigger engine itself. Constructed via [`TriggerEngine::new`] and
93/// driven via [`TriggerEngine::run`].
94pub struct TriggerEngine {
95    state: Arc<InnerState>,
96    reload_rx: mpsc::Receiver<TriggerReloadEvent>,
97    trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
98    /// Triggers that have been present for at least one config reload cycle.
99    /// A freshly-created trigger (from an agent `apply`) must survive one
100    /// reload before it is eligible to fire — this prevents agent-applied
101    /// triggers from immediately spawning parasitic tasks.
102    stabilized_triggers: HashSet<(String, String)>,
103}
104
105impl TriggerEngine {
106    /// Create a new engine and its control handle.
107    pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
108        let (reload_tx, reload_rx) = mpsc::channel(16);
109        let trigger_event_rx = state.trigger_event_tx.subscribe();
110        let engine = Self {
111            state,
112            reload_rx,
113            trigger_event_rx,
114            stabilized_triggers: HashSet::new(),
115        };
116        let handle = TriggerEngineHandle { reload_tx };
117        (engine, handle)
118    }
119
120    /// Main run loop. Returns when `shutdown_rx` fires.
121    pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
122        info!("trigger engine started");
123
124        // Load initial trigger set.
125        let mut cron_schedule = self.build_cron_schedule();
126
127        loop {
128            let sleep_duration = next_cron_sleep(&cron_schedule);
129            let sleep_fut = tokio::time::sleep(sleep_duration);
130            tokio::pin!(sleep_fut);
131
132            tokio::select! {
133                // ── Cron tick ───────────────────────────────────────────
134                () = &mut sleep_fut => {
135                    let now = Utc::now();
136                    let due = collect_due_entries(&cron_schedule, now);
137                    for entry in due {
138                        match &entry.kind {
139                            CronEntryKind::Trigger => {
140                                self.fire_trigger(&entry.trigger_name, &entry.project).await;
141                            }
142                            CronEntryKind::CrdPlugin { crd_kind, plugin } => {
143                                info!(
144                                    crd = crd_kind.as_str(),
145                                    plugin = plugin.name.as_str(),
146                                    "firing CRD cron plugin"
147                                );
148                                if let Err(e) = crate::crd::plugins::execute_cron_plugin(plugin, crd_kind, Some(&self.state.db_path)).await {
149                                    warn!(
150                                        crd = crd_kind.as_str(),
151                                        plugin = plugin.name.as_str(),
152                                        error = %e,
153                                        "CRD cron plugin failed"
154                                    );
155                                }
156                            }
157                        }
158                    }
159                    // Recompute schedule after firing.
160                    cron_schedule = self.build_cron_schedule();
161                }
162
163                // ── Event trigger ───────────────────────────────────────
164                event_result = self.trigger_event_rx.recv() => {
165                    match event_result {
166                        Ok(payload) => {
167                            self.handle_event_trigger(&payload).await;
168                        }
169                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
170                            warn!(skipped = n, "trigger event receiver lagged");
171                        }
172                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
173                            debug!("trigger event channel closed");
174                            break;
175                        }
176                    }
177                }
178
179                // ── Config reload ───────────────────────────────────────
180                Some(_) = self.reload_rx.recv() => {
181                    info!("trigger engine: reloading configuration");
182                    cron_schedule = self.build_cron_schedule();
183                }
184
185                // ── Shutdown ────────────────────────────────────────────
186                _ = shutdown_rx.changed() => {
187                    info!("trigger engine shutting down");
188                    break;
189                }
190            }
191        }
192    }
193
194    // ── Cron helpers ─────────────────────────────────────────────────────────
195
196    fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
197        let snap = self.state.config_runtime.load();
198        let config = &snap.active_config.config;
199        let mut entries = Vec::new();
200
201        // Collect the current set of triggers to update stabilization tracking.
202        let mut current_triggers: HashSet<(String, String)> = HashSet::new();
203        for (project_id, project) in &config.projects {
204            for name in project.triggers.keys() {
205                current_triggers.insert((project_id.clone(), name.clone()));
206            }
207        }
208
209        // Promote triggers that were already known from a prior cycle.
210        // Triggers seen for the first time in THIS reload are NOT yet stabilized
211        // and will only become eligible after the next reload.
212        let previously_known = std::mem::take(&mut self.stabilized_triggers);
213        self.stabilized_triggers = current_triggers.clone();
214
215        for (project_id, project) in &config.projects {
216            for (name, trigger) in &project.triggers {
217                if trigger.suspend {
218                    continue;
219                }
220                // Skip triggers that are new (not in the previous cycle's set).
221                if !previously_known.contains(&(project_id.clone(), name.clone())) {
222                    debug!(
223                        trigger = name.as_str(),
224                        project = project_id.as_str(),
225                        "trigger not yet stabilized, skipping cron schedule"
226                    );
227                    continue;
228                }
229                if let Some(ref cron_spec) = trigger.cron {
230                    match compute_next_fire(cron_spec, Utc::now()) {
231                        Ok(next) => {
232                            entries.push(CronEntry {
233                                trigger_name: name.clone(),
234                                project: project_id.clone(),
235                                next_fire: next,
236                                kind: CronEntryKind::Trigger,
237                            });
238                        }
239                        Err(e) => {
240                            warn!(
241                                trigger = name.as_str(),
242                                project = project_id.as_str(),
243                                error = %e,
244                                "failed to compute next fire time"
245                            );
246                        }
247                    }
248                }
249            }
250        }
251
252        // ── CRD plugin cron entries ─────────────────────────────────────
253        for (crd_kind, crd) in &config.custom_resource_definitions {
254            for plugin in crate::crd::plugins::cron_plugins(&crd.plugins) {
255                if let Some(ref schedule) = plugin.schedule {
256                    let cron_spec = TriggerCronConfig {
257                        schedule: schedule.clone(),
258                        timezone: plugin.timezone.clone(),
259                    };
260                    match compute_next_fire(&cron_spec, Utc::now()) {
261                        Ok(next) => {
262                            entries.push(CronEntry {
263                                trigger_name: format!("crd:{}:{}", crd_kind, plugin.name),
264                                project: String::new(),
265                                next_fire: next,
266                                kind: CronEntryKind::CrdPlugin {
267                                    crd_kind: crd_kind.clone(),
268                                    plugin: plugin.clone(),
269                                },
270                            });
271                        }
272                        Err(e) => {
273                            warn!(
274                                crd = crd_kind.as_str(),
275                                plugin = plugin.name.as_str(),
276                                error = %e,
277                                "failed to compute next fire time for CRD cron plugin"
278                            );
279                        }
280                    }
281                }
282            }
283        }
284
285        entries
286    }
287
288    // ── Event trigger matching ───────────────────────────────────────────────
289
290    async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
291        // Resolve the source task's workflow from the database (the payload only
292        // carries event_type + task_id to keep the broadcast lightweight).
293        // For webhook events, skip workflow lookup (no source task).
294        let source_workflow = if payload.task_id.is_empty() {
295            None
296        } else {
297            self.lookup_task_workflow(&payload.task_id).await
298        };
299
300        let snap = self.state.config_runtime.load();
301        let config = &snap.active_config.config;
302
303        for (project_id, project) in &config.projects {
304            // When a project scope is specified, only match triggers in that project.
305            if let Some(ref scoped_project) = payload.project {
306                if project_id != scoped_project {
307                    continue;
308                }
309            }
310            for (name, trigger) in &project.triggers {
311                if trigger.suspend {
312                    continue;
313                }
314                // Skip triggers not yet stabilized (first seen in most recent reload).
315                if !self
316                    .stabilized_triggers
317                    .contains(&(project_id.clone(), name.clone()))
318                {
319                    continue;
320                }
321                if let Some(ref event_spec) = trigger.event {
322                    // Match event source type.
323                    if event_spec.source != payload.event_type {
324                        continue;
325                    }
326                    // Match optional workflow filter.
327                    if let Some(ref filter) = event_spec.filter {
328                        if let Some(ref filter_wf) = filter.workflow {
329                            match source_workflow {
330                                Some(ref sw) if sw == filter_wf => {}
331                                _ => continue,
332                            }
333                        }
334                        // CEL condition evaluation on payload.
335                        if let Some(ref condition) = filter.condition {
336                            if let Some(ref event_payload) = payload.payload {
337                                match crate::prehook::evaluate_webhook_filter(
338                                    condition,
339                                    event_payload,
340                                ) {
341                                    Ok(true) => {} // Condition matched, proceed
342                                    Ok(false) => {
343                                        debug!(
344                                            trigger = name.as_str(),
345                                            condition, "CEL filter rejected payload"
346                                        );
347                                        continue;
348                                    }
349                                    Err(e) => {
350                                        warn!(
351                                            trigger = name.as_str(),
352                                            error = %e,
353                                            "CEL filter evaluation failed, skipping"
354                                        );
355                                        continue;
356                                    }
357                                }
358                            } else {
359                                // No payload but condition set — skip (can't evaluate)
360                                debug!(
361                                    trigger = name.as_str(),
362                                    "CEL condition set but no payload available, skipping"
363                                );
364                                continue;
365                            }
366                        }
367                    }
368
369                    info!(
370                        trigger = name.as_str(),
371                        project = project_id.as_str(),
372                        event_type = payload.event_type.as_str(),
373                        "event trigger matched"
374                    );
375                    self.fire_trigger_with_config(
376                        name,
377                        project_id,
378                        trigger,
379                        payload.payload.as_ref(),
380                    )
381                    .await;
382                }
383            }
384        }
385    }
386
387    // ── Fire logic ───────────────────────────────────────────────────────────
388
389    async fn fire_trigger(&self, trigger_name: &str, project: &str) {
390        let snap = self.state.config_runtime.load();
391        let config = &snap.active_config.config;
392
393        let trigger = config
394            .projects
395            .get(project)
396            .and_then(|p| p.triggers.get(trigger_name));
397
398        let Some(trigger) = trigger else {
399            warn!(trigger = trigger_name, "trigger not found in config");
400            return;
401        };
402
403        self.fire_trigger_with_config(trigger_name, project, trigger, None)
404            .await;
405    }
406
407    async fn fire_trigger_with_config(
408        &self,
409        trigger_name: &str,
410        project: &str,
411        trigger: &TriggerConfig,
412        webhook_payload: Option<&serde_json::Value>,
413    ) {
414        // ── Suspend check ────────────────────────────────────────────────
415        if trigger.suspend {
416            self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
417            return;
418        }
419
420        // ── Throttle check (event triggers only) ─────────────────────────
421        if let Some(ref throttle) = trigger.throttle {
422            if throttle.min_interval > 0 {
423                if let Some(last) = self.load_last_fired(trigger_name, project).await {
424                    let elapsed = (Utc::now() - last).num_seconds();
425                    if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
426                        self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
427                        return;
428                    }
429                }
430            }
431        }
432
433        // ── Concurrency policy ───────────────────────────────────────────
434        match trigger.concurrency_policy {
435            crate::cli_types::ConcurrencyPolicy::Forbid => {
436                if self.has_active_task(trigger_name, project).await {
437                    self.emit_trigger_event(
438                        trigger_name,
439                        "trigger_skipped",
440                        "concurrent_task_active",
441                    );
442                    return;
443                }
444            }
445            crate::cli_types::ConcurrencyPolicy::Replace => {
446                // Cancel active tasks created by this trigger before creating a new one.
447                self.cancel_active_tasks(trigger_name, project).await;
448            }
449            crate::cli_types::ConcurrencyPolicy::Allow => {}
450        }
451
452        // ── Create task ──────────────────────────────────────────────────
453        let target_files = trigger
454            .action
455            .args
456            .as_ref()
457            .and_then(|a| a.get("target-file"))
458            .cloned();
459
460        let task_name = format!("trigger-{trigger_name}");
461
462        let payload = CreateTaskPayload {
463            name: Some(task_name),
464            goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
465            project_id: Some(project.to_string()),
466            workspace_id: Some(trigger.action.workspace.clone()),
467            workflow_id: Some(trigger.action.workflow.clone()),
468            target_files,
469            parent_task_id: None,
470            spawn_reason: None,
471            step_filter: None,
472            initial_vars: None,
473        };
474
475        match crate::task_ops::create_task_as_service(&self.state, payload) {
476            Ok(summary) => {
477                let task_id = summary.id.clone();
478                info!(
479                    trigger = trigger_name,
480                    task_id = task_id.as_str(),
481                    "trigger fired: task created"
482                );
483
484                // Update trigger state.
485                self.update_trigger_state(trigger_name, project, &task_id, "created")
486                    .await;
487
488                // Emit event.
489                self.state.emit_event(
490                    &task_id,
491                    None,
492                    "trigger_fired",
493                    serde_json::json!({
494                        "trigger": trigger_name,
495                        "source": if trigger.cron.is_some() { "cron" } else { "event" },
496                        "task_id": task_id,
497                    }),
498                );
499
500                // Start the task if action.start is true.
501                if trigger.action.start {
502                    let state = self.state.clone();
503                    let tid = task_id.clone();
504                    tokio::spawn(async move {
505                        if let Err(e) = state.task_enqueuer.enqueue_task(&state, &tid).await {
506                            error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
507                        } else {
508                            state.worker_notify.notify_one();
509                        }
510                    });
511                }
512
513                // History limit cleanup (best-effort, async).
514                if trigger.history_limit.is_some() {
515                    let state = self.state.clone();
516                    let name = trigger_name.to_string();
517                    let proj = project.to_string();
518                    let limit = trigger.history_limit.clone();
519                    tokio::spawn(async move {
520                        if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
521                        {
522                            debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
523                        }
524                    });
525                }
526            }
527            Err(e) => {
528                error!(
529                    trigger = trigger_name,
530                    error = %e,
531                    "trigger failed to create task"
532                );
533                self.update_trigger_state(trigger_name, project, "", "failed_to_create")
534                    .await;
535                self.state.emit_event(
536                    "",
537                    None,
538                    "trigger_error",
539                    serde_json::json!({
540                        "trigger": trigger_name,
541                        "error": e.to_string(),
542                    }),
543                );
544            }
545        }
546    }
547
548    // ── DB helpers ───────────────────────────────────────────────────────────
549
550    /// Look up the workflow_id for a task from the database.
551    async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
552        let tid = task_id.to_owned();
553        let result = self
554            .state
555            .async_database
556            .reader()
557            .call(move |conn| {
558                let wf: Option<String> = conn
559                    .query_row(
560                        "SELECT workflow_id FROM tasks WHERE id = ?1",
561                        rusqlite::params![tid],
562                        |row| row.get(0),
563                    )
564                    .ok();
565                Ok(wf)
566            })
567            .await;
568        match result {
569            Ok(wf) => wf,
570            Err(e) => {
571                debug!(task_id, error = %e, "failed to look up task workflow");
572                None
573            }
574        }
575    }
576
577    async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
578        let name = trigger_name.to_owned();
579        let proj = project.to_owned();
580        let result = self
581            .state
582            .async_database
583            .reader()
584            .call(move |conn| {
585                let mut stmt = conn
586                    .prepare(
587                        "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
588                    )
589                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
590                let ts: Option<String> = stmt
591                    .query_row(rusqlite::params![name, proj], |row| row.get(0))
592                    .ok();
593                Ok(ts)
594            })
595            .await;
596
597        match result {
598            Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
599            _ => None,
600        }
601    }
602
603    async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
604        let name = trigger_name.to_owned();
605        let proj = project.to_owned();
606        let result = self
607            .state
608            .async_database
609            .reader()
610            .call(move |conn| {
611                let last_task_id: Option<String> = conn
612                    .query_row(
613                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
614                        rusqlite::params![name, proj],
615                        |row| row.get(0),
616                    )
617                    .ok()
618                    .flatten();
619
620                if let Some(ref tid) = last_task_id {
621                    let status: Option<String> = conn
622                        .query_row(
623                            "SELECT status FROM tasks WHERE id = ?1",
624                            rusqlite::params![tid],
625                            |row| row.get(0),
626                        )
627                        .ok();
628                    if let Some(s) = status {
629                        return Ok(matches!(
630                            s.as_str(),
631                            "created" | "pending" | "running" | "restart_pending"
632                        ));
633                    }
634                }
635                Ok(false)
636            })
637            .await;
638
639        result.unwrap_or(false)
640    }
641
642    async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
643        let name = trigger_name.to_owned();
644        let proj = project.to_owned();
645        let state = self.state.clone();
646        let result = state
647            .async_database
648            .reader()
649            .call(move |conn| {
650                let tid: Option<String> = conn
651                    .query_row(
652                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
653                        rusqlite::params![name, proj],
654                        |row| row.get(0),
655                    )
656                    .ok()
657                    .flatten();
658                Ok(tid)
659            })
660            .await;
661
662        if let Ok(Some(task_id)) = result {
663            if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
664                warn!(
665                    trigger = trigger_name,
666                    task_id = task_id.as_str(),
667                    error = %e,
668                    "failed to cancel active task for Replace policy"
669                );
670            }
671        }
672    }
673
674    async fn update_trigger_state(
675        &self,
676        trigger_name: &str,
677        project: &str,
678        task_id: &str,
679        status: &str,
680    ) {
681        let name = trigger_name.to_owned();
682        let proj = project.to_owned();
683        let tid = task_id.to_owned();
684        let st = status.to_owned();
685        let now = Utc::now().to_rfc3339();
686        let now2 = now.clone();
687
688        if let Err(e) = self
689            .state
690            .async_database
691            .writer()
692            .call(move |conn| {
693                conn.execute(
694                    "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
695                     VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
696                     ON CONFLICT(trigger_name, project) DO UPDATE SET
697                       last_fired_at = ?3,
698                       fire_count = fire_count + 1,
699                       last_task_id = ?4,
700                       last_status = ?5,
701                       updated_at = ?7",
702                    rusqlite::params![name, proj, now, tid, st, now2, now2],
703                )
704                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
705                Ok(())
706            })
707            .await
708        {
709            warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
710        }
711    }
712
713    fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
714        debug!(trigger = trigger_name, event_type, reason, "trigger event");
715        self.state.emit_event(
716            "",
717            None,
718            event_type,
719            serde_json::json!({
720                "trigger": trigger_name,
721                "reason": reason,
722            }),
723        );
724    }
725}
726
727// ── Cron schedule helpers ────────────────────────────────────────────────────
728
729/// Distinguishes trigger-based cron entries from CRD plugin cron entries.
730#[derive(Debug, Clone, PartialEq, Eq)]
731enum CronEntryKind {
732    /// A regular trigger cron entry.
733    Trigger,
734    /// A CRD plugin cron entry (executes a plugin command directly).
735    CrdPlugin {
736        crd_kind: String,
737        plugin: crate::crd::types::CrdPlugin,
738    },
739}
740
741struct CronEntry {
742    trigger_name: String,
743    project: String,
744    next_fire: DateTime<Utc>,
745    kind: CronEntryKind,
746}
747
748fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
749    use cron::Schedule;
750    use std::str::FromStr;
751
752    let schedule = Schedule::from_str(&spec.schedule)
753        .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
754
755    // If a timezone is specified, compute in that timezone, then convert back to UTC.
756    if let Some(ref tz_name) = spec.timezone {
757        let tz: chrono_tz::Tz = tz_name
758            .parse()
759            .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
760        let local_after = after.with_timezone(&tz);
761        let next = schedule
762            .after(&local_after)
763            .next()
764            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
765        Ok(next.with_timezone(&Utc))
766    } else {
767        let next = schedule
768            .after(&after)
769            .next()
770            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
771        Ok(next.with_timezone(&Utc))
772    }
773}
774
775fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
776    let now = Utc::now();
777    entries
778        .iter()
779        .map(|e| {
780            let diff = e.next_fire.signed_duration_since(now);
781            if diff.num_milliseconds() <= 0 {
782                std::time::Duration::from_millis(100)
783            } else {
784                std::time::Duration::from_millis(diff.num_milliseconds() as u64)
785            }
786        })
787        .min()
788        // If no cron triggers, sleep for a long time (until event or reload wakes us).
789        .unwrap_or(std::time::Duration::from_secs(3600))
790}
791
792fn collect_due_entries(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<&CronEntry> {
793    entries.iter().filter(|e| e.next_fire <= now).collect()
794}
795
796async fn cleanup_history(
797    state: &InnerState,
798    trigger_name: &str,
799    project: &str,
800    limit: Option<&crate::config::TriggerHistoryLimitConfig>,
801) -> Result<()> {
802    let limit = match limit {
803        Some(l) => l,
804        None => return Ok(()),
805    };
806
807    let task_name_pattern = format!("trigger-{trigger_name}");
808    let proj = project.to_owned();
809
810    // For each status category, collect IDs of tasks beyond the retention limit.
811    let mut ids_to_delete: Vec<String> = Vec::new();
812
813    if let Some(max_successful) = limit.successful {
814        let pattern = task_name_pattern.clone();
815        let p = proj.clone();
816        let max = max_successful as usize;
817        let ids = state
818            .async_database
819            .reader()
820            .call(move |conn| {
821                let mut stmt = conn
822                    .prepare(
823                        "SELECT id FROM tasks \
824                         WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
825                         ORDER BY created_at DESC",
826                    )
827                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
828                let rows = stmt
829                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
830                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
831                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
832                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
833            })
834            .await
835            .context("query completed tasks for history cleanup")?;
836        ids_to_delete.extend(ids);
837    }
838
839    if let Some(max_failed) = limit.failed {
840        let pattern = task_name_pattern.clone();
841        let p = proj.clone();
842        let max = max_failed as usize;
843        let ids = state
844            .async_database
845            .reader()
846            .call(move |conn| {
847                let mut stmt = conn
848                    .prepare(
849                        "SELECT id FROM tasks \
850                         WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
851                         ORDER BY created_at DESC",
852                    )
853                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
854                let rows = stmt
855                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
856                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
857                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
858                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
859            })
860            .await
861            .context("query failed tasks for history cleanup")?;
862        ids_to_delete.extend(ids);
863    }
864
865    if ids_to_delete.is_empty() {
866        return Ok(());
867    }
868
869    state
870        .async_database
871        .writer()
872        .call(move |conn| {
873            let placeholders: Vec<String> =
874                (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
875            let sql = format!(
876                "DELETE FROM tasks WHERE id IN ({})",
877                placeholders.join(", ")
878            );
879            let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
880                .iter()
881                .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
882                .collect();
883            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
884                params.iter().map(|p| p.as_ref()).collect();
885            conn.execute(&sql, param_refs.as_slice())
886                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
887            Ok(())
888        })
889        .await
890        .context("delete excess trigger history tasks")?;
891
892    Ok(())
893}
894
895// ── Goal construction ────────────────────────────────────────────────────────
896
897/// Build a task goal string for a trigger fire.
898/// If an event payload is present, include a summary in the goal.
899fn build_trigger_goal(trigger_name: &str, event_payload: Option<&serde_json::Value>) -> String {
900    match event_payload {
901        Some(payload) => {
902            // For filesystem events, use a friendlier format.
903            if let Some(filename) = payload.get("filename").and_then(|v| v.as_str()) {
904                if let Some(event_type) = payload.get("event_type").and_then(|v| v.as_str()) {
905                    return format!(
906                        "Triggered by filesystem '{trigger_name}': {event_type} {filename}"
907                    );
908                }
909            }
910            let summary = serde_json::to_string(payload).unwrap_or_default();
911            let truncated = if summary.len() > 500 {
912                format!("{}...", &summary[..497])
913            } else {
914                summary
915            };
916            format!("Triggered by '{trigger_name}': {truncated}")
917        }
918        None => format!("Triggered by: {trigger_name}"),
919    }
920}
921
922// ── Public helper for event broadcasting ─────────────────────────────────────
923
924/// Broadcast a trigger-relevant event (task_completed / task_failed / webhook).
925/// Called from the daemon's event handling path.
926pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
927    // Ignore send errors (no subscribers = no triggers configured).
928    let _ = state.trigger_event_tx.send(payload);
929}
930
931/// Notify the trigger engine to reload its configuration.
932/// Also notifies the filesystem watcher (if running) to re-evaluate watched paths.
933/// Safe to call from sync code. No-op if no engine/watcher is running.
934pub fn notify_trigger_reload(state: &InnerState) {
935    if let Ok(guard) = state.trigger_engine_handle.lock() {
936        if let Some(ref handle) = *guard {
937            let _ = handle.reload_sync();
938        }
939    }
940    // Also notify filesystem watcher to reload its watch list.
941    if let Ok(guard) = state.fs_watcher_reload_tx.lock() {
942        if let Some(ref tx) = *guard {
943            let _ = tx.try_send(());
944        }
945    }
946}
947
948#[cfg(test)]
949mod tests {
950    use super::*;
951    use chrono::Timelike;
952
953    #[test]
954    fn compute_next_fire_utc() {
955        let spec = TriggerCronConfig {
956            schedule: "0 0 2 * * *".to_string(), // daily at 02:00 (cron crate uses 6 fields)
957            timezone: None,
958        };
959        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
960            .unwrap()
961            .and_hms_opt(0, 0, 0)
962            .unwrap()
963            .and_utc();
964        let next = compute_next_fire(&spec, after).expect("should compute");
965        assert!(next > after);
966        assert_eq!(next.hour(), 2);
967    }
968
969    #[test]
970    fn compute_next_fire_with_timezone() {
971        let spec = TriggerCronConfig {
972            schedule: "0 0 2 * * *".to_string(),
973            timezone: Some("Asia/Shanghai".to_string()),
974        };
975        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
976            .unwrap()
977            .and_hms_opt(0, 0, 0)
978            .unwrap()
979            .and_utc();
980        let next = compute_next_fire(&spec, after).expect("should compute with tz");
981        assert!(next > after);
982        // 02:00 Shanghai = 18:00 UTC previous day
983        assert_eq!(next.hour(), 18);
984    }
985
986    #[test]
987    fn compute_next_fire_rejects_invalid_schedule() {
988        let spec = TriggerCronConfig {
989            schedule: "not a cron".to_string(),
990            timezone: None,
991        };
992        assert!(compute_next_fire(&spec, Utc::now()).is_err());
993    }
994
995    #[test]
996    fn compute_next_fire_rejects_invalid_timezone() {
997        let spec = TriggerCronConfig {
998            schedule: "0 0 2 * * *".to_string(),
999            timezone: Some("Invalid/TZ".to_string()),
1000        };
1001        assert!(compute_next_fire(&spec, Utc::now()).is_err());
1002    }
1003
1004    #[test]
1005    fn next_cron_sleep_empty_returns_1h() {
1006        let d = next_cron_sleep(&[]);
1007        assert_eq!(d, std::time::Duration::from_secs(3600));
1008    }
1009
1010    #[test]
1011    fn collect_due_entries_finds_past_entries() {
1012        let now = Utc::now();
1013        let past = now - chrono::Duration::seconds(10);
1014        let future = now + chrono::Duration::seconds(300);
1015        let entries = vec![
1016            CronEntry {
1017                trigger_name: "past".to_string(),
1018                project: "p".to_string(),
1019                next_fire: past,
1020                kind: CronEntryKind::Trigger,
1021            },
1022            CronEntry {
1023                trigger_name: "future".to_string(),
1024                project: "p".to_string(),
1025                next_fire: future,
1026                kind: CronEntryKind::Trigger,
1027            },
1028        ];
1029        let due = collect_due_entries(&entries, now);
1030        assert_eq!(due.len(), 1);
1031        assert_eq!(due[0].trigger_name, "past");
1032    }
1033}