1use std::sync::{Arc, Mutex};
8
9use tracing::{debug, info, warn};
10
11use super::audit::SopAuditLogger;
12use super::engine::{SopEngine, now_iso8601};
13use super::types::{SopEvent, SopRun, SopRunAction, SopTriggerSource};
14
15#[derive(Debug, Clone)]
19pub enum DispatchResult {
20 Started {
25 run_id: String,
26 sop_name: String,
27 action: Box<SopRunAction>,
28 },
29 Skipped { sop_name: String, reason: String },
31 NoMatch,
33}
34
35fn extract_run_id_from_action(action: &SopRunAction) -> &str {
39 match action {
40 SopRunAction::ExecuteStep { run_id, .. }
41 | SopRunAction::WaitApproval { run_id, .. }
42 | SopRunAction::DeterministicStep { run_id, .. }
43 | SopRunAction::CheckpointWait { run_id, .. }
44 | SopRunAction::Completed { run_id, .. }
45 | SopRunAction::Failed { run_id, .. } => run_id,
46 }
47}
48
49fn action_label(action: &SopRunAction) -> &'static str {
51 match action {
52 SopRunAction::ExecuteStep { .. } => "ExecuteStep",
53 SopRunAction::WaitApproval { .. } => "WaitApproval",
54 SopRunAction::DeterministicStep { .. } => "DeterministicStep",
55 SopRunAction::CheckpointWait { .. } => "CheckpointWait",
56 SopRunAction::Completed { .. } => "Completed",
57 SopRunAction::Failed { .. } => "Failed",
58 }
59}
60
61pub async fn dispatch_sop_event(
70 engine: &Arc<Mutex<SopEngine>>,
71 audit: &SopAuditLogger,
72 event: SopEvent,
73) -> Vec<DispatchResult> {
74 let matched_names: Vec<String> = match engine.lock() {
76 Ok(eng) => eng
77 .match_trigger(&event)
78 .iter()
79 .map(|s| s.name.clone())
80 .collect(),
81 Err(e) => {
82 crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
83 warn!("SOP dispatch: engine lock poisoned during match phase: {e}");
84 return vec![];
85 }
86 };
87
88 if matched_names.is_empty() {
89 debug!("SOP dispatch: no match for event");
90 return vec![DispatchResult::NoMatch];
91 }
92
93 info!(
94 "SOP dispatch: {} SOP(s) matched: {:?}",
95 matched_names.len(),
96 matched_names
97 );
98
99 let mut results = Vec::new();
101 let mut started_runs: Vec<SopRun> = Vec::new();
102
103 {
104 let mut eng = match engine.lock() {
105 Ok(e) => e,
106 Err(e) => {
107 crate::health::mark_component_error("sop_dispatch", format!("lock poisoned: {e}"));
108 warn!("SOP dispatch: engine lock poisoned during start phase: {e}");
109 return vec![];
110 }
111 };
112
113 for sop_name in &matched_names {
114 match eng.start_run(sop_name, event.clone()) {
115 Ok(action) => {
116 let run_id = extract_run_id_from_action(&action).to_string();
118 if let Some(run) = eng.active_runs().get(&run_id) {
120 started_runs.push(run.clone());
121 }
122 info!(
123 "SOP dispatch: started '{}' run {run_id} (action: {})",
124 sop_name,
125 action_label(&action),
126 );
127 results.push(DispatchResult::Started {
128 run_id,
129 sop_name: sop_name.clone(),
130 action: Box::new(action),
131 });
132 }
133 Err(e) => {
134 info!("SOP dispatch: skipped '{}': {e}", sop_name);
135 results.push(DispatchResult::Skipped {
136 sop_name: sop_name.clone(),
137 reason: e.to_string(),
138 });
139 }
140 }
141 }
142 } for run in &started_runs {
146 if let Err(e) = audit.log_run_start(run).await {
147 warn!("SOP dispatch: audit log failed for run {}: {e}", run.run_id);
148 }
149 }
150
151 crate::health::mark_component_ok("sop_dispatch");
152 results
153}
154
155pub fn process_headless_results(results: &[DispatchResult]) {
165 for result in results {
166 match result {
167 DispatchResult::Started {
168 run_id,
169 sop_name,
170 action,
171 } => match action.as_ref() {
172 SopRunAction::ExecuteStep { step, .. } => {
173 warn!(
174 "SOP headless dispatch: run {run_id} ('{sop_name}') ready for step {} \
175 '{}' but no agent loop available to execute",
176 step.number, step.title,
177 );
178 }
179 SopRunAction::WaitApproval { step, .. } => {
180 info!(
181 "SOP headless dispatch: run {run_id} ('{sop_name}') waiting for approval \
182 on step {} '{}'. Timeout polling will handle progression",
183 step.number, step.title,
184 );
185 }
186 SopRunAction::DeterministicStep { step, .. } => {
187 info!(
188 "SOP headless dispatch: run {run_id} ('{sop_name}') deterministic step {} \
189 '{}'",
190 step.number, step.title,
191 );
192 }
193 SopRunAction::CheckpointWait {
194 step, state_file, ..
195 } => {
196 info!(
197 "SOP headless dispatch: run {run_id} ('{sop_name}') checkpoint at step {} \
198 '{}', state persisted to {}",
199 step.number,
200 step.title,
201 state_file.display(),
202 );
203 }
204 SopRunAction::Completed { .. } => {
205 info!(
206 "SOP headless dispatch: run {run_id} ('{sop_name}') completed immediately"
207 );
208 }
209 SopRunAction::Failed { reason, .. } => {
210 warn!("SOP headless dispatch: run {run_id} ('{sop_name}') failed: {reason}");
211 }
212 },
213 DispatchResult::Skipped { sop_name, reason } => {
214 info!("SOP headless dispatch: skipped '{sop_name}': {reason}");
215 }
216 DispatchResult::NoMatch => {}
217 }
218 }
219}
220
221pub async fn dispatch_peripheral_signal(
228 engine: &Arc<Mutex<SopEngine>>,
229 audit: &SopAuditLogger,
230 board: &str,
231 signal: &str,
232 payload: Option<&str>,
233) -> Vec<DispatchResult> {
234 let event = SopEvent {
235 source: SopTriggerSource::Peripheral,
236 topic: Some(format!("{board}/{signal}")),
237 payload: payload.map(String::from),
238 timestamp: now_iso8601(),
239 };
240 dispatch_sop_event(engine, audit, event).await
241}
242
243#[derive(Clone)]
250pub struct SopCronCache {
251 schedules: Vec<(String, String, cron::Schedule)>,
253}
254
255impl SopCronCache {
256 pub fn from_engine(engine: &Arc<Mutex<SopEngine>>) -> Self {
261 let mut schedules = Vec::new();
262 let eng = match engine.lock() {
263 Ok(e) => e,
264 Err(e) => {
265 warn!("SopCronCache: engine lock poisoned: {e}");
266 return Self { schedules };
267 }
268 };
269
270 for sop in eng.sops() {
271 for trigger in &sop.triggers {
272 if let super::types::SopTrigger::Cron { expression } = trigger {
273 let normalized = match crate::cron::normalize_expression(expression) {
275 Ok(n) => n,
276 Err(e) => {
277 warn!(
278 "SopCronCache: invalid cron expression '{}' in SOP '{}': {e}",
279 expression, sop.name
280 );
281 continue;
282 }
283 };
284 match normalized.parse::<cron::Schedule>() {
285 Ok(schedule) => {
286 schedules.push((sop.name.clone(), expression.clone(), schedule));
287 }
288 Err(e) => {
289 warn!(
290 "SopCronCache: failed to parse cron schedule '{}' for SOP '{}': {e}",
291 normalized, sop.name
292 );
293 }
294 }
295 }
296 }
297 }
298
299 info!("SopCronCache: cached {} cron schedule(s)", schedules.len());
300 Self { schedules }
301 }
302
303 #[cfg(test)]
305 pub fn schedules(&self) -> &[(String, String, cron::Schedule)] {
306 &self.schedules
307 }
308}
309
310pub async fn check_sop_cron_triggers(
315 engine: &Arc<Mutex<SopEngine>>,
316 audit: &SopAuditLogger,
317 cache: &SopCronCache,
318 last_check: &mut chrono::DateTime<chrono::Utc>,
319) -> Vec<DispatchResult> {
320 let now = chrono::Utc::now();
321 let mut all_results = Vec::new();
322
323 for (_sop_name, expression, schedule) in &cache.schedules {
324 let mut upcoming = schedule.after(last_check);
329 if let Some(next) = upcoming.next() {
330 if next <= now {
331 let event = SopEvent {
333 source: SopTriggerSource::Cron,
334 topic: Some(expression.clone()),
335 payload: None,
336 timestamp: now_iso8601(),
337 };
338 let results = dispatch_sop_event(engine, audit, event).await;
339 all_results.extend(results);
340 }
341 }
342 }
343
344 *last_check = now;
345 all_results
346}
347
348#[cfg(test)]
351mod tests {
352 use super::*;
353 use crate::config::SopConfig;
354 use crate::memory::traits::Memory;
355 use crate::sop::types::{
356 Sop, SopExecutionMode, SopPriority, SopRunAction, SopStep, SopTrigger, SopTriggerSource,
357 };
358
359 fn test_sop(name: &str, triggers: Vec<SopTrigger>) -> Sop {
360 Sop {
361 name: name.into(),
362 description: format!("Test SOP: {name}"),
363 version: "1.0.0".into(),
364 priority: SopPriority::Normal,
365 execution_mode: SopExecutionMode::Auto,
366 triggers,
367 steps: vec![SopStep {
368 number: 1,
369 title: "Step one".into(),
370 body: "Do step one".into(),
371 suggested_tools: vec![],
372 requires_confirmation: false,
373 kind: crate::sop::SopStepKind::default(),
374 schema: None,
375 }],
376 cooldown_secs: 0,
377 max_concurrent: 2,
378 location: None,
379 deterministic: false,
380 }
381 }
382
383 fn test_engine(sops: Vec<Sop>) -> Arc<Mutex<SopEngine>> {
384 let mut engine = SopEngine::new(SopConfig::default());
385 engine.set_sops_for_test(sops);
386 Arc::new(Mutex::new(engine))
387 }
388
389 fn test_audit() -> SopAuditLogger {
390 let memory: Arc<dyn Memory> = Arc::new(crate::memory::test_memory::TestMemory::new());
391 SopAuditLogger::new(memory)
392 }
393
394 #[tokio::test]
395 async fn dispatch_starts_matching_sop() {
396 let engine = test_engine(vec![test_sop(
397 "mqtt-sop",
398 vec![SopTrigger::Mqtt {
399 topic: "sensors/temp".into(),
400 condition: None,
401 }],
402 )]);
403 let audit = test_audit();
404
405 let event = SopEvent {
406 source: SopTriggerSource::Mqtt,
407 topic: Some("sensors/temp".into()),
408 payload: Some(r#"{"value": 42}"#.into()),
409 timestamp: now_iso8601(),
410 };
411
412 let results = dispatch_sop_event(&engine, &audit, event).await;
413 assert_eq!(results.len(), 1);
414 assert!(
415 matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }))
416 );
417 }
418
419 #[tokio::test]
420 async fn dispatch_skips_when_cooldown_active() {
421 let mut sop = test_sop("cooldown-sop", vec![SopTrigger::Manual]);
422 sop.cooldown_secs = 3600;
423 sop.max_concurrent = 1;
424 let engine = test_engine(vec![sop]);
425 let audit = test_audit();
426
427 {
429 let mut eng = engine.lock().unwrap();
430 let _action = eng
431 .start_run(
432 "cooldown-sop",
433 SopEvent {
434 source: SopTriggerSource::Manual,
435 topic: None,
436 payload: None,
437 timestamp: now_iso8601(),
438 },
439 )
440 .unwrap();
441 let run_id = eng.active_runs().keys().next().unwrap().clone();
443 eng.advance_step(
444 &run_id,
445 crate::sop::types::SopStepResult {
446 step_number: 1,
447 status: crate::sop::types::SopStepStatus::Completed,
448 output: "done".into(),
449 started_at: now_iso8601(),
450 completed_at: Some(now_iso8601()),
451 },
452 )
453 .unwrap();
454 }
455
456 let event = SopEvent {
458 source: SopTriggerSource::Manual,
459 topic: None,
460 payload: None,
461 timestamp: now_iso8601(),
462 };
463 let results = dispatch_sop_event(&engine, &audit, event).await;
464 assert_eq!(results.len(), 1);
465 assert!(
466 matches!(&results[0], DispatchResult::Skipped { sop_name, .. } if sop_name == "cooldown-sop")
467 );
468 }
469
470 #[tokio::test]
471 async fn dispatch_returns_no_match_for_unknown_event() {
472 let engine = test_engine(vec![test_sop("manual-sop", vec![SopTrigger::Manual])]);
473 let audit = test_audit();
474
475 let event = SopEvent {
477 source: SopTriggerSource::Mqtt,
478 topic: Some("some/topic".into()),
479 payload: None,
480 timestamp: now_iso8601(),
481 };
482 let results = dispatch_sop_event(&engine, &audit, event).await;
483 assert_eq!(results.len(), 1);
484 assert!(matches!(&results[0], DispatchResult::NoMatch));
485 }
486
487 #[tokio::test]
488 async fn dispatch_batch_lock_starts_multiple_sops() {
489 let sop1 = test_sop(
490 "webhook-sop-1",
491 vec![SopTrigger::Webhook {
492 path: "/api/deploy".into(),
493 }],
494 );
495 let sop2 = test_sop(
496 "webhook-sop-2",
497 vec![SopTrigger::Webhook {
498 path: "/api/deploy".into(),
499 }],
500 );
501 let engine = test_engine(vec![sop1, sop2]);
502 let audit = test_audit();
503
504 let event = SopEvent {
505 source: SopTriggerSource::Webhook,
506 topic: Some("/api/deploy".into()),
507 payload: None,
508 timestamp: now_iso8601(),
509 };
510
511 let results = dispatch_sop_event(&engine, &audit, event).await;
512 let started_count = results
513 .iter()
514 .filter(|r| matches!(r, DispatchResult::Started { .. }))
515 .count();
516 assert_eq!(started_count, 2);
517 }
518
519 #[tokio::test]
522 async fn dispatch_captures_action_for_wait_approval() {
523 let mut sop = test_sop(
525 "supervised-sop",
526 vec![SopTrigger::Mqtt {
527 topic: "alert".into(),
528 condition: None,
529 }],
530 );
531 sop.execution_mode = SopExecutionMode::Supervised;
532 let engine = test_engine(vec![sop]);
533 let audit = test_audit();
534
535 let event = SopEvent {
536 source: SopTriggerSource::Mqtt,
537 topic: Some("alert".into()),
538 payload: None,
539 timestamp: now_iso8601(),
540 };
541
542 let results = dispatch_sop_event(&engine, &audit, event).await;
543 assert_eq!(results.len(), 1);
544 match &results[0] {
545 DispatchResult::Started {
546 run_id,
547 sop_name,
548 action,
549 } => {
550 assert_eq!(sop_name, "supervised-sop");
551 assert!(!run_id.is_empty());
552 assert!(
553 matches!(action.as_ref(), SopRunAction::WaitApproval { .. }),
554 "Supervised SOP must return WaitApproval, got {:?}",
555 action
556 );
557 }
558 other => panic!("Expected Started, got {other:?}"),
559 }
560 }
561
562 #[tokio::test]
564 async fn dispatch_captures_action_for_execute_step() {
565 let engine = test_engine(vec![test_sop("auto-sop", vec![SopTrigger::Manual])]);
566 let audit = test_audit();
567
568 let event = SopEvent {
569 source: SopTriggerSource::Manual,
570 topic: None,
571 payload: None,
572 timestamp: now_iso8601(),
573 };
574
575 let results = dispatch_sop_event(&engine, &audit, event).await;
576 assert_eq!(results.len(), 1);
577 match &results[0] {
578 DispatchResult::Started { action, .. } => {
579 assert!(
580 matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }),
581 "Auto SOP must return ExecuteStep, got {:?}",
582 action
583 );
584 }
585 other => panic!("Expected Started, got {other:?}"),
586 }
587 }
588
589 #[tokio::test]
590 async fn peripheral_signal_dispatches_to_matching_sop() {
591 let engine = test_engine(vec![test_sop(
592 "gpio-sop",
593 vec![SopTrigger::Peripheral {
594 board: "nucleo".into(),
595 signal: "pin_3".into(),
596 condition: None,
597 }],
598 )]);
599 let audit = test_audit();
600
601 let results =
602 dispatch_peripheral_signal(&engine, &audit, "nucleo", "pin_3", Some("1")).await;
603 assert_eq!(results.len(), 1);
604 assert!(
605 matches!(&results[0], DispatchResult::Started { sop_name, .. } if sop_name == "gpio-sop" )
606 );
607 }
608
609 #[tokio::test]
610 async fn peripheral_signal_no_match_returns_empty() {
611 let engine = test_engine(vec![test_sop(
612 "gpio-sop",
613 vec![SopTrigger::Peripheral {
614 board: "nucleo".into(),
615 signal: "pin_3".into(),
616 condition: None,
617 }],
618 )]);
619 let audit = test_audit();
620
621 let results = dispatch_peripheral_signal(&engine, &audit, "rpi", "gpio_5", None).await;
622 assert_eq!(results.len(), 1);
623 assert!(matches!(&results[0], DispatchResult::NoMatch));
624 }
625
626 #[test]
627 fn cron_cache_skips_invalid_expression() {
628 let sop = test_sop(
629 "bad-cron",
630 vec![SopTrigger::Cron {
631 expression: "not a valid cron".into(),
632 }],
633 );
634 let engine = test_engine(vec![sop]);
635 let cache = SopCronCache::from_engine(&engine);
636 assert!(cache.schedules().is_empty());
637 }
638
639 #[test]
640 fn cron_cache_parses_valid_expression() {
641 let sop = test_sop(
642 "valid-cron",
643 vec![SopTrigger::Cron {
644 expression: "0 */5 * * *".into(),
645 }],
646 );
647 let engine = test_engine(vec![sop]);
648 let cache = SopCronCache::from_engine(&engine);
649 assert_eq!(cache.schedules().len(), 1);
650 assert_eq!(cache.schedules()[0].0, "valid-cron");
651 assert_eq!(cache.schedules()[0].1, "0 */5 * * *");
652 }
653
654 #[tokio::test]
655 async fn cron_sop_trigger_fires_on_schedule() {
656 let sop = test_sop(
657 "cron-sop",
658 vec![SopTrigger::Cron {
659 expression: "* * * * *".into(),
660 }],
661 );
662 let engine = test_engine(vec![sop]);
663 let audit = test_audit();
664 let cache = SopCronCache::from_engine(&engine);
665
666 let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
668 let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
669
670 let started = results
671 .iter()
672 .filter(|r| matches!(r, DispatchResult::Started { .. }))
673 .count();
674 assert!(started >= 1, "Expected at least 1 started SOP from cron");
675 }
676
677 #[tokio::test]
678 async fn cron_sop_only_matching_expression_fires() {
679 let sop1 = test_sop(
680 "every-min",
681 vec![SopTrigger::Cron {
682 expression: "* * * * *".into(),
683 }],
684 );
685 let sop2 = test_sop(
688 "yearly",
689 vec![SopTrigger::Cron {
690 expression: "0 0 1 1 *".into(),
691 }],
692 );
693 let engine = test_engine(vec![sop1, sop2]);
694 let audit = test_audit();
695 let cache = SopCronCache::from_engine(&engine);
696
697 let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(2);
698 let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
699
700 let started_names: Vec<&str> = results
702 .iter()
703 .filter_map(|r| match r {
704 DispatchResult::Started { sop_name, .. } => Some(sop_name.as_str()),
705 _ => None,
706 })
707 .collect();
708 assert!(started_names.contains(&"every-min"));
709 assert!(!started_names.contains(&"yearly"));
710 }
711
712 #[tokio::test]
713 async fn cron_sop_window_check_does_not_miss_tick() {
714 let sop = test_sop(
715 "every-min",
716 vec![SopTrigger::Cron {
717 expression: "* * * * *".into(),
718 }],
719 );
720 let engine = test_engine(vec![sop]);
721 let audit = test_audit();
722 let cache = SopCronCache::from_engine(&engine);
723
724 let mut last_check = chrono::Utc::now() - chrono::Duration::minutes(5);
726 let results = check_sop_cron_triggers(&engine, &audit, &cache, &mut last_check).await;
727
728 let started = results
730 .iter()
731 .filter(|r| matches!(r, DispatchResult::Started { .. }))
732 .count();
733 assert!(
734 started >= 1,
735 "Window-based check should catch ticks from 5 minutes ago"
736 );
737
738 let now = chrono::Utc::now();
740 assert!(
741 (now - last_check).num_seconds() < 2,
742 "last_check should be updated to now"
743 );
744 }
745}