1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::path::{Path, PathBuf};
4
5use anyhow::{Result, bail};
6use tracing::{info, warn};
7
8use super::condition::evaluate_condition;
9use super::load_sops;
10use super::types::{
11 DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
12 SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
13 SopTrigger, SopTriggerSource,
14};
15use crate::config::SopConfig;
16
17pub struct SopEngine {
19 sops: Vec<Sop>,
20 active_runs: HashMap<String, SopRun>,
21 finished_runs: Vec<SopRun>,
23 config: SopConfig,
24 run_counter: u64,
25 deterministic_savings: DeterministicSavings,
27}
28
29impl SopEngine {
30 pub fn new(config: SopConfig) -> Self {
32 Self {
33 sops: Vec::new(),
34 active_runs: HashMap::new(),
35 finished_runs: Vec::new(),
36 config,
37 run_counter: 0,
38 deterministic_savings: DeterministicSavings::default(),
39 }
40 }
41
42 pub fn reload(&mut self, workspace_dir: &Path) {
44 self.sops = load_sops(
45 workspace_dir,
46 self.config.sops_dir.as_deref(),
47 super::parse_execution_mode(&self.config.default_execution_mode),
48 );
49 info!("SOP engine loaded {} SOPs", self.sops.len());
50 }
51
52 pub fn sops(&self) -> &[Sop] {
54 &self.sops
55 }
56
57 pub fn active_runs(&self) -> &HashMap<String, SopRun> {
59 &self.active_runs
60 }
61
62 pub fn get_run(&self, run_id: &str) -> Option<&SopRun> {
64 self.active_runs
65 .get(run_id)
66 .or_else(|| self.finished_runs.iter().find(|r| r.run_id == run_id))
67 }
68
69 pub fn get_sop(&self, name: &str) -> Option<&Sop> {
71 self.sops.iter().find(|s| s.name == name)
72 }
73
74 pub fn match_trigger(&self, event: &SopEvent) -> Vec<&Sop> {
79 self.sops
80 .iter()
81 .filter(|sop| sop.triggers.iter().any(|t| trigger_matches(t, event)))
82 .collect()
83 }
84
85 pub fn can_start(&self, sop_name: &str) -> bool {
90 let sop = match self.get_sop(sop_name) {
91 Some(s) => s,
92 None => return false,
93 };
94
95 let active_for_sop = self
97 .active_runs
98 .values()
99 .filter(|r| r.sop_name == sop_name)
100 .count();
101 if active_for_sop >= sop.max_concurrent as usize {
102 return false;
103 }
104
105 if self.active_runs.len() >= self.config.max_concurrent_total {
107 return false;
108 }
109
110 if sop.cooldown_secs > 0 {
112 if let Some(last) = self.last_finished_run(sop_name) {
113 if let Some(ref completed_at) = last.completed_at {
114 if !cooldown_elapsed(completed_at, sop.cooldown_secs) {
115 return false;
116 }
117 }
118 }
119 }
120
121 true
122 }
123
124 pub fn start_run(&mut self, sop_name: &str, event: SopEvent) -> Result<SopRunAction> {
127 if self.get_sop(sop_name).map_or(false, |s| {
129 s.execution_mode == SopExecutionMode::Deterministic
130 }) {
131 return self.start_deterministic_run(sop_name, event);
132 }
133
134 let sop = self
135 .get_sop(sop_name)
136 .ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
137 .clone();
138
139 if !self.can_start(sop_name) {
140 bail!(
141 "Cannot start SOP '{}': cooldown or concurrency limit reached",
142 sop_name
143 );
144 }
145
146 if sop.steps.is_empty() {
147 bail!("SOP '{}' has no steps defined", sop_name);
148 }
149
150 self.run_counter += 1;
151 let dur = std::time::SystemTime::now()
152 .duration_since(std::time::UNIX_EPOCH)
153 .unwrap_or_default();
154 let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
155 let run_id = format!("run-{epoch_ms}-{:04}", self.run_counter);
156 let now = now_iso8601();
157
158 let run = SopRun {
159 run_id: run_id.clone(),
160 sop_name: sop_name.to_string(),
161 trigger_event: event,
162 status: SopRunStatus::Running,
163 current_step: 1,
164 total_steps: u32::try_from(sop.steps.len()).unwrap_or(u32::MAX),
165 started_at: now,
166 completed_at: None,
167 step_results: Vec::new(),
168 waiting_since: None,
169 llm_calls_saved: 0,
170 };
171
172 self.active_runs.insert(run_id.clone(), run);
173
174 info!("SOP run {} started for '{}'", run_id, sop_name);
175
176 let step = sop.steps[0].clone();
178 let context = format_step_context(&sop, &self.active_runs[&run_id], &step);
179 let action = resolve_step_action(&sop, &step, run_id.clone(), context);
180
181 if matches!(action, SopRunAction::WaitApproval { .. }) {
183 if let Some(run) = self.active_runs.get_mut(&run_id) {
184 run.status = SopRunStatus::WaitingApproval;
185 run.waiting_since = Some(now_iso8601());
186 }
187 }
188
189 Ok(action)
190 }
191
192 pub fn advance_step(&mut self, run_id: &str, result: SopStepResult) -> Result<SopRunAction> {
195 let run = self
196 .active_runs
197 .get_mut(run_id)
198 .ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
199
200 let sop = self
201 .sops
202 .iter()
203 .find(|s| s.name == run.sop_name)
204 .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
205 .clone();
206
207 run.step_results.push(result.clone());
209
210 if result.status == SopStepStatus::Failed {
212 let reason = format!("Step {} failed: {}", result.step_number, result.output);
213 warn!("SOP run {run_id}: {reason}");
214 return Ok(self.finish_run(run_id, SopRunStatus::Failed, Some(reason)));
215 }
216
217 let next_step_num = run.current_step + 1;
219 if next_step_num > run.total_steps {
220 info!("SOP run {run_id} completed successfully");
222 return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
223 }
224
225 let run = self.active_runs.get_mut(run_id).unwrap();
227 run.current_step = next_step_num;
228
229 let step_idx = (next_step_num - 1) as usize;
230 let step = sop.steps[step_idx].clone();
231 let context = format_step_context(&sop, run, &step);
232 let run_id_str = run_id.to_string();
233 let action = resolve_step_action(&sop, &step, run_id_str.clone(), context);
234
235 if matches!(action, SopRunAction::WaitApproval { .. }) {
237 if let Some(run) = self.active_runs.get_mut(&run_id_str) {
238 run.status = SopRunStatus::WaitingApproval;
239 run.waiting_since = Some(now_iso8601());
240 }
241 }
242
243 Ok(action)
244 }
245
246 pub fn cancel_run(&mut self, run_id: &str) -> Result<()> {
248 if !self.active_runs.contains_key(run_id) {
249 bail!("Active run not found: {run_id}");
250 }
251 self.finish_run(run_id, SopRunStatus::Cancelled, None);
252 info!("SOP run {run_id} cancelled");
253 Ok(())
254 }
255
256 pub fn approve_step(&mut self, run_id: &str) -> Result<SopRunAction> {
258 let run = self
259 .active_runs
260 .get_mut(run_id)
261 .ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
262
263 if run.status != SopRunStatus::WaitingApproval {
264 bail!(
265 "Run {run_id} is not waiting for approval (status: {})",
266 run.status
267 );
268 }
269
270 run.status = SopRunStatus::Running;
271 run.waiting_since = None;
272
273 let sop = self
274 .sops
275 .iter()
276 .find(|s| s.name == run.sop_name)
277 .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
278 .clone();
279
280 let step_idx = (run.current_step - 1) as usize;
281 let step = sop.steps[step_idx].clone();
282 let context = format_step_context(&sop, run, &step);
283
284 Ok(SopRunAction::ExecuteStep {
285 run_id: run_id.to_string(),
286 step,
287 context,
288 })
289 }
290
291 pub fn finished_runs(&self, sop_name: Option<&str>) -> Vec<&SopRun> {
293 self.finished_runs
294 .iter()
295 .filter(|r| sop_name.map_or(true, |name| r.sop_name == name))
296 .collect()
297 }
298
299 pub fn deterministic_savings(&self) -> &DeterministicSavings {
301 &self.deterministic_savings
302 }
303
304 pub fn start_deterministic_run(
309 &mut self,
310 sop_name: &str,
311 event: SopEvent,
312 ) -> Result<SopRunAction> {
313 let sop = self
314 .get_sop(sop_name)
315 .ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
316 .clone();
317
318 if sop.execution_mode != SopExecutionMode::Deterministic {
319 bail!(
320 "SOP '{}' is not in deterministic mode (mode: {})",
321 sop_name,
322 sop.execution_mode
323 );
324 }
325
326 if !self.can_start(sop_name) {
327 bail!(
328 "Cannot start SOP '{}': cooldown or concurrency limit reached",
329 sop_name
330 );
331 }
332
333 if sop.steps.is_empty() {
334 bail!("SOP '{}' has no steps defined", sop_name);
335 }
336
337 self.run_counter += 1;
338 let dur = std::time::SystemTime::now()
339 .duration_since(std::time::UNIX_EPOCH)
340 .unwrap_or_default();
341 let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
342 let run_id = format!("det-{epoch_ms}-{:04}", self.run_counter);
343 let now = now_iso8601();
344
345 let total_steps = u32::try_from(sop.steps.len()).unwrap_or(u32::MAX);
346 let run = SopRun {
347 run_id: run_id.clone(),
348 sop_name: sop_name.to_string(),
349 trigger_event: event,
350 status: SopRunStatus::Running,
351 current_step: 1,
352 total_steps,
353 started_at: now,
354 completed_at: None,
355 step_results: Vec::new(),
356 waiting_since: None,
357 llm_calls_saved: 0,
358 };
359
360 self.active_runs.insert(run_id.clone(), run);
361 info!(
362 "Deterministic SOP run {} started for '{}'",
363 run_id, sop_name
364 );
365
366 let step = sop.steps[0].clone();
368 let input = serde_json::Value::Null;
369 self.resolve_deterministic_action(&sop, &run_id, &step, input)
370 }
371
372 pub fn advance_deterministic_step(
375 &mut self,
376 run_id: &str,
377 step_output: serde_json::Value,
378 ) -> Result<SopRunAction> {
379 let run = self
380 .active_runs
381 .get_mut(run_id)
382 .ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
383
384 let sop = self
385 .sops
386 .iter()
387 .find(|s| s.name == run.sop_name)
388 .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
389 .clone();
390
391 let now = now_iso8601();
393 let step_result = SopStepResult {
394 step_number: run.current_step,
395 status: SopStepStatus::Completed,
396 output: step_output.to_string(),
397 started_at: run.started_at.clone(),
398 completed_at: Some(now),
399 };
400 run.step_results.push(step_result);
401
402 run.llm_calls_saved += 1;
404
405 let next_step_num = run.current_step + 1;
407 if next_step_num > run.total_steps {
408 info!(
409 "Deterministic SOP run {run_id} completed ({} LLM calls saved)",
410 run.llm_calls_saved
411 );
412 let saved = run.llm_calls_saved;
413 self.deterministic_savings.total_llm_calls_saved += saved;
414 self.deterministic_savings.total_runs += 1;
415 return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
416 }
417
418 let run = self.active_runs.get_mut(run_id).unwrap();
419 run.current_step = next_step_num;
420
421 let step_idx = (next_step_num - 1) as usize;
422 let step = sop.steps[step_idx].clone();
423 let run_id_owned = run_id.to_string();
424
425 self.resolve_deterministic_action(&sop, &run_id_owned, &step, step_output)
426 }
427
428 pub fn resume_deterministic_run(
430 &mut self,
431 state: DeterministicRunState,
432 ) -> Result<SopRunAction> {
433 let run = self
434 .active_runs
435 .get_mut(&state.run_id)
436 .ok_or_else(|| anyhow::anyhow!("Active run not found: {}", state.run_id))?;
437
438 if run.status != SopRunStatus::PausedCheckpoint {
439 bail!(
440 "Run {} is not paused at checkpoint (status: {})",
441 state.run_id,
442 run.status
443 );
444 }
445
446 let sop = self
447 .sops
448 .iter()
449 .find(|s| s.name == run.sop_name)
450 .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
451 .clone();
452
453 run.status = SopRunStatus::Running;
454 run.waiting_since = None;
455 run.llm_calls_saved = state.llm_calls_saved;
456
457 let next_step_num = state.last_completed_step + 1;
459 if next_step_num > state.total_steps {
460 info!(
461 "Deterministic SOP run {} completed on resume ({} LLM calls saved)",
462 state.run_id, state.llm_calls_saved
463 );
464 self.deterministic_savings.total_llm_calls_saved += state.llm_calls_saved;
465 self.deterministic_savings.total_runs += 1;
466 return Ok(self.finish_run(&state.run_id, SopRunStatus::Completed, None));
467 }
468
469 let run = self.active_runs.get_mut(&state.run_id).unwrap();
470 run.current_step = next_step_num;
471
472 let step_idx = (next_step_num - 1) as usize;
473 let step = sop.steps[step_idx].clone();
474
475 let last_output = state
477 .step_outputs
478 .get(&state.last_completed_step)
479 .cloned()
480 .unwrap_or(serde_json::Value::Null);
481
482 let run_id = state.run_id.clone();
483 self.resolve_deterministic_action(&sop, &run_id, &step, last_output)
484 }
485
486 fn resolve_deterministic_action(
488 &mut self,
489 sop: &Sop,
490 run_id: &str,
491 step: &SopStep,
492 input: serde_json::Value,
493 ) -> Result<SopRunAction> {
494 if step.kind == SopStepKind::Checkpoint {
495 if let Some(run) = self.active_runs.get_mut(run_id) {
497 run.status = SopRunStatus::PausedCheckpoint;
498 run.waiting_since = Some(now_iso8601());
499 }
500
501 let state_file = self.persist_deterministic_state(run_id, sop)?;
502
503 info!(
504 "Deterministic SOP run {run_id}: checkpoint at step {} '{}', state persisted to {}",
505 step.number,
506 step.title,
507 state_file.display()
508 );
509
510 Ok(SopRunAction::CheckpointWait {
511 run_id: run_id.to_string(),
512 step: step.clone(),
513 state_file,
514 })
515 } else {
516 Ok(SopRunAction::DeterministicStep {
517 run_id: run_id.to_string(),
518 step: step.clone(),
519 input,
520 })
521 }
522 }
523
524 fn persist_deterministic_state(&self, run_id: &str, sop: &Sop) -> Result<PathBuf> {
526 let run = self
527 .active_runs
528 .get(run_id)
529 .ok_or_else(|| anyhow::anyhow!("Run not found: {run_id}"))?;
530
531 let mut step_outputs = HashMap::new();
532 for result in &run.step_results {
533 let value = serde_json::from_str(&result.output)
535 .unwrap_or_else(|_| serde_json::Value::String(result.output.clone()));
536 step_outputs.insert(result.step_number, value);
537 }
538
539 let state = DeterministicRunState {
540 run_id: run_id.to_string(),
541 sop_name: run.sop_name.clone(),
542 last_completed_step: run.current_step.saturating_sub(1),
543 total_steps: run.total_steps,
544 step_outputs,
545 persisted_at: now_iso8601(),
546 llm_calls_saved: run.llm_calls_saved,
547 paused_at_checkpoint: run.status == SopRunStatus::PausedCheckpoint,
548 };
549
550 let temp_dir = std::env::temp_dir();
552 let dir = sop
553 .location
554 .as_deref()
555 .unwrap_or_else(|| temp_dir.as_path());
556 let state_file = dir.join(format!("{run_id}.state.json"));
557 let json = serde_json::to_string_pretty(&state)?;
558 std::fs::write(&state_file, json)?;
559
560 Ok(state_file)
561 }
562
563 pub fn load_deterministic_state(path: &Path) -> Result<DeterministicRunState> {
565 let content = std::fs::read_to_string(path)?;
566 let state: DeterministicRunState = serde_json::from_str(&content)?;
567 Ok(state)
568 }
569
570 pub fn check_approval_timeouts(&mut self) -> Vec<SopRunAction> {
576 let timeout_secs = self.config.approval_timeout_secs;
577 if timeout_secs == 0 {
578 return Vec::new();
579 }
580
581 let timed_out: Vec<(String, bool)> = self
584 .active_runs
585 .values()
586 .filter(|r| r.status == SopRunStatus::WaitingApproval)
587 .filter(|r| {
588 r.waiting_since
589 .as_deref()
590 .map_or(false, |ts| cooldown_elapsed(ts, timeout_secs))
591 })
592 .map(|r| {
593 let is_critical = self
594 .sops
595 .iter()
596 .find(|s| s.name == r.sop_name)
597 .map_or(false, |s| {
598 matches!(s.priority, SopPriority::Critical | SopPriority::High)
599 });
600 (r.run_id.clone(), is_critical)
601 })
602 .collect();
603
604 let mut actions = Vec::new();
605 for (run_id, is_critical) in timed_out {
606 if is_critical {
607 info!(
609 "SOP run {run_id}: approval timeout — auto-approving (critical/high priority)"
610 );
611 match self.approve_step(&run_id) {
612 Ok(action) => actions.push(action),
613 Err(e) => warn!("SOP run {run_id}: auto-approve failed: {e}"),
614 }
615 } else {
616 info!("SOP run {run_id}: approval timeout — waiting indefinitely (non-critical)");
617 }
618 }
619
620 actions
621 }
622
623 #[cfg(test)]
627 pub(crate) fn set_sops_for_test(&mut self, sops: Vec<Sop>) {
628 self.sops = sops;
629 }
630
631 fn last_finished_run(&self, sop_name: &str) -> Option<&SopRun> {
634 self.finished_runs
635 .iter()
636 .rev()
637 .find(|r| r.sop_name == sop_name)
638 }
639
640 fn finish_run(
641 &mut self,
642 run_id: &str,
643 status: SopRunStatus,
644 reason: Option<String>,
645 ) -> SopRunAction {
646 let mut run = self.active_runs.remove(run_id).unwrap();
647 run.status = status;
648 run.completed_at = Some(now_iso8601());
649 let sop_name = run.sop_name.clone();
650 let run_id_owned = run.run_id.clone();
651 self.finished_runs.push(run);
652
653 let max = self.config.max_finished_runs;
655 if max > 0 && self.finished_runs.len() > max {
656 let excess = self.finished_runs.len() - max;
657 self.finished_runs.drain(..excess);
658 }
659
660 match status {
661 SopRunStatus::Failed => SopRunAction::Failed {
662 run_id: run_id_owned,
663 sop_name,
664 reason: reason.unwrap_or_default(),
665 },
666 _ => SopRunAction::Completed {
667 run_id: run_id_owned,
668 sop_name,
669 },
670 }
671 }
672}
673
674fn trigger_matches(trigger: &SopTrigger, event: &SopEvent) -> bool {
678 match (trigger, event.source) {
679 (SopTrigger::Mqtt { topic, condition }, SopTriggerSource::Mqtt) => {
680 let topic_match = event
681 .topic
682 .as_deref()
683 .map_or(false, |t| mqtt_topic_matches(topic, t));
684 if !topic_match {
685 return false;
686 }
687 match condition {
689 Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
690 None => true,
691 }
692 }
693
694 (SopTrigger::Webhook { path }, SopTriggerSource::Webhook) => {
695 event.topic.as_deref().map_or(false, |t| t == path)
696 }
697
698 (
699 SopTrigger::Peripheral {
700 board,
701 signal,
702 condition,
703 },
704 SopTriggerSource::Peripheral,
705 ) => {
706 let topic_match = event.topic.as_deref().map_or(false, |t| {
707 let expected = format!("{board}/{signal}");
708 t == expected
709 });
710 if !topic_match {
711 return false;
712 }
713 match condition {
715 Some(cond) => evaluate_condition(cond, event.payload.as_deref()),
716 None => true,
717 }
718 }
719
720 (SopTrigger::Cron { expression }, SopTriggerSource::Cron) => {
721 event.topic.as_deref().map_or(false, |t| t == expression)
722 }
723
724 (SopTrigger::Manual, SopTriggerSource::Manual) => true,
725
726 _ => false,
727 }
728}
729
730fn mqtt_topic_matches(pattern: &str, topic: &str) -> bool {
732 let pat_parts: Vec<&str> = pattern.split('/').collect();
733 let top_parts: Vec<&str> = topic.split('/').collect();
734
735 let mut pi = 0;
736 let mut ti = 0;
737
738 while pi < pat_parts.len() && ti < top_parts.len() {
739 match pat_parts[pi] {
740 "#" => return true, "+" => {
742 pi += 1;
744 ti += 1;
745 }
746 seg => {
747 if seg != top_parts[ti] {
748 return false;
749 }
750 pi += 1;
751 ti += 1;
752 }
753 }
754 }
755
756 pi == pat_parts.len() && ti == top_parts.len()
758}
759
760fn resolve_step_action(sop: &Sop, step: &SopStep, run_id: String, context: String) -> SopRunAction {
764 if step.requires_confirmation {
766 return SopRunAction::WaitApproval {
767 run_id,
768 step: step.clone(),
769 context,
770 };
771 }
772
773 let needs_approval = match sop.execution_mode {
774 SopExecutionMode::Auto | SopExecutionMode::Deterministic => false,
777 SopExecutionMode::Supervised => {
778 step.number == 1
780 }
781 SopExecutionMode::StepByStep => true,
782 SopExecutionMode::PriorityBased => match sop.priority {
783 SopPriority::Critical | SopPriority::High => false,
784 SopPriority::Normal | SopPriority::Low => {
785 step.number == 1
787 }
788 },
789 };
790
791 if needs_approval {
792 SopRunAction::WaitApproval {
793 run_id,
794 step: step.clone(),
795 context,
796 }
797 } else {
798 SopRunAction::ExecuteStep {
799 run_id,
800 step: step.clone(),
801 context,
802 }
803 }
804}
805
806fn format_step_context(sop: &Sop, run: &SopRun, step: &SopStep) -> String {
810 let mut ctx = format!(
811 "[SOP: {} (run {}) — Step {} of {}]\n\n",
812 sop.name, run.run_id, step.number, run.total_steps
813 );
814
815 let _ = writeln!(
816 ctx,
817 "Trigger: {} {}",
818 run.trigger_event.source,
819 run.trigger_event.topic.as_deref().unwrap_or("(no topic)")
820 );
821
822 if let Some(ref payload) = run.trigger_event.payload {
823 let _ = writeln!(ctx, "Payload: {payload}");
824 }
825
826 if let Some(prev) = run.step_results.last() {
828 let _ = writeln!(
829 ctx,
830 "Previous: Step {} {} — {}",
831 prev.step_number, prev.status, prev.output
832 );
833 }
834
835 let _ = write!(ctx, "\nCurrent step: **{}**\n{}\n", step.title, step.body);
836
837 if !step.suggested_tools.is_empty() {
838 let _ = write!(
839 ctx,
840 "\nSuggested tools: {}\n",
841 step.suggested_tools.join(", ")
842 );
843 }
844
845 ctx.push_str("\nWhen done, report your result.\n");
846
847 ctx
848}
849
850pub(crate) fn now_iso8601() -> String {
853 let now = std::time::SystemTime::now()
855 .duration_since(std::time::UNIX_EPOCH)
856 .unwrap_or_default();
857 let secs = now.as_secs();
859 let days = secs / 86400;
860 let time_secs = secs % 86400;
861 let hours = time_secs / 3600;
862 let minutes = (time_secs % 3600) / 60;
863 let seconds = time_secs % 60;
864
865 let (year, month, day) = days_to_ymd(days);
867 format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
868}
869
870fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
872 days += 719_468;
874 let era = days / 146_097;
875 let doe = days - era * 146_097;
876 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
877 let y = yoe + era * 400;
878 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
879 let mp = (5 * doy + 2) / 153;
880 let d = doy - (153 * mp + 2) / 5 + 1;
881 let m = if mp < 10 { mp + 3 } else { mp - 9 };
882 let y = if m <= 2 { y + 1 } else { y };
883 (y, m, d)
884}
885
886fn cooldown_elapsed(completed_at: &str, cooldown_secs: u64) -> bool {
888 let completed = parse_iso8601_secs(completed_at);
890 let now = std::time::SystemTime::now()
891 .duration_since(std::time::UNIX_EPOCH)
892 .unwrap_or_default()
893 .as_secs();
894
895 match completed {
896 Some(ts) => now.saturating_sub(ts) >= cooldown_secs,
897 None => true, }
899}
900
901fn parse_iso8601_secs(input: &str) -> Option<u64> {
903 let input = input.trim_end_matches('Z');
905 let parts: Vec<&str> = input.split('T').collect();
906 if parts.len() != 2 {
907 return None;
908 }
909 let date_parts: Vec<u64> = parts[0].split('-').filter_map(|p| p.parse().ok()).collect();
910 let time_parts: Vec<u64> = parts[1].split(':').filter_map(|p| p.parse().ok()).collect();
911 if date_parts.len() != 3 || time_parts.len() != 3 {
912 return None;
913 }
914 let (year, month, day) = (date_parts[0], date_parts[1], date_parts[2]);
915 let (hour, min, sec) = (time_parts[0], time_parts[1], time_parts[2]);
916
917 let year_adj = if month <= 2 { year - 1 } else { year };
919 let month_adj = if month > 2 { month - 3 } else { month + 9 };
920 let era = year_adj / 400;
921 let yoe = year_adj - era * 400;
922 let doy = (153 * month_adj + 2) / 5 + day - 1;
923 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
924 let days = era * 146_097 + doe - 719_468;
925
926 Some(days * 86400 + hour * 3600 + min * 60 + sec)
927}
928
929#[cfg(test)]
930mod tests {
931 use super::*;
932 use crate::sop::types::SopExecutionMode;
933
934 fn manual_event() -> SopEvent {
935 SopEvent {
936 source: SopTriggerSource::Manual,
937 topic: None,
938 payload: None,
939 timestamp: now_iso8601(),
940 }
941 }
942
943 fn mqtt_event(topic: &str, payload: &str) -> SopEvent {
944 SopEvent {
945 source: SopTriggerSource::Mqtt,
946 topic: Some(topic.into()),
947 payload: Some(payload.into()),
948 timestamp: now_iso8601(),
949 }
950 }
951
952 fn test_sop(name: &str, mode: SopExecutionMode, priority: SopPriority) -> Sop {
953 Sop {
954 name: name.into(),
955 description: format!("Test SOP: {name}"),
956 version: "1.0.0".into(),
957 priority,
958 execution_mode: mode,
959 triggers: vec![SopTrigger::Manual],
960 steps: vec![
961 SopStep {
962 number: 1,
963 title: "Step one".into(),
964 body: "Do step one".into(),
965 suggested_tools: vec!["shell".into()],
966 requires_confirmation: false,
967 kind: SopStepKind::default(),
968 schema: None,
969 },
970 SopStep {
971 number: 2,
972 title: "Step two".into(),
973 body: "Do step two".into(),
974 suggested_tools: vec![],
975 requires_confirmation: false,
976 kind: SopStepKind::default(),
977 schema: None,
978 },
979 ],
980 cooldown_secs: 0,
981 max_concurrent: 1,
982 location: None,
983 deterministic: false,
984 }
985 }
986
987 fn engine_with_sops(sops: Vec<Sop>) -> SopEngine {
988 let mut engine = SopEngine::new(SopConfig::default());
989 engine.sops = sops;
990 engine
991 }
992
993 fn extract_run_id(action: &SopRunAction) -> &str {
995 match action {
996 SopRunAction::ExecuteStep { run_id, .. }
997 | SopRunAction::WaitApproval { run_id, .. }
998 | SopRunAction::DeterministicStep { run_id, .. }
999 | SopRunAction::CheckpointWait { run_id, .. }
1000 | SopRunAction::Completed { run_id, .. }
1001 | SopRunAction::Failed { run_id, .. } => run_id,
1002 }
1003 }
1004
1005 fn first_active_run_id(engine: &SopEngine) -> String {
1007 engine
1008 .active_runs()
1009 .keys()
1010 .next()
1011 .expect("expected at least one active run")
1012 .clone()
1013 }
1014
1015 #[test]
1018 fn match_manual_trigger() {
1019 let engine = engine_with_sops(vec![test_sop(
1020 "s1",
1021 SopExecutionMode::Auto,
1022 SopPriority::Normal,
1023 )]);
1024 let matches = engine.match_trigger(&manual_event());
1025 assert_eq!(matches.len(), 1);
1026 assert_eq!(matches[0].name, "s1");
1027 }
1028
1029 #[test]
1030 fn no_match_for_wrong_source() {
1031 let engine = engine_with_sops(vec![test_sop(
1032 "s1",
1033 SopExecutionMode::Auto,
1034 SopPriority::Normal,
1035 )]);
1036 let event = mqtt_event("sensors/temp", "{}");
1037 let matches = engine.match_trigger(&event);
1038 assert!(matches.is_empty());
1039 }
1040
1041 #[test]
1042 fn match_mqtt_trigger_exact() {
1043 let sop = Sop {
1044 triggers: vec![SopTrigger::Mqtt {
1045 topic: "plant/pump/pressure".into(),
1046 condition: None,
1047 }],
1048 ..test_sop(
1049 "pressure-sop",
1050 SopExecutionMode::Auto,
1051 SopPriority::Critical,
1052 )
1053 };
1054 let engine = engine_with_sops(vec![sop]);
1055 let matches = engine.match_trigger(&mqtt_event("plant/pump/pressure", "87.3"));
1056 assert_eq!(matches.len(), 1);
1057 }
1058
1059 #[test]
1060 fn match_mqtt_wildcard_plus() {
1061 let sop = Sop {
1062 triggers: vec![SopTrigger::Mqtt {
1063 topic: "plant/+/pressure".into(),
1064 condition: None,
1065 }],
1066 ..test_sop("wildcard-sop", SopExecutionMode::Auto, SopPriority::Normal)
1067 };
1068 let engine = engine_with_sops(vec![sop]);
1069 assert_eq!(
1070 engine
1071 .match_trigger(&mqtt_event("plant/pump_3/pressure", "87"))
1072 .len(),
1073 1
1074 );
1075 assert!(
1076 engine
1077 .match_trigger(&mqtt_event("plant/pump_3/temperature", "50"))
1078 .is_empty()
1079 );
1080 }
1081
1082 #[test]
1083 fn match_mqtt_wildcard_hash() {
1084 let sop = Sop {
1085 triggers: vec![SopTrigger::Mqtt {
1086 topic: "plant/#".into(),
1087 condition: None,
1088 }],
1089 ..test_sop("hash-sop", SopExecutionMode::Auto, SopPriority::Normal)
1090 };
1091 let engine = engine_with_sops(vec![sop]);
1092 assert_eq!(
1093 engine
1094 .match_trigger(&mqtt_event("plant/pump/pressure", "87"))
1095 .len(),
1096 1
1097 );
1098 assert_eq!(
1099 engine
1100 .match_trigger(&mqtt_event("plant/a/b/c/d", "x"))
1101 .len(),
1102 1
1103 );
1104 }
1105
1106 #[test]
1107 fn mqtt_topic_matching_edge_cases() {
1108 assert!(mqtt_topic_matches("a/b/c", "a/b/c"));
1109 assert!(!mqtt_topic_matches("a/b/c", "a/b/d"));
1110 assert!(!mqtt_topic_matches("a/b/c", "a/b"));
1111 assert!(!mqtt_topic_matches("a/b", "a/b/c"));
1112 assert!(mqtt_topic_matches("+/+/+", "a/b/c"));
1113 assert!(!mqtt_topic_matches("+/+", "a/b/c"));
1114 assert!(mqtt_topic_matches("#", "a/b/c"));
1115 assert!(mqtt_topic_matches("a/#", "a/b/c"));
1116 assert!(!mqtt_topic_matches("b/#", "a/b/c"));
1117 }
1118
1119 #[test]
1122 fn webhook_trigger_matches_exact_path() {
1123 let sop = Sop {
1124 triggers: vec![SopTrigger::Webhook {
1125 path: "/webhook".into(),
1126 }],
1127 ..test_sop("webhook-sop", SopExecutionMode::Auto, SopPriority::Normal)
1128 };
1129 let engine = engine_with_sops(vec![sop]);
1130
1131 let event = SopEvent {
1133 source: SopTriggerSource::Webhook,
1134 topic: Some("/webhook".into()),
1135 payload: None,
1136 timestamp: now_iso8601(),
1137 };
1138 assert_eq!(engine.match_trigger(&event).len(), 1);
1139 }
1140
1141 #[test]
1142 fn webhook_trigger_rejects_different_path() {
1143 let sop = Sop {
1144 triggers: vec![SopTrigger::Webhook {
1145 path: "/sop/deploy".into(),
1146 }],
1147 ..test_sop("deploy-sop", SopExecutionMode::Auto, SopPriority::Normal)
1148 };
1149 let engine = engine_with_sops(vec![sop]);
1150
1151 let event = SopEvent {
1153 source: SopTriggerSource::Webhook,
1154 topic: Some("/webhook".into()),
1155 payload: None,
1156 timestamp: now_iso8601(),
1157 };
1158 assert!(engine.match_trigger(&event).is_empty());
1159
1160 let event = SopEvent {
1162 source: SopTriggerSource::Webhook,
1163 topic: Some("/sop/deploy".into()),
1164 payload: None,
1165 timestamp: now_iso8601(),
1166 };
1167 assert_eq!(engine.match_trigger(&event).len(), 1);
1168 }
1169
1170 #[test]
1173 fn cron_trigger_matches_only_matching_expression() {
1174 let sop = Sop {
1175 triggers: vec![SopTrigger::Cron {
1176 expression: "0 */5 * * *".into(),
1177 }],
1178 ..test_sop("cron-sop", SopExecutionMode::Auto, SopPriority::Normal)
1179 };
1180 let engine = engine_with_sops(vec![sop]);
1181
1182 let event = SopEvent {
1184 source: SopTriggerSource::Cron,
1185 topic: Some("0 */5 * * *".into()),
1186 payload: None,
1187 timestamp: now_iso8601(),
1188 };
1189 assert_eq!(engine.match_trigger(&event).len(), 1);
1190
1191 let event = SopEvent {
1193 source: SopTriggerSource::Cron,
1194 topic: Some("0 */10 * * *".into()),
1195 payload: None,
1196 timestamp: now_iso8601(),
1197 };
1198 assert!(engine.match_trigger(&event).is_empty());
1199
1200 let event = SopEvent {
1202 source: SopTriggerSource::Cron,
1203 topic: None,
1204 payload: None,
1205 timestamp: now_iso8601(),
1206 };
1207 assert!(engine.match_trigger(&event).is_empty());
1208 }
1209
1210 #[test]
1213 fn mqtt_condition_filters_by_payload() {
1214 let sop = Sop {
1215 triggers: vec![SopTrigger::Mqtt {
1216 topic: "sensors/pressure".into(),
1217 condition: Some("$.value > 85".into()),
1218 }],
1219 ..test_sop("cond-sop", SopExecutionMode::Auto, SopPriority::Critical)
1220 };
1221 let engine = engine_with_sops(vec![sop]);
1222
1223 let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 90}"#));
1225 assert_eq!(matches.len(), 1);
1226
1227 let matches = engine.match_trigger(&mqtt_event("sensors/pressure", r#"{"value": 50}"#));
1229 assert!(matches.is_empty());
1230 }
1231
1232 #[test]
1233 fn mqtt_no_condition_matches_any_payload() {
1234 let sop = Sop {
1235 triggers: vec![SopTrigger::Mqtt {
1236 topic: "sensors/temp".into(),
1237 condition: None,
1238 }],
1239 ..test_sop("no-cond", SopExecutionMode::Auto, SopPriority::Normal)
1240 };
1241 let engine = engine_with_sops(vec![sop]);
1242
1243 let matches = engine.match_trigger(&mqtt_event("sensors/temp", "anything"));
1244 assert_eq!(matches.len(), 1);
1245 }
1246
1247 #[test]
1248 fn mqtt_condition_no_payload_fails_closed() {
1249 let sop = Sop {
1250 triggers: vec![SopTrigger::Mqtt {
1251 topic: "sensors/temp".into(),
1252 condition: Some("$.value > 0".into()),
1253 }],
1254 ..test_sop("no-payload", SopExecutionMode::Auto, SopPriority::Normal)
1255 };
1256 let engine = engine_with_sops(vec![sop]);
1257
1258 let event = SopEvent {
1260 source: SopTriggerSource::Mqtt,
1261 topic: Some("sensors/temp".into()),
1262 payload: None,
1263 timestamp: now_iso8601(),
1264 };
1265 assert!(engine.match_trigger(&event).is_empty());
1266 }
1267
1268 #[test]
1269 fn peripheral_condition_filters_by_payload() {
1270 let sop = Sop {
1271 triggers: vec![SopTrigger::Peripheral {
1272 board: "nucleo".into(),
1273 signal: "pin_3".into(),
1274 condition: Some("> 0".into()),
1275 }],
1276 ..test_sop("periph-cond", SopExecutionMode::Auto, SopPriority::High)
1277 };
1278 let engine = engine_with_sops(vec![sop]);
1279
1280 let event = SopEvent {
1282 source: SopTriggerSource::Peripheral,
1283 topic: Some("nucleo/pin_3".into()),
1284 payload: Some("1".into()),
1285 timestamp: now_iso8601(),
1286 };
1287 assert_eq!(engine.match_trigger(&event).len(), 1);
1288
1289 let event = SopEvent {
1291 source: SopTriggerSource::Peripheral,
1292 topic: Some("nucleo/pin_3".into()),
1293 payload: Some("0".into()),
1294 timestamp: now_iso8601(),
1295 };
1296 assert!(engine.match_trigger(&event).is_empty());
1297 }
1298
1299 #[test]
1300 fn peripheral_no_condition_matches_any() {
1301 let sop = Sop {
1302 triggers: vec![SopTrigger::Peripheral {
1303 board: "rpi".into(),
1304 signal: "gpio_5".into(),
1305 condition: None,
1306 }],
1307 ..test_sop("periph-nocond", SopExecutionMode::Auto, SopPriority::Normal)
1308 };
1309 let engine = engine_with_sops(vec![sop]);
1310
1311 let event = SopEvent {
1312 source: SopTriggerSource::Peripheral,
1313 topic: Some("rpi/gpio_5".into()),
1314 payload: Some("0".into()),
1315 timestamp: now_iso8601(),
1316 };
1317 assert_eq!(engine.match_trigger(&event).len(), 1);
1318 }
1319
1320 #[test]
1323 fn start_run_returns_first_step() {
1324 let mut engine = engine_with_sops(vec![test_sop(
1325 "s1",
1326 SopExecutionMode::Auto,
1327 SopPriority::Normal,
1328 )]);
1329 let action = engine.start_run("s1", manual_event()).unwrap();
1330 let run_id = extract_run_id(&action);
1331 assert!(run_id.starts_with("run-"));
1332 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1333 assert_eq!(engine.active_runs().len(), 1);
1334 }
1335
1336 #[test]
1337 fn start_run_unknown_sop_fails() {
1338 let mut engine = engine_with_sops(vec![]);
1339 assert!(engine.start_run("nonexistent", manual_event()).is_err());
1340 }
1341
1342 #[test]
1343 fn advance_step_to_completion() {
1344 let mut engine = engine_with_sops(vec![test_sop(
1345 "s1",
1346 SopExecutionMode::Auto,
1347 SopPriority::Normal,
1348 )]);
1349 let action = engine.start_run("s1", manual_event()).unwrap();
1350 let run_id = extract_run_id(&action).to_string();
1351
1352 let action = engine
1354 .advance_step(
1355 &run_id,
1356 SopStepResult {
1357 step_number: 1,
1358 status: SopStepStatus::Completed,
1359 output: "done".into(),
1360 started_at: now_iso8601(),
1361 completed_at: Some(now_iso8601()),
1362 },
1363 )
1364 .unwrap();
1365
1366 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1368
1369 let action = engine
1371 .advance_step(
1372 &run_id,
1373 SopStepResult {
1374 step_number: 2,
1375 status: SopStepStatus::Completed,
1376 output: "done".into(),
1377 started_at: now_iso8601(),
1378 completed_at: Some(now_iso8601()),
1379 },
1380 )
1381 .unwrap();
1382
1383 assert!(matches!(action, SopRunAction::Completed { .. }));
1384 assert!(engine.active_runs().is_empty());
1385 assert_eq!(engine.finished_runs(None).len(), 1);
1386 }
1387
1388 #[test]
1389 fn step_failure_ends_run() {
1390 let mut engine = engine_with_sops(vec![test_sop(
1391 "s1",
1392 SopExecutionMode::Auto,
1393 SopPriority::Normal,
1394 )]);
1395 let action = engine.start_run("s1", manual_event()).unwrap();
1396 let run_id = extract_run_id(&action).to_string();
1397
1398 let action = engine
1399 .advance_step(
1400 &run_id,
1401 SopStepResult {
1402 step_number: 1,
1403 status: SopStepStatus::Failed,
1404 output: "valve stuck".into(),
1405 started_at: now_iso8601(),
1406 completed_at: Some(now_iso8601()),
1407 },
1408 )
1409 .unwrap();
1410
1411 assert!(
1412 matches!(action, SopRunAction::Failed { ref reason, .. } if reason.contains("valve stuck"))
1413 );
1414 assert!(engine.active_runs().is_empty());
1415 }
1416
1417 #[test]
1418 fn cancel_run() {
1419 let mut engine = engine_with_sops(vec![test_sop(
1420 "s1",
1421 SopExecutionMode::Auto,
1422 SopPriority::Normal,
1423 )]);
1424 let action = engine.start_run("s1", manual_event()).unwrap();
1425 let run_id = extract_run_id(&action).to_string();
1426 engine.cancel_run(&run_id).unwrap();
1427 assert!(engine.active_runs().is_empty());
1428 let finished = engine.finished_runs(None);
1429 assert_eq!(finished[0].status, SopRunStatus::Cancelled);
1430 }
1431
1432 #[test]
1433 fn cancel_unknown_run_fails() {
1434 let mut engine = engine_with_sops(vec![]);
1435 assert!(engine.cancel_run("nonexistent").is_err());
1436 }
1437
1438 #[test]
1441 fn per_sop_concurrency_limit() {
1442 let mut engine = engine_with_sops(vec![test_sop(
1443 "s1",
1444 SopExecutionMode::Auto,
1445 SopPriority::Normal,
1446 )]);
1447 engine.start_run("s1", manual_event()).unwrap();
1449 assert!(!engine.can_start("s1"));
1450 assert!(engine.start_run("s1", manual_event()).is_err());
1451 }
1452
1453 #[test]
1454 fn global_concurrency_limit() {
1455 let sops = vec![
1456 test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal),
1457 test_sop("s2", SopExecutionMode::Auto, SopPriority::Normal),
1458 ];
1459 let mut engine = SopEngine::new(SopConfig {
1460 max_concurrent_total: 1,
1461 ..SopConfig::default()
1462 });
1463 engine.sops = sops;
1464
1465 engine.start_run("s1", manual_event()).unwrap();
1466 assert!(!engine.can_start("s2"));
1467 }
1468
1469 #[test]
1472 fn cooldown_blocks_immediate_restart() {
1473 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1474 sop.cooldown_secs = 3600; let mut engine = engine_with_sops(vec![sop]);
1476
1477 let action = engine.start_run("s1", manual_event()).unwrap();
1478 let run_id = extract_run_id(&action).to_string();
1479 engine
1481 .advance_step(
1482 &run_id,
1483 SopStepResult {
1484 step_number: 1,
1485 status: SopStepStatus::Completed,
1486 output: "ok".into(),
1487 started_at: now_iso8601(),
1488 completed_at: Some(now_iso8601()),
1489 },
1490 )
1491 .unwrap();
1492 engine
1493 .advance_step(
1494 &run_id,
1495 SopStepResult {
1496 step_number: 2,
1497 status: SopStepStatus::Completed,
1498 output: "ok".into(),
1499 started_at: now_iso8601(),
1500 completed_at: Some(now_iso8601()),
1501 },
1502 )
1503 .unwrap();
1504
1505 assert!(!engine.can_start("s1"));
1507 }
1508
1509 #[test]
1512 fn auto_mode_executes_immediately() {
1513 let mut engine = engine_with_sops(vec![test_sop(
1514 "s1",
1515 SopExecutionMode::Auto,
1516 SopPriority::Normal,
1517 )]);
1518 let action = engine.start_run("s1", manual_event()).unwrap();
1519 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1520 }
1521
1522 #[test]
1523 fn supervised_mode_waits_on_first_step() {
1524 let mut engine = engine_with_sops(vec![test_sop(
1525 "s1",
1526 SopExecutionMode::Supervised,
1527 SopPriority::Normal,
1528 )]);
1529 let action = engine.start_run("s1", manual_event()).unwrap();
1530 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1531 }
1532
1533 #[test]
1534 fn step_by_step_waits_on_every_step() {
1535 let mut engine = engine_with_sops(vec![test_sop(
1536 "s1",
1537 SopExecutionMode::StepByStep,
1538 SopPriority::Normal,
1539 )]);
1540
1541 let action = engine.start_run("s1", manual_event()).unwrap();
1543 let run_id = extract_run_id(&action).to_string();
1544 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1545
1546 let action = engine.approve_step(&run_id).unwrap();
1548 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1549
1550 let action = engine
1552 .advance_step(
1553 &run_id,
1554 SopStepResult {
1555 step_number: 1,
1556 status: SopStepStatus::Completed,
1557 output: "ok".into(),
1558 started_at: now_iso8601(),
1559 completed_at: Some(now_iso8601()),
1560 },
1561 )
1562 .unwrap();
1563 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1564 }
1565
1566 #[test]
1567 fn priority_based_critical_auto() {
1568 let mut engine = engine_with_sops(vec![test_sop(
1569 "s1",
1570 SopExecutionMode::PriorityBased,
1571 SopPriority::Critical,
1572 )]);
1573 let action = engine.start_run("s1", manual_event()).unwrap();
1574 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1575 }
1576
1577 #[test]
1578 fn priority_based_normal_supervised() {
1579 let mut engine = engine_with_sops(vec![test_sop(
1580 "s1",
1581 SopExecutionMode::PriorityBased,
1582 SopPriority::Normal,
1583 )]);
1584 let action = engine.start_run("s1", manual_event()).unwrap();
1585 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1587 }
1588
1589 #[test]
1590 fn requires_confirmation_overrides_auto() {
1591 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Critical);
1592 sop.steps[0].requires_confirmation = true;
1593 let mut engine = engine_with_sops(vec![sop]);
1594 let action = engine.start_run("s1", manual_event()).unwrap();
1595 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1597 }
1598
1599 #[test]
1602 fn approve_transitions_to_execute() {
1603 let mut engine = engine_with_sops(vec![test_sop(
1604 "s1",
1605 SopExecutionMode::Supervised,
1606 SopPriority::Normal,
1607 )]);
1608 let action = engine.start_run("s1", manual_event()).unwrap();
1609 let run_id = extract_run_id(&action).to_string();
1610
1611 let run = engine.active_runs().get(&run_id).unwrap();
1613 assert_eq!(run.status, SopRunStatus::WaitingApproval);
1614
1615 let action = engine.approve_step(&run_id).unwrap();
1617 assert!(matches!(action, SopRunAction::ExecuteStep { .. }));
1618
1619 let run = engine.active_runs().get(&run_id).unwrap();
1620 assert_eq!(run.status, SopRunStatus::Running);
1621 }
1622
1623 #[test]
1624 fn approve_non_waiting_fails() {
1625 let mut engine = engine_with_sops(vec![test_sop(
1626 "s1",
1627 SopExecutionMode::Auto,
1628 SopPriority::Normal,
1629 )]);
1630 let action = engine.start_run("s1", manual_event()).unwrap();
1631 let run_id = extract_run_id(&action).to_string();
1632 assert!(engine.approve_step(&run_id).is_err());
1633 }
1634
1635 #[test]
1638 fn step_context_includes_sop_name_and_step() {
1639 let sop = test_sop(
1640 "pump-shutdown",
1641 SopExecutionMode::Auto,
1642 SopPriority::Critical,
1643 );
1644 let run = SopRun {
1645 run_id: "run-001".into(),
1646 sop_name: "pump-shutdown".into(),
1647 trigger_event: manual_event(),
1648 status: SopRunStatus::Running,
1649 current_step: 1,
1650 total_steps: 2,
1651 started_at: now_iso8601(),
1652 completed_at: None,
1653 step_results: Vec::new(),
1654 waiting_since: None,
1655 llm_calls_saved: 0,
1656 };
1657 let ctx = format_step_context(&sop, &run, &sop.steps[0]);
1658 assert!(ctx.contains("pump-shutdown"));
1659 assert!(ctx.contains("Step 1 of 2"));
1660 assert!(ctx.contains("Step one"));
1661 }
1662
1663 #[test]
1666 fn get_run_finds_active_and_finished() {
1667 let mut engine = engine_with_sops(vec![test_sop(
1668 "s1",
1669 SopExecutionMode::Auto,
1670 SopPriority::Normal,
1671 )]);
1672 let action = engine.start_run("s1", manual_event()).unwrap();
1673 let run_id = extract_run_id(&action).to_string();
1674
1675 assert!(engine.get_run(&run_id).is_some());
1677 assert_eq!(
1678 engine.get_run(&run_id).unwrap().status,
1679 SopRunStatus::Running
1680 );
1681
1682 engine
1684 .advance_step(
1685 &run_id,
1686 SopStepResult {
1687 step_number: 1,
1688 status: SopStepStatus::Completed,
1689 output: "ok".into(),
1690 started_at: now_iso8601(),
1691 completed_at: Some(now_iso8601()),
1692 },
1693 )
1694 .unwrap();
1695 engine
1696 .advance_step(
1697 &run_id,
1698 SopStepResult {
1699 step_number: 2,
1700 status: SopStepStatus::Completed,
1701 output: "ok".into(),
1702 started_at: now_iso8601(),
1703 completed_at: Some(now_iso8601()),
1704 },
1705 )
1706 .unwrap();
1707
1708 assert!(engine.get_run(&run_id).is_some());
1710 assert_eq!(
1711 engine.get_run(&run_id).unwrap().status,
1712 SopRunStatus::Completed
1713 );
1714
1715 assert!(engine.get_run("nonexistent").is_none());
1717 }
1718
1719 #[test]
1722 fn iso8601_roundtrip() {
1723 let ts = now_iso8601();
1724 let secs = parse_iso8601_secs(&ts);
1725 assert!(secs.is_some());
1726 let now = std::time::SystemTime::now()
1728 .duration_since(std::time::UNIX_EPOCH)
1729 .unwrap()
1730 .as_secs();
1731 assert!(now.abs_diff(secs.unwrap()) < 2);
1732 }
1733
1734 #[test]
1735 fn parse_known_timestamp() {
1736 let secs = parse_iso8601_secs("2026-01-01T00:00:00Z").unwrap();
1738 assert_eq!(secs, 20454 * 86400);
1740 }
1741
1742 #[test]
1745 fn timeout_auto_approves_critical() {
1746 let mut engine = SopEngine::new(SopConfig {
1747 approval_timeout_secs: 1, ..SopConfig::default()
1749 });
1750 let mut sop = test_sop("s1", SopExecutionMode::Supervised, SopPriority::Critical);
1751 sop.execution_mode = SopExecutionMode::Supervised;
1753 engine.set_sops_for_test(vec![sop]);
1754
1755 let action = engine.start_run("s1", manual_event()).unwrap();
1756 let run_id = extract_run_id(&action).to_string();
1757 assert!(matches!(action, SopRunAction::WaitApproval { .. }));
1758
1759 let run = engine.active_runs.get_mut(&run_id).unwrap();
1761 run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1762
1763 let actions = engine.check_approval_timeouts();
1764 assert_eq!(actions.len(), 1);
1765 assert!(matches!(actions[0], SopRunAction::ExecuteStep { .. }));
1766 }
1767
1768 #[test]
1769 fn timeout_does_not_auto_approve_normal() {
1770 let mut engine = SopEngine::new(SopConfig {
1771 approval_timeout_secs: 1,
1772 ..SopConfig::default()
1773 });
1774 engine.set_sops_for_test(vec![test_sop(
1775 "s1",
1776 SopExecutionMode::Supervised,
1777 SopPriority::Normal,
1778 )]);
1779
1780 let action = engine.start_run("s1", manual_event()).unwrap();
1781 let run_id = extract_run_id(&action).to_string();
1782
1783 let run = engine.active_runs.get_mut(&run_id).unwrap();
1785 run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1786
1787 let actions = engine.check_approval_timeouts();
1789 assert!(actions.is_empty());
1790 assert_eq!(
1792 engine.get_run(&run_id).unwrap().status,
1793 SopRunStatus::WaitingApproval
1794 );
1795 }
1796
1797 #[test]
1798 fn timeout_zero_disables_check() {
1799 let mut engine = SopEngine::new(SopConfig {
1800 approval_timeout_secs: 0,
1801 ..SopConfig::default()
1802 });
1803 engine.set_sops_for_test(vec![test_sop(
1804 "s1",
1805 SopExecutionMode::Supervised,
1806 SopPriority::Critical,
1807 )]);
1808 let action = engine.start_run("s1", manual_event()).unwrap();
1809 let run_id = extract_run_id(&action).to_string();
1810
1811 let run = engine.active_runs.get_mut(&run_id).unwrap();
1812 run.waiting_since = Some("2020-01-01T00:00:00Z".into());
1813
1814 let actions = engine.check_approval_timeouts();
1815 assert!(actions.is_empty());
1816 }
1817
1818 #[test]
1819 fn waiting_since_set_on_wait_approval() {
1820 let mut engine = engine_with_sops(vec![test_sop(
1821 "s1",
1822 SopExecutionMode::Supervised,
1823 SopPriority::Normal,
1824 )]);
1825 let action = engine.start_run("s1", manual_event()).unwrap();
1826 let run_id = extract_run_id(&action).to_string();
1827
1828 let run = engine.get_run(&run_id).unwrap();
1829 assert_eq!(run.status, SopRunStatus::WaitingApproval);
1830 assert!(run.waiting_since.is_some());
1831 }
1832
1833 #[test]
1836 fn max_finished_runs_evicts_oldest() {
1837 let mut engine = SopEngine::new(SopConfig {
1838 max_finished_runs: 2,
1839 ..SopConfig::default()
1840 });
1841 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1843 sop.steps = vec![sop.steps[0].clone()];
1844 sop.max_concurrent = 10;
1845 engine.sops = vec![sop];
1846
1847 let mut finished_ids = Vec::new();
1849 for _ in 0..3 {
1850 let action = engine.start_run("s1", manual_event()).unwrap();
1851 let rid = extract_run_id(&action).to_string();
1852 engine
1853 .advance_step(
1854 &rid,
1855 SopStepResult {
1856 step_number: 1,
1857 status: SopStepStatus::Completed,
1858 output: "ok".into(),
1859 started_at: now_iso8601(),
1860 completed_at: Some(now_iso8601()),
1861 },
1862 )
1863 .unwrap();
1864 finished_ids.push(rid);
1865 }
1866
1867 let finished = engine.finished_runs(None);
1869 assert_eq!(
1870 finished.len(),
1871 2,
1872 "eviction should cap at max_finished_runs"
1873 );
1874 assert_eq!(finished[0].run_id, finished_ids[1]);
1876 assert_eq!(finished[1].run_id, finished_ids[2]);
1877 }
1878
1879 #[test]
1880 fn max_finished_runs_zero_means_unlimited() {
1881 let mut engine = SopEngine::new(SopConfig {
1882 max_finished_runs: 0,
1883 ..SopConfig::default()
1884 });
1885 let mut sop = test_sop("s1", SopExecutionMode::Auto, SopPriority::Normal);
1886 sop.steps = vec![sop.steps[0].clone()];
1887 sop.max_concurrent = 10;
1888 engine.sops = vec![sop];
1889
1890 for _ in 0..5 {
1891 let action = engine.start_run("s1", manual_event()).unwrap();
1892 let rid = extract_run_id(&action).to_string();
1893 engine
1894 .advance_step(
1895 &rid,
1896 SopStepResult {
1897 step_number: 1,
1898 status: SopStepStatus::Completed,
1899 output: "ok".into(),
1900 started_at: now_iso8601(),
1901 completed_at: Some(now_iso8601()),
1902 },
1903 )
1904 .unwrap();
1905 }
1906
1907 assert_eq!(engine.finished_runs(None).len(), 5, "zero means unlimited");
1908 }
1909
1910 #[test]
1911 fn waiting_since_cleared_on_approve() {
1912 let mut engine = engine_with_sops(vec![test_sop(
1913 "s1",
1914 SopExecutionMode::Supervised,
1915 SopPriority::Normal,
1916 )]);
1917 let action = engine.start_run("s1", manual_event()).unwrap();
1918 let run_id = extract_run_id(&action).to_string();
1919 engine.approve_step(&run_id).unwrap();
1920
1921 let run = engine.get_run(&run_id).unwrap();
1922 assert_eq!(run.status, SopRunStatus::Running);
1923 assert!(run.waiting_since.is_none());
1924 }
1925
1926 fn deterministic_sop(name: &str) -> Sop {
1929 Sop {
1930 name: name.into(),
1931 description: format!("Deterministic SOP: {name}"),
1932 version: "1.0.0".into(),
1933 priority: SopPriority::Normal,
1934 execution_mode: SopExecutionMode::Deterministic,
1935 triggers: vec![SopTrigger::Manual],
1936 steps: vec![
1937 SopStep {
1938 number: 1,
1939 title: "Step one".into(),
1940 body: "Do step one".into(),
1941 suggested_tools: vec![],
1942 requires_confirmation: false,
1943 kind: SopStepKind::Execute,
1944 schema: None,
1945 },
1946 SopStep {
1947 number: 2,
1948 title: "Checkpoint".into(),
1949 body: "Pause for approval".into(),
1950 suggested_tools: vec![],
1951 requires_confirmation: false,
1952 kind: SopStepKind::Checkpoint,
1953 schema: None,
1954 },
1955 SopStep {
1956 number: 3,
1957 title: "Step three".into(),
1958 body: "Final step".into(),
1959 suggested_tools: vec![],
1960 requires_confirmation: false,
1961 kind: SopStepKind::Execute,
1962 schema: None,
1963 },
1964 ],
1965 cooldown_secs: 0,
1966 max_concurrent: 1,
1967 location: None,
1968 deterministic: true,
1969 }
1970 }
1971
1972 #[test]
1973 fn deterministic_start_returns_deterministic_step() {
1974 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
1975 let action = engine.start_run("det-sop", manual_event()).unwrap();
1976 assert!(
1977 matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 1),
1978 "First action should be DeterministicStep for step 1"
1979 );
1980 let run_id = extract_run_id(&action).to_string();
1981 assert!(run_id.starts_with("det-"));
1982 }
1983
1984 #[test]
1985 fn deterministic_start_routes_through_start_run() {
1986 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
1987 let action = engine.start_run("det-sop", manual_event()).unwrap();
1989 assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
1990 }
1991
1992 #[test]
1993 fn deterministic_advance_pipes_output() {
1994 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
1995 let action = engine.start_run("det-sop", manual_event()).unwrap();
1996 let run_id = extract_run_id(&action).to_string();
1997
1998 let output = serde_json::json!({"result": "step1_done"});
2000 let action = engine
2001 .advance_deterministic_step(&run_id, output.clone())
2002 .unwrap();
2003
2004 assert!(
2006 matches!(action, SopRunAction::CheckpointWait { ref step, .. } if step.number == 2),
2007 "Step 2 (checkpoint) should return CheckpointWait"
2008 );
2009 }
2010
2011 #[test]
2012 fn deterministic_checkpoint_pauses_run() {
2013 let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]);
2014 let action = engine.start_run("det-sop", manual_event()).unwrap();
2015 let run_id = extract_run_id(&action).to_string();
2016
2017 let action = engine
2019 .advance_deterministic_step(&run_id, serde_json::json!({"ok": true}))
2020 .unwrap();
2021
2022 assert!(matches!(action, SopRunAction::CheckpointWait { .. }));
2024
2025 let run = engine.get_run(&run_id).unwrap();
2027 assert_eq!(run.status, SopRunStatus::PausedCheckpoint);
2028 assert!(run.waiting_since.is_some());
2029 }
2030
2031 #[test]
2032 fn deterministic_completion_tracks_savings() {
2033 let mut sop = deterministic_sop("det-sop");
2034 sop.steps = vec![
2036 SopStep {
2037 number: 1,
2038 title: "Step one".into(),
2039 body: "Do it".into(),
2040 suggested_tools: vec![],
2041 requires_confirmation: false,
2042 kind: SopStepKind::Execute,
2043 schema: None,
2044 },
2045 SopStep {
2046 number: 2,
2047 title: "Step two".into(),
2048 body: "Do it too".into(),
2049 suggested_tools: vec![],
2050 requires_confirmation: false,
2051 kind: SopStepKind::Execute,
2052 schema: None,
2053 },
2054 ];
2055 let mut engine = engine_with_sops(vec![sop]);
2056
2057 let action = engine.start_run("det-sop", manual_event()).unwrap();
2058 let run_id = extract_run_id(&action).to_string();
2059
2060 let action = engine
2062 .advance_deterministic_step(&run_id, serde_json::json!("s1"))
2063 .unwrap();
2064 assert!(matches!(action, SopRunAction::DeterministicStep { .. }));
2065
2066 let action = engine
2068 .advance_deterministic_step(&run_id, serde_json::json!("s2"))
2069 .unwrap();
2070 assert!(matches!(action, SopRunAction::Completed { .. }));
2071
2072 let savings = engine.deterministic_savings();
2074 assert_eq!(savings.total_runs, 1);
2075 assert_eq!(savings.total_llm_calls_saved, 2);
2076 }
2077
2078 #[test]
2079 fn deterministic_non_deterministic_sop_rejected() {
2080 let mut engine = engine_with_sops(vec![test_sop(
2081 "s1",
2082 SopExecutionMode::Auto,
2083 SopPriority::Normal,
2084 )]);
2085 let result = engine.start_deterministic_run("s1", manual_event());
2086 assert!(result.is_err());
2087 assert!(
2088 result
2089 .unwrap_err()
2090 .to_string()
2091 .contains("not in deterministic mode")
2092 );
2093 }
2094}