Skip to main content

agent_orchestrator/
trigger_engine.rs

1//! Trigger engine: cron scheduler and event-driven task creation.
2//!
3//! The `TriggerEngine` runs as a long-lived tokio task inside `orchestratord`,
4//! watching for cron ticks and task-lifecycle events and creating tasks when
5//! trigger conditions are met.
6
7use crate::config::{TriggerConfig, TriggerCronConfig};
8use crate::dto::CreateTaskPayload;
9use crate::events::insert_event;
10use crate::state::InnerState;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Utc};
13use std::collections::HashSet;
14use std::sync::atomic::Ordering;
15use std::sync::Arc;
16use tokio::sync::mpsc;
17use tracing::{debug, error, info, warn};
18
19/// Cancels a running task for the Replace trigger policy.
20///
21/// This is a simplified version of `scheduler::stop_task_runtime` that avoids
22/// a dependency on the scheduler crate.
23async fn cancel_task_for_trigger(state: &InnerState, task_id: &str) -> Result<()> {
24    // Signal the in-process running task to stop, if present.
25    let runtime = {
26        let running = state.running.lock().await;
27        running.get(task_id).cloned()
28    };
29    if let Some(rt) = runtime {
30        rt.stop_flag.store(true, Ordering::SeqCst);
31    }
32    // Update task status to cancelled.
33    state
34        .db_writer
35        .set_task_status(task_id, "cancelled", false)
36        .await?;
37    insert_event(
38        state,
39        task_id,
40        None,
41        "task_control",
42        serde_json::json!({"status": "cancelled"}),
43    )
44    .await?;
45    Ok(())
46}
47
48// ── Public types ─────────────────────────────────────────────────────────────
49
50/// Payload broadcast when a task_completed or task_failed event fires.
51#[derive(Debug, Clone)]
52pub struct TriggerEventPayload {
53    /// Event type: "task_completed" or "task_failed".
54    pub event_type: String,
55    /// Source task ID.
56    pub task_id: String,
57}
58
59/// Notification sent to the engine when trigger configuration changes.
60#[derive(Debug)]
61pub enum TriggerReloadEvent {
62    /// Re-read all triggers from the current config snapshot.
63    Reload,
64}
65
66/// Handle used by the rest of the system to communicate with a running engine.
67#[derive(Clone)]
68pub struct TriggerEngineHandle {
69    reload_tx: mpsc::Sender<TriggerReloadEvent>,
70}
71
72impl TriggerEngineHandle {
73    /// Notify the engine to reload trigger configuration (async).
74    pub async fn reload(&self) {
75        let _ = self.reload_tx.send(TriggerReloadEvent::Reload).await;
76    }
77
78    /// Notify the engine to reload trigger configuration (sync-safe).
79    ///
80    /// Uses `try_send` so it can be called from synchronous code paths
81    /// like `apply_manifests`. Returns `true` if the notification was sent.
82    pub fn reload_sync(&self) -> bool {
83        self.reload_tx.try_send(TriggerReloadEvent::Reload).is_ok()
84    }
85}
86
87/// The trigger engine itself. Constructed via [`TriggerEngine::new`] and
88/// driven via [`TriggerEngine::run`].
89pub struct TriggerEngine {
90    state: Arc<InnerState>,
91    reload_rx: mpsc::Receiver<TriggerReloadEvent>,
92    trigger_event_rx: tokio::sync::broadcast::Receiver<TriggerEventPayload>,
93    /// Triggers that have been present for at least one config reload cycle.
94    /// A freshly-created trigger (from an agent `apply`) must survive one
95    /// reload before it is eligible to fire — this prevents agent-applied
96    /// triggers from immediately spawning parasitic tasks.
97    stabilized_triggers: HashSet<(String, String)>,
98}
99
100impl TriggerEngine {
101    /// Create a new engine and its control handle.
102    pub fn new(state: Arc<InnerState>) -> (Self, TriggerEngineHandle) {
103        let (reload_tx, reload_rx) = mpsc::channel(16);
104        let trigger_event_rx = state.trigger_event_tx.subscribe();
105        let engine = Self {
106            state,
107            reload_rx,
108            trigger_event_rx,
109            stabilized_triggers: HashSet::new(),
110        };
111        let handle = TriggerEngineHandle { reload_tx };
112        (engine, handle)
113    }
114
115    /// Main run loop. Returns when `shutdown_rx` fires.
116    pub async fn run(mut self, mut shutdown_rx: tokio::sync::watch::Receiver<bool>) {
117        info!("trigger engine started");
118
119        // Load initial trigger set.
120        let mut cron_schedule = self.build_cron_schedule();
121
122        loop {
123            let sleep_duration = next_cron_sleep(&cron_schedule);
124            let sleep_fut = tokio::time::sleep(sleep_duration);
125            tokio::pin!(sleep_fut);
126
127            tokio::select! {
128                // ── Cron tick ───────────────────────────────────────────
129                () = &mut sleep_fut => {
130                    let now = Utc::now();
131                    let fired = collect_due_triggers(&cron_schedule, now);
132                    for (trigger_name, project) in fired {
133                        self.fire_trigger(&trigger_name, &project).await;
134                    }
135                    // Recompute schedule after firing.
136                    cron_schedule = self.build_cron_schedule();
137                }
138
139                // ── Event trigger ───────────────────────────────────────
140                event_result = self.trigger_event_rx.recv() => {
141                    match event_result {
142                        Ok(payload) => {
143                            self.handle_event_trigger(&payload).await;
144                        }
145                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
146                            warn!(skipped = n, "trigger event receiver lagged");
147                        }
148                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
149                            debug!("trigger event channel closed");
150                            break;
151                        }
152                    }
153                }
154
155                // ── Config reload ───────────────────────────────────────
156                Some(_) = self.reload_rx.recv() => {
157                    info!("trigger engine: reloading configuration");
158                    cron_schedule = self.build_cron_schedule();
159                }
160
161                // ── Shutdown ────────────────────────────────────────────
162                _ = shutdown_rx.changed() => {
163                    info!("trigger engine shutting down");
164                    break;
165                }
166            }
167        }
168    }
169
170    // ── Cron helpers ─────────────────────────────────────────────────────────
171
172    fn build_cron_schedule(&mut self) -> Vec<CronEntry> {
173        let snap = self.state.config_runtime.load();
174        let config = &snap.active_config.config;
175        let mut entries = Vec::new();
176
177        // Collect the current set of triggers to update stabilization tracking.
178        let mut current_triggers: HashSet<(String, String)> = HashSet::new();
179        for (project_id, project) in &config.projects {
180            for name in project.triggers.keys() {
181                current_triggers.insert((project_id.clone(), name.clone()));
182            }
183        }
184
185        // Promote triggers that were already known from a prior cycle.
186        // Triggers seen for the first time in THIS reload are NOT yet stabilized
187        // and will only become eligible after the next reload.
188        let previously_known = std::mem::take(&mut self.stabilized_triggers);
189        self.stabilized_triggers = current_triggers.clone();
190
191        for (project_id, project) in &config.projects {
192            for (name, trigger) in &project.triggers {
193                if trigger.suspend {
194                    continue;
195                }
196                // Skip triggers that are new (not in the previous cycle's set).
197                if !previously_known.contains(&(project_id.clone(), name.clone())) {
198                    debug!(
199                        trigger = name.as_str(),
200                        project = project_id.as_str(),
201                        "trigger not yet stabilized, skipping cron schedule"
202                    );
203                    continue;
204                }
205                if let Some(ref cron_spec) = trigger.cron {
206                    match compute_next_fire(cron_spec, Utc::now()) {
207                        Ok(next) => {
208                            entries.push(CronEntry {
209                                trigger_name: name.clone(),
210                                project: project_id.clone(),
211                                next_fire: next,
212                            });
213                        }
214                        Err(e) => {
215                            warn!(
216                                trigger = name.as_str(),
217                                project = project_id.as_str(),
218                                error = %e,
219                                "failed to compute next fire time"
220                            );
221                        }
222                    }
223                }
224            }
225        }
226        entries
227    }
228
229    // ── Event trigger matching ───────────────────────────────────────────────
230
231    async fn handle_event_trigger(&self, payload: &TriggerEventPayload) {
232        // Resolve the source task's workflow from the database (the payload only
233        // carries event_type + task_id to keep the broadcast lightweight).
234        let source_workflow = self.lookup_task_workflow(&payload.task_id).await;
235
236        let snap = self.state.config_runtime.load();
237        let config = &snap.active_config.config;
238
239        for (project_id, project) in &config.projects {
240            for (name, trigger) in &project.triggers {
241                if trigger.suspend {
242                    continue;
243                }
244                // Skip triggers not yet stabilized (first seen in most recent reload).
245                if !self
246                    .stabilized_triggers
247                    .contains(&(project_id.clone(), name.clone()))
248                {
249                    continue;
250                }
251                if let Some(ref event_spec) = trigger.event {
252                    // Match event source type.
253                    if event_spec.source != payload.event_type {
254                        continue;
255                    }
256                    // Match optional workflow filter.
257                    if let Some(ref filter) = event_spec.filter {
258                        if let Some(ref filter_wf) = filter.workflow {
259                            match source_workflow {
260                                Some(ref sw) if sw == filter_wf => {}
261                                _ => continue,
262                            }
263                        }
264                        // CEL condition evaluation — future extension.
265                        // For now, if a condition is set, log a warning and skip.
266                        if filter.condition.is_some() {
267                            debug!(
268                                trigger = name.as_str(),
269                                "CEL condition evaluation not yet implemented, skipping"
270                            );
271                            continue;
272                        }
273                    }
274
275                    info!(
276                        trigger = name.as_str(),
277                        project = project_id.as_str(),
278                        event_type = payload.event_type.as_str(),
279                        source_task = payload.task_id.as_str(),
280                        "event trigger matched"
281                    );
282                    self.fire_trigger_with_config(name, project_id, trigger)
283                        .await;
284                }
285            }
286        }
287    }
288
289    // ── Fire logic ───────────────────────────────────────────────────────────
290
291    async fn fire_trigger(&self, trigger_name: &str, project: &str) {
292        let snap = self.state.config_runtime.load();
293        let config = &snap.active_config.config;
294
295        let trigger = config
296            .projects
297            .get(project)
298            .and_then(|p| p.triggers.get(trigger_name));
299
300        let Some(trigger) = trigger else {
301            warn!(trigger = trigger_name, "trigger not found in config");
302            return;
303        };
304
305        self.fire_trigger_with_config(trigger_name, project, trigger)
306            .await;
307    }
308
309    async fn fire_trigger_with_config(
310        &self,
311        trigger_name: &str,
312        project: &str,
313        trigger: &TriggerConfig,
314    ) {
315        // ── Suspend check ────────────────────────────────────────────────
316        if trigger.suspend {
317            self.emit_trigger_event(trigger_name, "trigger_skipped", "suspended");
318            return;
319        }
320
321        // ── Throttle check (event triggers only) ─────────────────────────
322        if let Some(ref throttle) = trigger.throttle {
323            if throttle.min_interval > 0 {
324                if let Some(last) = self.load_last_fired(trigger_name, project).await {
325                    let elapsed = (Utc::now() - last).num_seconds();
326                    if elapsed >= 0 && (elapsed as u64) < throttle.min_interval {
327                        self.emit_trigger_event(trigger_name, "trigger_skipped", "throttled");
328                        return;
329                    }
330                }
331            }
332        }
333
334        // ── Concurrency policy ───────────────────────────────────────────
335        match trigger.concurrency_policy {
336            crate::cli_types::ConcurrencyPolicy::Forbid => {
337                if self.has_active_task(trigger_name, project).await {
338                    self.emit_trigger_event(
339                        trigger_name,
340                        "trigger_skipped",
341                        "concurrent_task_active",
342                    );
343                    return;
344                }
345            }
346            crate::cli_types::ConcurrencyPolicy::Replace => {
347                // Cancel active tasks created by this trigger before creating a new one.
348                self.cancel_active_tasks(trigger_name, project).await;
349            }
350            crate::cli_types::ConcurrencyPolicy::Allow => {}
351        }
352
353        // ── Create task ──────────────────────────────────────────────────
354        let target_files = trigger
355            .action
356            .args
357            .as_ref()
358            .and_then(|a| a.get("target-file"))
359            .cloned();
360
361        let task_name = format!("trigger-{trigger_name}");
362
363        let payload = CreateTaskPayload {
364            name: Some(task_name),
365            goal: Some(format!("Triggered by: {trigger_name}")),
366            project_id: Some(project.to_string()),
367            workspace_id: Some(trigger.action.workspace.clone()),
368            workflow_id: Some(trigger.action.workflow.clone()),
369            target_files,
370            parent_task_id: None,
371            spawn_reason: None,
372        };
373
374        match crate::task_ops::create_task_as_service(&self.state, payload) {
375            Ok(summary) => {
376                let task_id = summary.id.clone();
377                info!(
378                    trigger = trigger_name,
379                    task_id = task_id.as_str(),
380                    "trigger fired: task created"
381                );
382
383                // Update trigger state.
384                self.update_trigger_state(trigger_name, project, &task_id, "created")
385                    .await;
386
387                // Emit event.
388                self.state.emit_event(
389                    &task_id,
390                    None,
391                    "trigger_fired",
392                    serde_json::json!({
393                        "trigger": trigger_name,
394                        "source": if trigger.cron.is_some() { "cron" } else { "event" },
395                        "task_id": task_id,
396                    }),
397                );
398
399                // Start the task if action.start is true.
400                if trigger.action.start {
401                    let state = self.state.clone();
402                    let tid = task_id.clone();
403                    tokio::spawn(async move {
404                        if let Err(e) =
405                            crate::scheduler_service::enqueue_task_as_service(&state, &tid).await
406                        {
407                            error!(task_id = tid.as_str(), error = %e, "failed to enqueue triggered task");
408                        } else {
409                            state.worker_notify.notify_one();
410                        }
411                    });
412                }
413
414                // History limit cleanup (best-effort, async).
415                if trigger.history_limit.is_some() {
416                    let state = self.state.clone();
417                    let name = trigger_name.to_string();
418                    let proj = project.to_string();
419                    let limit = trigger.history_limit.clone();
420                    tokio::spawn(async move {
421                        if let Err(e) = cleanup_history(&state, &name, &proj, limit.as_ref()).await
422                        {
423                            debug!(trigger = name.as_str(), error = %e, "history cleanup failed");
424                        }
425                    });
426                }
427            }
428            Err(e) => {
429                error!(
430                    trigger = trigger_name,
431                    error = %e,
432                    "trigger failed to create task"
433                );
434                self.update_trigger_state(trigger_name, project, "", "failed_to_create")
435                    .await;
436                self.state.emit_event(
437                    "",
438                    None,
439                    "trigger_error",
440                    serde_json::json!({
441                        "trigger": trigger_name,
442                        "error": e.to_string(),
443                    }),
444                );
445            }
446        }
447    }
448
449    // ── DB helpers ───────────────────────────────────────────────────────────
450
451    /// Look up the workflow_id for a task from the database.
452    async fn lookup_task_workflow(&self, task_id: &str) -> Option<String> {
453        let tid = task_id.to_owned();
454        let result = self
455            .state
456            .async_database
457            .reader()
458            .call(move |conn| {
459                let wf: Option<String> = conn
460                    .query_row(
461                        "SELECT workflow_id FROM tasks WHERE id = ?1",
462                        rusqlite::params![tid],
463                        |row| row.get(0),
464                    )
465                    .ok();
466                Ok(wf)
467            })
468            .await;
469        match result {
470            Ok(wf) => wf,
471            Err(e) => {
472                debug!(task_id, error = %e, "failed to look up task workflow");
473                None
474            }
475        }
476    }
477
478    async fn load_last_fired(&self, trigger_name: &str, project: &str) -> Option<DateTime<Utc>> {
479        let name = trigger_name.to_owned();
480        let proj = project.to_owned();
481        let result = self
482            .state
483            .async_database
484            .reader()
485            .call(move |conn| {
486                let mut stmt = conn
487                    .prepare(
488                        "SELECT last_fired_at FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
489                    )
490                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
491                let ts: Option<String> = stmt
492                    .query_row(rusqlite::params![name, proj], |row| row.get(0))
493                    .ok();
494                Ok(ts)
495            })
496            .await;
497
498        match result {
499            Ok(Some(ts)) => ts.parse::<DateTime<Utc>>().ok(),
500            _ => None,
501        }
502    }
503
504    async fn has_active_task(&self, trigger_name: &str, project: &str) -> bool {
505        let name = trigger_name.to_owned();
506        let proj = project.to_owned();
507        let result = self
508            .state
509            .async_database
510            .reader()
511            .call(move |conn| {
512                let last_task_id: Option<String> = conn
513                    .query_row(
514                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
515                        rusqlite::params![name, proj],
516                        |row| row.get(0),
517                    )
518                    .ok()
519                    .flatten();
520
521                if let Some(ref tid) = last_task_id {
522                    let status: Option<String> = conn
523                        .query_row(
524                            "SELECT status FROM tasks WHERE id = ?1",
525                            rusqlite::params![tid],
526                            |row| row.get(0),
527                        )
528                        .ok();
529                    if let Some(s) = status {
530                        return Ok(matches!(
531                            s.as_str(),
532                            "created" | "pending" | "running" | "restart_pending"
533                        ));
534                    }
535                }
536                Ok(false)
537            })
538            .await;
539
540        result.unwrap_or(false)
541    }
542
543    async fn cancel_active_tasks(&self, trigger_name: &str, project: &str) {
544        let name = trigger_name.to_owned();
545        let proj = project.to_owned();
546        let state = self.state.clone();
547        let result = state
548            .async_database
549            .reader()
550            .call(move |conn| {
551                let tid: Option<String> = conn
552                    .query_row(
553                        "SELECT last_task_id FROM trigger_state WHERE trigger_name = ?1 AND project = ?2",
554                        rusqlite::params![name, proj],
555                        |row| row.get(0),
556                    )
557                    .ok()
558                    .flatten();
559                Ok(tid)
560            })
561            .await;
562
563        if let Ok(Some(task_id)) = result {
564            if let Err(e) = cancel_task_for_trigger(&self.state, &task_id).await {
565                warn!(
566                    trigger = trigger_name,
567                    task_id = task_id.as_str(),
568                    error = %e,
569                    "failed to cancel active task for Replace policy"
570                );
571            }
572        }
573    }
574
575    async fn update_trigger_state(
576        &self,
577        trigger_name: &str,
578        project: &str,
579        task_id: &str,
580        status: &str,
581    ) {
582        let name = trigger_name.to_owned();
583        let proj = project.to_owned();
584        let tid = task_id.to_owned();
585        let st = status.to_owned();
586        let now = Utc::now().to_rfc3339();
587        let now2 = now.clone();
588
589        if let Err(e) = self
590            .state
591            .async_database
592            .writer()
593            .call(move |conn| {
594                conn.execute(
595                    "INSERT INTO trigger_state (trigger_name, project, last_fired_at, fire_count, last_task_id, last_status, created_at, updated_at)
596                     VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?7)
597                     ON CONFLICT(trigger_name, project) DO UPDATE SET
598                       last_fired_at = ?3,
599                       fire_count = fire_count + 1,
600                       last_task_id = ?4,
601                       last_status = ?5,
602                       updated_at = ?7",
603                    rusqlite::params![name, proj, now, tid, st, now2, now2],
604                )
605                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
606                Ok(())
607            })
608            .await
609        {
610            warn!(trigger = trigger_name, error = %e, "failed to update trigger_state");
611        }
612    }
613
614    fn emit_trigger_event(&self, trigger_name: &str, event_type: &str, reason: &str) {
615        debug!(trigger = trigger_name, event_type, reason, "trigger event");
616        self.state.emit_event(
617            "",
618            None,
619            event_type,
620            serde_json::json!({
621                "trigger": trigger_name,
622                "reason": reason,
623            }),
624        );
625    }
626}
627
628// ── Cron schedule helpers ────────────────────────────────────────────────────
629
630struct CronEntry {
631    trigger_name: String,
632    project: String,
633    next_fire: DateTime<Utc>,
634}
635
636fn compute_next_fire(spec: &TriggerCronConfig, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
637    use cron::Schedule;
638    use std::str::FromStr;
639
640    let schedule = Schedule::from_str(&spec.schedule)
641        .with_context(|| format!("invalid cron expression: {}", spec.schedule))?;
642
643    // If a timezone is specified, compute in that timezone, then convert back to UTC.
644    if let Some(ref tz_name) = spec.timezone {
645        let tz: chrono_tz::Tz = tz_name
646            .parse()
647            .map_err(|_| anyhow::anyhow!("invalid timezone: {tz_name}"))?;
648        let local_after = after.with_timezone(&tz);
649        let next = schedule
650            .after(&local_after)
651            .next()
652            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
653        Ok(next.with_timezone(&Utc))
654    } else {
655        let next = schedule
656            .after(&after)
657            .next()
658            .ok_or_else(|| anyhow::anyhow!("no next fire time for schedule"))?;
659        Ok(next.with_timezone(&Utc))
660    }
661}
662
663fn next_cron_sleep(entries: &[CronEntry]) -> std::time::Duration {
664    let now = Utc::now();
665    entries
666        .iter()
667        .map(|e| {
668            let diff = e.next_fire.signed_duration_since(now);
669            if diff.num_milliseconds() <= 0 {
670                std::time::Duration::from_millis(100)
671            } else {
672                std::time::Duration::from_millis(diff.num_milliseconds() as u64)
673            }
674        })
675        .min()
676        // If no cron triggers, sleep for a long time (until event or reload wakes us).
677        .unwrap_or(std::time::Duration::from_secs(3600))
678}
679
680fn collect_due_triggers(entries: &[CronEntry], now: DateTime<Utc>) -> Vec<(String, String)> {
681    entries
682        .iter()
683        .filter(|e| e.next_fire <= now)
684        .map(|e| (e.trigger_name.clone(), e.project.clone()))
685        .collect()
686}
687
688async fn cleanup_history(
689    state: &InnerState,
690    trigger_name: &str,
691    project: &str,
692    limit: Option<&crate::config::TriggerHistoryLimitConfig>,
693) -> Result<()> {
694    let limit = match limit {
695        Some(l) => l,
696        None => return Ok(()),
697    };
698
699    let task_name_pattern = format!("trigger-{trigger_name}");
700    let proj = project.to_owned();
701
702    // For each status category, collect IDs of tasks beyond the retention limit.
703    let mut ids_to_delete: Vec<String> = Vec::new();
704
705    if let Some(max_successful) = limit.successful {
706        let pattern = task_name_pattern.clone();
707        let p = proj.clone();
708        let max = max_successful as usize;
709        let ids = state
710            .async_database
711            .reader()
712            .call(move |conn| {
713                let mut stmt = conn
714                    .prepare(
715                        "SELECT id FROM tasks \
716                         WHERE name = ?1 AND project_id = ?2 AND status = 'completed' \
717                         ORDER BY created_at DESC",
718                    )
719                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
720                let rows = stmt
721                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
722                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
723                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
724                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
725            })
726            .await
727            .context("query completed tasks for history cleanup")?;
728        ids_to_delete.extend(ids);
729    }
730
731    if let Some(max_failed) = limit.failed {
732        let pattern = task_name_pattern.clone();
733        let p = proj.clone();
734        let max = max_failed as usize;
735        let ids = state
736            .async_database
737            .reader()
738            .call(move |conn| {
739                let mut stmt = conn
740                    .prepare(
741                        "SELECT id FROM tasks \
742                         WHERE name = ?1 AND project_id = ?2 AND status = 'failed' \
743                         ORDER BY created_at DESC",
744                    )
745                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
746                let rows = stmt
747                    .query_map(rusqlite::params![pattern, p], |row| row.get::<_, String>(0))
748                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
749                let all: Vec<String> = rows.filter_map(|r| r.ok()).collect();
750                Ok(all.into_iter().skip(max).collect::<Vec<String>>())
751            })
752            .await
753            .context("query failed tasks for history cleanup")?;
754        ids_to_delete.extend(ids);
755    }
756
757    if ids_to_delete.is_empty() {
758        return Ok(());
759    }
760
761    state
762        .async_database
763        .writer()
764        .call(move |conn| {
765            let placeholders: Vec<String> =
766                (1..=ids_to_delete.len()).map(|i| format!("?{i}")).collect();
767            let sql = format!(
768                "DELETE FROM tasks WHERE id IN ({})",
769                placeholders.join(", ")
770            );
771            let params: Vec<Box<dyn rusqlite::types::ToSql>> = ids_to_delete
772                .iter()
773                .map(|id| Box::new(id.clone()) as Box<dyn rusqlite::types::ToSql>)
774                .collect();
775            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
776                params.iter().map(|p| p.as_ref()).collect();
777            conn.execute(&sql, param_refs.as_slice())
778                .map_err(|e| tokio_rusqlite::Error::Other(e.into()))?;
779            Ok(())
780        })
781        .await
782        .context("delete excess trigger history tasks")?;
783
784    Ok(())
785}
786
787// ── Public helper for event broadcasting ─────────────────────────────────────
788
789/// Broadcast a trigger-relevant event (task_completed / task_failed).
790/// Called from the daemon's event handling path.
791pub fn broadcast_task_event(state: &InnerState, payload: TriggerEventPayload) {
792    // Ignore send errors (no subscribers = no triggers configured).
793    let _ = state.trigger_event_tx.send(payload);
794}
795
796/// Notify the trigger engine to reload its configuration.
797/// Safe to call from sync code. No-op if no engine is running.
798pub fn notify_trigger_reload(state: &InnerState) {
799    if let Ok(guard) = state.trigger_engine_handle.lock() {
800        if let Some(ref handle) = *guard {
801            let _ = handle.reload_sync();
802        }
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809    use chrono::Timelike;
810
811    #[test]
812    fn compute_next_fire_utc() {
813        let spec = TriggerCronConfig {
814            schedule: "0 0 2 * * *".to_string(), // daily at 02:00 (cron crate uses 6 fields)
815            timezone: None,
816        };
817        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
818            .unwrap()
819            .and_hms_opt(0, 0, 0)
820            .unwrap()
821            .and_utc();
822        let next = compute_next_fire(&spec, after).expect("should compute");
823        assert!(next > after);
824        assert_eq!(next.hour(), 2);
825    }
826
827    #[test]
828    fn compute_next_fire_with_timezone() {
829        let spec = TriggerCronConfig {
830            schedule: "0 0 2 * * *".to_string(),
831            timezone: Some("Asia/Shanghai".to_string()),
832        };
833        let after = chrono::NaiveDate::from_ymd_opt(2026, 1, 1)
834            .unwrap()
835            .and_hms_opt(0, 0, 0)
836            .unwrap()
837            .and_utc();
838        let next = compute_next_fire(&spec, after).expect("should compute with tz");
839        assert!(next > after);
840        // 02:00 Shanghai = 18:00 UTC previous day
841        assert_eq!(next.hour(), 18);
842    }
843
844    #[test]
845    fn compute_next_fire_rejects_invalid_schedule() {
846        let spec = TriggerCronConfig {
847            schedule: "not a cron".to_string(),
848            timezone: None,
849        };
850        assert!(compute_next_fire(&spec, Utc::now()).is_err());
851    }
852
853    #[test]
854    fn compute_next_fire_rejects_invalid_timezone() {
855        let spec = TriggerCronConfig {
856            schedule: "0 0 2 * * *".to_string(),
857            timezone: Some("Invalid/TZ".to_string()),
858        };
859        assert!(compute_next_fire(&spec, Utc::now()).is_err());
860    }
861
862    #[test]
863    fn next_cron_sleep_empty_returns_1h() {
864        let d = next_cron_sleep(&[]);
865        assert_eq!(d, std::time::Duration::from_secs(3600));
866    }
867
868    #[test]
869    fn collect_due_triggers_finds_past_entries() {
870        let now = Utc::now();
871        let past = now - chrono::Duration::seconds(10);
872        let future = now + chrono::Duration::seconds(300);
873        let entries = vec![
874            CronEntry {
875                trigger_name: "past".to_string(),
876                project: "p".to_string(),
877                next_fire: past,
878            },
879            CronEntry {
880                trigger_name: "future".to_string(),
881                project: "p".to_string(),
882                next_fire: future,
883            },
884        ];
885        let due = collect_due_triggers(&entries, now);
886        assert_eq!(due.len(), 1);
887        assert_eq!(due[0].0, "past");
888    }
889}