Skip to main content

agent_orchestrator/
trigger_engine.rs

1//! Trigger engine: cron scheduler and event-driven task creation.
2//!
3//! The `TriggerEngine` runs as a long-lived tokio task inside `orchestratord`,
4//! watching for cron ticks and task-lifecycle events and creating tasks when
5//! trigger conditions are met.
6
7use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::Arc;
15use std::sync::atomic::Ordering;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19/// Cancels a running task for the Replace trigger policy.
20///
21/// This is a simplified version of `scheduler::stop_task_runtime` that avoids
22/// a dependency on the scheduler crate.
23async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24    // Signal the in-process running task to stop, if present.
25    let runtime = {
26        let running = state.running.lock().await;
27        running.get(task_id).cloned()
28    };
29    if let Some(rt) = runtime {
30        rt.stop_flag.store(true, Ordering::SeqCst);
31    }
32    // Update task status to cancelled.
33    state
34        .db_writer
35        .set_task_status(task_id, "cancelled", false)
36        .await?;
37    insert_event(
38        state,
39        task_id,
40        None,
41        "task_control",
42        serde_json::json!({"status": "cancelled"}),
43    )
44    .await?;
45    Ok(())
46}
47
48// ── Public types ─────────────────────────────────────────────────────────────
49
50/// Payload broadcast when a trigger-relevant event fires.
51#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53    /// Event type: "task_completed", "task_failed", or "webhook".
54    pub event_type: String,
55    /// Source task ID (empty for webhook events).
56    pub task_id: String,
57    /// Optional JSON payload (webhook body or event metadata).
58    pub payload: Option<serde_json::Value>,
59}
60
61/// Notification sent to the engine when trigger configuration changes.
62#[derive(Debug)]
63pub enum TriggerReloadEvent {
64    /// Re-read all triggers from the current config snapshot.
65    Reload,
66}
67
68/// Handle used by the rest of the system to communicate with a running engine.
69#[derive(Clone)]
70pub struct TriggerEngineHandle {
71    reload_tx: mpsc::Sender<TriggerReloadEvent>,
72}
73
74impl TriggerEngineHandle {
75    /// Notify the engine to reload trigger configuration (async).
76    pub async fn reload(&self) {
77        let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
78    }
79
80    /// Notify the engine to reload trigger configuration (sync-safe).
81    ///
82    /// Uses `try_send` so it can be called from synchronous code paths
83    /// like `apply_manifests`. Returns `true` if the notification was sent.
84    pub fn reload_sync(&self) -> bool {
85        self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
86    }
87}
88
89/// The trigger engine itself. Constructed via [`TriggerEngine::new`] and
90/// driven via [`TriggerEngine::run`].
91pub struct TriggerEngine {
92    state: Arc<InnerState>,
93    reload_rx: mpsc::Receiver<TriggerReloadEvent>,
94    trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
95    /// Triggers that have been present for at least one config reload cycle.
96    /// A freshly-created trigger (from an agent `apply`) must survive one
97    /// reload before it is eligible to fire — this prevents agent-applied
98    /// triggers from immediately spawning parasitic tasks.
99    stabilized_triggers: HashSet<(String, String)>,
100}
101
102impl TriggerEngine {
103    /// Create a new engine and its control handle.
104    pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
105        let (reload_tx, reload_rx) = mpsc::channel(16);
106        let trigger_event_rx = state.trigger_event_tx.subscribe();
107        let engine = Self {
108            state,
109            reload_rx,
110            trigger_event_rx,
111            stabilized_triggers: HashSet::new(),
112        };
113        let handle = TriggerEngineHandle { reload_tx };
114        (engine, handle)
115    }
116
117    /// Main run loop. Returns when `shutdown_rx` fires.
118    pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
119        info!("trigger engine started");
120
121        // Load initial trigger set.
122        let mut cron_schedule = self.build_cron_schedule();
123
124        loop {
125            let sleep_duration = next_cron_sleep(&cron_schedule);
126            let sleep_fut = tokio::time::sleep(sleep_duration);
127            tokio::pin!(sleep_fut);
128
129            tokio::select! {
130                // ── Cron tick ───────────────────────────────────────────
131                () = &mut sleep_fut => {
132                    let now = Utc::now();
133                    let fired = collect_due_triggers(&cron_schedule, now);
134                    for (trigger_name, project) in fired {
135                        self.fire_trigger(&trigger_name, &project).await;
136                    }
137                    // Recompute schedule after firing.
138                    cron_schedule = self.build_cron_schedule();
139                }
140
141                // ── Event trigger ───────────────────────────────────────
142                event_result = self.trigger_event_rx.recv() => {
143                    match event_result {
144                        Ok(payload) => {
145                            self.handle_event_trigger(&payload).await;
146                        }
147                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
148                            warn!(skipped = n, "trigger event receiver lagged");
149                        }
150                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
151                            debug!("trigger event channel closed");
152                            break;
153                        }
154                    }
155                }
156
157                // ── Config reload ───────────────────────────────────────
158                Some(_) = self.reload_rx.recv() => {
159                    info!("trigger engine: reloading configuration");
160                    cron_schedule = self.build_cron_schedule();
161                }
162
163                // ── Shutdown ────────────────────────────────────────────
164                _ = shutdown_rx.changed() => {
165                    info!("trigger engine shutting down");
166                    break;
167                }
168            }
169        }
170    }
171
172    // ── Cron helpers ─────────────────────────────────────────────────────────
173
174    fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
175        let snap = self.state.config_runtime.load();
176        let config = &snap.active_config.config;
177        let mut entries = Vec::new();
178
179        // Collect the current set of triggers to update stabilization tracking.
180        let mut current_triggers: HashSet<(String, String)> = HashSet::new();
181        for (project_id, project) in &config.projects {
182            for name in project.triggers.keys() {
183                current_triggers.insert((project_id.clone(), name.clone()));
184            }
185        }
186
187        // Promote triggers that were already known from a prior cycle.
188        // Triggers seen for the first time in THIS reload are NOT yet stabilized
189        // and will only become eligible after the next reload.
190        let previously_known = std::mem::take(&mut self.stabilized_triggers);
191        self.stabilized_triggers = current_triggers.clone();
192
193        for (project_id, project) in &config.projects {
194            for (name, trigger) in &project.triggers {
195                if trigger.suspend {
196                    continue;
197                }
198                // Skip triggers that are new (not in the previous cycle's set).
199                if !previously_known.contains(&(project_id.clone(), name.clone())) {
200                    debug!(
201                        trigger = name.as_str(),
202                        project = project_id.as_str(),
203                        "trigger not yet stabilized, skipping cron schedule"
204                    );
205                    continue;
206                }
207                if let Some(ref cron_spec) = trigger.cron {
208                    match compute_next_fire(cron_spec, Utc::now()) {
209                        Ok(next) => {
210                            entries.push(CronEntry {
211                                trigger_name: name.clone(),
212                                project: project_id.clone(),
213                                next_fire: next,
214                            });
215                        }
216                        Err(e) => {
217                            warn!(
218                                trigger = name.as_str(),
219                                project = project_id.as_str(),
220                                error = %e,
221                                "failed to compute next fire time"
222                            );
223                        }
224                    }
225                }
226            }
227        }
228        entries
229    }
230
231    // ── Event trigger matching ───────────────────────────────────────────────
232
233    async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
234        // Resolve the source task's workflow from the database (the payload only
235        // carries event_type + task_id to keep the broadcast lightweight).
236        // For webhook events, skip workflow lookup (no source task).
237        let source_workflow = if payload.task_id.is_empty() {
238            None
239        } else {
240            self.lookup_task_workflow(&payload.task_id).await
241        };
242
243        let snap = self.state.config_runtime.load();
244        let config = &snap.active_config.config;
245
246        for (project_id, project) in &config.projects {
247            for (name, trigger) in &project.triggers {
248                if trigger.suspend {
249                    continue;
250                }
251                // Skip triggers not yet stabilized (first seen in most recent reload).
252                if !self
253                    .stabilized_triggers
254                    .contains(&(project_id.clone(), name.clone()))
255                {
256                    continue;
257                }
258                if let Some(ref event_spec) = trigger.event {
259                    // Match event source type.
260                    if event_spec.source != payload.event_type {
261                        continue;
262                    }
263                    // Match optional workflow filter.
264                    if let Some(ref filter) = event_spec.filter {
265                        if let Some(ref filter_wf) = filter.workflow {
266                            match source_workflow {
267                                Some(ref sw) if sw == filter_wf => {}
268                                _ => continue,
269                            }
270                        }
271                        // CEL condition evaluation on payload.
272                        if let Some(ref condition) = filter.condition {
273                            if let Some(ref event_payload) = payload.payload {
274                                match crate::prehook::evaluate_webhook_filter(
275                                    condition,
276                                    event_payload,
277                                ) {
278                                    Ok(true) => {} // Condition matched, proceed
279                                    Ok(false) => {
280                                        debug!(
281                                            trigger = name.as_str(),
282                                            condition, "CEL filter rejected payload"
283                                        );
284                                        continue;
285                                    }
286                                    Err(e) => {
287                                        warn!(
288                                            trigger = name.as_str(),
289                                            error = %e,
290                                            "CEL filter evaluation failed, skipping"
291                                        );
292                                        continue;
293                                    }
294                                }
295                            } else {
296                                // No payload but condition set — skip (can't evaluate)
297                                debug!(
298                                    trigger = name.as_str(),
299                                    "CEL condition set but no payload available, skipping"
300                                );
301                                continue;
302                            }
303                        }
304                    }
305
306                    info!(
307                        trigger = name.as_str(),
308                        project = project_id.as_str(),
309                        event_type = payload.event_type.as_str(),
310                        "event trigger matched"
311                    );
312                    self.fire_trigger_with_config(
313                        name,
314                        project_id,
315                        trigger,
316                        payload.payload.as_ref(),
317                    )
318                    .await;
319                }
320            }
321        }
322    }
323
324    // ── Fire logic ───────────────────────────────────────────────────────────
325
326    async fn fire_trigger(&self, trigger_name: &str, project: &str) {
327        let snap = self.state.config_runtime.load();
328        let config = &snap.active_config.config;
329
330        let trigger = config
331            .projects
332            .get(project)
333            .and_then(|p| p.triggers.get(trigger_name));
334
335        let Some(trigger) = trigger else {
336            warn!(trigger = trigger_name, "trigger not found in config");
337            return;
338        };
339
340        self.fire_trigger_with_config(trigger_name, project, trigger, None)
341            .await;
342    }
343
344    async fn fire_trigger_with_config(
345        &self,
346        trigger_name: &str,
347        project: &str,
348        trigger: &TriggerConfig,
349        webhook_payload: Option<&serde_json::Value>,
350    ) {
351        // ── Suspend check ────────────────────────────────────────────────
352        if trigger.suspend {
353            self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
354            return;
355        }
356
357        // ── Throttle check (event triggers only) ─────────────────────────
358        if let Some(ref throttle) = trigger.throttle {
359            if throttle.min_interval > 0 {
360                if let Some(last) = self.load_last_fired(trigger_name, project).await {
361                    let elapsed = (Utc::now() - last).num_seconds();
362                    if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
363                        self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
364                        return;
365                    }
366                }
367            }
368        }
369
370        // ── Concurrency policy ───────────────────────────────────────────
371        match trigger.concurrency_policy {
372            crate::cli_types::ConcurrencyPolicy::Forbid => {
373                if self.has_active_task(trigger_name, project).await {
374                    self.emit_trigger_event(
375                        trigger_name,
376                        "trigger_skipped",
377                        "concurrent_task_active",
378                    );
379                    return;
380                }
381            }
382            crate::cli_types::ConcurrencyPolicy::Replace => {
383                // Cancel active tasks created by this trigger before creating a new one.
384                self.cancel_active_tasks(trigger_name, project).await;
385            }
386            crate::cli_types::ConcurrencyPolicy::Allow => {}
387        }
388
389        // ── Create task ──────────────────────────────────────────────────
390        let target_files = trigger
391            .action
392            .args
393            .as_ref()
394            .and_then(|a| a.get("target-file"))
395            .cloned();
396
397        let task_name = format!("trigger-{trigger_name}");
398
399        let payload = CreateTaskPayload {
400            name: Some(task_name),
401            goal: Some(build_trigger_goal(trigger_name, webhook_payload)),
402            project_id: Some(project.to_string()),
403            workspace_id: Some(trigger.action.workspace.clone()),
404            workflow_id: Some(trigger.action.workflow.clone()),
405            target_files,
406            parent_task_id: None,
407            spawn_reason: None,
408        };
409
410        match crate::task_ops::create_task_as_service(&self.state, payload) {
411            Ok(summary) => {
412                let task_id = summary.id.clone();
413                info!(
414                    trigger = trigger_name,
415                    task_id = task_id.as_str(),
416                    "trigger fired: task created"
417                );
418
419                // Update trigger state.
420                self.update_trigger_state(trigger_name, project, &task_id, "created")
421                    .await;
422
423                // Emit event.
424                self.state.emit_event(
425                    &task_id,
426                    None,
427                    "trigger_fired",
428                    serde_json::json!({
429                        "trigger": trigger_name,
430                        "source": if trigger.cron.is_some() { "cron" } else { "event" },
431                        "task_id": task_id,
432                    }),
433                );
434
435                // Start the task if action.start is true.
436                if trigger.action.start {
437                    let state = self.state.clone();
438                    let tid = task_id.clone();
439                    tokio::spawn(async move {
440                        if let Err(e) =
441                            crate::scheduler_service::enqueue_task_as_service(&state, &tid).await
442                        {
443                            error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
444                        } else {
445                            state.worker_notify.notify_one();
446                        }
447                    });
448                }
449
450                // History limit cleanup (best-effort, async).
451                if trigger.history_limit.is_some() {
452                    let state = self.state.clone();
453                    let name = trigger_name.to_string();
454                    let proj = project.to_string();
455                    let limit = trigger.history_limit.clone();
456                    tokio::spawn(async move {
457                        if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
458                        {
459                            debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
460                        }
461                    });
462                }
463            }
464            Err(e) => {
465                error!(
466                    trigger = trigger_name,
467                    error = %e,
468                    "trigger failed to create task"
469                );
470                self.update_trigger_state(trigger_name, project, "", "failed_to_create")
471                    .await;
472                self.state.emit_event(
473                    "",
474                    None,
475                    "trigger_error",
476                    serde_json::json!({
477                        "trigger": trigger_name,
478                        "error": e.to_string(),
479                    }),
480                );
481            }
482        }
483    }
484
485    // ── DB helpers ───────────────────────────────────────────────────────────
486
487    /// Look up the workflow_id for a task from the database.
488    async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
489        let tid = task_id.to_owned();
490        let result = self
491            .state
492            .async_database
493            .reader()
494            .call(move |conn| {
495                let wf: Option<String> = conn
496                    .query_row(
497                        "SELECT workflow_id FROM tasks WHERE id = ?1",
498                        rusqlite::params![tid],
499                        |row| row.get(0),
500                    )
501                    .ok();
502                Ok(wf)
503            })
504            .await;
505        match result {
506            Ok(wf) => wf,
507            Err(e) => {
508                debug!(task_id, error = %e, "failed to look up task workflow");
509                None
510            }
511        }
512    }
513
514    async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
515        let name = trigger_name.to_owned();
516        let proj = project.to_owned();
517        let result = self
518            .state
519            .async_database
520            .reader()
521            .call(move |conn| {
522                let mut stmt = conn
523                    .prepare(
524                        "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
525                    )
526                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
527                let ts: Option<String> = stmt
528                    .query_row(rusqlite::params![name, proj], |row| row.get(0))
529                    .ok();
530                Ok(ts)
531            })
532            .await;
533
534        match result {
535            Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
536            _ => None,
537        }
538    }
539
540    async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
541        let name = trigger_name.to_owned();
542        let proj = project.to_owned();
543        let result = self
544            .state
545            .async_database
546            .reader()
547            .call(move |conn| {
548                let last_task_id: Option<String> = conn
549                    .query_row(
550                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
551                        rusqlite::params![name, proj],
552                        |row| row.get(0),
553                    )
554                    .ok()
555                    .flatten();
556
557                if let Some(ref tid) = last_task_id {
558                    let status: Option<String> = conn
559                        .query_row(
560                            "SELECT status FROM tasks WHERE id = ?1",
561                            rusqlite::params![tid],
562                            |row| row.get(0),
563                        )
564                        .ok();
565                    if let Some(s) = status {
566                        return Ok(matches!(
567                            s.as_str(),
568                            "created" | "pending" | "running" | "restart_pending"
569                        ));
570                    }
571                }
572                Ok(false)
573            })
574            .await;
575
576        result.unwrap_or(false)
577    }
578
579    async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
580        let name = trigger_name.to_owned();
581        let proj = project.to_owned();
582        let state = self.state.clone();
583        let result = state
584            .async_database
585            .reader()
586            .call(move |conn| {
587                let tid: Option<String> = conn
588                    .query_row(
589                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
590                        rusqlite::params![name, proj],
591                        |row| row.get(0),
592                    )
593                    .ok()
594                    .flatten();
595                Ok(tid)
596            })
597            .await;
598
599        if let Ok(Some(task_id)) = result {
600            if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
601                warn!(
602                    trigger = trigger_name,
603                    task_id = task_id.as_str(),
604                    error = %e,
605                    "failed to cancel active task for Replace policy"
606                );
607            }
608        }
609    }
610
611    async fn update_trigger_state(
612        &self,
613        trigger_name: &str,
614        project: &str,
615        task_id: &str,
616        status: &str,
617    ) {
618        let name = trigger_name.to_owned();
619        let proj = project.to_owned();
620        let tid = task_id.to_owned();
621        let st = status.to_owned();
622        let now = Utc::now().to_rfc3339();
623        let now2 = now.clone();
624
625        if let Err(e) = self
626            .state
627            .async_database
628            .writer()
629            .call(move |conn| {
630                conn.execute(
631                    "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
632                     VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
633                     ON CONFLICT(trigger_name, project) DO UPDATE SET
634                       last_fired_at = ?3,
635                       fire_count = fire_count + 1,
636                       last_task_id = ?4,
637                       last_status = ?5,
638                       updated_at = ?7",
639                    rusqlite::params![name, proj, now, tid, st, now2, now2],
640                )
641                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
642                Ok(())
643            })
644            .await
645        {
646            warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
647        }
648    }
649
650    fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
651        debug!(trigger = trigger_name, event_type, reason, "trigger event");
652        self.state.emit_event(
653            "",
654            None,
655            event_type,
656            serde_json::json!({
657                "trigger": trigger_name,
658                "reason": reason,
659            }),
660        );
661    }
662}
663
664// ── Cron schedule helpers ────────────────────────────────────────────────────
665
666struct CronEntry {
667    trigger_name: String,
668    project: String,
669    next_fire: DateTime<Utc>,
670}
671
672fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
673    use cron::Schedule;
674    use std::str::FromStr;
675
676    let schedule = Schedule::from_str(&spec.schedule)
677        .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
678
679    // If a timezone is specified, compute in that timezone, then convert back to UTC.
680    if let Some(ref tz_name) = spec.timezone {
681        let tz: chrono_tz::Tz = tz_name
682            .parse()
683            .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
684        let local_after = after.with_timezone(&tz);
685        let next = schedule
686            .after(&local_after)
687            .next()
688            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
689        Ok(next.with_timezone(&Utc))
690    } else {
691        let next = schedule
692            .after(&after)
693            .next()
694            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
695        Ok(next.with_timezone(&Utc))
696    }
697}
698
699fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
700    let now = Utc::now();
701    entries
702        .iter()
703        .map(|e| {
704            let diff = e.next_fire.signed_duration_since(now);
705            if diff.num_milliseconds() <= 0 {
706                std::time::Duration::from_millis(100)
707            } else {
708                std::time::Duration::from_millis(diff.num_milliseconds() as u64)
709            }
710        })
711        .min()
712        // If no cron triggers, sleep for a long time (until event or reload wakes us).
713        .unwrap_or(std::time::Duration::from_secs(3600))
714}
715
716fn collect_due_triggers(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<(String, String)> {
717    entries
718        .iter()
719        .filter(|e| e.next_fire <= now)
720        .map(|e| (e.trigger_name.clone(), e.project.clone()))
721        .collect()
722}
723
724async fn cleanup_history(
725    state: &InnerState,
726    trigger_name: &str,
727    project: &str,
728    limit: Option<&crate::config::TriggerHistoryLimitConfig>,
729) -> Result<()> {
730    let limit = match limit {
731        Some(l) => l,
732        None => return Ok(()),
733    };
734
735    let task_name_pattern = format!("trigger-{trigger_name}");
736    let proj = project.to_owned();
737
738    // For each status category, collect IDs of tasks beyond the retention limit.
739    let mut ids_to_delete: Vec<String> = Vec::new();
740
741    if let Some(max_successful) = limit.successful {
742        let pattern = task_name_pattern.clone();
743        let p = proj.clone();
744        let max = max_successful as usize;
745        let ids = state
746            .async_database
747            .reader()
748            .call(move |conn| {
749                let mut stmt = conn
750                    .prepare(
751                        "SELECT id FROM tasks \
752                         WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
753                         ORDER BY created_at DESC",
754                    )
755                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
756                let rows = stmt
757                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
758                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
759                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
760                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
761            })
762            .await
763            .context("query completed tasks for history cleanup")?;
764        ids_to_delete.extend(ids);
765    }
766
767    if let Some(max_failed) = limit.failed {
768        let pattern = task_name_pattern.clone();
769        let p = proj.clone();
770        let max = max_failed as usize;
771        let ids = state
772            .async_database
773            .reader()
774            .call(move |conn| {
775                let mut stmt = conn
776                    .prepare(
777                        "SELECT id FROM tasks \
778                         WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
779                         ORDER BY created_at DESC",
780                    )
781                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
782                let rows = stmt
783                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
784                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
785                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
786                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
787            })
788            .await
789            .context("query failed tasks for history cleanup")?;
790        ids_to_delete.extend(ids);
791    }
792
793    if ids_to_delete.is_empty() {
794        return Ok(());
795    }
796
797    state
798        .async_database
799        .writer()
800        .call(move |conn| {
801            let placeholders: Vec<String> =
802                (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
803            let sql = format!(
804                "DELETE FROM tasks WHERE id IN ({})",
805                placeholders.join(", ")
806            );
807            let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
808                .iter()
809                .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
810                .collect();
811            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
812                params.iter().map(|p| p.as_ref()).collect();
813            conn.execute(&sql, param_refs.as_slice())
814                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
815            Ok(())
816        })
817        .await
818        .context("delete excess trigger history tasks")?;
819
820    Ok(())
821}
822
823// ── Goal construction ────────────────────────────────────────────────────────
824
825/// Build a task goal string for a trigger fire.
826/// If a webhook payload is present, include a summary in the goal.
827fn build_trigger_goal(trigger_name: &str, webhook_payload: Option<&serde_json::Value>) -> String {
828    match webhook_payload {
829        Some(payload) => {
830            let summary = serde_json::to_string(payload).unwrap_or_default();
831            let truncated = if summary.len() > 500 {
832                format!("{}...", &summary[..497])
833            } else {
834                summary
835            };
836            format!("Triggered by webhook '{trigger_name}': {truncated}")
837        }
838        None => format!("Triggered by: {trigger_name}"),
839    }
840}
841
842// ── Public helper for event broadcasting ─────────────────────────────────────
843
844/// Broadcast a trigger-relevant event (task_completed / task_failed / webhook).
845/// Called from the daemon's event handling path.
846pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
847    // Ignore send errors (no subscribers = no triggers configured).
848    let _ = state.trigger_event_tx.send(payload);
849}
850
851/// Notify the trigger engine to reload its configuration.
852/// Safe to call from sync code. No-op if no engine is running.
853pub fn notify_trigger_reload(state: &InnerState) {
854    if let Ok(guard) = state.trigger_engine_handle.lock() {
855        if let Some(ref handle) = *guard {
856            let _ = handle.reload_sync();
857        }
858    }
859}
860
861#[cfg(test)]
862mod tests {
863    use super::*;
864    use chrono::Timelike;
865
866    #[test]
867    fn compute_next_fire_utc() {
868        let spec = TriggerCronConfig {
869            schedule: "0 0 2 * * *".to_string(), // daily at 02:00 (cron crate uses 6 fields)
870            timezone: None,
871        };
872        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
873            .unwrap()
874            .and_hms_opt(0, 0, 0)
875            .unwrap()
876            .and_utc();
877        let next = compute_next_fire(&spec, after).expect("should compute");
878        assert!(next > after);
879        assert_eq!(next.hour(), 2);
880    }
881
882    #[test]
883    fn compute_next_fire_with_timezone() {
884        let spec = TriggerCronConfig {
885            schedule: "0 0 2 * * *".to_string(),
886            timezone: Some("Asia/Shanghai".to_string()),
887        };
888        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
889            .unwrap()
890            .and_hms_opt(0, 0, 0)
891            .unwrap()
892            .and_utc();
893        let next = compute_next_fire(&spec, after).expect("should compute with tz");
894        assert!(next > after);
895        // 02:00 Shanghai = 18:00 UTC previous day
896        assert_eq!(next.hour(), 18);
897    }
898
899    #[test]
900    fn compute_next_fire_rejects_invalid_schedule() {
901        let spec = TriggerCronConfig {
902            schedule: "not a cron".to_string(),
903            timezone: None,
904        };
905        assert!(compute_next_fire(&spec, Utc::now()).is_err());
906    }
907
908    #[test]
909    fn compute_next_fire_rejects_invalid_timezone() {
910        let spec = TriggerCronConfig {
911            schedule: "0 0 2 * * *".to_string(),
912            timezone: Some("Invalid/TZ".to_string()),
913        };
914        assert!(compute_next_fire(&spec, Utc::now()).is_err());
915    }
916
917    #[test]
918    fn next_cron_sleep_empty_returns_1h() {
919        let d = next_cron_sleep(&[]);
920        assert_eq!(d, std::time::Duration::from_secs(3600));
921    }
922
923    #[test]
924    fn collect_due_triggers_finds_past_entries() {
925        let now = Utc::now();
926        let past = now - chrono::Duration::seconds(10);
927        let future = now + chrono::Duration::seconds(300);
928        let entries = vec![
929            CronEntry {
930                trigger_name: "past".to_string(),
931                project: "p".to_string(),
932                next_fire: past,
933            },
934            CronEntry {
935                trigger_name: "future".to_string(),
936                project: "p".to_string(),
937                next_fire: future,
938            },
939        ];
940        let due = collect_due_triggers(&entries, now);
941        assert_eq!(due.len(), 1);
942        assert_eq!(due[0].0, "past");
943    }
944}