Skip to main content

agent_orchestrator/
trigger_engine.rs

1//! Trigger engine: cron scheduler and event-driven task creation.
2//!
3//! The `TriggerEngine` runs as a long-lived tokio task inside `orchestratord`,
4//! watching for cron ticks and task-lifecycle events and creating tasks when
5//! trigger conditions are met.
6
7use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19/// Cancels a running task for the Replace trigger policy.
20///
21/// This is a simplified version of `scheduler::stop_task_runtime` that avoids
22/// a dependency on the scheduler crate.
23async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24    // Signal the in-process running task to stop, if present.
25    let runtime = {
26        let running = state.running.lock().await;
27        running.get(task_id).cloned()
28    };
29    if let Some(rt) = runtime {
30        rt.stop_flag.store(true, Ordering::SeqCst);
31    }
32    // Update task status to cancelled.
33    state
34        .db_writer
35        .set_task_status(task_id, "cancelled", false)
36        .await?;
37    insert_event(
38        state,
39        task_id,
40        None,
41        "task_control",
42        serde_json::json!({"status": "cancelled"}),
43    )
44    .await?;
45    Ok(())
46}
47
48// ── Public types ─────────────────────────────────────────────────────────────
49
50/// Payload broadcast when a trigger-relevant event fires.
51#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53    /// Event type: "task_completed", "task_failed", "webhook", or "filesystem".
54    pub event_type: String,
55    /// Source task ID (empty for webhook/filesystem events).
56    pub task_id: String,
57    /// Optional JSON payload (webhook body, filesystem event context, or task metadata).
58    pub payload: Option<serde_json::Value>,
59    /// Optional project scope. When set, the trigger engine only matches triggers
60    /// in this specific project, preventing cross-project trigger leakage.
61    pub project: Option<String>,
62}
63
64/// Notification sent to the engine when trigger configuration changes.
65#[derive(Debug)]
66pub enum TriggerReloadEvent {
67    /// Re-read all triggers from the current config snapshot.
68    Reload,
69}
70
71/// Handle used by the rest of the system to communicate with a running engine.
72#[derive(Clone)]
73pub struct TriggerEngineHandle {
74    reload_tx: mpsc::Sender<TriggerReloadEvent>,
75}
76
77impl TriggerEngineHandle {
78    /// Notify the engine to reload trigger configuration (async).
79    pub async fn reload(&self) {
80        let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
81    }
82
83    /// Notify the engine to reload trigger configuration (sync-safe).
84    ///
85    /// Uses `try_send` so it can be called from synchronous code paths
86    /// like `apply_manifests`. Returns `true` if the notification was sent.
87    pub fn reload_sync(&self) -> bool {
88        self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
89    }
90}
91
92/// The trigger engine itself. Constructed via [`TriggerEngine::new`] and
93/// driven via [`TriggerEngine::run`].
94pub struct TriggerEngine {
95    state: Arc<InnerState>,
96    reload_rx: mpsc::Receiver<TriggerReloadEvent>,
97    trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
98    /// Triggers that have been present for at least one config reload cycle.
99    /// A freshly-created trigger (from an agent `apply`) must survive one
100    /// reload before it is eligible to fire — this prevents agent-applied
101    /// triggers from immediately spawning parasitic tasks.
102    stabilized_triggers: HashSet<(String, String)>,
103}
104
105impl TriggerEngine {
106    /// Create a new engine and its control handle.
107    pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
108        let (reload_tx, reload_rx) = mpsc::channel(16);
109        let trigger_event_rx = state.trigger_event_tx.subscribe();
110        let engine = Self {
111            state,
112            reload_rx,
113            trigger_event_rx,
114            stabilized_triggers: HashSet::new(),
115        };
116        let handle = TriggerEngineHandle { reload_tx };
117        (engine, handle)
118    }
119
120    /// Main run loop. Returns when `shutdown_rx` fires.
121    pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
122        info!("trigger engine started");
123
124        // Load initial trigger set.
125        let mut cron_schedule = self.build_cron_schedule();
126
127        loop {
128            let sleep_duration = next_cron_sleep(&cron_schedule);
129            let sleep_fut = tokio::time::sleep(sleep_duration);
130            tokio::pin!(sleep_fut);
131
132            tokio::select! {
133                // ── Cron tick ───────────────────────────────────────────
134                () = &mut sleep_fut => {
135                    let now = Utc::now();
136                    let due = collect_due_entries(&cron_schedule, now);
137                    for entry in due {
138                        match &entry.kind {
139                            CronEntryKind::Trigger => {
140                                self.fire_trigger(&entry.trigger_name, &entry.project).await;
141                            }
142                            CronEntryKind::CrdPlugin { crd_kind, plugin } => {
143                                info!(
144                                    crd = crd_kind.as_str(),
145                                    plugin = plugin.name.as_str(),
146                                    "firing CRD cron plugin"
147                                );
148                                if let Err(e) = crate::crd::plugins::execute_cron_plugin(plugin, crd_kind, Some(&self.state.db_path)) {
149                                    warn!(
150                                        crd = crd_kind.as_str(),
151                                        plugin = plugin.name.as_str(),
152                                        error = %e,
153                                        "CRD cron plugin failed"
154                                    );
155                                }
156                            }
157                        }
158                    }
159                    // Recompute schedule after firing.
160                    cron_schedule = self.build_cron_schedule();
161                }
162
163                // ── Event trigger ───────────────────────────────────────
164                event_result = self.trigger_event_rx.recv() => {
165                    match event_result {
166                        Ok(payload) => {
167                            self.handle_event_trigger(&payload).await;
168                        }
169                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
170                            warn!(skipped = n, "trigger event receiver lagged");
171                        }
172                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
173                            debug!("trigger event channel closed");
174                            break;
175                        }
176                    }
177                }
178
179                // ── Config reload ───────────────────────────────────────
180                Some(_) = self.reload_rx.recv() => {
181                    info!("trigger engine: reloading configuration");
182                    cron_schedule = self.build_cron_schedule();
183                }
184
185                // ── Shutdown ────────────────────────────────────────────
186                _ = shutdown_rx.changed() => {
187                    info!("trigger engine shutting down");
188                    break;
189                }
190            }
191        }
192    }
193
194    // ── Cron helpers ─────────────────────────────────────────────────────────
195
196    fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
197        let snap = self.state.config_runtime.load();
198        let config = &snap.active_config.config;
199        let mut entries = Vec::new();
200
201        // Collect the current set of triggers to update stabilization tracking.
202        let mut current_triggers: HashSet<(String, String)> = HashSet::new();
203        for (project_id, project) in &config.projects {
204            for name in project.triggers.keys() {
205                current_triggers.insert((project_id.clone(), name.clone()));
206            }
207        }
208
209        // Promote triggers that were already known from a prior cycle.
210        // Triggers seen for the first time in THIS reload are NOT yet stabilized
211        // and will only become eligible after the next reload.
212        let previously_known = std::mem::take(&mut self.stabilized_triggers);
213        self.stabilized_triggers = current_triggers.clone();
214
215        for (project_id, project) in &config.projects {
216            for (name, trigger) in &project.triggers {
217                if trigger.suspend {
218                    continue;
219                }
220                // Skip triggers that are new (not in the previous cycle's set).
221                if !previously_known.contains(&(project_id.clone(), name.clone())) {
222                    debug!(
223                        trigger = name.as_str(),
224                        project = project_id.as_str(),
225                        "trigger not yet stabilized, skipping cron schedule"
226                    );
227                    continue;
228                }
229                if let Some(ref cron_spec) = trigger.cron {
230                    match compute_next_fire(cron_spec, Utc::now()) {
231                        Ok(next) => {
232                            entries.push(CronEntry {
233                                trigger_name: name.clone(),
234                                project: project_id.clone(),
235                                next_fire: next,
236                                kind: CronEntryKind::Trigger,
237                            });
238                        }
239                        Err(e) => {
240                            warn!(
241                                trigger = name.as_str(),
242                                project = project_id.as_str(),
243                                error = %e,
244                                "failed to compute next fire time"
245                            );
246                        }
247                    }
248                }
249            }
250        }
251
252        // ── CRD plugin cron entries ─────────────────────────────────────
253        for (crd_kind, crd) in &config.custom_resource_definitions {
254            for plugin in crate::crd::plugins::cron_plugins(&crd.plugins) {
255                if let Some(ref schedule) = plugin.schedule {
256                    let cron_spec = TriggerCronConfig {
257                        schedule: schedule.clone(),
258                        timezone: plugin.timezone.clone(),
259                    };
260                    match compute_next_fire(&cron_spec, Utc::now()) {
261                        Ok(next) => {
262                            entries.push(CronEntry {
263                                trigger_name: format!("crd:{}:{}", crd_kind, plugin.name),
264                                project: String::new(),
265                                next_fire: next,
266                                kind: CronEntryKind::CrdPlugin {
267                                    crd_kind: crd_kind.clone(),
268                                    plugin: plugin.clone(),
269                                },
270                            });
271                        }
272                        Err(e) => {
273                            warn!(
274                                crd = crd_kind.as_str(),
275                                plugin = plugin.name.as_str(),
276                                error = %e,
277                                "failed to compute next fire time for CRD cron plugin"
278                            );
279                        }
280                    }
281                }
282            }
283        }
284
285        entries
286    }
287
288    // ── Event trigger matching ───────────────────────────────────────────────
289
290    async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
291        // Resolve the source task's workflow from the database (the payload only
292        // carries event_type + task_id to keep the broadcast lightweight).
293        // For webhook events, skip workflow lookup (no source task).
294        let source_workflow = if payload.task_id.is_empty() {
295            None
296        } else {
297            self.lookup_task_workflow(&payload.task_id).await
298        };
299
300        let snap = self.state.config_runtime.load();
301        let config = &snap.active_config.config;
302
303        for (project_id, project) in &config.projects {
304            // When a project scope is specified, only match triggers in that project.
305            if let Some(ref scoped_project) = payload.project {
306                if project_id != scoped_project {
307                    continue;
308                }
309            }
310            for (name, trigger) in &project.triggers {
311                if trigger.suspend {
312                    continue;
313                }
314                // Skip triggers not yet stabilized (first seen in most recent reload).
315                if !self
316                    .stabilized_triggers
317                    .contains(&(project_id.clone(), name.clone()))
318                {
319                    continue;
320                }
321                if let Some(ref event_spec) = trigger.event {
322                    // Match event source type.
323                    if event_spec.source != payload.event_type {
324                        continue;
325                    }
326                    // Match optional workflow filter.
327                    if let Some(ref filter) = event_spec.filter {
328                        if let Some(ref filter_wf) = filter.workflow {
329                            match source_workflow {
330                                Some(ref sw) if sw == filter_wf => {}
331                                _ => continue,
332                            }
333                        }
334                        // CEL condition evaluation on payload.
335                        if let Some(ref condition) = filter.condition {
336                            if let Some(ref event_payload) = payload.payload {
337                                match crate::prehook::evaluate_webhook_filter(
338                                    condition,
339                                    event_payload,
340                                ) {
341                                    Ok(true) => {} // Condition matched, proceed
342                                    Ok(false) => {
343                                        debug!(
344                                            trigger = name.as_str(),
345                                            condition, "CEL filter rejected payload"
346                                        );
347                                        continue;
348                                    }
349                                    Err(e) => {
350                                        warn!(
351                                            trigger = name.as_str(),
352                                            error = %e,
353                                            "CEL filter evaluation failed, skipping"
354                                        );
355                                        continue;
356                                    }
357                                }
358                            } else {
359                                // No payload but condition set — skip (can't evaluate)
360                                debug!(
361                                    trigger = name.as_str(),
362                                    "CEL condition set but no payload available, skipping"
363                                );
364                                continue;
365                            }
366                        }
367                    }
368
369                    info!(
370                        trigger = name.as_str(),
371                        project = project_id.as_str(),
372                        event_type = payload.event_type.as_str(),
373                        "event trigger matched"
374                    );
375                    self.fire_trigger_with_config(
376                        name,
377                        project_id,
378                        trigger,
379                        payload.payload.as_ref(),
380                    )
381                    .await;
382                }
383            }
384        }
385    }
386
387    // ── Fire logic ───────────────────────────────────────────────────────────
388
389    async fn fire_trigger(&self, trigger_name: &str, project: &str) {
390        let snap = self.state.config_runtime.load();
391        let config = &snap.active_config.config;
392
393        let trigger = config
394            .projects
395            .get(project)
396            .and_then(|p| p.triggers.get(trigger_name));
397
398        let Some(trigger) = trigger else {
399            warn!(trigger = trigger_name, "trigger not found in config");
400            return;
401        };
402
403        self.fire_trigger_with_config(trigger_name, project, trigger, None)
404            .await;
405    }
406
407    async fn fire_trigger_with_config(
408        &self,
409        trigger_name: &str,
410        project: &str,
411        trigger: &TriggerConfig,
412        webhook_payload: Option<&serde_json::Value>,
413    ) {
414        // ── Suspend check ────────────────────────────────────────────────
415        if trigger.suspend {
416            self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
417            return;
418        }
419
420        // ── Throttle check (event triggers only) ─────────────────────────
421        if let Some(ref throttle) = trigger.throttle {
422            if throttle.min_interval > 0 {
423                if let Some(last) = self.load_last_fired(trigger_name, project).await {
424                    let elapsed = (Utc::now() - last).num_seconds();
425                    if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
426                        self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
427                        return;
428                    }
429                }
430            }
431        }
432
433        // ── Concurrency policy ───────────────────────────────────────────
434        match trigger.concurrency_policy {
435            crate::cli_types::ConcurrencyPolicy::Forbid => {
436                if self.has_active_task(trigger_name, project).await {
437                    self.emit_trigger_event(
438                        trigger_name,
439                        "trigger_skipped",
440                        "concurrent_task_active",
441                    );
442                    return;
443                }
444            }
445            crate::cli_types::ConcurrencyPolicy::Replace => {
446                // Cancel active tasks created by this trigger before creating a new one.
447                self.cancel_active_tasks(trigger_name, project).await;
448            }
449            crate::cli_types::ConcurrencyPolicy::Allow => {}
450        }
451
452        // ── Create task ──────────────────────────────────────────────────
453        let target_files = trigger
454            .action
455            .args
456            .as_ref()
457            .and_then(|a| a.get("target-file"))
458            .cloned();
459
460        let task_name = format!("trigger-{trigger_name}");
461
462        let payload = CreateTaskPayload {
463            name: Some(task_name),
464            goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
465            project_id: Some(project.to_string()),
466            workspace_id: Some(trigger.action.workspace.clone()),
467            workflow_id: Some(trigger.action.workflow.clone()),
468            target_files,
469            parent_task_id: None,
470            spawn_reason: None,
471        };
472
473        match crate::task_ops::create_task_as_service(&self.state, payload) {
474            Ok(summary) => {
475                let task_id = summary.id.clone();
476                info!(
477                    trigger = trigger_name,
478                    task_id = task_id.as_str(),
479                    "trigger fired: task created"
480                );
481
482                // Update trigger state.
483                self.update_trigger_state(trigger_name, project, &task_id, "created")
484                    .await;
485
486                // Emit event.
487                self.state.emit_event(
488                    &task_id,
489                    None,
490                    "trigger_fired",
491                    serde_json::json!({
492                        "trigger": trigger_name,
493                        "source": if trigger.cron.is_some() { "cron" } else { "event" },
494                        "task_id": task_id,
495                    }),
496                );
497
498                // Start the task if action.start is true.
499                if trigger.action.start {
500                    let state = self.state.clone();
501                    let tid = task_id.clone();
502                    tokio::spawn(async move {
503                        if let Err(e) = state.task_enqueuer.enqueue_task(&state, &tid).await {
504                            error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
505                        } else {
506                            state.worker_notify.notify_one();
507                        }
508                    });
509                }
510
511                // History limit cleanup (best-effort, async).
512                if trigger.history_limit.is_some() {
513                    let state = self.state.clone();
514                    let name = trigger_name.to_string();
515                    let proj = project.to_string();
516                    let limit = trigger.history_limit.clone();
517                    tokio::spawn(async move {
518                        if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
519                        {
520                            debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
521                        }
522                    });
523                }
524            }
525            Err(e) => {
526                error!(
527                    trigger = trigger_name,
528                    error = %e,
529                    "trigger failed to create task"
530                );
531                self.update_trigger_state(trigger_name, project, "", "failed_to_create")
532                    .await;
533                self.state.emit_event(
534                    "",
535                    None,
536                    "trigger_error",
537                    serde_json::json!({
538                        "trigger": trigger_name,
539                        "error": e.to_string(),
540                    }),
541                );
542            }
543        }
544    }
545
546    // ── DB helpers ───────────────────────────────────────────────────────────
547
548    /// Look up the workflow_id for a task from the database.
549    async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
550        let tid = task_id.to_owned();
551        let result = self
552            .state
553            .async_database
554            .reader()
555            .call(move |conn| {
556                let wf: Option<String> = conn
557                    .query_row(
558                        "SELECT workflow_id FROM tasks WHERE id = ?1",
559                        rusqlite::params![tid],
560                        |row| row.get(0),
561                    )
562                    .ok();
563                Ok(wf)
564            })
565            .await;
566        match result {
567            Ok(wf) => wf,
568            Err(e) => {
569                debug!(task_id, error = %e, "failed to look up task workflow");
570                None
571            }
572        }
573    }
574
575    async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
576        let name = trigger_name.to_owned();
577        let proj = project.to_owned();
578        let result = self
579            .state
580            .async_database
581            .reader()
582            .call(move |conn| {
583                let mut stmt = conn
584                    .prepare(
585                        "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
586                    )
587                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
588                let ts: Option<String> = stmt
589                    .query_row(rusqlite::params![name, proj], |row| row.get(0))
590                    .ok();
591                Ok(ts)
592            })
593            .await;
594
595        match result {
596            Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
597            _ => None,
598        }
599    }
600
601    async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
602        let name = trigger_name.to_owned();
603        let proj = project.to_owned();
604        let result = self
605            .state
606            .async_database
607            .reader()
608            .call(move |conn| {
609                let last_task_id: Option<String> = conn
610                    .query_row(
611                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
612                        rusqlite::params![name, proj],
613                        |row| row.get(0),
614                    )
615                    .ok()
616                    .flatten();
617
618                if let Some(ref tid) = last_task_id {
619                    let status: Option<String> = conn
620                        .query_row(
621                            "SELECT status FROM tasks WHERE id = ?1",
622                            rusqlite::params![tid],
623                            |row| row.get(0),
624                        )
625                        .ok();
626                    if let Some(s) = status {
627                        return Ok(matches!(
628                            s.as_str(),
629                            "created" | "pending" | "running" | "restart_pending"
630                        ));
631                    }
632                }
633                Ok(false)
634            })
635            .await;
636
637        result.unwrap_or(false)
638    }
639
640    async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
641        let name = trigger_name.to_owned();
642        let proj = project.to_owned();
643        let state = self.state.clone();
644        let result = state
645            .async_database
646            .reader()
647            .call(move |conn| {
648                let tid: Option<String> = conn
649                    .query_row(
650                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
651                        rusqlite::params![name, proj],
652                        |row| row.get(0),
653                    )
654                    .ok()
655                    .flatten();
656                Ok(tid)
657            })
658            .await;
659
660        if let Ok(Some(task_id)) = result {
661            if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
662                warn!(
663                    trigger = trigger_name,
664                    task_id = task_id.as_str(),
665                    error = %e,
666                    "failed to cancel active task for Replace policy"
667                );
668            }
669        }
670    }
671
672    async fn update_trigger_state(
673        &self,
674        trigger_name: &str,
675        project: &str,
676        task_id: &str,
677        status: &str,
678    ) {
679        let name = trigger_name.to_owned();
680        let proj = project.to_owned();
681        let tid = task_id.to_owned();
682        let st = status.to_owned();
683        let now = Utc::now().to_rfc3339();
684        let now2 = now.clone();
685
686        if let Err(e) = self
687            .state
688            .async_database
689            .writer()
690            .call(move |conn| {
691                conn.execute(
692                    "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
693                     VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
694                     ON CONFLICT(trigger_name, project) DO UPDATE SET
695                       last_fired_at = ?3,
696                       fire_count = fire_count + 1,
697                       last_task_id = ?4,
698                       last_status = ?5,
699                       updated_at = ?7",
700                    rusqlite::params![name, proj, now, tid, st, now2, now2],
701                )
702                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
703                Ok(())
704            })
705            .await
706        {
707            warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
708        }
709    }
710
711    fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
712        debug!(trigger = trigger_name, event_type, reason, "trigger event");
713        self.state.emit_event(
714            "",
715            None,
716            event_type,
717            serde_json::json!({
718                "trigger": trigger_name,
719                "reason": reason,
720            }),
721        );
722    }
723}
724
725// ── Cron schedule helpers ────────────────────────────────────────────────────
726
727/// Distinguishes trigger-based cron entries from CRD plugin cron entries.
728#[derive(Debug, Clone, PartialEq, Eq)]
729enum CronEntryKind {
730    /// A regular trigger cron entry.
731    Trigger,
732    /// A CRD plugin cron entry (executes a plugin command directly).
733    CrdPlugin {
734        crd_kind: String,
735        plugin: crate::crd::types::CrdPlugin,
736    },
737}
738
739struct CronEntry {
740    trigger_name: String,
741    project: String,
742    next_fire: DateTime<Utc>,
743    kind: CronEntryKind,
744}
745
746fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
747    use cron::Schedule;
748    use std::str::FromStr;
749
750    let schedule = Schedule::from_str(&spec.schedule)
751        .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
752
753    // If a timezone is specified, compute in that timezone, then convert back to UTC.
754    if let Some(ref tz_name) = spec.timezone {
755        let tz: chrono_tz::Tz = tz_name
756            .parse()
757            .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
758        let local_after = after.with_timezone(&tz);
759        let next = schedule
760            .after(&local_after)
761            .next()
762            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
763        Ok(next.with_timezone(&Utc))
764    } else {
765        let next = schedule
766            .after(&after)
767            .next()
768            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
769        Ok(next.with_timezone(&Utc))
770    }
771}
772
773fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
774    let now = Utc::now();
775    entries
776        .iter()
777        .map(|e| {
778            let diff = e.next_fire.signed_duration_since(now);
779            if diff.num_milliseconds() <= 0 {
780                std::time::Duration::from_millis(100)
781            } else {
782                std::time::Duration::from_millis(diff.num_milliseconds() as u64)
783            }
784        })
785        .min()
786        // If no cron triggers, sleep for a long time (until event or reload wakes us).
787        .unwrap_or(std::time::Duration::from_secs(3600))
788}
789
790fn collect_due_entries(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<&CronEntry> {
791    entries.iter().filter(|e| e.next_fire <= now).collect()
792}
793
794async fn cleanup_history(
795    state: &InnerState,
796    trigger_name: &str,
797    project: &str,
798    limit: Option<&crate::config::TriggerHistoryLimitConfig>,
799) -> Result<()> {
800    let limit = match limit {
801        Some(l) => l,
802        None => return Ok(()),
803    };
804
805    let task_name_pattern = format!("trigger-{trigger_name}");
806    let proj = project.to_owned();
807
808    // For each status category, collect IDs of tasks beyond the retention limit.
809    let mut ids_to_delete: Vec<String> = Vec::new();
810
811    if let Some(max_successful) = limit.successful {
812        let pattern = task_name_pattern.clone();
813        let p = proj.clone();
814        let max = max_successful as usize;
815        let ids = state
816            .async_database
817            .reader()
818            .call(move |conn| {
819                let mut stmt = conn
820                    .prepare(
821                        "SELECT id FROM tasks \
822                         WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
823                         ORDER BY created_at DESC",
824                    )
825                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
826                let rows = stmt
827                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
828                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
829                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
830                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
831            })
832            .await
833            .context("query completed tasks for history cleanup")?;
834        ids_to_delete.extend(ids);
835    }
836
837    if let Some(max_failed) = limit.failed {
838        let pattern = task_name_pattern.clone();
839        let p = proj.clone();
840        let max = max_failed as usize;
841        let ids = state
842            .async_database
843            .reader()
844            .call(move |conn| {
845                let mut stmt = conn
846                    .prepare(
847                        "SELECT id FROM tasks \
848                         WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
849                         ORDER BY created_at DESC",
850                    )
851                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
852                let rows = stmt
853                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
854                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
855                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
856                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
857            })
858            .await
859            .context("query failed tasks for history cleanup")?;
860        ids_to_delete.extend(ids);
861    }
862
863    if ids_to_delete.is_empty() {
864        return Ok(());
865    }
866
867    state
868        .async_database
869        .writer()
870        .call(move |conn| {
871            let placeholders: Vec<String> =
872                (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
873            let sql = format!(
874                "DELETE FROM tasks WHERE id IN ({})",
875                placeholders.join(", ")
876            );
877            let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
878                .iter()
879                .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
880                .collect();
881            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
882                params.iter().map(|p| p.as_ref()).collect();
883            conn.execute(&sql, param_refs.as_slice())
884                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
885            Ok(())
886        })
887        .await
888        .context("delete excess trigger history tasks")?;
889
890    Ok(())
891}
892
893// ── Goal construction ────────────────────────────────────────────────────────
894
895/// Build a task goal string for a trigger fire.
896/// If an event payload is present, include a summary in the goal.
897fn build_trigger_goal(trigger_name: &str, event_payload: Option<&serde_json::Value>) -> String {
898    match event_payload {
899        Some(payload) => {
900            // For filesystem events, use a friendlier format.
901            if let Some(filename) = payload.get("filename").and_then(|v| v.as_str()) {
902                if let Some(event_type) = payload.get("event_type").and_then(|v| v.as_str()) {
903                    return format!(
904                        "Triggered by filesystem '{trigger_name}': {event_type} {filename}"
905                    );
906                }
907            }
908            let summary = serde_json::to_string(payload).unwrap_or_default();
909            let truncated = if summary.len() > 500 {
910                format!("{}...", &summary[..497])
911            } else {
912                summary
913            };
914            format!("Triggered by '{trigger_name}': {truncated}")
915        }
916        None => format!("Triggered by: {trigger_name}"),
917    }
918}
919
920// ── Public helper for event broadcasting ─────────────────────────────────────
921
922/// Broadcast a trigger-relevant event (task_completed / task_failed / webhook).
923/// Called from the daemon's event handling path.
924pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
925    // Ignore send errors (no subscribers = no triggers configured).
926    let _ = state.trigger_event_tx.send(payload);
927}
928
929/// Notify the trigger engine to reload its configuration.
930/// Also notifies the filesystem watcher (if running) to re-evaluate watched paths.
931/// Safe to call from sync code. No-op if no engine/watcher is running.
932pub fn notify_trigger_reload(state: &InnerState) {
933    if let Ok(guard) = state.trigger_engine_handle.lock() {
934        if let Some(ref handle) = *guard {
935            let _ = handle.reload_sync();
936        }
937    }
938    // Also notify filesystem watcher to reload its watch list.
939    if let Ok(guard) = state.fs_watcher_reload_tx.lock() {
940        if let Some(ref tx) = *guard {
941            let _ = tx.try_send(());
942        }
943    }
944}
945
946#[cfg(test)]
947mod tests {
948    use super::*;
949    use chrono::Timelike;
950
951    #[test]
952    fn compute_next_fire_utc() {
953        let spec = TriggerCronConfig {
954            schedule: "0 0 2 * * *".to_string(), // daily at 02:00 (cron crate uses 6 fields)
955            timezone: None,
956        };
957        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
958            .unwrap()
959            .and_hms_opt(0, 0, 0)
960            .unwrap()
961            .and_utc();
962        let next = compute_next_fire(&spec, after).expect("should compute");
963        assert!(next > after);
964        assert_eq!(next.hour(), 2);
965    }
966
967    #[test]
968    fn compute_next_fire_with_timezone() {
969        let spec = TriggerCronConfig {
970            schedule: "0 0 2 * * *".to_string(),
971            timezone: Some("Asia/Shanghai".to_string()),
972        };
973        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
974            .unwrap()
975            .and_hms_opt(0, 0, 0)
976            .unwrap()
977            .and_utc();
978        let next = compute_next_fire(&spec, after).expect("should compute with tz");
979        assert!(next > after);
980        // 02:00 Shanghai = 18:00 UTC previous day
981        assert_eq!(next.hour(), 18);
982    }
983
984    #[test]
985    fn compute_next_fire_rejects_invalid_schedule() {
986        let spec = TriggerCronConfig {
987            schedule: "not a cron".to_string(),
988            timezone: None,
989        };
990        assert!(compute_next_fire(&spec, Utc::now()).is_err());
991    }
992
993    #[test]
994    fn compute_next_fire_rejects_invalid_timezone() {
995        let spec = TriggerCronConfig {
996            schedule: "0 0 2 * * *".to_string(),
997            timezone: Some("Invalid/TZ".to_string()),
998        };
999        assert!(compute_next_fire(&spec, Utc::now()).is_err());
1000    }
1001
1002    #[test]
1003    fn next_cron_sleep_empty_returns_1h() {
1004        let d = next_cron_sleep(&[]);
1005        assert_eq!(d, std::time::Duration::from_secs(3600));
1006    }
1007
1008    #[test]
1009    fn collect_due_entries_finds_past_entries() {
1010        let now = Utc::now();
1011        let past = now - chrono::Duration::seconds(10);
1012        let future = now + chrono::Duration::seconds(300);
1013        let entries = vec![
1014            CronEntry {
1015                trigger_name: "past".to_string(),
1016                project: "p".to_string(),
1017                next_fire: past,
1018                kind: CronEntryKind::Trigger,
1019            },
1020            CronEntry {
1021                trigger_name: "future".to_string(),
1022                project: "p".to_string(),
1023                next_fire: future,
1024                kind: CronEntryKind::Trigger,
1025            },
1026        ];
1027        let due = collect_due_entries(&entries, now);
1028        assert_eq!(due.len(), 1);
1029        assert_eq!(due[0].trigger_name, "past");
1030    }
1031}