Skip to main content

construct/sop/
dispatch.rs

1//! Unified SOP event dispatch helpers.
2//!
3//! All event sources (MQTT, webhook, cron, peripheral) route through
4//! `dispatch_sop_event` so that locking, audit, and health bookkeeping
5//! happen in exactly one place.
6
7use std::sync::{Arc, Mutex};
8
9use tracing::{debug, info, warn};
10
11use super::audit::SopAuditLogger;
12use super::engine::{SopEngine, now_iso8601};
13use super::types::{SopEvent, SopRun, SopRunAction, SopTriggerSource};
14
15// ── Dispatch result ─────────────────────────────────────────────
16
17/// Outcome of attempting to dispatch an event to the SOP engine.
18#[derive(Debug, Clone)]
19pub enum DispatchResult {
20    /// A new SOP run was started. `action` carries the next step the runtime
21    /// must execute (or wait for approval on). Callers that cannot act on the
22    /// action (e.g. headless fan-in) must still audit/log it — never silently
23    /// drop.
24    Started {
25        run_id: String,
26        sop_name: String,
27        action: Box<SopRunAction>,
28    },
29    /// A matching SOP was found but could not start (cooldown / concurrency).
30    Skipped { sop_name: String, reason: String },
31    /// No loaded SOP matched the event.
32    NoMatch,
33}
34
35// ── Action helpers ──────────────────────────────────────────────
36
37/// Extract the `run_id` from any `SopRunAction` variant.
38fn extract_run_id_from_action(action: &SopRunAction) -> &str {
39    match action {
40        SopRunAction::ExecuteStep { run_id, .. }
41        | SopRunAction::WaitApproval { run_id, .. }
42        | SopRunAction::DeterministicStep { run_id, .. }
43        | SopRunAction::CheckpointWait { run_id, .. }
44        | SopRunAction::Completed { run_id, .. }
45        | SopRunAction::Failed { run_id, .. } => run_id,
46    }
47}
48
49/// Short label for logging which action was returned.
50fn action_label(action: &SopRunAction) -> &'static str {
51    match action {
52        SopRunAction::ExecuteStep { .. } => "ExecuteStep",
53        SopRunAction::WaitApproval { .. } => "WaitApproval",
54        SopRunAction::DeterministicStep { .. } => "DeterministicStep",
55        SopRunAction::CheckpointWait { .. } => "CheckpointWait",
56        SopRunAction::Completed { .. } => "Completed",
57        SopRunAction::Failed { .. } => "Failed",
58    }
59}
60
61// ── Core dispatch ───────────────────────────────────────────────
62
63/// Dispatch an incoming event to the SOP engine.
64///
65/// Pattern (batch lock — exactly 2 acquisitions):
66/// 1. Lock → `match_trigger` → collect SOP names → drop lock
67/// 2. Lock → for each name: `start_run` → collect results → drop lock
68/// 3. Async (no lock): audit each started run
69pub async fn dispatch_sop_event(
70    engine: &Arc<Mutex<SopEngine>>,
71    audit: &SopAuditLogger,
72    event: SopEvent,
73) -> Vec<DispatchResult> {
74    // Phase 1: match
75    let matched_names: Vec<String> = match engine.lock() {
76        Ok(eng) => eng
77            .match_trigger(&event)
78            .iter()
79            .map(|s| s.name.clone())
80            .collect(),
81        Err(e) => {
82            crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
83            warn!("SOP dispatch: engine lock poisoned during match phase: {e}");
84            return vec![];
85        }
86    };
87
88    if matched_names.is_empty() {
89        debug!("SOP dispatch: no match for event");
90        return vec![DispatchResult::NoMatch];
91    }
92
93    info!(
94        "SOP dispatch: {} SOP(s) matched: {:?}",
95        matched_names.len(),
96        matched_names
97    );
98
99    // Phase 2: start runs
100    let mut results = Vec::new();
101    let mut started_runs: Vec<SopRun> = Vec::new();
102
103    {
104        let mut eng = match engine.lock() {
105            Ok(e) => e,
106            Err(e) => {
107                crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
108                warn!("SOP dispatch: engine lock poisoned during start phase: {e}");
109                return vec![];
110            }
111        };
112
113        for sop_name in &matched_names {
114            match eng.start_run(sop_name, event.clone()) {
115                Ok(action) => {
116                    // Extract run_id from the action (authoritative source)
117                    let run_id = extract_run_id_from_action(&action).to_string();
118                    // Snapshot the run for audit (must be done under lock)
119                    if let Some(run) = eng.active_runs().get(&run_id) {
120                        started_runs.push(run.clone());
121                    }
122                    info!(
123                        "SOP dispatch: started '{}' run {run_id} (action: {})",
124                        sop_name,
125                        action_label(&action),
126                    );
127                    results.push(DispatchResult::Started {
128                        run_id,
129                        sop_name: sop_name.clone(),
130                        action: Box::new(action),
131                    });
132                }
133                Err(e) => {
134                    info!("SOP dispatch: skipped '{}': {e}", sop_name);
135                    results.push(DispatchResult::Skipped {
136                        sop_name: sop_name.clone(),
137                        reason: e.to_string(),
138                    });
139                }
140            }
141        }
142    } // lock dropped
143
144    // Phase 3: audit (async, no lock)
145    for run in &started_runs {
146        if let Err(e) = audit.log_run_start(run).await {
147            warn!("SOP dispatch: audit log failed for run {}: {e}", run.run_id);
148        }
149    }
150
151    crate::health::mark_component_ok("sop_dispatch");
152    results
153}
154
155// ── Headless result processing ──────────────────────────────────
156
157/// Process dispatch results in headless (non-agent-loop) callers.
158///
159/// This handles audit and logging for fan-in callers (MQTT, webhook, cron)
160/// that cannot execute SOP steps interactively. For `WaitApproval` actions,
161/// approval timeout polling in the scheduler handles progression.
162/// For `ExecuteStep` actions, the run is started in the engine but steps
163/// cannot be executed without an agent loop — this is logged as a warning.
164pub fn process_headless_results(results: &[DispatchResult]) {
165    for result in results {
166        match result {
167            DispatchResult::Started {
168                run_id,
169                sop_name,
170                action,
171            } => match action.as_ref() {
172                SopRunAction::ExecuteStep { step, .. } => {
173                    warn!(
174                        "SOP headless dispatch: run {run_id} ('{sop_name}') ready for step {} \
175                         '{}' but no agent loop available to execute",
176                        step.number, step.title,
177                    );
178                }
179                SopRunAction::WaitApproval { step, .. } => {
180                    info!(
181                        "SOP headless dispatch: run {run_id} ('{sop_name}') waiting for approval \
182                         on step {} '{}'. Timeout polling will handle progression",
183                        step.number, step.title,
184                    );
185                }
186                SopRunAction::DeterministicStep { step, .. } => {
187                    info!(
188                        "SOP headless dispatch: run {run_id} ('{sop_name}') deterministic step {} \
189                         '{}'",
190                        step.number, step.title,
191                    );
192                }
193                SopRunAction::CheckpointWait {
194                    step, state_file, ..
195                } => {
196                    info!(
197                        "SOP headless dispatch: run {run_id} ('{sop_name}') checkpoint at step {} \
198                         '{}', state persisted to {}",
199                        step.number,
200                        step.title,
201                        state_file.display(),
202                    );
203                }
204                SopRunAction::Completed { .. } => {
205                    info!(
206                        "SOP headless dispatch: run {run_id} ('{sop_name}') completed immediately"
207                    );
208                }
209                SopRunAction::Failed { reason, .. } => {
210                    warn!("SOP headless dispatch: run {run_id} ('{sop_name}') failed: {reason}");
211                }
212            },
213            DispatchResult::Skipped { sop_name, reason } => {
214                info!("SOP headless dispatch: skipped '{sop_name}': {reason}");
215            }
216            DispatchResult::NoMatch => {}
217        }
218    }
219}
220
221// ── Peripheral signal helper ────────────────────────────────────
222
223/// Convenience wrapper for peripheral hardware callbacks.
224///
225/// Builds a `SopEvent` with source `Peripheral` and topic `"{board}/{signal}"`
226/// then dispatches it through the standard path.
227pub async fn dispatch_peripheral_signal(
228    engine: &Arc<Mutex<SopEngine>>,
229    audit: &SopAuditLogger,
230    board: &str,
231    signal: &str,
232    payload: Option<&str>,
233) -> Vec<DispatchResult> {
234    let event = SopEvent {
235        source: SopTriggerSource::Peripheral,
236        topic: Some(format!("{board}/{signal}")),
237        payload: payload.map(String::from),
238        timestamp: now_iso8601(),
239    };
240    dispatch_sop_event(engine, audit, event).await
241}
242
243// ── Cron SOP cache + check ──────────────────────────────────────
244
245/// Pre-parsed cron schedules for SOP triggers.
246///
247/// Built once at daemon startup to avoid re-parsing cron expressions
248/// on every scheduler tick.
249#[derive(Clone)]
250pub struct SopCronCache {
251    /// (sop_name, raw_expression, parsed_schedule)
252    schedules: Vec<(String, String, cron::Schedule)>,
253}
254
255impl SopCronCache {
256    /// Build cache from the current engine state.
257    ///
258    /// Locks the engine once, iterates SOPs, parses Cron trigger expressions.
259    /// Invalid expressions are logged and skipped (fail-closed).
260    pub fn from_engine(engine: &Arc<Mutex<SopEngine>>) -> Self {
261        let mut schedules = Vec::new();
262        let eng = match engine.lock() {
263            Ok(e) => e,
264            Err(e) => {
265                warn!("SopCronCache: engine lock poisoned: {e}");
266                return Self { schedules };
267            }
268        };
269
270        for sop in eng.sops() {
271            for trigger in &sop.triggers {
272                if let super::types::SopTrigger::Cron { expression } = trigger {
273                    // Normalize 5-field crontab to 6-field (prepend seconds)
274                    let normalized = match crate::cron::normalize_expression(expression) {
275                        Ok(n) => n,
276                        Err(e) => {
277                            warn!(
278                                "SopCronCache: invalid cron expression '{}' in SOP '{}': {e}",
279                                expression, sop.name
280                            );
281                            continue;
282                        }
283                    };
284                    match normalized.parse::<cron::Schedule>() {
285                        Ok(schedule) => {
286                            schedules.push((sop.name.clone(), expression.clone(), schedule));
287                        }
288                        Err(e) => {
289                            warn!(
290                                "SopCronCache: failed to parse cron schedule '{}' for SOP '{}': {e}",
291                                normalized, sop.name
292                            );
293                        }
294                    }
295                }
296            }
297        }
298
299        info!("SopCronCache: cached {} cron schedule(s)", schedules.len());
300        Self { schedules }
301    }
302
303    /// Return the cached schedules (for testing).
304    #[cfg(test)]
305    pub fn schedules(&self) -> &[(String, String, cron::Schedule)] {
306        &self.schedules
307    }
308}
309
310/// Check all cached cron SOP triggers for firings in the window
311/// `(last_check, now]` and dispatch events for each.
312///
313/// Uses window-based evaluation so ticks between polls are never missed.
314pub async fn check_sop_cron_triggers(
315    engine: &Arc<Mutex<SopEngine>>,
316    audit: &SopAuditLogger,
317    cache: &SopCronCache,
318    last_check: &mut chrono::DateTime<chrono::Utc>,
319) -> Vec<DispatchResult> {
320    let now = chrono::Utc::now();
321    let mut all_results = Vec::new();
322
323    for (_sop_name, expression, schedule) in &cache.schedules {
324        // Check if any occurrence fell in the window (last_check, now].
325        // At-most-once semantics: even if multiple ticks of the same expression
326        // fell in the window (e.g., scheduler delayed), we fire only once.
327        // This is intentional — SOP triggers should not retroactively batch-fire.
328        let mut upcoming = schedule.after(last_check);
329        if let Some(next) = upcoming.next() {
330            if next <= now {
331                // This expression fired in the window
332                let event = SopEvent {
333                    source: SopTriggerSource::Cron,
334                    topic: Some(expression.clone()),
335                    payload: None,
336                    timestamp: now_iso8601(),
337                };
338                let results = dispatch_sop_event(engine, audit, event).await;
339                all_results.extend(results);
340            }
341        }
342    }
343
344    *last_check = now;
345    all_results
346}
347
348// ── Tests ───────────────────────────────────────────────────────
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use crate::config::SopConfig;
354    use crate::memory::traits::Memory;
355    use crate::sop::types::{
356        Sop, SopExecutionMode, SopPriority, SopRunAction, SopStep, SopTrigger, SopTriggerSource,
357    };
358
359    fn test_sop(name: &str, triggers: Vec<SopTrigger>) -> Sop {
360        Sop {
361            name: name.into(),
362            description: format!("Test SOP: {name}"),
363            version: "1.0.0".into(),
364            priority: SopPriority::Normal,
365            execution_mode: SopExecutionMode::Auto,
366            triggers,
367            steps: vec![SopStep {
368                number: 1,
369                title: "Step one".into(),
370                body: "Do step one".into(),
371                suggested_tools: vec![],
372                requires_confirmation: false,
373                kind: crate::sop::SopStepKind::default(),
374                schema: None,
375            }],
376            cooldown_secs: 0,
377            max_concurrent: 2,
378            location: None,
379            deterministic: false,
380        }
381    }
382
383    fn test_engine(sops: Vec<Sop>) -> Arc<Mutex<SopEngine>> {
384        let mut engine = SopEngine::new(SopConfig::default());
385        engine.set_sops_for_test(sops);
386        Arc::new(Mutex::new(engine))
387    }
388
389    fn test_audit() -> SopAuditLogger {
390        let memory: Arc<dyn Memory> = Arc::new(crate::memory::test_memory::TestMemory::new());
391        SopAuditLogger::new(memory)
392    }
393
394    #[tokio::test]
395    async fn dispatch_starts_matching_sop() {
396        let engine = test_engine(vec![test_sop(
397            "mqtt-sop",
398            vec![SopTrigger::Mqtt {
399                topic: "sensors/temp".into(),
400                condition: None,
401            }],
402        )]);
403        let audit = test_audit();
404
405        let event = SopEvent {
406            source: SopTriggerSource::Mqtt,
407            topic: Some("sensors/temp".into()),
408            payload: Some(r#"{"value": 42}"#.into()),
409            timestamp: now_iso8601(),
410        };
411
412        let results = dispatch_sop_event(&engine, &audit, event).await;
413        assert_eq!(results.len(), 1);
414        assert!(
415            matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }))
416        );
417    }
418
419    #[tokio::test]
420    async fn dispatch_skips_when_cooldown_active() {
421        let mut sop = test_sop("cooldown-sop", vec![SopTrigger::Manual]);
422        sop.cooldown_secs = 3600;
423        sop.max_concurrent = 1;
424        let engine = test_engine(vec![sop]);
425        let audit = test_audit();
426
427        // Start a run manually so that completing it will trigger cooldown
428        {
429            let mut eng = engine.lock().unwrap();
430            let _action = eng
431                .start_run(
432                    "cooldown-sop",
433                    SopEvent {
434                        source: SopTriggerSource::Manual,
435                        topic: None,
436                        payload: None,
437                        timestamp: now_iso8601(),
438                    },
439                )
440                .unwrap();
441            // Complete the run
442            let run_id = eng.active_runs().keys().next().unwrap().clone();
443            eng.advance_step(
444                &run_id,
445                crate::sop::types::SopStepResult {
446                    step_number: 1,
447                    status: crate::sop::types::SopStepStatus::Completed,
448                    output: "done".into(),
449                    started_at: now_iso8601(),
450                    completed_at: Some(now_iso8601()),
451                },
452            )
453            .unwrap();
454        }
455
456        // Now dispatch — should skip due to cooldown
457        let event = SopEvent {
458            source: SopTriggerSource::Manual,
459            topic: None,
460            payload: None,
461            timestamp: now_iso8601(),
462        };
463        let results = dispatch_sop_event(&engine, &audit, event).await;
464        assert_eq!(results.len(), 1);
465        assert!(
466            matches!(&results[0], DispatchResult::Skipped { sop_name, .. } if sop_name == "cooldown-sop")
467        );
468    }
469
470    #[tokio::test]
471    async fn dispatch_returns_no_match_for_unknown_event() {
472        let engine = test_engine(vec![test_sop("manual-sop", vec![SopTrigger::Manual])]);
473        let audit = test_audit();
474
475        // Send an MQTT event — the SOP only has a Manual trigger
476        let event = SopEvent {
477            source: SopTriggerSource::Mqtt,
478            topic: Some("some/topic".into()),
479            payload: None,
480            timestamp: now_iso8601(),
481        };
482        let results = dispatch_sop_event(&engine, &audit, event).await;
483        assert_eq!(results.len(), 1);
484        assert!(matches!(&results[0], DispatchResult::NoMatch));
485    }
486
487    #[tokio::test]
488    async fn dispatch_batch_lock_starts_multiple_sops() {
489        let sop1 = test_sop(
490            "webhook-sop-1",
491            vec![SopTrigger::Webhook {
492                path: "/api/deploy".into(),
493            }],
494        );
495        let sop2 = test_sop(
496            "webhook-sop-2",
497            vec![SopTrigger::Webhook {
498                path: "/api/deploy".into(),
499            }],
500        );
501        let engine = test_engine(vec![sop1, sop2]);
502        let audit = test_audit();
503
504        let event = SopEvent {
505            source: SopTriggerSource::Webhook,
506            topic: Some("/api/deploy".into()),
507            payload: None,
508            timestamp: now_iso8601(),
509        };
510
511        let results = dispatch_sop_event(&engine, &audit, event).await;
512        let started_count = results
513            .iter()
514            .filter(|r| matches!(r, DispatchResult::Started { .. }))
515            .count();
516        assert_eq!(started_count, 2);
517    }
518
519    /// B1 DoD: prove that the action returned by `start_run` is captured in
520    /// `DispatchResult::Started` — not silently dropped.
521    #[tokio::test]
522    async fn dispatch_captures_action_for_wait_approval() {
523        // Supervised mode → WaitApproval on step 1
524        let mut sop = test_sop(
525            "supervised-sop",
526            vec![SopTrigger::Mqtt {
527                topic: "alert".into(),
528                condition: None,
529            }],
530        );
531        sop.execution_mode = SopExecutionMode::Supervised;
532        let engine = test_engine(vec![sop]);
533        let audit = test_audit();
534
535        let event = SopEvent {
536            source: SopTriggerSource::Mqtt,
537            topic: Some("alert".into()),
538            payload: None,
539            timestamp: now_iso8601(),
540        };
541
542        let results = dispatch_sop_event(&engine, &audit, event).await;
543        assert_eq!(results.len(), 1);
544        match &results[0] {
545            DispatchResult::Started {
546                run_id,
547                sop_name,
548                action,
549            } => {
550                assert_eq!(sop_name, "supervised-sop");
551                assert!(!run_id.is_empty());
552                assert!(
553                    matches!(action.as_ref(), SopRunAction::WaitApproval { .. }),
554                    "Supervised SOP must return WaitApproval, got {:?}",
555                    action
556                );
557            }
558            other => panic!("Expected Started, got {other:?}"),
559        }
560    }
561
562    /// B1 DoD: Auto-mode SOP returns ExecuteStep action in dispatch result.
563    #[tokio::test]
564    async fn dispatch_captures_action_for_execute_step() {
565        let engine = test_engine(vec![test_sop("auto-sop", vec![SopTrigger::Manual])]);
566        let audit = test_audit();
567
568        let event = SopEvent {
569            source: SopTriggerSource::Manual,
570            topic: None,
571            payload: None,
572            timestamp: now_iso8601(),
573        };
574
575        let results = dispatch_sop_event(&engine, &audit, event).await;
576        assert_eq!(results.len(), 1);
577        match &results[0] {
578            DispatchResult::Started { action, .. } => {
579                assert!(
580                    matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }),
581                    "Auto SOP must return ExecuteStep, got {:?}",
582                    action
583                );
584            }
585            other => panic!("Expected Started, got {other:?}"),
586        }
587    }
588
589    #[tokio::test]
590    async fn peripheral_signal_dispatches_to_matching_sop() {
591        let engine = test_engine(vec![test_sop(
592            "gpio-sop",
593            vec![SopTrigger::Peripheral {
594                board: "nucleo".into(),
595                signal: "pin_3".into(),
596                condition: None,
597            }],
598        )]);
599        let audit = test_audit();
600
601        let results =
602            dispatch_peripheral_signal(&engine, &audit, "nucleo", "pin_3", Some("1")).await;
603        assert_eq!(results.len(), 1);
604        assert!(
605            matches!(&results[0], DispatchResult::Started { sop_name, .. } if sop_name == "gpio-sop" )
606        );
607    }
608
609    #[tokio::test]
610    async fn peripheral_signal_no_match_returns_empty() {
611        let engine = test_engine(vec![test_sop(
612            "gpio-sop",
613            vec![SopTrigger::Peripheral {
614                board: "nucleo".into(),
615                signal: "pin_3".into(),
616                condition: None,
617            }],
618        )]);
619        let audit = test_audit();
620
621        let results = dispatch_peripheral_signal(&engine, &audit, "rpi", "gpio_5", None).await;
622        assert_eq!(results.len(), 1);
623        assert!(matches!(&results[0], DispatchResult::NoMatch));
624    }
625
626    #[test]
627    fn cron_cache_skips_invalid_expression() {
628        let sop = test_sop(
629            "bad-cron",
630            vec![SopTrigger::Cron {
631                expression: "not a valid cron".into(),
632            }],
633        );
634        let engine = test_engine(vec![sop]);
635        let cache = SopCronCache::from_engine(&engine);
636        assert!(cache.schedules().is_empty());
637    }
638
639    #[test]
640    fn cron_cache_parses_valid_expression() {
641        let sop = test_sop(
642            "valid-cron",
643            vec![SopTrigger::Cron {
644                expression: "0 */5 * * *".into(),
645            }],
646        );
647        let engine = test_engine(vec![sop]);
648        let cache = SopCronCache::from_engine(&engine);
649        assert_eq!(cache.schedules().len(), 1);
650        assert_eq!(cache.schedules()[0].0, "valid-cron");
651        assert_eq!(cache.schedules()[0].1, "0 */5 * * *");
652    }
653
654    #[tokio::test]
655    async fn cron_sop_trigger_fires_on_schedule() {
656        let sop = test_sop(
657            "cron-sop",
658            vec![SopTrigger::Cron {
659                expression: "* * * * *".into(),
660            }],
661        );
662        let engine = test_engine(vec![sop]);
663        let audit = test_audit();
664        let cache = SopCronCache::from_engine(&engine);
665
666        // Set last_check to 2 minutes ago so the window contains a tick
667        let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
668        let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
669
670        let started = results
671            .iter()
672            .filter(|r| matches!(r, DispatchResult::Started { .. }))
673            .count();
674        assert!(started >= 1, "Expected at least 1 started SOP from cron");
675    }
676
677    #[tokio::test]
678    async fn cron_sop_only_matching_expression_fires() {
679        let sop1 = test_sop(
680            "every-min",
681            vec![SopTrigger::Cron {
682                expression: "* * * * *".into(),
683            }],
684        );
685        // An expression that won't fire in a 2-minute window from now:
686        // "0 0 1 1 *" = midnight Jan 1
687        let sop2 = test_sop(
688            "yearly",
689            vec![SopTrigger::Cron {
690                expression: "0 0 1 1 *".into(),
691            }],
692        );
693        let engine = test_engine(vec![sop1, sop2]);
694        let audit = test_audit();
695        let cache = SopCronCache::from_engine(&engine);
696
697        let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
698        let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
699
700        // Only "every-min" should have fired
701        let started_names: Vec<&str> = results
702            .iter()
703            .filter_map(|r| match r {
704                DispatchResult::Started { sop_name, .. } => Some(sop_name.as_str()),
705                _ => None,
706            })
707            .collect();
708        assert!(started_names.contains(&"every-min"));
709        assert!(!started_names.contains(&"yearly"));
710    }
711
712    #[tokio::test]
713    async fn cron_sop_window_check_does_not_miss_tick() {
714        let sop = test_sop(
715            "every-min",
716            vec![SopTrigger::Cron {
717                expression: "* * * * *".into(),
718            }],
719        );
720        let engine = test_engine(vec![sop]);
721        let audit = test_audit();
722        let cache = SopCronCache::from_engine(&engine);
723
724        // Simulate: last_check was 5 minutes ago, poll just now
725        let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(5);
726        let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
727
728        // At least one tick should have been caught
729        let started = results
730            .iter()
731            .filter(|r| matches!(r, DispatchResult::Started { .. }))
732            .count();
733        assert!(
734            started >= 1,
735            "Window-based check should catch ticks from 5 minutes ago"
736        );
737
738        // last_check should be updated to approximately now
739        let now = chrono::Utc::now();
740        assert!(
741            (now - last_check).num_seconds() < 2,
742            "last_check should be updated to now"
743        );
744    }
745}