1pub mod actions;
76pub mod escalation;
77pub mod resolve;
78
79pub use escalation::parse_duration;
80pub use resolve::status_to_reaction_key;
81
82use crate::{
83 config::AoConfig,
84 error::Result,
85 events::OrchestratorEvent,
86 notifier::NotifierRegistry,
87 reactions::{EscalateAfter, ReactionAction, ReactionConfig, ReactionOutcome},
88 traits::{Runtime, Scm},
89 types::{Session, SessionId},
90};
91use escalation::TrackerState;
92use resolve::merge_reaction_config;
93use std::{
94 collections::{HashMap, HashSet},
95 sync::{Arc, Mutex},
96 time::Instant,
97};
98use tokio::sync::broadcast;
99
100pub struct ReactionEngine {
105 config: HashMap<String, ReactionConfig>,
109 ao_config: Option<Arc<AoConfig>>,
112 runtime: Arc<dyn Runtime>,
115 events_tx: broadcast::Sender<OrchestratorEvent>,
119 trackers: Mutex<HashMap<(SessionId, String), TrackerState>>,
122 warned_parse_failures: Mutex<HashSet<String>>,
130 scm: Option<Arc<dyn Scm>>,
135 notifier_registry: Option<NotifierRegistry>,
143}
144
145impl ReactionEngine {
146 pub fn new(
150 config: HashMap<String, ReactionConfig>,
151 runtime: Arc<dyn Runtime>,
152 events_tx: broadcast::Sender<OrchestratorEvent>,
153 ) -> Self {
154 Self {
155 config,
156 ao_config: None,
157 runtime,
158 events_tx,
159 trackers: Mutex::new(HashMap::new()),
160 warned_parse_failures: Mutex::new(HashSet::new()),
161 scm: None,
162 notifier_registry: None,
163 }
164 }
165
166 pub fn new_with_config(
172 ao_config: Arc<AoConfig>,
173 runtime: Arc<dyn Runtime>,
174 events_tx: broadcast::Sender<OrchestratorEvent>,
175 ) -> Self {
176 Self {
177 config: HashMap::new(),
178 ao_config: Some(ao_config),
179 runtime,
180 events_tx,
181 trackers: Mutex::new(HashMap::new()),
182 warned_parse_failures: Mutex::new(HashSet::new()),
183 scm: None,
184 notifier_registry: None,
185 }
186 }
187
188 fn global_reactions(&self) -> &HashMap<String, ReactionConfig> {
189 self.ao_config
190 .as_ref()
191 .map(|c| &c.reactions)
192 .unwrap_or(&self.config)
193 }
194
195 pub fn resolve_reaction_config(&self, session: &Session, key: &str) -> Option<ReactionConfig> {
203 let global = self.global_reactions().get(key).cloned();
204 let project = self
205 .ao_config
206 .as_ref()
207 .and_then(|c| c.projects.get(&session.project_id))
208 .and_then(|p| p.reactions.get(key))
209 .cloned();
210
211 match (global, project) {
212 (Some(g), Some(p)) => Some(merge_reaction_config(g, p)),
213 (Some(g), None) => Some(g),
214 (None, Some(p)) => Some(p),
215 (None, None) => None,
216 }
217 }
218
219 pub fn with_scm(mut self, scm: Arc<dyn Scm>) -> Self {
226 self.scm = Some(scm);
227 self
228 }
229
230 pub fn with_notifier_registry(mut self, registry: NotifierRegistry) -> Self {
234 self.notifier_registry = Some(registry);
235 self
236 }
237
238 pub async fn dispatch(
246 &self,
247 session: &Session,
248 reaction_key: &str,
249 ) -> Result<Option<ReactionOutcome>> {
250 let Some(cfg) = self.resolve_reaction_config(session, reaction_key) else {
251 tracing::debug!(
252 reaction = reaction_key,
253 session = %session.id,
254 "no reaction configured; skipping"
255 );
256 return Ok(None);
257 };
258 self.dispatch_with_cfg(session, reaction_key, cfg).await
259 }
260
261 pub async fn dispatch_with_message(
269 &self,
270 session: &Session,
271 reaction_key: &str,
272 message_override: String,
273 ) -> Result<Option<ReactionOutcome>> {
274 let Some(mut cfg) = self.resolve_reaction_config(session, reaction_key) else {
275 tracing::debug!(
276 reaction = reaction_key,
277 session = %session.id,
278 "no reaction configured; skipping"
279 );
280 return Ok(None);
281 };
282 cfg.message = Some(message_override);
283 self.dispatch_with_cfg(session, reaction_key, cfg).await
284 }
285
286 async fn dispatch_with_cfg(
290 &self,
291 session: &Session,
292 reaction_key: &str,
293 cfg: crate::reactions::ReactionConfig,
294 ) -> Result<Option<ReactionOutcome>> {
295 if !cfg.auto {
304 if cfg.action == ReactionAction::Notify {
305 let outcome = self
306 .dispatch_notify(session, reaction_key, &cfg, false)
307 .await;
308 return Ok(Some(outcome));
309 }
310 tracing::debug!(
311 reaction = reaction_key,
312 session = %session.id,
313 "reaction auto: false; skipping non-notify action"
314 );
315 return Ok(None);
316 }
317
318 let duration_gate: Option<std::time::Duration> = match cfg.escalate_after {
325 Some(EscalateAfter::Duration(ref s)) => match parse_duration(s) {
326 Some(d) => Some(d),
327 None => {
328 self.warn_once_parse_failure(reaction_key, "escalate_after", s);
329 None
330 }
331 },
332 _ => None,
333 };
334
335 let (attempts, should_escalate) = {
339 let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
340 tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
341 e.into_inner()
342 });
343 let entry = trackers
344 .entry((session.id.clone(), reaction_key.to_string()))
345 .or_insert_with(|| TrackerState {
346 attempts: 0,
347 first_triggered_at: Instant::now(),
348 });
349 entry.attempts += 1;
350 let attempts = entry.attempts;
351
352 let max_attempts = cfg.retries;
356 let mut escalate = max_attempts.is_some_and(|n| attempts > n);
357
358 if let Some(EscalateAfter::Attempts(n)) = cfg.escalate_after {
364 if attempts > n {
365 escalate = true;
366 }
367 } else if let Some(d) = duration_gate {
368 if entry.first_triggered_at.elapsed() > d {
372 escalate = true;
373 }
374 }
375
376 (attempts, escalate)
377 };
378
379 if should_escalate {
380 self.emit(OrchestratorEvent::ReactionEscalated {
381 id: session.id.clone(),
382 reaction_key: reaction_key.to_string(),
383 attempts,
384 });
385 let outcome = self
391 .dispatch_notify(session, reaction_key, &cfg, true)
392 .await;
393 return Ok(Some(outcome));
394 }
395
396 let outcome = match cfg.action {
397 ReactionAction::SendToAgent => {
398 self.dispatch_send_to_agent(session, reaction_key, &cfg)
399 .await
400 }
401 ReactionAction::Notify => {
402 self.dispatch_notify(session, reaction_key, &cfg, false)
403 .await
404 }
405 ReactionAction::AutoMerge => {
406 self.dispatch_auto_merge(session, reaction_key, &cfg).await
407 }
408 };
409 Ok(Some(outcome))
410 }
411
412 pub fn clear_tracker(&self, session_id: &SessionId, reaction_key: &str) {
419 let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
420 tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
421 e.into_inner()
422 });
423 trackers.remove(&(session_id.clone(), reaction_key.to_string()));
424 }
425
426 pub fn clear_all_for_session(&self, session_id: &SessionId) {
432 let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
433 tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
434 e.into_inner()
435 });
436 trackers.retain(|(sid, _), _| sid != session_id);
437 }
438
439 pub fn attempts(&self, session_id: &SessionId, reaction_key: &str) -> u32 {
443 self.trackers
444 .lock()
445 .unwrap_or_else(|e| {
446 tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
447 e.into_inner()
448 })
449 .get(&(session_id.clone(), reaction_key.to_string()))
450 .map(|t| t.attempts)
451 .unwrap_or(0)
452 }
453
454 #[cfg(test)]
461 fn first_triggered_at(&self, session_id: &SessionId, reaction_key: &str) -> Option<Instant> {
462 self.trackers
463 .lock()
464 .unwrap_or_else(|e| {
465 tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
466 e.into_inner()
467 })
468 .get(&(session_id.clone(), reaction_key.to_string()))
469 .map(|t| t.first_triggered_at)
470 }
471
472 pub(crate) fn warn_once_parse_failure(&self, reaction_key: &str, field: &str, raw: &str) {
488 let key = format!("{reaction_key}.{field}");
489 let mut warned = self.warned_parse_failures.lock().unwrap_or_else(|e| {
490 tracing::error!(
491 "reaction warned_parse_failures mutex poisoned; recovering inner state: {e}"
492 );
493 e.into_inner()
494 });
495 if warned.insert(key) {
496 tracing::warn!(
497 reaction = reaction_key,
498 field = field,
499 value = raw,
500 "ignoring unparseable duration string; expected `^\\d+(s|m|h)$`"
501 );
502 }
503 }
504
505 fn emit(&self, event: OrchestratorEvent) {
508 let _ = self.events_tx.send(event);
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use crate::{
516 config::{AoConfig, ProjectConfig},
517 notifier::NotificationPayload,
518 reactions::{EscalateAfter, EventPriority, ReactionAction, ReactionConfig},
519 traits::Runtime,
520 types::{now_ms, ActivityState, Session, SessionId, SessionStatus},
521 };
522 use async_trait::async_trait;
523 use std::path::{Path, PathBuf};
524 use std::sync::atomic::Ordering;
525 use std::sync::Mutex as StdMutex;
526
527 struct RecordingRuntime {
531 sends: StdMutex<Vec<(String, String)>>,
532 fail_send: std::sync::atomic::AtomicBool,
533 }
534
535 impl RecordingRuntime {
536 fn new() -> Self {
537 Self {
538 sends: StdMutex::new(Vec::new()),
539 fail_send: std::sync::atomic::AtomicBool::new(false),
540 }
541 }
542 fn sends(&self) -> Vec<(String, String)> {
543 self.sends.lock().unwrap().clone()
544 }
545 }
546
547 #[async_trait]
548 impl Runtime for RecordingRuntime {
549 async fn create(
550 &self,
551 _session_id: &str,
552 _cwd: &Path,
553 _launch_command: &str,
554 _env: &[(String, String)],
555 ) -> Result<String> {
556 Ok("mock-handle".into())
557 }
558 async fn send_message(&self, handle: &str, msg: &str) -> Result<()> {
559 if self.fail_send.load(Ordering::SeqCst) {
560 return Err(crate::error::AoError::Runtime("mock send failed".into()));
561 }
562 self.sends
563 .lock()
564 .unwrap()
565 .push((handle.to_string(), msg.to_string()));
566 Ok(())
567 }
568 async fn is_alive(&self, _handle: &str) -> Result<bool> {
569 Ok(true)
570 }
571 async fn destroy(&self, _handle: &str) -> Result<()> {
572 Ok(())
573 }
574 }
575
576 fn fake_session(id: &str) -> Session {
577 Session {
578 id: SessionId(id.into()),
579 project_id: "demo".into(),
580 status: SessionStatus::CiFailed,
581 agent: "claude-code".into(),
582 agent_config: None,
583 branch: format!("ao-{id}"),
584 task: "t".into(),
585 workspace_path: Some(PathBuf::from("/tmp/ws")),
586 runtime_handle: Some(format!("handle-{id}")),
587 runtime: "tmux".into(),
588 activity: Some(ActivityState::Ready),
589 created_at: now_ms(),
590 cost: None,
591 issue_id: None,
592 issue_url: None,
593 claimed_pr_number: None,
594 claimed_pr_url: None,
595 initial_prompt_override: None,
596 spawned_by: None,
597 last_merge_conflict_dispatched: None,
598 last_review_backlog_fingerprint: None,
599 }
600 }
601
602 fn minimal_project(reactions: HashMap<String, ReactionConfig>) -> ProjectConfig {
603 ProjectConfig {
604 name: None,
605 repo: "test/test".into(),
606 path: "/tmp/ao-test-project".into(),
607 default_branch: "main".into(),
608 session_prefix: None,
609 branch_namespace: None,
610 runtime: None,
611 agent: None,
612 workspace: None,
613 tracker: None,
614 scm: None,
615 symlinks: vec![],
616 post_create: vec![],
617 agent_config: None,
618 orchestrator: None,
619 worker: None,
620 reactions,
621 agent_rules: None,
622 agent_rules_file: None,
623 orchestrator_rules: None,
624 orchestrator_session_strategy: None,
625 opencode_issue_session_strategy: None,
626 }
627 }
628
629 fn build(
630 cfg_map: HashMap<String, ReactionConfig>,
631 ) -> (
632 Arc<ReactionEngine>,
633 Arc<RecordingRuntime>,
634 broadcast::Receiver<OrchestratorEvent>,
635 ) {
636 let runtime = Arc::new(RecordingRuntime::new());
637 let (tx, rx) = broadcast::channel(32);
638 let engine = Arc::new(ReactionEngine::new(
639 cfg_map,
640 runtime.clone() as Arc<dyn Runtime>,
641 tx,
642 ));
643 (engine, runtime, rx)
644 }
645
646 fn drain(rx: &mut broadcast::Receiver<OrchestratorEvent>) -> Vec<OrchestratorEvent> {
647 let mut out = Vec::new();
648 while let Ok(e) = rx.try_recv() {
649 out.push(e);
650 }
651 out
652 }
653
654 #[test]
657 fn resolve_reaction_config_merges_global_and_project() {
658 let mut global = ReactionConfig::new(ReactionAction::SendToAgent);
659 global.message = Some("global-msg".into());
660 global.retries = Some(3);
661 global.auto = true;
662 global.priority = Some(EventPriority::Warning);
663
664 let mut proj_cfg = ReactionConfig::new(ReactionAction::Notify);
665 proj_cfg.message = None;
666 proj_cfg.retries = None;
667 proj_cfg.auto = false;
668 proj_cfg.priority = Some(EventPriority::Urgent);
669
670 let mut reactions = HashMap::new();
671 reactions.insert("ci-failed".into(), global);
672
673 let mut proj_reactions = HashMap::new();
674 proj_reactions.insert("ci-failed".into(), proj_cfg);
675
676 let mut projects = HashMap::new();
677 projects.insert("demo".into(), minimal_project(proj_reactions));
678
679 let ao = AoConfig {
680 reactions,
681 projects,
682 ..Default::default()
683 };
684
685 let (tx, _rx) = broadcast::channel(4);
686 let engine = ReactionEngine::new_with_config(
687 Arc::new(ao),
688 Arc::new(RecordingRuntime::new()) as Arc<dyn Runtime>,
689 tx,
690 );
691 let session = fake_session("s1");
692
693 let resolved = engine
694 .resolve_reaction_config(&session, "ci-failed")
695 .expect("merged config");
696
697 assert_eq!(resolved.action, ReactionAction::Notify);
698 assert!(!resolved.auto);
699 assert_eq!(resolved.message.as_deref(), Some("global-msg"));
700 assert_eq!(resolved.retries, Some(3));
701 assert_eq!(resolved.priority, Some(EventPriority::Urgent));
702 }
703
704 #[test]
705 fn resolve_reaction_config_project_only_key() {
706 let mut proj_cfg = ReactionConfig::new(ReactionAction::Notify);
707 proj_cfg.message = Some("project-local".into());
708
709 let mut proj_reactions = HashMap::new();
710 proj_reactions.insert("only-in-project".into(), proj_cfg);
711
712 let mut projects = HashMap::new();
713 projects.insert("demo".into(), minimal_project(proj_reactions));
714
715 let ao = AoConfig {
716 projects,
717 ..Default::default()
718 };
719
720 let (tx, _rx) = broadcast::channel(4);
721 let engine = ReactionEngine::new_with_config(
722 Arc::new(ao),
723 Arc::new(RecordingRuntime::new()) as Arc<dyn Runtime>,
724 tx,
725 );
726 let session = fake_session("s1");
727
728 let resolved = engine
729 .resolve_reaction_config(&session, "only-in-project")
730 .expect("project-only reaction");
731 assert_eq!(resolved.action, ReactionAction::Notify);
732 assert_eq!(resolved.message.as_deref(), Some("project-local"));
733 }
734
735 #[tokio::test]
736 async fn tracker_first_triggered_at_persists_across_dispatches() {
737 let mut config = ReactionConfig::new(ReactionAction::Notify);
744 config.message = Some("hi".into());
745 let mut map = HashMap::new();
746 map.insert("ci-failed".into(), config);
747
748 let (engine, _runtime, _rx) = build(map);
749 let session = fake_session("s1");
750
751 assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
753 assert!(engine
754 .first_triggered_at(&session.id, "ci-failed")
755 .is_none());
756
757 engine.dispatch(&session, "ci-failed").await.unwrap();
759 assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
760 let first = engine
761 .first_triggered_at(&session.id, "ci-failed")
762 .expect("first dispatch must populate first_triggered_at");
763
764 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
768
769 engine.dispatch(&session, "ci-failed").await.unwrap();
771 assert_eq!(engine.attempts(&session.id, "ci-failed"), 2);
772 assert_eq!(
773 engine.first_triggered_at(&session.id, "ci-failed"),
774 Some(first),
775 "first_triggered_at must survive subsequent dispatches"
776 );
777 }
778
779 #[tokio::test]
780 async fn tracker_first_triggered_at_resets_after_clear() {
781 let mut config = ReactionConfig::new(ReactionAction::Notify);
787 config.message = Some("hi".into());
788 let mut map = HashMap::new();
789 map.insert("ci-failed".into(), config);
790
791 let (engine, _runtime, _rx) = build(map);
792 let session = fake_session("s1");
793
794 engine.dispatch(&session, "ci-failed").await.unwrap();
795 let first = engine
796 .first_triggered_at(&session.id, "ci-failed")
797 .expect("populated");
798
799 engine.clear_tracker(&session.id, "ci-failed");
801 assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
802 assert!(engine
803 .first_triggered_at(&session.id, "ci-failed")
804 .is_none());
805
806 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
807
808 engine.dispatch(&session, "ci-failed").await.unwrap();
809 let second = engine
810 .first_triggered_at(&session.id, "ci-failed")
811 .expect("repopulated");
812 assert!(
813 second > first,
814 "second episode must start a fresh first_triggered_at"
815 );
816 }
817
818 #[tokio::test]
819 async fn dispatch_unconfigured_key_is_noop() {
820 let (engine, runtime, mut rx) = build(HashMap::new());
821 let session = fake_session("s1");
822 let result = engine.dispatch(&session, "ci-failed").await.unwrap();
823 assert!(result.is_none());
824 assert!(runtime.sends().is_empty());
825 assert!(drain(&mut rx).is_empty());
826 }
827
828 #[tokio::test]
829 async fn dispatch_send_to_agent_calls_runtime_and_emits_event() {
830 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
831 config.message = Some("CI broke — please fix.".into());
832 let mut map = HashMap::new();
833 map.insert("ci-failed".into(), config);
834
835 let (engine, runtime, mut rx) = build(map);
836 let session = fake_session("s1");
837
838 let result = engine
839 .dispatch(&session, "ci-failed")
840 .await
841 .unwrap()
842 .unwrap();
843
844 assert!(result.success);
845 assert!(!result.escalated);
846 assert_eq!(result.action, ReactionAction::SendToAgent);
847 assert_eq!(runtime.sends().len(), 1);
848 assert_eq!(runtime.sends()[0].0, "handle-s1");
849 assert_eq!(runtime.sends()[0].1, "CI broke — please fix.");
850
851 let events = drain(&mut rx);
852 assert_eq!(events.len(), 2, "got {events:?}");
853 assert!(events.iter().any(|e| matches!(
854 e,
855 OrchestratorEvent::ReactionTriggered {
856 reaction_key,
857 action: ReactionAction::SendToAgent,
858 ..
859 } if reaction_key == "ci-failed"
860 )));
861 assert!(events.iter().any(|e| matches!(
862 e,
863 OrchestratorEvent::UiNotification { notification } if notification.reaction_key == "ci-failed"
864 )));
865 }
866
867 #[tokio::test]
868 async fn dispatch_send_to_agent_without_message_fails_softly() {
869 let config = ReactionConfig::new(ReactionAction::SendToAgent); let mut map = HashMap::new();
871 map.insert("ci-failed".into(), config);
872
873 let (engine, runtime, mut rx) = build(map);
874 let session = fake_session("s1");
875 let result = engine
876 .dispatch(&session, "ci-failed")
877 .await
878 .unwrap()
879 .unwrap();
880
881 assert!(!result.success);
882 assert!(runtime.sends().is_empty());
883 assert!(drain(&mut rx).is_empty());
886 }
887
888 #[tokio::test]
889 async fn dispatch_send_to_agent_propagates_runtime_send_failure_as_soft_failure() {
890 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
891 config.message = Some("fix it".into());
892 let mut map = HashMap::new();
893 map.insert("ci-failed".into(), config);
894
895 let (engine, runtime, mut rx) = build(map);
896 runtime.fail_send.store(true, Ordering::SeqCst);
897 let session = fake_session("s1");
898
899 let result = engine
900 .dispatch(&session, "ci-failed")
901 .await
902 .unwrap()
903 .unwrap();
904 assert!(!result.success);
905 assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
909 assert!(drain(&mut rx).is_empty());
910 }
911
912 #[tokio::test]
913 async fn dispatch_notify_emits_event_and_succeeds() {
914 let mut config = ReactionConfig::new(ReactionAction::Notify);
915 config.message = Some("approved & green".into());
916 let mut map = HashMap::new();
917 map.insert("approved-and-green".into(), config);
918
919 let (engine, runtime, mut rx) = build(map);
920 let mut session = fake_session("s1");
921 session.status = SessionStatus::Mergeable;
922
923 let result = engine
924 .dispatch(&session, "approved-and-green")
925 .await
926 .unwrap()
927 .unwrap();
928
929 assert!(result.success);
930 assert_eq!(result.action, ReactionAction::Notify);
931 assert!(runtime.sends().is_empty());
932
933 let events = drain(&mut rx);
934 assert_eq!(events.len(), 2, "got {events:?}");
935 assert!(events.iter().any(|e| matches!(
936 e,
937 OrchestratorEvent::ReactionTriggered {
938 action: ReactionAction::Notify,
939 ..
940 }
941 )));
942 assert!(events.iter().any(|e| matches!(
943 e,
944 OrchestratorEvent::UiNotification { notification } if notification.action == ReactionAction::Notify
945 )));
946 }
947
948 #[tokio::test]
949 async fn dispatch_auto_merge_without_scm_falls_back_to_phase_d_behaviour() {
950 let config = ReactionConfig::new(ReactionAction::AutoMerge);
956 let mut map = HashMap::new();
957 map.insert("approved-and-green".into(), config);
958
959 let (engine, _runtime, mut rx) = build(map);
960 let mut session = fake_session("s1");
961 session.status = SessionStatus::Mergeable;
962
963 let result = engine
964 .dispatch(&session, "approved-and-green")
965 .await
966 .unwrap()
967 .unwrap();
968 assert!(result.success);
969 assert_eq!(result.action, ReactionAction::AutoMerge);
970
971 let events = drain(&mut rx);
972 assert_eq!(events.len(), 1);
973 assert!(matches!(
974 &events[0],
975 OrchestratorEvent::ReactionTriggered {
976 action: ReactionAction::AutoMerge,
977 ..
978 }
979 ));
980 }
981
982 use crate::scm::{
985 CheckRun, CiStatus, MergeMethod, MergeReadiness, PrState, PullRequest, Review,
986 ReviewComment, ReviewDecision,
987 };
988
989 struct MergeMockScm {
993 pr: StdMutex<Option<PullRequest>>,
994 readiness: StdMutex<MergeReadiness>,
995 merge_calls: StdMutex<Vec<(u32, Option<MergeMethod>)>>,
996 detect_pr_errors: std::sync::atomic::AtomicBool,
997 merge_errors: std::sync::atomic::AtomicBool,
998 }
999
1000 impl MergeMockScm {
1001 fn new(pr: Option<PullRequest>, readiness: MergeReadiness) -> Self {
1002 Self {
1003 pr: StdMutex::new(pr),
1004 readiness: StdMutex::new(readiness),
1005 merge_calls: StdMutex::new(Vec::new()),
1006 detect_pr_errors: std::sync::atomic::AtomicBool::new(false),
1007 merge_errors: std::sync::atomic::AtomicBool::new(false),
1008 }
1009 }
1010 fn merges(&self) -> Vec<(u32, Option<MergeMethod>)> {
1011 self.merge_calls.lock().unwrap().clone()
1012 }
1013 }
1014
1015 #[async_trait]
1016 impl Scm for MergeMockScm {
1017 fn name(&self) -> &str {
1018 "merge-mock"
1019 }
1020 async fn detect_pr(&self, _session: &Session) -> Result<Option<PullRequest>> {
1021 if self.detect_pr_errors.load(Ordering::SeqCst) {
1022 return Err(crate::error::AoError::Runtime("detect_pr".into()));
1023 }
1024 Ok(self.pr.lock().unwrap().clone())
1025 }
1026 async fn pr_state(&self, _pr: &PullRequest) -> Result<PrState> {
1027 Ok(PrState::Open)
1028 }
1029 async fn ci_checks(&self, _pr: &PullRequest) -> Result<Vec<CheckRun>> {
1030 Ok(vec![])
1031 }
1032 async fn ci_status(&self, _pr: &PullRequest) -> Result<CiStatus> {
1033 Ok(CiStatus::Passing)
1034 }
1035 async fn reviews(&self, _pr: &PullRequest) -> Result<Vec<Review>> {
1036 Ok(vec![])
1037 }
1038 async fn review_decision(&self, _pr: &PullRequest) -> Result<ReviewDecision> {
1039 Ok(ReviewDecision::Approved)
1040 }
1041 async fn pending_comments(&self, _pr: &PullRequest) -> Result<Vec<ReviewComment>> {
1042 Ok(vec![])
1043 }
1044 async fn mergeability(&self, _pr: &PullRequest) -> Result<MergeReadiness> {
1045 Ok(self.readiness.lock().unwrap().clone())
1046 }
1047 async fn merge(&self, pr: &PullRequest, method: Option<MergeMethod>) -> Result<()> {
1048 if self.merge_errors.load(Ordering::SeqCst) {
1049 return Err(crate::error::AoError::Runtime("merge failed".into()));
1050 }
1051 self.merge_calls.lock().unwrap().push((pr.number, method));
1052 Ok(())
1053 }
1054 }
1055
1056 fn ready_readiness() -> MergeReadiness {
1057 MergeReadiness {
1058 mergeable: true,
1059 ci_passing: true,
1060 approved: true,
1061 no_conflicts: true,
1062 blockers: vec![],
1063 }
1064 }
1065
1066 fn fake_pr(number: u32) -> PullRequest {
1067 PullRequest {
1068 number,
1069 url: format!("https://github.com/acme/widgets/pull/{number}"),
1070 title: "fix the widgets".into(),
1071 owner: "acme".into(),
1072 repo: "widgets".into(),
1073 branch: "ao-s1".into(),
1074 base_branch: "main".into(),
1075 is_draft: false,
1076 }
1077 }
1078
1079 fn build_with_scm(
1080 cfg_map: HashMap<String, ReactionConfig>,
1081 scm: Arc<dyn Scm>,
1082 ) -> (
1083 Arc<ReactionEngine>,
1084 Arc<RecordingRuntime>,
1085 broadcast::Receiver<OrchestratorEvent>,
1086 ) {
1087 let runtime = Arc::new(RecordingRuntime::new());
1088 let (tx, rx) = broadcast::channel(32);
1089 let engine = Arc::new(
1090 ReactionEngine::new(cfg_map, runtime.clone() as Arc<dyn Runtime>, tx).with_scm(scm),
1091 );
1092 (engine, runtime, rx)
1093 }
1094
1095 #[tokio::test]
1096 async fn dispatch_auto_merge_with_ready_pr_calls_scm_merge() {
1097 let config = ReactionConfig::new(ReactionAction::AutoMerge);
1100 let mut map = HashMap::new();
1101 map.insert("approved-and-green".into(), config);
1102
1103 let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
1104 let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1105
1106 let mut session = fake_session("s1");
1107 session.status = SessionStatus::Mergeable;
1108
1109 let result = engine
1110 .dispatch(&session, "approved-and-green")
1111 .await
1112 .unwrap()
1113 .unwrap();
1114
1115 assert!(result.success);
1116 assert_eq!(result.action, ReactionAction::AutoMerge);
1117 assert_eq!(scm.merges().len(), 1, "expected one merge call");
1118 assert_eq!(scm.merges()[0], (42, None));
1119 assert!(result.message.unwrap().contains("merged PR #42"));
1120
1121 let events = drain(&mut rx);
1122 assert_eq!(events.len(), 1);
1123 assert!(matches!(
1124 &events[0],
1125 OrchestratorEvent::ReactionTriggered {
1126 action: ReactionAction::AutoMerge,
1127 ..
1128 }
1129 ));
1130 }
1131
1132 #[tokio::test]
1133 async fn dispatch_auto_merge_with_stale_green_observation_does_not_merge() {
1134 let config = ReactionConfig::new(ReactionAction::AutoMerge);
1143 let mut map = HashMap::new();
1144 map.insert("approved-and-green".into(), config);
1145
1146 let stale = MergeReadiness {
1147 mergeable: false,
1148 ci_passing: false,
1149 approved: true,
1150 no_conflicts: true,
1151 blockers: vec!["CI is failing".into()],
1152 };
1153 let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), stale));
1154 let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1155
1156 let mut session = fake_session("s1");
1157 session.status = SessionStatus::Mergeable;
1158
1159 let result = engine
1160 .dispatch(&session, "approved-and-green")
1161 .await
1162 .unwrap()
1163 .unwrap();
1164
1165 assert!(!result.success, "stale observation must not merge");
1166 assert!(scm.merges().is_empty(), "Scm::merge must not be called");
1167
1168 let events = drain(&mut rx);
1172 assert!(
1173 events.is_empty(),
1174 "stale-green skip must not emit events, got {events:?}"
1175 );
1176 }
1177
1178 #[tokio::test]
1179 async fn dispatch_auto_merge_with_no_pr_returns_soft_failure() {
1180 let config = ReactionConfig::new(ReactionAction::AutoMerge);
1183 let mut map = HashMap::new();
1184 map.insert("approved-and-green".into(), config);
1185
1186 let scm = Arc::new(MergeMockScm::new(None, ready_readiness()));
1187 let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1188
1189 let mut session = fake_session("s1");
1190 session.status = SessionStatus::Mergeable;
1191
1192 let result = engine
1193 .dispatch(&session, "approved-and-green")
1194 .await
1195 .unwrap()
1196 .unwrap();
1197
1198 assert!(!result.success);
1199 assert!(scm.merges().is_empty());
1200 let events = drain(&mut rx);
1203 assert!(
1204 events.is_empty(),
1205 "no-PR skip must not emit events, got {events:?}"
1206 );
1207 }
1208
1209 #[tokio::test]
1210 async fn dispatch_auto_merge_with_detect_pr_error_returns_soft_failure() {
1211 let config = ReactionConfig::new(ReactionAction::AutoMerge);
1212 let mut map = HashMap::new();
1213 map.insert("approved-and-green".into(), config);
1214
1215 let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
1216 scm.detect_pr_errors.store(true, Ordering::SeqCst);
1217 let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1218
1219 let mut session = fake_session("s1");
1220 session.status = SessionStatus::Mergeable;
1221
1222 let result = engine
1223 .dispatch(&session, "approved-and-green")
1224 .await
1225 .unwrap()
1226 .unwrap();
1227
1228 assert!(!result.success);
1229 assert!(scm.merges().is_empty(), "merge must not run on detect err");
1230 let events = drain(&mut rx);
1231 assert!(
1232 events.is_empty(),
1233 "detect_pr error must not emit events, got {events:?}"
1234 );
1235 }
1236
1237 #[tokio::test]
1238 async fn dispatch_auto_merge_propagates_merge_error_as_soft_failure() {
1239 let config = ReactionConfig::new(ReactionAction::AutoMerge);
1244 let mut map = HashMap::new();
1245 map.insert("approved-and-green".into(), config);
1246
1247 let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
1248 scm.merge_errors.store(true, Ordering::SeqCst);
1249 let (engine, _runtime, _rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1250
1251 let mut session = fake_session("s1");
1252 session.status = SessionStatus::Mergeable;
1253
1254 let result = engine
1255 .dispatch(&session, "approved-and-green")
1256 .await
1257 .unwrap()
1258 .unwrap();
1259
1260 assert!(!result.success);
1261 assert!(
1262 result.message.unwrap().contains("merge failed"),
1263 "error message should surface"
1264 );
1265 }
1266
1267 #[tokio::test]
1268 async fn dispatch_auto_false_skips_active_actions_but_allows_notify() {
1269 let mut sta = ReactionConfig::new(ReactionAction::SendToAgent);
1271 sta.auto = false;
1272 sta.message = Some("noop".into());
1273 let mut map = HashMap::new();
1274 map.insert("ci-failed".into(), sta);
1275
1276 let mut notify = ReactionConfig::new(ReactionAction::Notify);
1279 notify.auto = false;
1280 map.insert("approved-and-green".into(), notify);
1281
1282 let (engine, runtime, mut rx) = build(map);
1283
1284 let s1 = fake_session("s1");
1286 assert!(engine.dispatch(&s1, "ci-failed").await.unwrap().is_none());
1287 assert!(runtime.sends().is_empty());
1288 assert!(drain(&mut rx).is_empty());
1289
1290 let mut s2 = fake_session("s2");
1292 s2.status = SessionStatus::Mergeable;
1293 let result = engine
1294 .dispatch(&s2, "approved-and-green")
1295 .await
1296 .unwrap()
1297 .unwrap();
1298 assert!(result.success);
1299 assert_eq!(result.action, ReactionAction::Notify);
1300 }
1301
1302 #[tokio::test]
1303 async fn retries_exhausted_escalates_to_notify_and_emits_both_events() {
1304 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1306 config.message = Some("fix".into());
1307 config.retries = Some(2);
1308 let mut map = HashMap::new();
1309 map.insert("ci-failed".into(), config);
1310
1311 let (engine, runtime, mut rx) = build(map);
1312 let session = fake_session("s1");
1313
1314 let r1 = engine
1316 .dispatch(&session, "ci-failed")
1317 .await
1318 .unwrap()
1319 .unwrap();
1320 assert!(r1.success);
1321 assert!(!r1.escalated);
1322 let r2 = engine
1323 .dispatch(&session, "ci-failed")
1324 .await
1325 .unwrap()
1326 .unwrap();
1327 assert!(r2.success);
1328 assert!(!r2.escalated);
1329 assert_eq!(runtime.sends().len(), 2);
1330
1331 let r3 = engine
1333 .dispatch(&session, "ci-failed")
1334 .await
1335 .unwrap()
1336 .unwrap();
1337 assert!(r3.escalated);
1338 assert_eq!(r3.action, ReactionAction::Notify);
1339 assert_eq!(runtime.sends().len(), 2);
1341
1342 let events = drain(&mut rx);
1345 assert_eq!(events.len(), 7, "got {events:?}");
1346 let escalated_count = events
1347 .iter()
1348 .filter(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. }))
1349 .count();
1350 assert_eq!(escalated_count, 1);
1351 assert!(events.iter().any(|e| matches!(
1352 e,
1353 OrchestratorEvent::ReactionTriggered {
1354 action: ReactionAction::Notify,
1355 ..
1356 }
1357 )));
1358 assert!(matches!(
1359 events.last().unwrap(),
1360 OrchestratorEvent::UiNotification { .. }
1361 ));
1362 }
1363
1364 #[tokio::test]
1365 async fn escalate_after_attempts_escalates_independently_of_retries() {
1366 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1369 config.message = Some("fix".into());
1370 config.escalate_after = Some(EscalateAfter::Attempts(1));
1371 let mut map = HashMap::new();
1372 map.insert("ci-failed".into(), config);
1373
1374 let (engine, runtime, _rx) = build(map);
1375 let session = fake_session("s1");
1376
1377 let r1 = engine
1379 .dispatch(&session, "ci-failed")
1380 .await
1381 .unwrap()
1382 .unwrap();
1383 assert!(!r1.escalated);
1384 assert_eq!(runtime.sends().len(), 1);
1385
1386 let r2 = engine
1388 .dispatch(&session, "ci-failed")
1389 .await
1390 .unwrap()
1391 .unwrap();
1392 assert!(r2.escalated);
1393 assert_eq!(runtime.sends().len(), 1);
1394 }
1395
1396 #[tokio::test]
1397 async fn escalate_after_duration_does_not_fire_before_elapsed() {
1398 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1405 config.message = Some("fix".into());
1406 config.escalate_after = Some(EscalateAfter::Duration("10m".into()));
1407 let mut map = HashMap::new();
1408 map.insert("ci-failed".into(), config);
1409
1410 let (engine, runtime, _rx) = build(map);
1411 let session = fake_session("s1");
1412
1413 for _ in 0..5 {
1414 let r = engine
1415 .dispatch(&session, "ci-failed")
1416 .await
1417 .unwrap()
1418 .unwrap();
1419 assert!(!r.escalated);
1420 }
1421 assert_eq!(runtime.sends().len(), 5);
1422 }
1423
1424 #[tokio::test]
1425 async fn escalate_after_duration_fires_once_elapsed_exceeds_threshold() {
1426 let mut config = ReactionConfig::new(ReactionAction::Notify);
1432 config.message = Some("stuck".into());
1433 config.retries = None; config.escalate_after = Some(EscalateAfter::Duration("1s".into()));
1435 let mut map = HashMap::new();
1436 map.insert("agent-stuck".into(), config);
1437
1438 let (engine, _runtime, mut rx) = build(map);
1439 let mut session = fake_session("s1");
1440 session.status = SessionStatus::Working;
1441
1442 let first = engine
1445 .dispatch(&session, "agent-stuck")
1446 .await
1447 .unwrap()
1448 .unwrap();
1449 assert!(!first.escalated);
1450
1451 {
1455 let mut trackers = engine.trackers.lock().unwrap();
1456 let key = (session.id.clone(), "agent-stuck".to_string());
1457 let entry = trackers.get_mut(&key).expect("tracker populated");
1458 entry.first_triggered_at = Instant::now()
1459 .checked_sub(std::time::Duration::from_secs(2))
1460 .expect("monotonic clock has been running >2s");
1461 }
1462
1463 let second = engine
1465 .dispatch(&session, "agent-stuck")
1466 .await
1467 .unwrap()
1468 .unwrap();
1469 assert!(second.escalated, "duration gate should have fired");
1470 assert_eq!(second.action, ReactionAction::Notify);
1471
1472 let events = drain(&mut rx);
1475 assert!(
1476 events
1477 .iter()
1478 .any(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. })),
1479 "expected ReactionEscalated, got {events:?}"
1480 );
1481 }
1482
1483 #[tokio::test]
1484 async fn escalate_after_duration_with_garbage_string_logs_once_and_retries_gate_still_fires() {
1485 let mut config = ReactionConfig::new(ReactionAction::Notify);
1490 config.message = Some("stuck".into());
1491 config.retries = Some(2);
1492 config.escalate_after = Some(EscalateAfter::Duration("ten minutes".into()));
1493 let mut map = HashMap::new();
1494 map.insert("agent-stuck".into(), config);
1495
1496 let (engine, _runtime, _rx) = build(map);
1497 let mut session = fake_session("s1");
1498 session.status = SessionStatus::Working;
1499
1500 let r1 = engine
1502 .dispatch(&session, "agent-stuck")
1503 .await
1504 .unwrap()
1505 .unwrap();
1506 assert!(!r1.escalated);
1507 let r2 = engine
1508 .dispatch(&session, "agent-stuck")
1509 .await
1510 .unwrap()
1511 .unwrap();
1512 assert!(!r2.escalated);
1513 let r3 = engine
1514 .dispatch(&session, "agent-stuck")
1515 .await
1516 .unwrap()
1517 .unwrap();
1518 assert!(
1519 r3.escalated,
1520 "retries gate must still fire even when escalate_after is garbage"
1521 );
1522
1523 let warned = engine.warned_parse_failures.lock().unwrap();
1526 assert!(warned.contains("agent-stuck.escalate_after"));
1527 assert_eq!(
1528 warned.len(),
1529 1,
1530 "only one warn should be recorded across 3 dispatches"
1531 );
1532 }
1533
1534 #[tokio::test]
1535 async fn warn_once_parse_failure_is_idempotent_per_key() {
1536 let (engine, _runtime, _rx) = build(HashMap::new());
1543
1544 engine.warn_once_parse_failure("agent-stuck", "threshold", "ten");
1545 engine.warn_once_parse_failure("agent-stuck", "threshold", "eleven");
1546 engine.warn_once_parse_failure("agent-stuck", "threshold", "twelve");
1547 engine.warn_once_parse_failure("agent-stuck", "escalate_after", "frob");
1548
1549 let warned = engine.warned_parse_failures.lock().unwrap();
1550 assert_eq!(warned.len(), 2);
1551 assert!(warned.contains("agent-stuck.threshold"));
1552 assert!(warned.contains("agent-stuck.escalate_after"));
1553 }
1554
1555 #[tokio::test]
1556 async fn clear_tracker_after_escalation_restores_real_action() {
1557 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1564 config.message = Some("fix".into());
1565 config.retries = Some(1);
1566 let mut map = HashMap::new();
1567 map.insert("ci-failed".into(), config);
1568
1569 let (engine, runtime, _rx) = build(map);
1570 let session = fake_session("s1");
1571
1572 let r1 = engine
1574 .dispatch(&session, "ci-failed")
1575 .await
1576 .unwrap()
1577 .unwrap();
1578 assert!(!r1.escalated);
1579 let r2 = engine
1581 .dispatch(&session, "ci-failed")
1582 .await
1583 .unwrap()
1584 .unwrap();
1585 assert!(r2.escalated);
1586 assert_eq!(runtime.sends().len(), 1);
1587
1588 engine.clear_tracker(&session.id, "ci-failed");
1590
1591 let r3 = engine
1593 .dispatch(&session, "ci-failed")
1594 .await
1595 .unwrap()
1596 .unwrap();
1597 assert!(r3.success);
1598 assert!(!r3.escalated);
1599 assert_eq!(r3.action, ReactionAction::SendToAgent);
1600 assert_eq!(runtime.sends().len(), 2);
1601 }
1602
1603 #[tokio::test]
1604 async fn clear_all_for_session_drops_every_reaction_tracker() {
1605 let mut ci = ReactionConfig::new(ReactionAction::SendToAgent);
1609 ci.message = Some("fix".into());
1610 let mut cr = ReactionConfig::new(ReactionAction::SendToAgent);
1611 cr.message = Some("review".into());
1612 let mut map = HashMap::new();
1613 map.insert("ci-failed".into(), ci);
1614 map.insert("changes-requested".into(), cr);
1615
1616 let (engine, _runtime, _rx) = build(map);
1617 let a = fake_session("a");
1618 let b = fake_session("b");
1619
1620 engine.dispatch(&a, "ci-failed").await.unwrap();
1622 engine.dispatch(&a, "changes-requested").await.unwrap();
1623 engine.dispatch(&b, "ci-failed").await.unwrap();
1624 assert_eq!(engine.attempts(&a.id, "ci-failed"), 1);
1625 assert_eq!(engine.attempts(&a.id, "changes-requested"), 1);
1626 assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
1627
1628 engine.clear_all_for_session(&a.id);
1630
1631 assert_eq!(engine.attempts(&a.id, "ci-failed"), 0);
1632 assert_eq!(engine.attempts(&a.id, "changes-requested"), 0);
1633 assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
1635 }
1636
1637 #[tokio::test]
1638 async fn auto_false_notify_fires_once_per_transition_and_does_not_escalate() {
1639 let mut cfg = ReactionConfig::new(ReactionAction::Notify);
1643 cfg.auto = false;
1644 cfg.retries = Some(0); let mut map = HashMap::new();
1646 map.insert("approved-and-green".into(), cfg);
1647
1648 let (engine, _runtime, mut rx) = build(map);
1649 let mut session = fake_session("s1");
1650 session.status = SessionStatus::Mergeable;
1651
1652 for _ in 0..2 {
1655 let r = engine
1656 .dispatch(&session, "approved-and-green")
1657 .await
1658 .unwrap()
1659 .unwrap();
1660 assert!(r.success);
1661 assert!(!r.escalated);
1662 assert_eq!(r.action, ReactionAction::Notify);
1663 }
1664 assert_eq!(engine.attempts(&session.id, "approved-and-green"), 0);
1665
1666 let events = drain(&mut rx);
1668 assert!(
1669 !events
1670 .iter()
1671 .any(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. })),
1672 "auto:false notify must not escalate, got {events:?}"
1673 );
1674 }
1675
1676 #[tokio::test]
1677 async fn clear_tracker_resets_attempts_for_next_transition() {
1678 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1679 config.message = Some("fix".into());
1680 config.retries = Some(1);
1681 let mut map = HashMap::new();
1682 map.insert("ci-failed".into(), config);
1683
1684 let (engine, _runtime, _rx) = build(map);
1685 let session = fake_session("s1");
1686
1687 engine.dispatch(&session, "ci-failed").await.unwrap();
1689 assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
1690
1691 engine.clear_tracker(&session.id, "ci-failed");
1693 assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
1694
1695 let r = engine
1697 .dispatch(&session, "ci-failed")
1698 .await
1699 .unwrap()
1700 .unwrap();
1701 assert!(r.success);
1702 assert!(!r.escalated);
1703 }
1704
1705 #[tokio::test]
1706 async fn trackers_are_scoped_per_reaction_key() {
1707 let mut ci = ReactionConfig::new(ReactionAction::SendToAgent);
1710 ci.message = Some("fix ci".into());
1711 let mut cr = ReactionConfig::new(ReactionAction::SendToAgent);
1712 cr.message = Some("address review".into());
1713
1714 let mut map = HashMap::new();
1715 map.insert("ci-failed".into(), ci);
1716 map.insert("changes-requested".into(), cr);
1717
1718 let (engine, _runtime, _rx) = build(map);
1719 let session = fake_session("s1");
1720
1721 engine.dispatch(&session, "ci-failed").await.unwrap();
1722 engine.dispatch(&session, "ci-failed").await.unwrap();
1723 engine
1724 .dispatch(&session, "changes-requested")
1725 .await
1726 .unwrap();
1727
1728 assert_eq!(engine.attempts(&session.id, "ci-failed"), 2);
1729 assert_eq!(engine.attempts(&session.id, "changes-requested"), 1);
1730 }
1731
1732 #[tokio::test]
1733 async fn trackers_are_scoped_per_session_id() {
1734 let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
1735 cfg.message = Some("fix".into());
1736 let mut map = HashMap::new();
1737 map.insert("ci-failed".into(), cfg);
1738
1739 let (engine, _runtime, _rx) = build(map);
1740 let a = fake_session("a");
1741 let b = fake_session("b");
1742
1743 engine.dispatch(&a, "ci-failed").await.unwrap();
1744 engine.dispatch(&a, "ci-failed").await.unwrap();
1745 engine.dispatch(&b, "ci-failed").await.unwrap();
1746
1747 assert_eq!(engine.attempts(&a.id, "ci-failed"), 2);
1748 assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
1749 }
1750
1751 use crate::notifier::{tests::TestNotifier, NotificationRouting, NotifierRegistry};
1754
1755 fn build_with_notifier(
1758 cfg_map: HashMap<String, ReactionConfig>,
1759 registry: NotifierRegistry,
1760 ) -> (
1761 Arc<ReactionEngine>,
1762 Arc<RecordingRuntime>,
1763 broadcast::Receiver<OrchestratorEvent>,
1764 ) {
1765 let runtime = Arc::new(RecordingRuntime::new());
1766 let (tx, rx) = broadcast::channel(32);
1767 let engine = Arc::new(
1768 ReactionEngine::new(cfg_map, runtime.clone() as Arc<dyn Runtime>, tx)
1769 .with_notifier_registry(registry),
1770 );
1771 (engine, runtime, rx)
1772 }
1773
1774 #[tokio::test]
1775 async fn dispatch_notify_without_registry_unchanged() {
1776 let mut config = ReactionConfig::new(ReactionAction::Notify);
1779 config.message = Some("approved".into());
1780 let mut map = HashMap::new();
1781 map.insert("approved-and-green".into(), config);
1782
1783 let (engine, _runtime, mut rx) = build(map);
1784 let mut session = fake_session("s1");
1785 session.status = SessionStatus::Mergeable;
1786
1787 let result = engine
1788 .dispatch(&session, "approved-and-green")
1789 .await
1790 .unwrap()
1791 .unwrap();
1792 assert!(result.success);
1793 assert_eq!(result.action, ReactionAction::Notify);
1794 assert!(!result.escalated);
1795 assert_eq!(result.message.as_deref(), Some("approved"));
1796
1797 let events = drain(&mut rx);
1798 assert_eq!(events.len(), 2, "got {events:?}");
1799 assert!(events.iter().any(|e| matches!(
1800 e,
1801 OrchestratorEvent::ReactionTriggered {
1802 action: ReactionAction::Notify,
1803 ..
1804 }
1805 )));
1806 assert!(events.iter().any(|e| matches!(
1807 e,
1808 OrchestratorEvent::UiNotification { notification } if notification.action == ReactionAction::Notify
1809 )));
1810 }
1811
1812 #[tokio::test]
1813 async fn dispatch_notify_with_empty_routing_is_success() {
1814 let registry = NotifierRegistry::new(NotificationRouting::default());
1817 let config = ReactionConfig::new(ReactionAction::Notify);
1818 let mut map = HashMap::new();
1819 map.insert("approved-and-green".into(), config);
1820
1821 let (engine, _runtime, mut rx) = build_with_notifier(map, registry);
1822 let mut session = fake_session("s1");
1823 session.status = SessionStatus::Mergeable;
1824
1825 let result = engine
1826 .dispatch(&session, "approved-and-green")
1827 .await
1828 .unwrap()
1829 .unwrap();
1830 assert!(result.success);
1831 assert!(!result.escalated);
1832
1833 let events = drain(&mut rx);
1834 assert!(events
1835 .iter()
1836 .any(|e| matches!(e, OrchestratorEvent::ReactionTriggered { .. })));
1837 }
1838
1839 #[tokio::test]
1840 async fn dispatch_notify_routes_to_single_plugin() {
1841 let mut routing = HashMap::new();
1844 routing.insert(EventPriority::Action, vec!["test".to_string()]);
1845 let (tn, received) = TestNotifier::new("test");
1846 let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
1847 registry.register("test", Arc::new(tn));
1848
1849 let mut config = ReactionConfig::new(ReactionAction::Notify);
1850 config.message = Some("PR merged".into());
1851 let mut map = HashMap::new();
1852 map.insert("approved-and-green".into(), config);
1853
1854 let (engine, _runtime, _rx) = build_with_notifier(map, registry);
1855 let mut session = fake_session("s1");
1856 session.status = SessionStatus::Mergeable;
1857
1858 let result = engine
1859 .dispatch(&session, "approved-and-green")
1860 .await
1861 .unwrap()
1862 .unwrap();
1863 assert!(result.success);
1864 assert_eq!(result.message.as_deref(), Some("PR merged"));
1865
1866 let payloads = received.lock().unwrap();
1867 assert_eq!(payloads.len(), 1);
1868 assert_eq!(payloads[0].reaction_key, "approved-and-green");
1869 assert_eq!(payloads[0].priority, EventPriority::Action);
1870 assert_eq!(payloads[0].body, "PR merged");
1871 assert!(!payloads[0].escalated);
1872 }
1873
1874 #[tokio::test]
1875 async fn dispatch_notify_fan_out_reports_partial_failure() {
1876 use crate::notifier::NotifierError;
1879
1880 struct FailNotifier;
1881
1882 #[async_trait::async_trait]
1883 impl crate::notifier::Notifier for FailNotifier {
1884 fn name(&self) -> &str {
1885 "fail"
1886 }
1887 async fn send(
1888 &self,
1889 _payload: &NotificationPayload,
1890 ) -> std::result::Result<(), NotifierError> {
1891 Err(NotifierError::Unavailable("offline".into()))
1892 }
1893 }
1894
1895 let mut routing = HashMap::new();
1896 routing.insert(
1897 EventPriority::Urgent,
1898 vec!["ok-plugin".to_string(), "fail".to_string()],
1899 );
1900 let (tn, received) = TestNotifier::new("ok-plugin");
1901 let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
1902 registry.register("ok-plugin", Arc::new(tn));
1903 registry.register("fail", Arc::new(FailNotifier));
1904
1905 let mut config = ReactionConfig::new(ReactionAction::Notify);
1906 config.message = Some("something".into());
1907 let mut map = HashMap::new();
1908 map.insert("agent-stuck".into(), config);
1911
1912 let (engine, _runtime, _rx) = build_with_notifier(map, registry);
1913 let mut session = fake_session("s1");
1914 session.status = SessionStatus::Stuck;
1915
1916 let result = engine
1917 .dispatch(&session, "agent-stuck")
1918 .await
1919 .unwrap()
1920 .unwrap();
1921 assert!(!result.success);
1922 let msg = result.message.unwrap();
1923 assert!(
1924 msg.contains("fail"),
1925 "error message should name the failing notifier, got: {msg}"
1926 );
1927
1928 let payloads = received.lock().unwrap();
1930 assert_eq!(payloads.len(), 1);
1931 assert_eq!(payloads[0].reaction_key, "agent-stuck");
1932 }
1933
1934 #[tokio::test]
1935 async fn escalation_routes_through_notifier_registry() {
1936 let mut routing = HashMap::new();
1939 routing.insert(EventPriority::Urgent, vec!["test".to_string()]);
1940 let (tn, received) = TestNotifier::new("test");
1941 let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
1942 registry.register("test", Arc::new(tn));
1943
1944 let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1945 config.message = Some("fix ci".into());
1946 config.retries = Some(1);
1947 let mut map = HashMap::new();
1948 map.insert("ci-failed".into(), config);
1949
1950 let (engine, _runtime, mut rx) = build_with_notifier(map, registry);
1951 let session = fake_session("s1");
1952
1953 let r1 = engine
1955 .dispatch(&session, "ci-failed")
1956 .await
1957 .unwrap()
1958 .unwrap();
1959 assert!(!r1.escalated);
1960
1961 let r2 = engine
1963 .dispatch(&session, "ci-failed")
1964 .await
1965 .unwrap()
1966 .unwrap();
1967 assert!(r2.escalated);
1968 assert_eq!(r2.action, ReactionAction::Notify);
1969
1970 let payloads = received.lock().unwrap();
1972 assert_eq!(payloads.len(), 1);
1973 assert!(payloads[0].escalated);
1974 assert_eq!(payloads[0].reaction_key, "ci-failed");
1975 assert_eq!(payloads[0].priority, EventPriority::Urgent);
1976
1977 let events = drain(&mut rx);
1979 assert!(events.iter().any(|e| matches!(
1980 e,
1981 OrchestratorEvent::ReactionEscalated {
1982 reaction_key,
1983 ..
1984 } if reaction_key == "ci-failed"
1985 )));
1986 assert!(events.iter().any(|e| matches!(
1987 e,
1988 OrchestratorEvent::ReactionTriggered {
1989 action: ReactionAction::Notify,
1990 ..
1991 }
1992 )));
1993 }
1994
1995 #[tokio::test]
1996 async fn auto_false_notify_still_routes_through_registry() {
1997 let mut routing = HashMap::new();
2000 routing.insert(EventPriority::Action, vec!["test".to_string()]);
2001 let (tn, received) = TestNotifier::new("test");
2002 let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
2003 registry.register("test", Arc::new(tn));
2004
2005 let mut config = ReactionConfig::new(ReactionAction::Notify);
2006 config.auto = false;
2007 config.message = Some("fyi".into());
2008 let mut map = HashMap::new();
2009 map.insert("approved-and-green".into(), config);
2010
2011 let (engine, _runtime, _rx) = build_with_notifier(map, registry);
2012 let mut session = fake_session("s1");
2013 session.status = SessionStatus::Mergeable;
2014
2015 let result = engine
2016 .dispatch(&session, "approved-and-green")
2017 .await
2018 .unwrap()
2019 .unwrap();
2020 assert!(result.success);
2021 assert!(!result.escalated);
2022
2023 let payloads = received.lock().unwrap();
2024 assert_eq!(payloads.len(), 1);
2025 assert_eq!(payloads[0].body, "fyi");
2026 }
2027}