Skip to main content

agent_orchestrator/
trigger_engine.rs

1//! Trigger engine: cron scheduler and event-driven task creation.
2//!
3//! The `TriggerEngine` runs as a long-lived tokio task inside `orchestratord`,
4//! watching for cron ticks and task-lifecycle events and creating tasks when
5//! trigger conditions are met.
6
7use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19/// Cancels a running task for the Replace trigger policy.
20///
21/// This is a simplified version of `scheduler::stop_task_runtime` that avoids
22/// a dependency on the scheduler crate.
23async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24    // Signal the in-process running task to stop, if present.
25    let runtime = {
26        let running = state.running.lock().await;
27        running.get(task_id).cloned()
28    };
29    if let Some(rt) = runtime {
30        rt.stop_flag.store(true, Ordering::SeqCst);
31    }
32    // Update task status to cancelled.
33    state
34        .db_writer
35        .set_task_status(task_id, "cancelled", false)
36        .await?;
37    insert_event(
38        state,
39        task_id,
40        None,
41        "task_control",
42        serde_json::json!({"status": "cancelled"}),
43    )
44    .await?;
45    Ok(())
46}
47
48// ── Public types ─────────────────────────────────────────────────────────────
49
50/// Payload broadcast when a trigger-relevant event fires.
51#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53    /// Event type: "task_completed", "task_failed", "webhook", or "filesystem".
54    pub event_type: String,
55    /// Source task ID (empty for webhook/filesystem events).
56    pub task_id: String,
57    /// Optional JSON payload (webhook body, filesystem event context, or task metadata).
58    pub payload: Option<serde_json::Value>,
59    /// Optional project scope. When set, the trigger engine only matches triggers
60    /// in this specific project, preventing cross-project trigger leakage.
61    pub project: Option<String>,
62}
63
64/// Notification sent to the engine when trigger configuration changes.
65#[derive(Debug)]
66pub enum TriggerReloadEvent {
67    /// Re-read all triggers from the current config snapshot.
68    Reload,
69}
70
71/// Handle used by the rest of the system to communicate with a running engine.
72#[derive(Clone)]
73pub struct TriggerEngineHandle {
74    reload_tx: mpsc::Sender<TriggerReloadEvent>,
75}
76
77impl TriggerEngineHandle {
78    /// Notify the engine to reload trigger configuration (async).
79    pub async fn reload(&self) {
80        let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
81    }
82
83    /// Notify the engine to reload trigger configuration (sync-safe).
84    ///
85    /// Uses `try_send` so it can be called from synchronous code paths
86    /// like `apply_manifests`. Returns `true` if the notification was sent.
87    pub fn reload_sync(&self) -> bool {
88        self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
89    }
90}
91
92/// The trigger engine itself. Constructed via [`TriggerEngine::new`] and
93/// driven via [`TriggerEngine::run`].
94pub struct TriggerEngine {
95    state: Arc<InnerState>,
96    reload_rx: mpsc::Receiver<TriggerReloadEvent>,
97    trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
98    /// Triggers that have been present for at least one config reload cycle.
99    /// A freshly-created trigger (from an agent `apply`) must survive one
100    /// reload before it is eligible to fire — this prevents agent-applied
101    /// triggers from immediately spawning parasitic tasks.
102    stabilized_triggers: HashSet<(String, String)>,
103}
104
105impl TriggerEngine {
106    /// Create a new engine and its control handle.
107    pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
108        let (reload_tx, reload_rx) = mpsc::channel(16);
109        let trigger_event_rx = state.trigger_event_tx.subscribe();
110        let engine = Self {
111            state,
112            reload_rx,
113            trigger_event_rx,
114            stabilized_triggers: HashSet::new(),
115        };
116        let handle = TriggerEngineHandle { reload_tx };
117        (engine, handle)
118    }
119
120    /// Main run loop. Returns when `shutdown_rx` fires.
121    pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
122        info!("trigger engine started");
123
124        // Load initial trigger set.
125        let mut cron_schedule = self.build_cron_schedule();
126
127        loop {
128            let sleep_duration = next_cron_sleep(&cron_schedule);
129            let sleep_fut = tokio::time::sleep(sleep_duration);
130            tokio::pin!(sleep_fut);
131
132            tokio::select! {
133                // ── Cron tick ───────────────────────────────────────────
134                () = &mut sleep_fut => {
135                    let now = Utc::now();
136                    let fired = collect_due_triggers(&cron_schedule, now);
137                    for (trigger_name, project) in fired {
138                        self.fire_trigger(&trigger_name, &project).await;
139                    }
140                    // Recompute schedule after firing.
141                    cron_schedule = self.build_cron_schedule();
142                }
143
144                // ── Event trigger ───────────────────────────────────────
145                event_result = self.trigger_event_rx.recv() => {
146                    match event_result {
147                        Ok(payload) => {
148                            self.handle_event_trigger(&payload).await;
149                        }
150                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
151                            warn!(skipped = n, "trigger event receiver lagged");
152                        }
153                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
154                            debug!("trigger event channel closed");
155                            break;
156                        }
157                    }
158                }
159
160                // ── Config reload ───────────────────────────────────────
161                Some(_) = self.reload_rx.recv() => {
162                    info!("trigger engine: reloading configuration");
163                    cron_schedule = self.build_cron_schedule();
164                }
165
166                // ── Shutdown ────────────────────────────────────────────
167                _ = shutdown_rx.changed() => {
168                    info!("trigger engine shutting down");
169                    break;
170                }
171            }
172        }
173    }
174
175    // ── Cron helpers ─────────────────────────────────────────────────────────
176
177    fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
178        let snap = self.state.config_runtime.load();
179        let config = &snap.active_config.config;
180        let mut entries = Vec::new();
181
182        // Collect the current set of triggers to update stabilization tracking.
183        let mut current_triggers: HashSet<(String, String)> = HashSet::new();
184        for (project_id, project) in &config.projects {
185            for name in project.triggers.keys() {
186                current_triggers.insert((project_id.clone(), name.clone()));
187            }
188        }
189
190        // Promote triggers that were already known from a prior cycle.
191        // Triggers seen for the first time in THIS reload are NOT yet stabilized
192        // and will only become eligible after the next reload.
193        let previously_known = std::mem::take(&mut self.stabilized_triggers);
194        self.stabilized_triggers = current_triggers.clone();
195
196        for (project_id, project) in &config.projects {
197            for (name, trigger) in &project.triggers {
198                if trigger.suspend {
199                    continue;
200                }
201                // Skip triggers that are new (not in the previous cycle's set).
202                if !previously_known.contains(&(project_id.clone(), name.clone())) {
203                    debug!(
204                        trigger = name.as_str(),
205                        project = project_id.as_str(),
206                        "trigger not yet stabilized, skipping cron schedule"
207                    );
208                    continue;
209                }
210                if let Some(ref cron_spec) = trigger.cron {
211                    match compute_next_fire(cron_spec, Utc::now()) {
212                        Ok(next) => {
213                            entries.push(CronEntry {
214                                trigger_name: name.clone(),
215                                project: project_id.clone(),
216                                next_fire: next,
217                            });
218                        }
219                        Err(e) => {
220                            warn!(
221                                trigger = name.as_str(),
222                                project = project_id.as_str(),
223                                error = %e,
224                                "failed to compute next fire time"
225                            );
226                        }
227                    }
228                }
229            }
230        }
231        entries
232    }
233
234    // ── Event trigger matching ───────────────────────────────────────────────
235
236    async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
237        // Resolve the source task's workflow from the database (the payload only
238        // carries event_type + task_id to keep the broadcast lightweight).
239        // For webhook events, skip workflow lookup (no source task).
240        let source_workflow = if payload.task_id.is_empty() {
241            None
242        } else {
243            self.lookup_task_workflow(&payload.task_id).await
244        };
245
246        let snap = self.state.config_runtime.load();
247        let config = &snap.active_config.config;
248
249        for (project_id, project) in &config.projects {
250            // When a project scope is specified, only match triggers in that project.
251            if let Some(ref scoped_project) = payload.project {
252                if project_id != scoped_project {
253                    continue;
254                }
255            }
256            for (name, trigger) in &project.triggers {
257                if trigger.suspend {
258                    continue;
259                }
260                // Skip triggers not yet stabilized (first seen in most recent reload).
261                if !self
262                    .stabilized_triggers
263                    .contains(&(project_id.clone(), name.clone()))
264                {
265                    continue;
266                }
267                if let Some(ref event_spec) = trigger.event {
268                    // Match event source type.
269                    if event_spec.source != payload.event_type {
270                        continue;
271                    }
272                    // Match optional workflow filter.
273                    if let Some(ref filter) = event_spec.filter {
274                        if let Some(ref filter_wf) = filter.workflow {
275                            match source_workflow {
276                                Some(ref sw) if sw == filter_wf => {}
277                                _ => continue,
278                            }
279                        }
280                        // CEL condition evaluation on payload.
281                        if let Some(ref condition) = filter.condition {
282                            if let Some(ref event_payload) = payload.payload {
283                                match crate::prehook::evaluate_webhook_filter(
284                                    condition,
285                                    event_payload,
286                                ) {
287                                    Ok(true) => {} // Condition matched, proceed
288                                    Ok(false) => {
289                                        debug!(
290                                            trigger = name.as_str(),
291                                            condition, "CEL filter rejected payload"
292                                        );
293                                        continue;
294                                    }
295                                    Err(e) => {
296                                        warn!(
297                                            trigger = name.as_str(),
298                                            error = %e,
299                                            "CEL filter evaluation failed, skipping"
300                                        );
301                                        continue;
302                                    }
303                                }
304                            } else {
305                                // No payload but condition set — skip (can't evaluate)
306                                debug!(
307                                    trigger = name.as_str(),
308                                    "CEL condition set but no payload available, skipping"
309                                );
310                                continue;
311                            }
312                        }
313                    }
314
315                    info!(
316                        trigger = name.as_str(),
317                        project = project_id.as_str(),
318                        event_type = payload.event_type.as_str(),
319                        "event trigger matched"
320                    );
321                    self.fire_trigger_with_config(
322                        name,
323                        project_id,
324                        trigger,
325                        payload.payload.as_ref(),
326                    )
327                    .await;
328                }
329            }
330        }
331    }
332
333    // ── Fire logic ───────────────────────────────────────────────────────────
334
335    async fn fire_trigger(&self, trigger_name: &str, project: &str) {
336        let snap = self.state.config_runtime.load();
337        let config = &snap.active_config.config;
338
339        let trigger = config
340            .projects
341            .get(project)
342            .and_then(|p| p.triggers.get(trigger_name));
343
344        let Some(trigger) = trigger else {
345            warn!(trigger = trigger_name, "trigger not found in config");
346            return;
347        };
348
349        self.fire_trigger_with_config(trigger_name, project, trigger, None)
350            .await;
351    }
352
353    async fn fire_trigger_with_config(
354        &self,
355        trigger_name: &str,
356        project: &str,
357        trigger: &TriggerConfig,
358        webhook_payload: Option<&serde_json::Value>,
359    ) {
360        // ── Suspend check ────────────────────────────────────────────────
361        if trigger.suspend {
362            self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
363            return;
364        }
365
366        // ── Throttle check (event triggers only) ─────────────────────────
367        if let Some(ref throttle) = trigger.throttle {
368            if throttle.min_interval > 0 {
369                if let Some(last) = self.load_last_fired(trigger_name, project).await {
370                    let elapsed = (Utc::now() - last).num_seconds();
371                    if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
372                        self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
373                        return;
374                    }
375                }
376            }
377        }
378
379        // ── Concurrency policy ───────────────────────────────────────────
380        match trigger.concurrency_policy {
381            crate::cli_types::ConcurrencyPolicy::Forbid => {
382                if self.has_active_task(trigger_name, project).await {
383                    self.emit_trigger_event(
384                        trigger_name,
385                        "trigger_skipped",
386                        "concurrent_task_active",
387                    );
388                    return;
389                }
390            }
391            crate::cli_types::ConcurrencyPolicy::Replace => {
392                // Cancel active tasks created by this trigger before creating a new one.
393                self.cancel_active_tasks(trigger_name, project).await;
394            }
395            crate::cli_types::ConcurrencyPolicy::Allow => {}
396        }
397
398        // ── Create task ──────────────────────────────────────────────────
399        let target_files = trigger
400            .action
401            .args
402            .as_ref()
403            .and_then(|a| a.get("target-file"))
404            .cloned();
405
406        let task_name = format!("trigger-{trigger_name}");
407
408        let payload = CreateTaskPayload {
409            name: Some(task_name),
410            goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
411            project_id: Some(project.to_string()),
412            workspace_id: Some(trigger.action.workspace.clone()),
413            workflow_id: Some(trigger.action.workflow.clone()),
414            target_files,
415            parent_task_id: None,
416            spawn_reason: None,
417        };
418
419        match crate::task_ops::create_task_as_service(&self.state, payload) {
420            Ok(summary) => {
421                let task_id = summary.id.clone();
422                info!(
423                    trigger = trigger_name,
424                    task_id = task_id.as_str(),
425                    "trigger fired: task created"
426                );
427
428                // Update trigger state.
429                self.update_trigger_state(trigger_name, project, &task_id, "created")
430                    .await;
431
432                // Emit event.
433                self.state.emit_event(
434                    &task_id,
435                    None,
436                    "trigger_fired",
437                    serde_json::json!({
438                        "trigger": trigger_name,
439                        "source": if trigger.cron.is_some() { "cron" } else { "event" },
440                        "task_id": task_id,
441                    }),
442                );
443
444                // Start the task if action.start is true.
445                if trigger.action.start {
446                    let state = self.state.clone();
447                    let tid = task_id.clone();
448                    tokio::spawn(async move {
449                        if let Err(e) = state.task_enqueuer.enqueue_task(&state, &tid).await {
450                            error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
451                        } else {
452                            state.worker_notify.notify_one();
453                        }
454                    });
455                }
456
457                // History limit cleanup (best-effort, async).
458                if trigger.history_limit.is_some() {
459                    let state = self.state.clone();
460                    let name = trigger_name.to_string();
461                    let proj = project.to_string();
462                    let limit = trigger.history_limit.clone();
463                    tokio::spawn(async move {
464                        if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
465                        {
466                            debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
467                        }
468                    });
469                }
470            }
471            Err(e) => {
472                error!(
473                    trigger = trigger_name,
474                    error = %e,
475                    "trigger failed to create task"
476                );
477                self.update_trigger_state(trigger_name, project, "", "failed_to_create")
478                    .await;
479                self.state.emit_event(
480                    "",
481                    None,
482                    "trigger_error",
483                    serde_json::json!({
484                        "trigger": trigger_name,
485                        "error": e.to_string(),
486                    }),
487                );
488            }
489        }
490    }
491
492    // ── DB helpers ───────────────────────────────────────────────────────────
493
494    /// Look up the workflow_id for a task from the database.
495    async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
496        let tid = task_id.to_owned();
497        let result = self
498            .state
499            .async_database
500            .reader()
501            .call(move |conn| {
502                let wf: Option<String> = conn
503                    .query_row(
504                        "SELECT workflow_id FROM tasks WHERE id = ?1",
505                        rusqlite::params![tid],
506                        |row| row.get(0),
507                    )
508                    .ok();
509                Ok(wf)
510            })
511            .await;
512        match result {
513            Ok(wf) => wf,
514            Err(e) => {
515                debug!(task_id, error = %e, "failed to look up task workflow");
516                None
517            }
518        }
519    }
520
521    async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
522        let name = trigger_name.to_owned();
523        let proj = project.to_owned();
524        let result = self
525            .state
526            .async_database
527            .reader()
528            .call(move |conn| {
529                let mut stmt = conn
530                    .prepare(
531                        "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
532                    )
533                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
534                let ts: Option<String> = stmt
535                    .query_row(rusqlite::params![name, proj], |row| row.get(0))
536                    .ok();
537                Ok(ts)
538            })
539            .await;
540
541        match result {
542            Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
543            _ => None,
544        }
545    }
546
547    async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
548        let name = trigger_name.to_owned();
549        let proj = project.to_owned();
550        let result = self
551            .state
552            .async_database
553            .reader()
554            .call(move |conn| {
555                let last_task_id: Option<String> = conn
556                    .query_row(
557                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
558                        rusqlite::params![name, proj],
559                        |row| row.get(0),
560                    )
561                    .ok()
562                    .flatten();
563
564                if let Some(ref tid) = last_task_id {
565                    let status: Option<String> = conn
566                        .query_row(
567                            "SELECT status FROM tasks WHERE id = ?1",
568                            rusqlite::params![tid],
569                            |row| row.get(0),
570                        )
571                        .ok();
572                    if let Some(s) = status {
573                        return Ok(matches!(
574                            s.as_str(),
575                            "created" | "pending" | "running" | "restart_pending"
576                        ));
577                    }
578                }
579                Ok(false)
580            })
581            .await;
582
583        result.unwrap_or(false)
584    }
585
586    async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
587        let name = trigger_name.to_owned();
588        let proj = project.to_owned();
589        let state = self.state.clone();
590        let result = state
591            .async_database
592            .reader()
593            .call(move |conn| {
594                let tid: Option<String> = conn
595                    .query_row(
596                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
597                        rusqlite::params![name, proj],
598                        |row| row.get(0),
599                    )
600                    .ok()
601                    .flatten();
602                Ok(tid)
603            })
604            .await;
605
606        if let Ok(Some(task_id)) = result {
607            if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
608                warn!(
609                    trigger = trigger_name,
610                    task_id = task_id.as_str(),
611                    error = %e,
612                    "failed to cancel active task for Replace policy"
613                );
614            }
615        }
616    }
617
618    async fn update_trigger_state(
619        &self,
620        trigger_name: &str,
621        project: &str,
622        task_id: &str,
623        status: &str,
624    ) {
625        let name = trigger_name.to_owned();
626        let proj = project.to_owned();
627        let tid = task_id.to_owned();
628        let st = status.to_owned();
629        let now = Utc::now().to_rfc3339();
630        let now2 = now.clone();
631
632        if let Err(e) = self
633            .state
634            .async_database
635            .writer()
636            .call(move |conn| {
637                conn.execute(
638                    "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
639                     VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
640                     ON CONFLICT(trigger_name, project) DO UPDATE SET
641                       last_fired_at = ?3,
642                       fire_count = fire_count + 1,
643                       last_task_id = ?4,
644                       last_status = ?5,
645                       updated_at = ?7",
646                    rusqlite::params![name, proj, now, tid, st, now2, now2],
647                )
648                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
649                Ok(())
650            })
651            .await
652        {
653            warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
654        }
655    }
656
657    fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
658        debug!(trigger = trigger_name, event_type, reason, "trigger event");
659        self.state.emit_event(
660            "",
661            None,
662            event_type,
663            serde_json::json!({
664                "trigger": trigger_name,
665                "reason": reason,
666            }),
667        );
668    }
669}
670
671// ── Cron schedule helpers ────────────────────────────────────────────────────
672
673struct CronEntry {
674    trigger_name: String,
675    project: String,
676    next_fire: DateTime<Utc>,
677}
678
679fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
680    use cron::Schedule;
681    use std::str::FromStr;
682
683    let schedule = Schedule::from_str(&spec.schedule)
684        .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
685
686    // If a timezone is specified, compute in that timezone, then convert back to UTC.
687    if let Some(ref tz_name) = spec.timezone {
688        let tz: chrono_tz::Tz = tz_name
689            .parse()
690            .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
691        let local_after = after.with_timezone(&tz);
692        let next = schedule
693            .after(&local_after)
694            .next()
695            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
696        Ok(next.with_timezone(&Utc))
697    } else {
698        let next = schedule
699            .after(&after)
700            .next()
701            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
702        Ok(next.with_timezone(&Utc))
703    }
704}
705
706fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
707    let now = Utc::now();
708    entries
709        .iter()
710        .map(|e| {
711            let diff = e.next_fire.signed_duration_since(now);
712            if diff.num_milliseconds() <= 0 {
713                std::time::Duration::from_millis(100)
714            } else {
715                std::time::Duration::from_millis(diff.num_milliseconds() as u64)
716            }
717        })
718        .min()
719        // If no cron triggers, sleep for a long time (until event or reload wakes us).
720        .unwrap_or(std::time::Duration::from_secs(3600))
721}
722
723fn collect_due_triggers(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<(String, String)> {
724    entries
725        .iter()
726        .filter(|e| e.next_fire <= now)
727        .map(|e| (e.trigger_name.clone(), e.project.clone()))
728        .collect()
729}
730
731async fn cleanup_history(
732    state: &InnerState,
733    trigger_name: &str,
734    project: &str,
735    limit: Option<&crate::config::TriggerHistoryLimitConfig>,
736) -> Result<()> {
737    let limit = match limit {
738        Some(l) => l,
739        None => return Ok(()),
740    };
741
742    let task_name_pattern = format!("trigger-{trigger_name}");
743    let proj = project.to_owned();
744
745    // For each status category, collect IDs of tasks beyond the retention limit.
746    let mut ids_to_delete: Vec<String> = Vec::new();
747
748    if let Some(max_successful) = limit.successful {
749        let pattern = task_name_pattern.clone();
750        let p = proj.clone();
751        let max = max_successful as usize;
752        let ids = state
753            .async_database
754            .reader()
755            .call(move |conn| {
756                let mut stmt = conn
757                    .prepare(
758                        "SELECT id FROM tasks \
759                         WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
760                         ORDER BY created_at DESC",
761                    )
762                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
763                let rows = stmt
764                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
765                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
766                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
767                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
768            })
769            .await
770            .context("query completed tasks for history cleanup")?;
771        ids_to_delete.extend(ids);
772    }
773
774    if let Some(max_failed) = limit.failed {
775        let pattern = task_name_pattern.clone();
776        let p = proj.clone();
777        let max = max_failed as usize;
778        let ids = state
779            .async_database
780            .reader()
781            .call(move |conn| {
782                let mut stmt = conn
783                    .prepare(
784                        "SELECT id FROM tasks \
785                         WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
786                         ORDER BY created_at DESC",
787                    )
788                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
789                let rows = stmt
790                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
791                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
792                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
793                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
794            })
795            .await
796            .context("query failed tasks for history cleanup")?;
797        ids_to_delete.extend(ids);
798    }
799
800    if ids_to_delete.is_empty() {
801        return Ok(());
802    }
803
804    state
805        .async_database
806        .writer()
807        .call(move |conn| {
808            let placeholders: Vec<String> =
809                (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
810            let sql = format!(
811                "DELETE FROM tasks WHERE id IN ({})",
812                placeholders.join(", ")
813            );
814            let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
815                .iter()
816                .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
817                .collect();
818            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
819                params.iter().map(|p| p.as_ref()).collect();
820            conn.execute(&sql, param_refs.as_slice())
821                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
822            Ok(())
823        })
824        .await
825        .context("delete excess trigger history tasks")?;
826
827    Ok(())
828}
829
830// ── Goal construction ────────────────────────────────────────────────────────
831
832/// Build a task goal string for a trigger fire.
833/// If an event payload is present, include a summary in the goal.
834fn build_trigger_goal(trigger_name: &str, event_payload: Option<&serde_json::Value>) -> String {
835    match event_payload {
836        Some(payload) => {
837            // For filesystem events, use a friendlier format.
838            if let Some(filename) = payload.get("filename").and_then(|v| v.as_str()) {
839                if let Some(event_type) = payload.get("event_type").and_then(|v| v.as_str()) {
840                    return format!(
841                        "Triggered by filesystem '{trigger_name}': {event_type} {filename}"
842                    );
843                }
844            }
845            let summary = serde_json::to_string(payload).unwrap_or_default();
846            let truncated = if summary.len() > 500 {
847                format!("{}...", &summary[..497])
848            } else {
849                summary
850            };
851            format!("Triggered by '{trigger_name}': {truncated}")
852        }
853        None => format!("Triggered by: {trigger_name}"),
854    }
855}
856
857// ── Public helper for event broadcasting ─────────────────────────────────────
858
859/// Broadcast a trigger-relevant event (task_completed / task_failed / webhook).
860/// Called from the daemon's event handling path.
861pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
862    // Ignore send errors (no subscribers = no triggers configured).
863    let _ = state.trigger_event_tx.send(payload);
864}
865
866/// Notify the trigger engine to reload its configuration.
867/// Also notifies the filesystem watcher (if running) to re-evaluate watched paths.
868/// Safe to call from sync code. No-op if no engine/watcher is running.
869pub fn notify_trigger_reload(state: &InnerState) {
870    if let Ok(guard) = state.trigger_engine_handle.lock() {
871        if let Some(ref handle) = *guard {
872            let _ = handle.reload_sync();
873        }
874    }
875    // Also notify filesystem watcher to reload its watch list.
876    if let Ok(guard) = state.fs_watcher_reload_tx.lock() {
877        if let Some(ref tx) = *guard {
878            let _ = tx.try_send(());
879        }
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use chrono::Timelike;
887
888    #[test]
889    fn compute_next_fire_utc() {
890        let spec = TriggerCronConfig {
891            schedule: "0 0 2 * * *".to_string(), // daily at 02:00 (cron crate uses 6 fields)
892            timezone: None,
893        };
894        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
895            .unwrap()
896            .and_hms_opt(0, 0, 0)
897            .unwrap()
898            .and_utc();
899        let next = compute_next_fire(&spec, after).expect("should compute");
900        assert!(next > after);
901        assert_eq!(next.hour(), 2);
902    }
903
904    #[test]
905    fn compute_next_fire_with_timezone() {
906        let spec = TriggerCronConfig {
907            schedule: "0 0 2 * * *".to_string(),
908            timezone: Some("Asia/Shanghai".to_string()),
909        };
910        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
911            .unwrap()
912            .and_hms_opt(0, 0, 0)
913            .unwrap()
914            .and_utc();
915        let next = compute_next_fire(&spec, after).expect("should compute with tz");
916        assert!(next > after);
917        // 02:00 Shanghai = 18:00 UTC previous day
918        assert_eq!(next.hour(), 18);
919    }
920
921    #[test]
922    fn compute_next_fire_rejects_invalid_schedule() {
923        let spec = TriggerCronConfig {
924            schedule: "not a cron".to_string(),
925            timezone: None,
926        };
927        assert!(compute_next_fire(&spec, Utc::now()).is_err());
928    }
929
930    #[test]
931    fn compute_next_fire_rejects_invalid_timezone() {
932        let spec = TriggerCronConfig {
933            schedule: "0 0 2 * * *".to_string(),
934            timezone: Some("Invalid/TZ".to_string()),
935        };
936        assert!(compute_next_fire(&spec, Utc::now()).is_err());
937    }
938
939    #[test]
940    fn next_cron_sleep_empty_returns_1h() {
941        let d = next_cron_sleep(&[]);
942        assert_eq!(d, std::time::Duration::from_secs(3600));
943    }
944
945    #[test]
946    fn collect_due_triggers_finds_past_entries() {
947        let now = Utc::now();
948        let past = now - chrono::Duration::seconds(10);
949        let future = now + chrono::Duration::seconds(300);
950        let entries = vec![
951            CronEntry {
952                trigger_name: "past".to_string(),
953                project: "p".to_string(),
954                next_fire: past,
955            },
956            CronEntry {
957                trigger_name: "future".to_string(),
958                project: "p".to_string(),
959                next_fire: future,
960            },
961        ];
962        let due = collect_due_triggers(&entries, now);
963        assert_eq!(due.len(), 1);
964        assert_eq!(due[0].0, "past");
965    }
966}