Skip to main content

construct/sop/
engine.rs

1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::path::{Path, PathBuf};
4
5use anyhow::{Result, bail};
6use tracing::{info, warn};
7
8use super::condition::evaluate_condition;
9use super::load_sops;
10use super::types::{
11    DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
12    SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
13    SopTrigger, SopTriggerSource,
14};
15use crate::config::SopConfig;
16
17/// Central SOP orchestrator: loads SOPs, matches triggers, manages run lifecycle.
18pub struct SopEngine {
19    sops: Vec<Sop>,
20    active_runs: HashMap<String, SopRun>,
21    /// Completed/failed/cancelled runs (kept for status queries).
22    finished_runs: Vec<SopRun>,
23    config: SopConfig,
24    run_counter: u64,
25    /// Cumulative savings from deterministic execution.
26    deterministic_savings: DeterministicSavings,
27}
28
29impl SopEngine {
30    /// Create a new engine with the given config. Call `reload()` to load SOPs.
31    pub fn new(config: SopConfig) -> Self {
32        Self {
33            sops: Vec::new(),
34            active_runs: HashMap::new(),
35            finished_runs: Vec::new(),
36            config,
37            run_counter: 0,
38            deterministic_savings: DeterministicSavings::default(),
39        }
40    }
41
42    /// Load/reload SOPs from the configured directory.
43    pub fn reload(&mut self, workspace_dir: &Path) {
44        self.sops = load_sops(
45            workspace_dir,
46            self.config.sops_dir.as_deref(),
47            super::parse_execution_mode(&self.config.default_execution_mode),
48        );
49        info!("SOP engine loaded {} SOPs", self.sops.len());
50    }
51
52    /// Return all loaded SOP definitions.
53    pub fn sops(&self) -> &[Sop] {
54        &self.sops
55    }
56
57    /// Return all active (in-flight) runs.
58    pub fn active_runs(&self) -> &HashMap<String, SopRun> {
59        &self.active_runs
60    }
61
62    /// Look up a run by ID (active or finished).
63    pub fn get_run(&self, run_id: &str) -> Option<&SopRun> {
64        self.active_runs
65            .get(run_id)
66            .or_else(|| self.finished_runs.iter().find(|r| r.run_id == run_id))
67    }
68
69    /// Look up an SOP by name.
70    pub fn get_sop(&self, name: &str) -> Option<&Sop> {
71        self.sops.iter().find(|s| s.name == name)
72    }
73
74    // ── Trigger matching ────────────────────────────────────────
75
76    /// Match an incoming event against all loaded SOPs and return the names of
77    /// SOPs whose triggers match.
78    pub fn match_trigger(&self, event: &SopEvent) -> Vec<&Sop> {
79        self.sops
80            .iter()
81            .filter(|sop| sop.triggers.iter().any(|t| trigger_matches(t, event)))
82            .collect()
83    }
84
85    // ── Run lifecycle ───────────────────────────────────────────
86
87    /// Check whether a new run can be started for the given SOP
88    /// (respects cooldown and concurrency limits).
89    pub fn can_start(&self, sop_name: &str) -> bool {
90        let sop = match self.get_sop(sop_name) {
91            Some(s) => s,
92            None => return false,
93        };
94
95        // Per-SOP concurrency limit
96        let active_for_sop = self
97            .active_runs
98            .values()
99            .filter(|r| r.sop_name == sop_name)
100            .count();
101        if active_for_sop >= sop.max_concurrent as usize {
102            return false;
103        }
104
105        // Global concurrency limit
106        if self.active_runs.len() >= self.config.max_concurrent_total {
107            return false;
108        }
109
110        // Cooldown: check most recent finished run for this SOP
111        if sop.cooldown_secs > 0 {
112            if let Some(last) = self.last_finished_run(sop_name) {
113                if let Some(ref completed_at) = last.completed_at {
114                    if !cooldown_elapsed(completed_at, sop.cooldown_secs) {
115                        return false;
116                    }
117                }
118            }
119        }
120
121        true
122    }
123
124    /// Start a new SOP run. Returns the first action to take.
125    /// Deterministic SOPs are automatically routed to `start_deterministic_run`.
126    pub fn start_run(&mut self, sop_name: &str, event: SopEvent) -> Result<SopRunAction> {
127        // Route deterministic SOPs to dedicated path
128        if self.get_sop(sop_name).map_or(false, |s| {
129            s.execution_mode == SopExecutionMode::Deterministic
130        }) {
131            return self.start_deterministic_run(sop_name, event);
132        }
133
134        let sop = self
135            .get_sop(sop_name)
136            .ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
137            .clone();
138
139        if !self.can_start(sop_name) {
140            bail!(
141                "Cannot start SOP '{}': cooldown or concurrency limit reached",
142                sop_name
143            );
144        }
145
146        if sop.steps.is_empty() {
147            bail!("SOP '{}' has no steps defined", sop_name);
148        }
149
150        self.run_counter += 1;
151        let dur = std::time::SystemTime::now()
152            .duration_since(std::time::UNIX_EPOCH)
153            .unwrap_or_default();
154        let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
155        let run_id = format!("run-{epoch_ms}-{:04}", self.run_counter);
156        let now = now_iso8601();
157
158        let run = SopRun {
159            run_id: run_id.clone(),
160            sop_name: sop_name.to_string(),
161            trigger_event: event,
162            status: SopRunStatus::Running,
163            current_step: 1,
164            total_steps: u32::try_from(sop.steps.len()).unwrap_or(u32::MAX),
165            started_at: now,
166            completed_at: None,
167            step_results: Vec::new(),
168            waiting_since: None,
169            llm_calls_saved: 0,
170        };
171
172        self.active_runs.insert(run_id.clone(), run);
173
174        info!("SOP run {} started for '{}'", run_id, sop_name);
175
176        // Determine first action based on execution mode
177        let step = sop.steps[0].clone();
178        let context = format_step_context(&sop, &self.active_runs[&run_id], &step);
179        let action = resolve_step_action(&sop, &step, run_id.clone(), context);
180
181        // If the action is WaitApproval, update run status and record timestamp
182        if matches!(action, SopRunAction::WaitApproval { .. }) {
183            if let Some(run) = self.active_runs.get_mut(&run_id) {
184                run.status = SopRunStatus::WaitingApproval;
185                run.waiting_since = Some(now_iso8601());
186            }
187        }
188
189        Ok(action)
190    }
191
192    /// Report the result of the current step and advance the run.
193    /// Returns the next action to take.
194    pub fn advance_step(&mut self, run_id: &str, result: SopStepResult) -> Result<SopRunAction> {
195        let run = self
196            .active_runs
197            .get_mut(run_id)
198            .ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
199
200        let sop = self
201            .sops
202            .iter()
203            .find(|s| s.name == run.sop_name)
204            .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
205            .clone();
206
207        // Record step result
208        run.step_results.push(result.clone());
209
210        // Check if step failed
211        if result.status == SopStepStatus::Failed {
212            let reason = format!("Step {} failed: {}", result.step_number, result.output);
213            warn!("SOP run {run_id}: {reason}");
214            return Ok(self.finish_run(run_id, SopRunStatus::Failed, Some(reason)));
215        }
216
217        // Advance to next step
218        let next_step_num = run.current_step + 1;
219        if next_step_num > run.total_steps {
220            // All steps completed
221            info!("SOP run {run_id} completed successfully");
222            return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
223        }
224
225        // Update run state
226        let run = self.active_runs.get_mut(run_id).unwrap();
227        run.current_step = next_step_num;
228
229        let step_idx = (next_step_num - 1) as usize;
230        let step = sop.steps[step_idx].clone();
231        let context = format_step_context(&sop, run, &step);
232        let run_id_str = run_id.to_string();
233        let action = resolve_step_action(&sop, &step, run_id_str.clone(), context);
234
235        // If the action is WaitApproval, update run status and record timestamp
236        if matches!(action, SopRunAction::WaitApproval { .. }) {
237            if let Some(run) = self.active_runs.get_mut(&run_id_str) {
238                run.status = SopRunStatus::WaitingApproval;
239                run.waiting_since = Some(now_iso8601());
240            }
241        }
242
243        Ok(action)
244    }
245
246    /// Cancel an active run.
247    pub fn cancel_run(&mut self, run_id: &str) -> Result<()> {
248        if !self.active_runs.contains_key(run_id) {
249            bail!("Active run not found: {run_id}");
250        }
251        self.finish_run(run_id, SopRunStatus::Cancelled, None);
252        info!("SOP run {run_id} cancelled");
253        Ok(())
254    }
255
256    /// Approve a step that is waiting for approval, transitioning back to Running.
257    pub fn approve_step(&mut self, run_id: &str) -> Result<SopRunAction> {
258        let run = self
259            .active_runs
260            .get_mut(run_id)
261            .ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
262
263        if run.status != SopRunStatus::WaitingApproval {
264            bail!(
265                "Run {run_id} is not waiting for approval (status: {})",
266                run.status
267            );
268        }
269
270        run.status = SopRunStatus::Running;
271        run.waiting_since = None;
272
273        let sop = self
274            .sops
275            .iter()
276            .find(|s| s.name == run.sop_name)
277            .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
278            .clone();
279
280        let step_idx = (run.current_step - 1) as usize;
281        let step = sop.steps[step_idx].clone();
282        let context = format_step_context(&sop, run, &step);
283
284        Ok(SopRunAction::ExecuteStep {
285            run_id: run_id.to_string(),
286            step,
287            context,
288        })
289    }
290
291    /// List finished runs, optionally filtered by SOP name.
292    pub fn finished_runs(&self, sop_name: Option<&str>) -> Vec<&SopRun> {
293        self.finished_runs
294            .iter()
295            .filter(|r| sop_name.map_or(true, |name| r.sop_name == name))
296            .collect()
297    }
298
299    /// Return cumulative deterministic execution savings.
300    pub fn deterministic_savings(&self) -> &DeterministicSavings {
301        &self.deterministic_savings
302    }
303
304    // ── Deterministic execution ─────────────────────────────────
305
306    /// Start a deterministic SOP run. Steps execute sequentially without LLM
307    /// round-trips. Returns the first action (DeterministicStep or CheckpointWait).
308    pub fn start_deterministic_run(
309        &mut self,
310        sop_name: &str,
311        event: SopEvent,
312    ) -> Result<SopRunAction> {
313        let sop = self
314            .get_sop(sop_name)
315            .ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
316            .clone();
317
318        if sop.execution_mode != SopExecutionMode::Deterministic {
319            bail!(
320                "SOP '{}' is not in deterministic mode (mode: {})",
321                sop_name,
322                sop.execution_mode
323            );
324        }
325
326        if !self.can_start(sop_name) {
327            bail!(
328                "Cannot start SOP '{}': cooldown or concurrency limit reached",
329                sop_name
330            );
331        }
332
333        if sop.steps.is_empty() {
334            bail!("SOP '{}' has no steps defined", sop_name);
335        }
336
337        self.run_counter += 1;
338        let dur = std::time::SystemTime::now()
339            .duration_since(std::time::UNIX_EPOCH)
340            .unwrap_or_default();
341        let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
342        let run_id = format!("det-{epoch_ms}-{:04}", self.run_counter);
343        let now = now_iso8601();
344
345        let total_steps = u32::try_from(sop.steps.len()).unwrap_or(u32::MAX);
346        let run = SopRun {
347            run_id: run_id.clone(),
348            sop_name: sop_name.to_string(),
349            trigger_event: event,
350            status: SopRunStatus::Running,
351            current_step: 1,
352            total_steps,
353            started_at: now,
354            completed_at: None,
355            step_results: Vec::new(),
356            waiting_since: None,
357            llm_calls_saved: 0,
358        };
359
360        self.active_runs.insert(run_id.clone(), run);
361        info!(
362            "Deterministic SOP run {} started for '{}'",
363            run_id, sop_name
364        );
365
366        // Produce first step action
367        let step = sop.steps[0].clone();
368        let input = serde_json::Value::Null;
369        self.resolve_deterministic_action(&sop, &run_id, &step, input)
370    }
371
372    /// Advance a deterministic run with the output of the current step.
373    /// The output is piped as input to the next step.
374    pub fn advance_deterministic_step(
375        &mut self,
376        run_id: &str,
377        step_output: serde_json::Value,
378    ) -> Result<SopRunAction> {
379        let run = self
380            .active_runs
381            .get_mut(run_id)
382            .ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
383
384        let sop = self
385            .sops
386            .iter()
387            .find(|s| s.name == run.sop_name)
388            .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
389            .clone();
390
391        // Record step result
392        let now = now_iso8601();
393        let step_result = SopStepResult {
394            step_number: run.current_step,
395            status: SopStepStatus::Completed,
396            output: step_output.to_string(),
397            started_at: run.started_at.clone(),
398            completed_at: Some(now),
399        };
400        run.step_results.push(step_result);
401
402        // Each deterministic step saves one LLM call
403        run.llm_calls_saved += 1;
404
405        // Advance to next step
406        let next_step_num = run.current_step + 1;
407        if next_step_num > run.total_steps {
408            info!(
409                "Deterministic SOP run {run_id} completed ({} LLM calls saved)",
410                run.llm_calls_saved
411            );
412            let saved = run.llm_calls_saved;
413            self.deterministic_savings.total_llm_calls_saved += saved;
414            self.deterministic_savings.total_runs += 1;
415            return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
416        }
417
418        let run = self.active_runs.get_mut(run_id).unwrap();
419        run.current_step = next_step_num;
420
421        let step_idx = (next_step_num - 1) as usize;
422        let step = sop.steps[step_idx].clone();
423        let run_id_owned = run_id.to_string();
424
425        self.resolve_deterministic_action(&sop, &run_id_owned, &step, step_output)
426    }
427
428    /// Resume a deterministic run from persisted state.
429    pub fn resume_deterministic_run(
430        &mut self,
431        state: DeterministicRunState,
432    ) -> Result<SopRunAction> {
433        let run = self
434            .active_runs
435            .get_mut(&state.run_id)
436            .ok_or_else(|| anyhow::anyhow!("Active run not found: {}", state.run_id))?;
437
438        if run.status != SopRunStatus::PausedCheckpoint {
439            bail!(
440                "Run {} is not paused at checkpoint (status: {})",
441                state.run_id,
442                run.status
443            );
444        }
445
446        let sop = self
447            .sops
448            .iter()
449            .find(|s| s.name == run.sop_name)
450            .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
451            .clone();
452
453        run.status = SopRunStatus::Running;
454        run.waiting_since = None;
455        run.llm_calls_saved = state.llm_calls_saved;
456
457        // Resume from the step after the last completed one
458        let next_step_num = state.last_completed_step + 1;
459        if next_step_num > state.total_steps {
460            info!(
461                "Deterministic SOP run {} completed on resume ({} LLM calls saved)",
462                state.run_id, state.llm_calls_saved
463            );
464            self.deterministic_savings.total_llm_calls_saved += state.llm_calls_saved;
465            self.deterministic_savings.total_runs += 1;
466            return Ok(self.finish_run(&state.run_id, SopRunStatus::Completed, None));
467        }
468
469        let run = self.active_runs.get_mut(&state.run_id).unwrap();
470        run.current_step = next_step_num;
471
472        let step_idx = (next_step_num - 1) as usize;
473        let step = sop.steps[step_idx].clone();
474
475        // Use last step's output as input, or Null
476        let last_output = state
477            .step_outputs
478            .get(&state.last_completed_step)
479            .cloned()
480            .unwrap_or(serde_json::Value::Null);
481
482        let run_id = state.run_id.clone();
483        self.resolve_deterministic_action(&sop, &run_id, &step, last_output)
484    }
485
486    /// Resolve the action for a deterministic step (execute or checkpoint).
487    fn resolve_deterministic_action(
488        &mut self,
489        sop: &Sop,
490        run_id: &str,
491        step: &SopStep,
492        input: serde_json::Value,
493    ) -> Result<SopRunAction> {
494        if step.kind == SopStepKind::Checkpoint {
495            // Pause at checkpoint — persist state and wait for approval
496            if let Some(run) = self.active_runs.get_mut(run_id) {
497                run.status = SopRunStatus::PausedCheckpoint;
498                run.waiting_since = Some(now_iso8601());
499            }
500
501            let state_file = self.persist_deterministic_state(run_id, sop)?;
502
503            info!(
504                "Deterministic SOP run {run_id}: checkpoint at step {} '{}', state persisted to {}",
505                step.number,
506                step.title,
507                state_file.display()
508            );
509
510            Ok(SopRunAction::CheckpointWait {
511                run_id: run_id.to_string(),
512                step: step.clone(),
513                state_file,
514            })
515        } else {
516            Ok(SopRunAction::DeterministicStep {
517                run_id: run_id.to_string(),
518                step: step.clone(),
519                input,
520            })
521        }
522    }
523
524    /// Persist the current deterministic run state to a JSON file.
525    fn persist_deterministic_state(&self, run_id: &str, sop: &Sop) -> Result<PathBuf> {
526        let run = self
527            .active_runs
528            .get(run_id)
529            .ok_or_else(|| anyhow::anyhow!("Run not found: {run_id}"))?;
530
531        let mut step_outputs = HashMap::new();
532        for result in &run.step_results {
533            // Try to parse output as JSON, fall back to string value
534            let value = serde_json::from_str(&result.output)
535                .unwrap_or_else(|_| serde_json::Value::String(result.output.clone()));
536            step_outputs.insert(result.step_number, value);
537        }
538
539        let state = DeterministicRunState {
540            run_id: run_id.to_string(),
541            sop_name: run.sop_name.clone(),
542            last_completed_step: run.current_step.saturating_sub(1),
543            total_steps: run.total_steps,
544            step_outputs,
545            persisted_at: now_iso8601(),
546            llm_calls_saved: run.llm_calls_saved,
547            paused_at_checkpoint: run.status == SopRunStatus::PausedCheckpoint,
548        };
549
550        // Write to SOP location directory, or system temp dir
551        let temp_dir = std::env::temp_dir();
552        let dir = sop
553            .location
554            .as_deref()
555            .unwrap_or_else(|| temp_dir.as_path());
556        let state_file = dir.join(format!("{run_id}.state.json"));
557        let json = serde_json::to_string_pretty(&state)?;
558        std::fs::write(&state_file, json)?;
559
560        Ok(state_file)
561    }
562
563    /// Load a persisted deterministic run state from a JSON file.
564    pub fn load_deterministic_state(path: &Path) -> Result<DeterministicRunState> {
565        let content = std::fs::read_to_string(path)?;
566        let state: DeterministicRunState = serde_json::from_str(&content)?;
567        Ok(state)
568    }
569
570    // ── Approval timeout ──────────────────────────────────────────
571
572    /// Check all WaitingApproval runs for timeout. For Critical/High-priority SOPs,
573    /// auto-approve and return the resulting actions. Non-critical SOPs stay
574    /// in WaitingApproval indefinitely (or until explicitly approved/cancelled).
575    pub fn check_approval_timeouts(&mut self) -> Vec<SopRunAction> {
576        let timeout_secs = self.config.approval_timeout_secs;
577        if timeout_secs == 0 {
578            return Vec::new();
579        }
580
581        // Collect timed-out runs with their priority classification
582        // cooldown_elapsed(ts, secs) returns true when (now - ts) >= secs
583        let timed_out: Vec<(String, bool)> = self
584            .active_runs
585            .values()
586            .filter(|r| r.status == SopRunStatus::WaitingApproval)
587            .filter(|r| {
588                r.waiting_since
589                    .as_deref()
590                    .map_or(false, |ts| cooldown_elapsed(ts, timeout_secs))
591            })
592            .map(|r| {
593                let is_critical = self
594                    .sops
595                    .iter()
596                    .find(|s| s.name == r.sop_name)
597                    .map_or(false, |s| {
598                        matches!(s.priority, SopPriority::Critical | SopPriority::High)
599                    });
600                (r.run_id.clone(), is_critical)
601            })
602            .collect();
603
604        let mut actions = Vec::new();
605        for (run_id, is_critical) in timed_out {
606            if is_critical {
607                // Auto-approve: Critical/High priority SOPs fall back to Auto on timeout
608                info!(
609                    "SOP run {run_id}: approval timeout — auto-approving (critical/high priority)"
610                );
611                match self.approve_step(&run_id) {
612                    Ok(action) => actions.push(action),
613                    Err(e) => warn!("SOP run {run_id}: auto-approve failed: {e}"),
614                }
615            } else {
616                info!("SOP run {run_id}: approval timeout — waiting indefinitely (non-critical)");
617            }
618        }
619
620        actions
621    }
622
623    // ── Test helpers ──────────────────────────────────────────────
624
625    /// Replace loaded SOPs (for testing from other modules).
626    #[cfg(test)]
627    pub(crate) fn set_sops_for_test(&mut self, sops: Vec<Sop>) {
628        self.sops = sops;
629    }
630
631    // ── Internal helpers ────────────────────────────────────────
632
633    fn last_finished_run(&self, sop_name: &str) -> Option<&SopRun> {
634        self.finished_runs
635            .iter()
636            .rev()
637            .find(|r| r.sop_name == sop_name)
638    }
639
640    fn finish_run(
641        &mut self,
642        run_id: &str,
643        status: SopRunStatus,
644        reason: Option<String>,
645    ) -> SopRunAction {
646        let mut run = self.active_runs.remove(run_id).unwrap();
647        run.status = status;
648        run.completed_at = Some(now_iso8601());
649        let sop_name = run.sop_name.clone();
650        let run_id_owned = run.run_id.clone();
651        self.finished_runs.push(run);
652
653        // Evict oldest finished runs when over capacity
654        let max = self.config.max_finished_runs;
655        if max > 0 && self.finished_runs.len() > max {
656            let excess = self.finished_runs.len() - max;
657            self.finished_runs.drain(..excess);
658        }
659
660        match status {
661            SopRunStatus::Failed => SopRunAction::Failed {
662                run_id: run_id_owned,
663                sop_name,
664                reason: reason.unwrap_or_default(),
665            },
666            _ => SopRunAction::Completed {
667                run_id: run_id_owned,
668                sop_name,
669            },
670        }
671    }
672}
673
674// ── Trigger matching ────────────────────────────────────────────
675
676/// Check whether a single trigger definition matches an incoming event.
677fn trigger_matches(trigger: &SopTrigger, event: &SopEvent) -> bool {
678    match (trigger, event.source) {
679        (SopTrigger::Mqtt { topic, condition }, SopTriggerSource::Mqtt) => {
680            let topic_match = event
681                .topic
682                .as_deref()
683                .map_or(false, |t| mqtt_topic_matches(topic, t));
684            if !topic_match {
685                return false;
686            }
687            // Evaluate condition against payload (None condition = unconditional)
688            match condition {
689                Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
690                None => true,
691            }
692        }
693
694        (SopTrigger::Webhook { path }, SopTriggerSource::Webhook) => {
695            event.topic.as_deref().map_or(false, |t| t == path)
696        }
697
698        (
699            SopTrigger::Peripheral {
700                board,
701                signal,
702                condition,
703            },
704            SopTriggerSource::Peripheral,
705        ) => {
706            let topic_match = event.topic.as_deref().map_or(false, |t| {
707                let expected = format!("{board}/{signal}");
708                t == expected
709            });
710            if !topic_match {
711                return false;
712            }
713            // Evaluate condition against payload (None condition = unconditional)
714            match condition {
715                Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
716                None => true,
717            }
718        }
719
720        (SopTrigger::Cron { expression }, SopTriggerSource::Cron) => {
721            event.topic.as_deref().map_or(false, |t| t == expression)
722        }
723
724        (SopTrigger::Manual, SopTriggerSource::Manual) => true,
725
726        _ => false,
727    }
728}
729
730/// Simple MQTT topic matching with `+` (single-level) and `#` (multi-level) wildcards.
731fn mqtt_topic_matches(pattern: &str, topic: &str) -> bool {
732    let pat_parts: Vec<&str> = pattern.split('/').collect();
733    let top_parts: Vec<&str> = topic.split('/').collect();
734
735    let mut pi = 0;
736    let mut ti = 0;
737
738    while pi < pat_parts.len() && ti < top_parts.len() {
739        match pat_parts[pi] {
740            "#" => return true, // multi-level wildcard matches everything remaining
741            "+" => {
742                // single-level wildcard matches one segment
743                pi += 1;
744                ti += 1;
745            }
746            seg => {
747                if seg != top_parts[ti] {
748                    return false;
749                }
750                pi += 1;
751                ti += 1;
752            }
753        }
754    }
755
756    // Both must be fully consumed (unless pattern ended with #)
757    pi == pat_parts.len() && ti == top_parts.len()
758}
759
760// ── Execution mode resolution ───────────────────────────────────
761
762/// Determine the action for a step based on SOP execution mode.
763fn resolve_step_action(sop: &Sop, step: &SopStep, run_id: String, context: String) -> SopRunAction {
764    // Steps with requires_confirmation always need approval
765    if step.requires_confirmation {
766        return SopRunAction::WaitApproval {
767            run_id,
768            step: step.clone(),
769            context,
770        };
771    }
772
773    let needs_approval = match sop.execution_mode {
774        // Deterministic mode is handled via start_deterministic_run;
775        // if we reach here via the standard path, treat as Auto.
776        SopExecutionMode::Auto | SopExecutionMode::Deterministic => false,
777        SopExecutionMode::Supervised => {
778            // Supervised: approval only before the first step
779            step.number == 1
780        }
781        SopExecutionMode::StepByStep => true,
782        SopExecutionMode::PriorityBased => match sop.priority {
783            SopPriority::Critical | SopPriority::High => false,
784            SopPriority::Normal | SopPriority::Low => {
785                // Supervised behavior for normal/low
786                step.number == 1
787            }
788        },
789    };
790
791    if needs_approval {
792        SopRunAction::WaitApproval {
793            run_id,
794            step: step.clone(),
795            context,
796        }
797    } else {
798        SopRunAction::ExecuteStep {
799            run_id,
800            step: step.clone(),
801            context,
802        }
803    }
804}
805
806// ── Step context formatting ─────────────────────────────────────
807
808/// Build the structured context message that gets injected into the agent.
809fn format_step_context(sop: &Sop, run: &SopRun, step: &SopStep) -> String {
810    let mut ctx = format!(
811        "[SOP: {} (run {}) — Step {} of {}]\n\n",
812        sop.name, run.run_id, step.number, run.total_steps
813    );
814
815    let _ = writeln!(
816        ctx,
817        "Trigger: {} {}",
818        run.trigger_event.source,
819        run.trigger_event.topic.as_deref().unwrap_or("(no topic)")
820    );
821
822    if let Some(ref payload) = run.trigger_event.payload {
823        let _ = writeln!(ctx, "Payload: {payload}");
824    }
825
826    // Previous step summary
827    if let Some(prev) = run.step_results.last() {
828        let _ = writeln!(
829            ctx,
830            "Previous: Step {} {} — {}",
831            prev.step_number, prev.status, prev.output
832        );
833    }
834
835    let _ = write!(ctx, "\nCurrent step: **{}**\n{}\n", step.title, step.body);
836
837    if !step.suggested_tools.is_empty() {
838        let _ = write!(
839            ctx,
840            "\nSuggested tools: {}\n",
841            step.suggested_tools.join(", ")
842        );
843    }
844
845    ctx.push_str("\nWhen done, report your result.\n");
846
847    ctx
848}
849
850// ── Utilities ───────────────────────────────────────────────────
851
852pub(crate) fn now_iso8601() -> String {
853    // Use chrono if available, otherwise fallback to SystemTime
854    let now = std::time::SystemTime::now()
855        .duration_since(std::time::UNIX_EPOCH)
856        .unwrap_or_default();
857    // Simple UTC timestamp without chrono dependency
858    let secs = now.as_secs();
859    let days = secs / 86400;
860    let time_secs = secs % 86400;
861    let hours = time_secs / 3600;
862    let minutes = (time_secs % 3600) / 60;
863    let seconds = time_secs % 60;
864
865    // Days since epoch to Y-M-D (simplified — good enough for run IDs)
866    let (year, month, day) = days_to_ymd(days);
867    format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
868}
869
870/// Convert days since Unix epoch to (year, month, day).
871fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
872    // Algorithm from https://howardhinnant.github.io/date_algorithms.html
873    days += 719_468;
874    let era = days / 146_097;
875    let doe = days - era * 146_097;
876    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
877    let y = yoe + era * 400;
878    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
879    let mp = (5 * doy + 2) / 153;
880    let d = doy - (153 * mp + 2) / 5 + 1;
881    let m = if mp < 10 { mp + 3 } else { mp - 9 };
882    let y = if m <= 2 { y + 1 } else { y };
883    (y, m, d)
884}
885
886/// Check if enough time has elapsed since a timestamp string.
887fn cooldown_elapsed(completed_at: &str, cooldown_secs: u64) -> bool {
888    // Parse the ISO-8601 timestamp we generate
889    let completed = parse_iso8601_secs(completed_at);
890    let now = std::time::SystemTime::now()
891        .duration_since(std::time::UNIX_EPOCH)
892        .unwrap_or_default()
893        .as_secs();
894
895    match completed {
896        Some(ts) => now.saturating_sub(ts) >= cooldown_secs,
897        None => true, // Can't parse timestamp; allow start
898    }
899}
900
901/// Minimal ISO-8601 parser returning seconds since epoch.
902fn parse_iso8601_secs(input: &str) -> Option<u64> {
903    // Expected format: YYYY-MM-DDTHH:MM:SSZ
904    let input = input.trim_end_matches('Z');
905    let parts: Vec<&str> = input.split('T').collect();
906    if parts.len() != 2 {
907        return None;
908    }
909    let date_parts: Vec<u64> = parts[0].split('-').filter_map(|p| p.parse().ok()).collect();
910    let time_parts: Vec<u64> = parts[1].split(':').filter_map(|p| p.parse().ok()).collect();
911    if date_parts.len() != 3 || time_parts.len() != 3 {
912        return None;
913    }
914    let (year, month, day) = (date_parts[0], date_parts[1], date_parts[2]);
915    let (hour, min, sec) = (time_parts[0], time_parts[1], time_parts[2]);
916
917    // Reverse of days_to_ymd: compute days since epoch
918    let year_adj = if month <= 2 { year - 1 } else { year };
919    let month_adj = if month > 2 { month - 3 } else { month + 9 };
920    let era = year_adj / 400;
921    let yoe = year_adj - era * 400;
922    let doy = (153 * month_adj + 2) / 5 + day - 1;
923    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
924    let days = era * 146_097 + doe - 719_468;
925
926    Some(days * 86400 + hour * 3600 + min * 60 + sec)
927}
928
929#[cfg(test)]
930mod tests {
931    use super::*;
932    use crate::sop::types::SopExecutionMode;
933
934    fn manual_event() -> SopEvent {
935        SopEvent {
936            source: SopTriggerSource::Manual,
937            topic: None,
938            payload: None,
939            timestamp: now_iso8601(),
940        }
941    }
942
943    fn mqtt_event(topic: &str, payload: &str) -> SopEvent {
944        SopEvent {
945            source: SopTriggerSource::Mqtt,
946            topic: Some(topic.into()),
947            payload: Some(payload.into()),
948            timestamp: now_iso8601(),
949        }
950    }
951
952    fn test_sop(name: &str, mode: SopExecutionMode, priority: SopPriority) -> Sop {
953        Sop {
954            name: name.into(),
955            description: format!("Test SOP: {name}"),
956            version: "1.0.0".into(),
957            priority,
958            execution_mode: mode,
959            triggers: vec![SopTrigger::Manual],
960            steps: vec![
961                SopStep {
962                    number: 1,
963                    title: "Step one".into(),
964                    body: "Do step one".into(),
965                    suggested_tools: vec!["shell".into()],
966                    requires_confirmation: false,
967                    kind: SopStepKind::default(),
968                    schema: None,
969                },
970                SopStep {
971                    number: 2,
972                    title: "Step two".into(),
973                    body: "Do step two".into(),
974                    suggested_tools: vec![],
975                    requires_confirmation: false,
976                    kind: SopStepKind::default(),
977                    schema: None,
978                },
979            ],
980            cooldown_secs: 0,
981            max_concurrent: 1,
982            location: None,
983            deterministic: false,
984        }
985    }
986
987    fn engine_with_sops(sops: Vec<Sop>) -> SopEngine {
988        let mut engine = SopEngine::new(SopConfig::default());
989        engine.sops = sops;
990        engine
991    }
992
993    /// Extract run_id from any SopRunAction variant.
994    fn extract_run_id(action: &SopRunAction) -> &str {
995        match action {
996            SopRunAction::ExecuteStep { run_id, .. }
997            | SopRunAction::WaitApproval { run_id, .. }
998            | SopRunAction::DeterministicStep { run_id, .. }
999            | SopRunAction::CheckpointWait { run_id, .. }
1000            | SopRunAction::Completed { run_id, .. }
1001            | SopRunAction::Failed { run_id, .. } => run_id,
1002        }
1003    }
1004
1005    /// Get the first active run_id from the engine (for tests with a single run).
1006    fn first_active_run_id(engine: &SopEngine) -> String {
1007        engine
1008            .active_runs()
1009            .keys()
1010            .next()
1011            .expect("expected at least one active run")
1012            .clone()
1013    }
1014
1015    // ── Trigger matching ────────────────────────────────
1016
1017    #[test]
1018    fn match_manual_trigger() {
1019        let engine = engine_with_sops(vec![test_sop(
1020            "s1",
1021            SopExecutionMode::Auto,
1022            SopPriority::Normal,
1023        )]);
1024        let matches = engine.match_trigger(&manual_event());
1025        assert_eq!(matches.len(), 1);
1026        assert_eq!(matches[0].name, "s1");
1027    }
1028
1029    #[test]
1030    fn no_match_for_wrong_source() {
1031        let engine = engine_with_sops(vec![test_sop(
1032            "s1",
1033            SopExecutionMode::Auto,
1034            SopPriority::Normal,
1035        )]);
1036        let event = mqtt_event("sensors/temp", "{}");
1037        let matches = engine.match_trigger(&event);
1038        assert!(matches.is_empty());
1039    }
1040
1041    #[test]
1042    fn match_mqtt_trigger_exact() {
1043        let sop = Sop {
1044            triggers: vec![SopTrigger::Mqtt {
1045                topic: "plant/pump/pressure".into(),
1046                condition: None,
1047            }],
1048            ..test_sop(
1049                "pressure-sop",
1050                SopExecutionMode::Auto,
1051                SopPriority::Critical,
1052            )
1053        };
1054        let engine = engine_with_sops(vec![sop]);
1055        let matches = engine.match_trigger(&mqtt_event("plant/pump/pressure", "87.3"));
1056        assert_eq!(matches.len(), 1);
1057    }
1058
1059    #[test]
1060    fn match_mqtt_wildcard_plus() {
1061        let sop = Sop {
1062            triggers: vec![SopTrigger::Mqtt {
1063                topic: "plant/+/pressure".into(),
1064                condition: None,
1065            }],
1066            ..test_sop("wildcard-sop", SopExecutionMode::Auto, SopPriority::Normal)
1067        };
1068        let engine = engine_with_sops(vec![sop]);
1069        assert_eq!(
1070            engine
1071                .match_trigger(&mqtt_event("plant/pump_3/pressure", "87"))
1072                .len(),
1073            1
1074        );
1075        assert!(
1076            engine
1077                .match_trigger(&mqtt_event("plant/pump_3/temperature", "50"))
1078                .is_empty()
1079        );
1080    }
1081
1082    #[test]
1083    fn match_mqtt_wildcard_hash() {
1084        let sop = Sop {
1085            triggers: vec![SopTrigger::Mqtt {
1086                topic: "plant/#".into(),
1087                condition: None,
1088            }],
1089            ..test_sop("hash-sop", SopExecutionMode::Auto, SopPriority::Normal)
1090        };
1091        let engine = engine_with_sops(vec![sop]);
1092        assert_eq!(
1093            engine
1094                .match_trigger(&mqtt_event("plant/pump/pressure", "87"))
1095                .len(),
1096            1
1097        );
1098        assert_eq!(
1099            engine
1100                .match_trigger(&mqtt_event("plant/a/b/c/d", "x"))
1101                .len(),
1102            1
1103        );
1104    }
1105
1106    #[test]
1107    fn mqtt_topic_matching_edge_cases() {
1108        assert!(mqtt_topic_matches("a/b/c", "a/b/c"));
1109        assert!(!mqtt_topic_matches("a/b/c", "a/b/d"));
1110        assert!(!mqtt_topic_matches("a/b/c", "a/b"));
1111        assert!(!mqtt_topic_matches("a/b", "a/b/c"));
1112        assert!(mqtt_topic_matches("+/+/+", "a/b/c"));
1113        assert!(!mqtt_topic_matches("+/+", "a/b/c"));
1114        assert!(mqtt_topic_matches("#", "a/b/c"));
1115        assert!(mqtt_topic_matches("a/#", "a/b/c"));
1116        assert!(!mqtt_topic_matches("b/#", "a/b/c"));
1117    }
1118
1119    // ── Webhook trigger matching ─────────────────────
1120
1121    #[test]
1122    fn webhook_trigger_matches_exact_path() {
1123        let sop = Sop {
1124            triggers: vec![SopTrigger::Webhook {
1125                path: "/webhook".into(),
1126            }],
1127            ..test_sop("webhook-sop", SopExecutionMode::Auto, SopPriority::Normal)
1128        };
1129        let engine = engine_with_sops(vec![sop]);
1130
1131        // Exact match — should match
1132        let event = SopEvent {
1133            source: SopTriggerSource::Webhook,
1134            topic: Some("/webhook".into()),
1135            payload: None,
1136            timestamp: now_iso8601(),
1137        };
1138        assert_eq!(engine.match_trigger(&event).len(), 1);
1139    }
1140
1141    #[test]
1142    fn webhook_trigger_rejects_different_path() {
1143        let sop = Sop {
1144            triggers: vec![SopTrigger::Webhook {
1145                path: "/sop/deploy".into(),
1146            }],
1147            ..test_sop("deploy-sop", SopExecutionMode::Auto, SopPriority::Normal)
1148        };
1149        let engine = engine_with_sops(vec![sop]);
1150
1151        // Path /webhook does NOT match /sop/deploy
1152        let event = SopEvent {
1153            source: SopTriggerSource::Webhook,
1154            topic: Some("/webhook".into()),
1155            payload: None,
1156            timestamp: now_iso8601(),
1157        };
1158        assert!(engine.match_trigger(&event).is_empty());
1159
1160        // But /sop/deploy matches /sop/deploy
1161        let event = SopEvent {
1162            source: SopTriggerSource::Webhook,
1163            topic: Some("/sop/deploy".into()),
1164            payload: None,
1165            timestamp: now_iso8601(),
1166        };
1167        assert_eq!(engine.match_trigger(&event).len(), 1);
1168    }
1169
1170    // ── Cron trigger matching ─────────────────────────
1171
1172    #[test]
1173    fn cron_trigger_matches_only_matching_expression() {
1174        let sop = Sop {
1175            triggers: vec![SopTrigger::Cron {
1176                expression: "0 */5 * * *".into(),
1177            }],
1178            ..test_sop("cron-sop", SopExecutionMode::Auto, SopPriority::Normal)
1179        };
1180        let engine = engine_with_sops(vec![sop]);
1181
1182        // Matching expression
1183        let event = SopEvent {
1184            source: SopTriggerSource::Cron,
1185            topic: Some("0 */5 * * *".into()),
1186            payload: None,
1187            timestamp: now_iso8601(),
1188        };
1189        assert_eq!(engine.match_trigger(&event).len(), 1);
1190
1191        // Different expression — should NOT match
1192        let event = SopEvent {
1193            source: SopTriggerSource::Cron,
1194            topic: Some("0 */10 * * *".into()),
1195            payload: None,
1196            timestamp: now_iso8601(),
1197        };
1198        assert!(engine.match_trigger(&event).is_empty());
1199
1200        // No topic — should NOT match
1201        let event = SopEvent {
1202            source: SopTriggerSource::Cron,
1203            topic: None,
1204            payload: None,
1205            timestamp: now_iso8601(),
1206        };
1207        assert!(engine.match_trigger(&event).is_empty());
1208    }
1209
1210    // ── Condition-based trigger matching ────────────────
1211
1212    #[test]
1213    fn mqtt_condition_filters_by_payload() {
1214        let sop = Sop {
1215            triggers: vec![SopTrigger::Mqtt {
1216                topic: "sensors/pressure".into(),
1217                condition: Some("$.value > 85".into()),
1218            }],
1219            ..test_sop("cond-sop", SopExecutionMode::Auto, SopPriority::Critical)
1220        };
1221        let engine = engine_with_sops(vec![sop]);
1222
1223        // Payload meets condition
1224        let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 90}"#));
1225        assert_eq!(matches.len(), 1);
1226
1227        // Payload does not meet condition
1228        let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 50}"#));
1229        assert!(matches.is_empty());
1230    }
1231
1232    #[test]
1233    fn mqtt_no_condition_matches_any_payload() {
1234        let sop = Sop {
1235            triggers: vec![SopTrigger::Mqtt {
1236                topic: "sensors/temp".into(),
1237                condition: None,
1238            }],
1239            ..test_sop("no-cond", SopExecutionMode::Auto, SopPriority::Normal)
1240        };
1241        let engine = engine_with_sops(vec![sop]);
1242
1243        let matches = engine.match_trigger(&mqtt_event("sensors/temp", "anything"));
1244        assert_eq!(matches.len(), 1);
1245    }
1246
1247    #[test]
1248    fn mqtt_condition_no_payload_fails_closed() {
1249        let sop = Sop {
1250            triggers: vec![SopTrigger::Mqtt {
1251                topic: "sensors/temp".into(),
1252                condition: Some("$.value > 0".into()),
1253            }],
1254            ..test_sop("no-payload", SopExecutionMode::Auto, SopPriority::Normal)
1255        };
1256        let engine = engine_with_sops(vec![sop]);
1257
1258        // Event with no payload
1259        let event = SopEvent {
1260            source: SopTriggerSource::Mqtt,
1261            topic: Some("sensors/temp".into()),
1262            payload: None,
1263            timestamp: now_iso8601(),
1264        };
1265        assert!(engine.match_trigger(&event).is_empty());
1266    }
1267
1268    #[test]
1269    fn peripheral_condition_filters_by_payload() {
1270        let sop = Sop {
1271            triggers: vec![SopTrigger::Peripheral {
1272                board: "nucleo".into(),
1273                signal: "pin_3".into(),
1274                condition: Some("> 0".into()),
1275            }],
1276            ..test_sop("periph-cond", SopExecutionMode::Auto, SopPriority::High)
1277        };
1278        let engine = engine_with_sops(vec![sop]);
1279
1280        // Positive signal
1281        let event = SopEvent {
1282            source: SopTriggerSource::Peripheral,
1283            topic: Some("nucleo/pin_3".into()),
1284            payload: Some("1".into()),
1285            timestamp: now_iso8601(),
1286        };
1287        assert_eq!(engine.match_trigger(&event).len(), 1);
1288
1289        // Zero signal — does not meet condition
1290        let event = SopEvent {
1291            source: SopTriggerSource::Peripheral,
1292            topic: Some("nucleo/pin_3".into()),
1293            payload: Some("0".into()),
1294            timestamp: now_iso8601(),
1295        };
1296        assert!(engine.match_trigger(&event).is_empty());
1297    }
1298
1299    #[test]
1300    fn peripheral_no_condition_matches_any() {
1301        let sop = Sop {
1302            triggers: vec![SopTrigger::Peripheral {
1303                board: "rpi".into(),
1304                signal: "gpio_5".into(),
1305                condition: None,
1306            }],
1307            ..test_sop("periph-nocond", SopExecutionMode::Auto, SopPriority::Normal)
1308        };
1309        let engine = engine_with_sops(vec![sop]);
1310
1311        let event = SopEvent {
1312            source: SopTriggerSource::Peripheral,
1313            topic: Some("rpi/gpio_5".into()),
1314            payload: Some("0".into()),
1315            timestamp: now_iso8601(),
1316        };
1317        assert_eq!(engine.match_trigger(&event).len(), 1);
1318    }
1319
1320    // ── Run lifecycle ───────────────────────────────────
1321
1322    #[test]
1323    fn start_run_returns_first_step() {
1324        let mut engine = engine_with_sops(vec![test_sop(
1325            "s1",
1326            SopExecutionMode::Auto,
1327            SopPriority::Normal,
1328        )]);
1329        let action = engine.start_run("s1", manual_event()).unwrap();
1330        let run_id = extract_run_id(&action);
1331        assert!(run_id.starts_with("run-"));
1332        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1333        assert_eq!(engine.active_runs().len(), 1);
1334    }
1335
1336    #[test]
1337    fn start_run_unknown_sop_fails() {
1338        let mut engine = engine_with_sops(vec![]);
1339        assert!(engine.start_run("nonexistent", manual_event()).is_err());
1340    }
1341
1342    #[test]
1343    fn advance_step_to_completion() {
1344        let mut engine = engine_with_sops(vec![test_sop(
1345            "s1",
1346            SopExecutionMode::Auto,
1347            SopPriority::Normal,
1348        )]);
1349        let action = engine.start_run("s1", manual_event()).unwrap();
1350        let run_id = extract_run_id(&action).to_string();
1351
1352        // Complete step 1
1353        let action = engine
1354            .advance_step(
1355                &run_id,
1356                SopStepResult {
1357                    step_number: 1,
1358                    status: SopStepStatus::Completed,
1359                    output: "done".into(),
1360                    started_at: now_iso8601(),
1361                    completed_at: Some(now_iso8601()),
1362                },
1363            )
1364            .unwrap();
1365
1366        // Should get step 2
1367        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1368
1369        // Complete step 2
1370        let action = engine
1371            .advance_step(
1372                &run_id,
1373                SopStepResult {
1374                    step_number: 2,
1375                    status: SopStepStatus::Completed,
1376                    output: "done".into(),
1377                    started_at: now_iso8601(),
1378                    completed_at: Some(now_iso8601()),
1379                },
1380            )
1381            .unwrap();
1382
1383        assert!(matches!(action, SopRunAction::Completed { .. }));
1384        assert!(engine.active_runs().is_empty());
1385        assert_eq!(engine.finished_runs(None).len(), 1);
1386    }
1387
1388    #[test]
1389    fn step_failure_ends_run() {
1390        let mut engine = engine_with_sops(vec![test_sop(
1391            "s1",
1392            SopExecutionMode::Auto,
1393            SopPriority::Normal,
1394        )]);
1395        let action = engine.start_run("s1", manual_event()).unwrap();
1396        let run_id = extract_run_id(&action).to_string();
1397
1398        let action = engine
1399            .advance_step(
1400                &run_id,
1401                SopStepResult {
1402                    step_number: 1,
1403                    status: SopStepStatus::Failed,
1404                    output: "valve stuck".into(),
1405                    started_at: now_iso8601(),
1406                    completed_at: Some(now_iso8601()),
1407                },
1408            )
1409            .unwrap();
1410
1411        assert!(
1412            matches!(action, SopRunAction::Failed { ref reason, .. } if reason.contains("valve stuck"))
1413        );
1414        assert!(engine.active_runs().is_empty());
1415    }
1416
1417    #[test]
1418    fn cancel_run() {
1419        let mut engine = engine_with_sops(vec![test_sop(
1420            "s1",
1421            SopExecutionMode::Auto,
1422            SopPriority::Normal,
1423        )]);
1424        let action = engine.start_run("s1", manual_event()).unwrap();
1425        let run_id = extract_run_id(&action).to_string();
1426        engine.cancel_run(&run_id).unwrap();
1427        assert!(engine.active_runs().is_empty());
1428        let finished = engine.finished_runs(None);
1429        assert_eq!(finished[0].status, SopRunStatus::Cancelled);
1430    }
1431
1432    #[test]
1433    fn cancel_unknown_run_fails() {
1434        let mut engine = engine_with_sops(vec![]);
1435        assert!(engine.cancel_run("nonexistent").is_err());
1436    }
1437
1438    // ── Concurrency ─────────────────────────────────────
1439
1440    #[test]
1441    fn per_sop_concurrency_limit() {
1442        let mut engine = engine_with_sops(vec![test_sop(
1443            "s1",
1444            SopExecutionMode::Auto,
1445            SopPriority::Normal,
1446        )]);
1447        // max_concurrent = 1 by default
1448        engine.start_run("s1", manual_event()).unwrap();
1449        assert!(!engine.can_start("s1"));
1450        assert!(engine.start_run("s1", manual_event()).is_err());
1451    }
1452
1453    #[test]
1454    fn global_concurrency_limit() {
1455        let sops = vec![
1456            test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal),
1457            test_sop("s2", SopExecutionMode::Auto, SopPriority::Normal),
1458        ];
1459        let mut engine = SopEngine::new(SopConfig {
1460            max_concurrent_total: 1,
1461            ..SopConfig::default()
1462        });
1463        engine.sops = sops;
1464
1465        engine.start_run("s1", manual_event()).unwrap();
1466        assert!(!engine.can_start("s2"));
1467    }
1468
1469    // ── Cooldown ────────────────────────────────────────
1470
1471    #[test]
1472    fn cooldown_blocks_immediate_restart() {
1473        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1474        sop.cooldown_secs = 3600; // 1 hour
1475        let mut engine = engine_with_sops(vec![sop]);
1476
1477        let action = engine.start_run("s1", manual_event()).unwrap();
1478        let run_id = extract_run_id(&action).to_string();
1479        // Complete both steps
1480        engine
1481            .advance_step(
1482                &run_id,
1483                SopStepResult {
1484                    step_number: 1,
1485                    status: SopStepStatus::Completed,
1486                    output: "ok".into(),
1487                    started_at: now_iso8601(),
1488                    completed_at: Some(now_iso8601()),
1489                },
1490            )
1491            .unwrap();
1492        engine
1493            .advance_step(
1494                &run_id,
1495                SopStepResult {
1496                    step_number: 2,
1497                    status: SopStepStatus::Completed,
1498                    output: "ok".into(),
1499                    started_at: now_iso8601(),
1500                    completed_at: Some(now_iso8601()),
1501                },
1502            )
1503            .unwrap();
1504
1505        // Cooldown not elapsed — should block
1506        assert!(!engine.can_start("s1"));
1507    }
1508
1509    // ── Execution modes ─────────────────────────────────
1510
1511    #[test]
1512    fn auto_mode_executes_immediately() {
1513        let mut engine = engine_with_sops(vec![test_sop(
1514            "s1",
1515            SopExecutionMode::Auto,
1516            SopPriority::Normal,
1517        )]);
1518        let action = engine.start_run("s1", manual_event()).unwrap();
1519        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1520    }
1521
1522    #[test]
1523    fn supervised_mode_waits_on_first_step() {
1524        let mut engine = engine_with_sops(vec![test_sop(
1525            "s1",
1526            SopExecutionMode::Supervised,
1527            SopPriority::Normal,
1528        )]);
1529        let action = engine.start_run("s1", manual_event()).unwrap();
1530        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1531    }
1532
1533    #[test]
1534    fn step_by_step_waits_on_every_step() {
1535        let mut engine = engine_with_sops(vec![test_sop(
1536            "s1",
1537            SopExecutionMode::StepByStep,
1538            SopPriority::Normal,
1539        )]);
1540
1541        // Step 1: WaitApproval
1542        let action = engine.start_run("s1", manual_event()).unwrap();
1543        let run_id = extract_run_id(&action).to_string();
1544        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1545
1546        // Approve step 1
1547        let action = engine.approve_step(&run_id).unwrap();
1548        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1549
1550        // Complete step 1, step 2 should also WaitApproval
1551        let action = engine
1552            .advance_step(
1553                &run_id,
1554                SopStepResult {
1555                    step_number: 1,
1556                    status: SopStepStatus::Completed,
1557                    output: "ok".into(),
1558                    started_at: now_iso8601(),
1559                    completed_at: Some(now_iso8601()),
1560                },
1561            )
1562            .unwrap();
1563        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1564    }
1565
1566    #[test]
1567    fn priority_based_critical_auto() {
1568        let mut engine = engine_with_sops(vec![test_sop(
1569            "s1",
1570            SopExecutionMode::PriorityBased,
1571            SopPriority::Critical,
1572        )]);
1573        let action = engine.start_run("s1", manual_event()).unwrap();
1574        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1575    }
1576
1577    #[test]
1578    fn priority_based_normal_supervised() {
1579        let mut engine = engine_with_sops(vec![test_sop(
1580            "s1",
1581            SopExecutionMode::PriorityBased,
1582            SopPriority::Normal,
1583        )]);
1584        let action = engine.start_run("s1", manual_event()).unwrap();
1585        // Normal + PriorityBased → Supervised → WaitApproval on step 1
1586        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1587    }
1588
1589    #[test]
1590    fn requires_confirmation_overrides_auto() {
1591        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Critical);
1592        sop.steps[0].requires_confirmation = true;
1593        let mut engine = engine_with_sops(vec![sop]);
1594        let action = engine.start_run("s1", manual_event()).unwrap();
1595        // Even in Auto mode, requires_confirmation forces WaitApproval
1596        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1597    }
1598
1599    // ── Approve ─────────────────────────────────────────
1600
1601    #[test]
1602    fn approve_transitions_to_execute() {
1603        let mut engine = engine_with_sops(vec![test_sop(
1604            "s1",
1605            SopExecutionMode::Supervised,
1606            SopPriority::Normal,
1607        )]);
1608        let action = engine.start_run("s1", manual_event()).unwrap();
1609        let run_id = extract_run_id(&action).to_string();
1610
1611        // Run should be WaitingApproval
1612        let run = engine.active_runs().get(&run_id).unwrap();
1613        assert_eq!(run.status, SopRunStatus::WaitingApproval);
1614
1615        // Approve
1616        let action = engine.approve_step(&run_id).unwrap();
1617        assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1618
1619        let run = engine.active_runs().get(&run_id).unwrap();
1620        assert_eq!(run.status, SopRunStatus::Running);
1621    }
1622
1623    #[test]
1624    fn approve_non_waiting_fails() {
1625        let mut engine = engine_with_sops(vec![test_sop(
1626            "s1",
1627            SopExecutionMode::Auto,
1628            SopPriority::Normal,
1629        )]);
1630        let action = engine.start_run("s1", manual_event()).unwrap();
1631        let run_id = extract_run_id(&action).to_string();
1632        assert!(engine.approve_step(&run_id).is_err());
1633    }
1634
1635    // ── Context formatting ──────────────────────────────
1636
1637    #[test]
1638    fn step_context_includes_sop_name_and_step() {
1639        let sop = test_sop(
1640            "pump-shutdown",
1641            SopExecutionMode::Auto,
1642            SopPriority::Critical,
1643        );
1644        let run = SopRun {
1645            run_id: "run-001".into(),
1646            sop_name: "pump-shutdown".into(),
1647            trigger_event: manual_event(),
1648            status: SopRunStatus::Running,
1649            current_step: 1,
1650            total_steps: 2,
1651            started_at: now_iso8601(),
1652            completed_at: None,
1653            step_results: Vec::new(),
1654            waiting_since: None,
1655            llm_calls_saved: 0,
1656        };
1657        let ctx = format_step_context(&sop, &run, &sop.steps[0]);
1658        assert!(ctx.contains("pump-shutdown"));
1659        assert!(ctx.contains("Step 1 of 2"));
1660        assert!(ctx.contains("Step one"));
1661    }
1662
1663    // ── Get run (active + finished) ─────────────────────
1664
1665    #[test]
1666    fn get_run_finds_active_and_finished() {
1667        let mut engine = engine_with_sops(vec![test_sop(
1668            "s1",
1669            SopExecutionMode::Auto,
1670            SopPriority::Normal,
1671        )]);
1672        let action = engine.start_run("s1", manual_event()).unwrap();
1673        let run_id = extract_run_id(&action).to_string();
1674
1675        // Active
1676        assert!(engine.get_run(&run_id).is_some());
1677        assert_eq!(
1678            engine.get_run(&run_id).unwrap().status,
1679            SopRunStatus::Running
1680        );
1681
1682        // Complete
1683        engine
1684            .advance_step(
1685                &run_id,
1686                SopStepResult {
1687                    step_number: 1,
1688                    status: SopStepStatus::Completed,
1689                    output: "ok".into(),
1690                    started_at: now_iso8601(),
1691                    completed_at: Some(now_iso8601()),
1692                },
1693            )
1694            .unwrap();
1695        engine
1696            .advance_step(
1697                &run_id,
1698                SopStepResult {
1699                    step_number: 2,
1700                    status: SopStepStatus::Completed,
1701                    output: "ok".into(),
1702                    started_at: now_iso8601(),
1703                    completed_at: Some(now_iso8601()),
1704                },
1705            )
1706            .unwrap();
1707
1708        // Now finished — still findable
1709        assert!(engine.get_run(&run_id).is_some());
1710        assert_eq!(
1711            engine.get_run(&run_id).unwrap().status,
1712            SopRunStatus::Completed
1713        );
1714
1715        // Unknown
1716        assert!(engine.get_run("nonexistent").is_none());
1717    }
1718
1719    // ── ISO-8601 helpers ────────────────────────────────
1720
1721    #[test]
1722    fn iso8601_roundtrip() {
1723        let ts = now_iso8601();
1724        let secs = parse_iso8601_secs(&ts);
1725        assert!(secs.is_some());
1726        // Should be close to current time
1727        let now = std::time::SystemTime::now()
1728            .duration_since(std::time::UNIX_EPOCH)
1729            .unwrap()
1730            .as_secs();
1731        assert!(now.abs_diff(secs.unwrap()) < 2);
1732    }
1733
1734    #[test]
1735    fn parse_known_timestamp() {
1736        // 2026-01-01T00:00:00Z
1737        let secs = parse_iso8601_secs("2026-01-01T00:00:00Z").unwrap();
1738        // Jan 1 2026 = 20454 days since epoch * 86400
1739        assert_eq!(secs, 20454 * 86400);
1740    }
1741
1742    // ── Approval timeout ─────────────────────────────────
1743
1744    #[test]
1745    fn timeout_auto_approves_critical() {
1746        let mut engine = SopEngine::new(SopConfig {
1747            approval_timeout_secs: 1, // 1 second for test
1748            ..SopConfig::default()
1749        });
1750        let mut sop = test_sop("s1", SopExecutionMode::Supervised, SopPriority::Critical);
1751        // PriorityBased would auto-execute critical, so use Supervised to force WaitApproval
1752        sop.execution_mode = SopExecutionMode::Supervised;
1753        engine.set_sops_for_test(vec![sop]);
1754
1755        let action = engine.start_run("s1", manual_event()).unwrap();
1756        let run_id = extract_run_id(&action).to_string();
1757        assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1758
1759        // Manually backdate waiting_since to simulate timeout
1760        let run = engine.active_runs.get_mut(&run_id).unwrap();
1761        run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1762
1763        let actions = engine.check_approval_timeouts();
1764        assert_eq!(actions.len(), 1);
1765        assert!(matches!(actions[0], SopRunAction::ExecuteStep { .. }));
1766    }
1767
1768    #[test]
1769    fn timeout_does_not_auto_approve_normal() {
1770        let mut engine = SopEngine::new(SopConfig {
1771            approval_timeout_secs: 1,
1772            ..SopConfig::default()
1773        });
1774        engine.set_sops_for_test(vec![test_sop(
1775            "s1",
1776            SopExecutionMode::Supervised,
1777            SopPriority::Normal,
1778        )]);
1779
1780        let action = engine.start_run("s1", manual_event()).unwrap();
1781        let run_id = extract_run_id(&action).to_string();
1782
1783        // Backdate waiting_since
1784        let run = engine.active_runs.get_mut(&run_id).unwrap();
1785        run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1786
1787        // Normal priority → no auto-approve
1788        let actions = engine.check_approval_timeouts();
1789        assert!(actions.is_empty());
1790        // Run should still be WaitingApproval
1791        assert_eq!(
1792            engine.get_run(&run_id).unwrap().status,
1793            SopRunStatus::WaitingApproval
1794        );
1795    }
1796
1797    #[test]
1798    fn timeout_zero_disables_check() {
1799        let mut engine = SopEngine::new(SopConfig {
1800            approval_timeout_secs: 0,
1801            ..SopConfig::default()
1802        });
1803        engine.set_sops_for_test(vec![test_sop(
1804            "s1",
1805            SopExecutionMode::Supervised,
1806            SopPriority::Critical,
1807        )]);
1808        let action = engine.start_run("s1", manual_event()).unwrap();
1809        let run_id = extract_run_id(&action).to_string();
1810
1811        let run = engine.active_runs.get_mut(&run_id).unwrap();
1812        run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1813
1814        let actions = engine.check_approval_timeouts();
1815        assert!(actions.is_empty());
1816    }
1817
1818    #[test]
1819    fn waiting_since_set_on_wait_approval() {
1820        let mut engine = engine_with_sops(vec![test_sop(
1821            "s1",
1822            SopExecutionMode::Supervised,
1823            SopPriority::Normal,
1824        )]);
1825        let action = engine.start_run("s1", manual_event()).unwrap();
1826        let run_id = extract_run_id(&action).to_string();
1827
1828        let run = engine.get_run(&run_id).unwrap();
1829        assert_eq!(run.status, SopRunStatus::WaitingApproval);
1830        assert!(run.waiting_since.is_some());
1831    }
1832
1833    // ── Eviction ──────────────────────────────────────
1834
1835    #[test]
1836    fn max_finished_runs_evicts_oldest() {
1837        let mut engine = SopEngine::new(SopConfig {
1838            max_finished_runs: 2,
1839            ..SopConfig::default()
1840        });
1841        // SOP with 1 step so each run completes in one advance
1842        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1843        sop.steps = vec![sop.steps[0].clone()];
1844        sop.max_concurrent = 10;
1845        engine.sops = vec![sop];
1846
1847        // Complete 3 runs
1848        let mut finished_ids = Vec::new();
1849        for _ in 0..3 {
1850            let action = engine.start_run("s1", manual_event()).unwrap();
1851            let rid = extract_run_id(&action).to_string();
1852            engine
1853                .advance_step(
1854                    &rid,
1855                    SopStepResult {
1856                        step_number: 1,
1857                        status: SopStepStatus::Completed,
1858                        output: "ok".into(),
1859                        started_at: now_iso8601(),
1860                        completed_at: Some(now_iso8601()),
1861                    },
1862                )
1863                .unwrap();
1864            finished_ids.push(rid);
1865        }
1866
1867        // Only 2 should be kept (max_finished_runs=2)
1868        let finished = engine.finished_runs(None);
1869        assert_eq!(
1870            finished.len(),
1871            2,
1872            "eviction should cap at max_finished_runs"
1873        );
1874        // Oldest (first) run should be evicted, newest two remain
1875        assert_eq!(finished[0].run_id, finished_ids[1]);
1876        assert_eq!(finished[1].run_id, finished_ids[2]);
1877    }
1878
1879    #[test]
1880    fn max_finished_runs_zero_means_unlimited() {
1881        let mut engine = SopEngine::new(SopConfig {
1882            max_finished_runs: 0,
1883            ..SopConfig::default()
1884        });
1885        let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1886        sop.steps = vec![sop.steps[0].clone()];
1887        sop.max_concurrent = 10;
1888        engine.sops = vec![sop];
1889
1890        for _ in 0..5 {
1891            let action = engine.start_run("s1", manual_event()).unwrap();
1892            let rid = extract_run_id(&action).to_string();
1893            engine
1894                .advance_step(
1895                    &rid,
1896                    SopStepResult {
1897                        step_number: 1,
1898                        status: SopStepStatus::Completed,
1899                        output: "ok".into(),
1900                        started_at: now_iso8601(),
1901                        completed_at: Some(now_iso8601()),
1902                    },
1903                )
1904                .unwrap();
1905        }
1906
1907        assert_eq!(engine.finished_runs(None).len(), 5, "zero means unlimited");
1908    }
1909
1910    #[test]
1911    fn waiting_since_cleared_on_approve() {
1912        let mut engine = engine_with_sops(vec![test_sop(
1913            "s1",
1914            SopExecutionMode::Supervised,
1915            SopPriority::Normal,
1916        )]);
1917        let action = engine.start_run("s1", manual_event()).unwrap();
1918        let run_id = extract_run_id(&action).to_string();
1919        engine.approve_step(&run_id).unwrap();
1920
1921        let run = engine.get_run(&run_id).unwrap();
1922        assert_eq!(run.status, SopRunStatus::Running);
1923        assert!(run.waiting_since.is_none());
1924    }
1925
1926    // ── Deterministic execution ─────────────────────────
1927
1928    fn deterministic_sop(name: &str) -> Sop {
1929        Sop {
1930            name: name.into(),
1931            description: format!("Deterministic SOP: {name}"),
1932            version: "1.0.0".into(),
1933            priority: SopPriority::Normal,
1934            execution_mode: SopExecutionMode::Deterministic,
1935            triggers: vec![SopTrigger::Manual],
1936            steps: vec![
1937                SopStep {
1938                    number: 1,
1939                    title: "Step one".into(),
1940                    body: "Do step one".into(),
1941                    suggested_tools: vec![],
1942                    requires_confirmation: false,
1943                    kind: SopStepKind::Execute,
1944                    schema: None,
1945                },
1946                SopStep {
1947                    number: 2,
1948                    title: "Checkpoint".into(),
1949                    body: "Pause for approval".into(),
1950                    suggested_tools: vec![],
1951                    requires_confirmation: false,
1952                    kind: SopStepKind::Checkpoint,
1953                    schema: None,
1954                },
1955                SopStep {
1956                    number: 3,
1957                    title: "Step three".into(),
1958                    body: "Final step".into(),
1959                    suggested_tools: vec![],
1960                    requires_confirmation: false,
1961                    kind: SopStepKind::Execute,
1962                    schema: None,
1963                },
1964            ],
1965            cooldown_secs: 0,
1966            max_concurrent: 1,
1967            location: None,
1968            deterministic: true,
1969        }
1970    }
1971
1972    #[test]
1973    fn deterministic_start_returns_deterministic_step() {
1974        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
1975        let action = engine.start_run("det-sop", manual_event()).unwrap();
1976        assert!(
1977            matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 1),
1978            "First action should be DeterministicStep for step 1"
1979        );
1980        let run_id = extract_run_id(&action).to_string();
1981        assert!(run_id.starts_with("det-"));
1982    }
1983
1984    #[test]
1985    fn deterministic_start_routes_through_start_run() {
1986        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
1987        // start_run should auto-route to start_deterministic_run
1988        let action = engine.start_run("det-sop", manual_event()).unwrap();
1989        assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
1990    }
1991
1992    #[test]
1993    fn deterministic_advance_pipes_output() {
1994        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
1995        let action = engine.start_run("det-sop", manual_event()).unwrap();
1996        let run_id = extract_run_id(&action).to_string();
1997
1998        // Advance step 1 with output
1999        let output = serde_json::json!({"result": "step1_done"});
2000        let action = engine
2001            .advance_deterministic_step(&run_id, output.clone())
2002            .unwrap();
2003
2004        // Step 2 is a checkpoint — should pause
2005        assert!(
2006            matches!(action, SopRunAction::CheckpointWait { ref step, .. } if step.number == 2),
2007            "Step 2 (checkpoint) should return CheckpointWait"
2008        );
2009    }
2010
2011    #[test]
2012    fn deterministic_checkpoint_pauses_run() {
2013        let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2014        let action = engine.start_run("det-sop", manual_event()).unwrap();
2015        let run_id = extract_run_id(&action).to_string();
2016
2017        // Complete step 1
2018        let action = engine
2019            .advance_deterministic_step(&run_id, serde_json::json!({"ok": true}))
2020            .unwrap();
2021
2022        // Should be at checkpoint
2023        assert!(matches!(action, SopRunAction::CheckpointWait { .. }));
2024
2025        // Run should be PausedCheckpoint
2026        let run = engine.get_run(&run_id).unwrap();
2027        assert_eq!(run.status, SopRunStatus::PausedCheckpoint);
2028        assert!(run.waiting_since.is_some());
2029    }
2030
2031    #[test]
2032    fn deterministic_completion_tracks_savings() {
2033        let mut sop = deterministic_sop("det-sop");
2034        // Simplify: 2 execute steps, no checkpoint
2035        sop.steps = vec![
2036            SopStep {
2037                number: 1,
2038                title: "Step one".into(),
2039                body: "Do it".into(),
2040                suggested_tools: vec![],
2041                requires_confirmation: false,
2042                kind: SopStepKind::Execute,
2043                schema: None,
2044            },
2045            SopStep {
2046                number: 2,
2047                title: "Step two".into(),
2048                body: "Do it too".into(),
2049                suggested_tools: vec![],
2050                requires_confirmation: false,
2051                kind: SopStepKind::Execute,
2052                schema: None,
2053            },
2054        ];
2055        let mut engine = engine_with_sops(vec![sop]);
2056
2057        let action = engine.start_run("det-sop", manual_event()).unwrap();
2058        let run_id = extract_run_id(&action).to_string();
2059
2060        // Complete step 1
2061        let action = engine
2062            .advance_deterministic_step(&run_id, serde_json::json!("s1"))
2063            .unwrap();
2064        assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
2065
2066        // Complete step 2
2067        let action = engine
2068            .advance_deterministic_step(&run_id, serde_json::json!("s2"))
2069            .unwrap();
2070        assert!(matches!(action, SopRunAction::Completed { .. }));
2071
2072        // Check savings
2073        let savings = engine.deterministic_savings();
2074        assert_eq!(savings.total_runs, 1);
2075        assert_eq!(savings.total_llm_calls_saved, 2);
2076    }
2077
2078    #[test]
2079    fn deterministic_non_deterministic_sop_rejected() {
2080        let mut engine = engine_with_sops(vec![test_sop(
2081            "s1",
2082            SopExecutionMode::Auto,
2083            SopPriority::Normal,
2084        )]);
2085        let result = engine.start_deterministic_run("s1", manual_event());
2086        assert!(result.is_err());
2087        assert!(
2088            result
2089                .unwrap_err()
2090                .to_string()
2091                .contains("not in deterministic mode")
2092        );
2093    }
2094}