Skip to main content

ao_core/reaction_engine/
mod.rs

1//! Slice 2 Phase D — Reaction dispatch.
2//!
3//! The reaction engine sits between `LifecycleManager` and the side-effect
4//! plugins (`Runtime::send_message`, `Scm::merge`). When the lifecycle loop
5//! observes a status transition into a "trigger state" like `CiFailed`, it
6//! asks the engine to fire the corresponding reaction. The engine looks the
7//! reaction up in `AoConfig::reactions`, tracks attempts, and runs the
8//! configured action — or escalates to `Notify` when retries are exhausted.
9//!
10//! Mirrors `executeReaction` / `ReactionTracker` from
11//! `packages/core/src/lifecycle-manager.ts` (lines ~570-710 in the reference).
12//!
13//! ## Why the engine is separate from LifecycleManager
14//!
15//! TS bundled everything into one big `createLifecycleManager` closure. In
16//! Rust we split them so:
17//!
18//! 1. Tests can exercise the engine without a polling loop — unit tests
19//!    build a `ReactionEngine` directly, call `dispatch`, and assert events.
20//! 2. Future CLI commands (`ao-rs react fire ci-failed <id>`) can reuse
21//!    the engine directly without going through lifecycle ticks.
22//! 3. The lifecycle loop stays a thin state machine; all "business logic"
23//!    about what an action *means* lives here.
24//!
25//! ## Tracker semantics
26//!
27//! The tracker is keyed on `(SessionId, reaction_key)`. One tracker per
28//! (session, reaction) pair, regardless of how many times the same status
29//! transition fires. A tracker is:
30//!
31//! - **Incremented** on every dispatch attempt (including the one that
32//!   ultimately escalates).
33//! - **Cleared** by `LifecycleManager` via `clear_tracker` when a session
34//!   *leaves* the triggering status — so a new CI failure after a fix
35//!   doesn't inherit the old failure's retry budget. This matches the TS
36//!   `clearReactionTracker` calls on transition reset.
37//!
38//! ## Phase F additions
39//!
40//! - `with_scm` attaches an `Arc<dyn Scm>` so `dispatch_auto_merge` can
41//!   actually call `Scm::merge`. Before merging the engine re-probes
42//!   `detect_pr` + `mergeability` — a stale-green observation (the PR
43//!   was ready when the lifecycle tick saw it, but CI just flipped red)
44//!   aborts without merging, and the next tick can retry.
45//!
46//! ## Phase H additions
47//!
48//! - Duration-based `escalate_after` is now honoured. `TrackerState`
49//!   gained a `first_triggered_at: Instant` set on first dispatch; the
50//!   duration gate compares `entry.first_triggered_at.elapsed()` against
51//!   `parse_duration(escalate_after)` and flips `should_escalate` when
52//!   over. Parsing uses the same `^\d+(s|m|h)$` contract as TS
53//!   `parseDuration`, returning `None` on garbage.
54//! - Garbage duration strings no longer panic and do not cause escalate
55//!   to fire. They trigger a one-shot `tracing::warn!` per
56//!   `(reaction_key, field)` pair via `warned_parse_failures` — a
57//!   process-local `HashSet` that bounds log noise to a single warn per
58//!   misconfigured field.
59//!
60//! ## Phase B additions (Slice 3)
61//!
62//! - `with_notifier_registry` attaches a `NotifierRegistry` so
63//!   `dispatch_notify` can fan out to real plugins. Without a registry
64//!   the engine falls back to Phase D behaviour (emit event, return
65//!   success). With a registry, each `Notify` dispatch resolves the
66//!   priority against the routing table and calls `Notifier::send` on
67//!   every matching plugin; failures are logged and recorded in
68//!   `ReactionOutcome { success: false, .. }` but never propagate.
69//! - Escalation now also routes through the registry so a retry-
70//!   exhausted `SendToAgent → Notify` fallback actually reaches
71//!   configured notifiers.
72//! - `resolve_priority` uses configured `priority:` when set, otherwise
73//!   [`default_priority_for_reaction_key`](crate::reactions::default_priority_for_reaction_key).
74
75pub mod actions;
76pub mod escalation;
77pub mod resolve;
78
79pub use escalation::parse_duration;
80pub use resolve::status_to_reaction_key;
81
82use crate::{
83    config::AoConfig,
84    error::Result,
85    events::OrchestratorEvent,
86    notifier::NotifierRegistry,
87    reactions::{EscalateAfter, ReactionAction, ReactionConfig, ReactionOutcome},
88    traits::{Runtime, Scm},
89    types::{Session, SessionId},
90};
91use escalation::TrackerState;
92use resolve::merge_reaction_config;
93use std::{
94    collections::{HashMap, HashSet},
95    sync::{Arc, Mutex},
96    time::Instant,
97};
98use tokio::sync::broadcast;
99
100/// The reaction dispatcher. Holds config, attempt trackers, and the
101/// Runtime handle needed to actually talk to the agent process.
102///
103/// `Arc<ReactionEngine>` is what gets wired into `LifecycleManager`.
104pub struct ReactionEngine {
105    /// Global reaction-key → config for [`ReactionEngine::new`] (tests and
106    /// back-compat). Ignored when [`Self::ao_config`] is set — then globals
107    /// are read from `ao_config.reactions`.
108    config: HashMap<String, ReactionConfig>,
109    /// Full orchestrator config from [`ReactionEngine::new_with_config`].
110    /// Enables per-session merges with `projects.<id>.reactions`.
111    ao_config: Option<Arc<AoConfig>>,
112    /// Runtime used for `SendToAgent`. Required because every reaction
113    /// configuration today could choose `send-to-agent` as its action.
114    runtime: Arc<dyn Runtime>,
115    /// Shared broadcast channel — cloned from `LifecycleManager::events_tx`.
116    /// The engine emits `ReactionTriggered` / `ReactionEscalated` here so
117    /// subscribers see them alongside lifecycle events.
118    events_tx: broadcast::Sender<OrchestratorEvent>,
119    /// Per-(session, reaction) attempt state. `Mutex` (not async) because
120    /// the critical sections are tiny map mutations — no awaiting.
121    trackers: Mutex<HashMap<(SessionId, String), TrackerState>>,
122    /// Process-local set of `"{reaction_key}.{field}"` keys that have
123    /// already emitted a one-shot `tracing::warn!` for a malformed
124    /// duration string (`threshold` or `escalate_after`). Subsequent
125    /// parse failures for the same key are silent. Reset on
126    /// `ao-rs watch` restart — same non-persistence trade-off as
127    /// `trackers` and `idle_since`. Size bounded by the number of
128    /// reaction keys in the user's config (≤ 10 in practice).
129    warned_parse_failures: Mutex<HashSet<String>>,
130    /// Optional Phase F SCM plugin. When set, `dispatch_auto_merge`
131    /// actually calls `Scm::merge` (after re-verifying readiness with a
132    /// fresh `mergeability` probe). When unset, `auto-merge` degrades to
133    /// the Phase D behaviour: emit intent, log, return success.
134    scm: Option<Arc<dyn Scm>>,
135    /// Optional Slice 3 Phase B notifier registry. When set,
136    /// `dispatch_notify` resolves the reaction's priority against the
137    /// routing table and calls `Notifier::send` on each target plugin.
138    /// When unset, `dispatch_notify` falls back to Phase D behaviour
139    /// (emit event, return success). Matches the `with_scm` opt-in
140    /// pattern: existing call sites that don't attach a registry keep
141    /// working unchanged.
142    notifier_registry: Option<NotifierRegistry>,
143}
144
145impl ReactionEngine {
146    /// Build an engine from a loaded config. The caller owns the runtime
147    /// and the broadcast channel — typically `LifecycleManager` hands its
148    /// own `events_tx` in via `clone()` so engine events share the channel.
149    pub fn new(
150        config: HashMap<String, ReactionConfig>,
151        runtime: Arc<dyn Runtime>,
152        events_tx: broadcast::Sender<OrchestratorEvent>,
153    ) -> Self {
154        Self {
155            config,
156            ao_config: None,
157            runtime,
158            events_tx,
159            trackers: Mutex::new(HashMap::new()),
160            warned_parse_failures: Mutex::new(HashSet::new()),
161            scm: None,
162            notifier_registry: None,
163        }
164    }
165
166    /// Build an engine with access to per-project `reactions` overrides.
167    ///
168    /// Global rules come from `ao_config.reactions`; for each session,
169    /// [`Self::resolve_reaction_config`] merges in
170    /// `ao_config.projects[session.project_id].reactions` when present.
171    pub fn new_with_config(
172        ao_config: Arc<AoConfig>,
173        runtime: Arc<dyn Runtime>,
174        events_tx: broadcast::Sender<OrchestratorEvent>,
175    ) -> Self {
176        Self {
177            config: HashMap::new(),
178            ao_config: Some(ao_config),
179            runtime,
180            events_tx,
181            trackers: Mutex::new(HashMap::new()),
182            warned_parse_failures: Mutex::new(HashSet::new()),
183            scm: None,
184            notifier_registry: None,
185        }
186    }
187
188    fn global_reactions(&self) -> &HashMap<String, ReactionConfig> {
189        self.ao_config
190            .as_ref()
191            .map(|c| &c.reactions)
192            .unwrap_or(&self.config)
193    }
194
195    /// Effective reaction config for `key` in `session`'s project context.
196    ///
197    /// - Both global and project define `key`: start from global, overlay
198    ///   project — booleans and `action` always from project; each
199    ///   [`Option`] field uses project when `Some`, otherwise keeps global.
200    /// - Only project: return the project entry.
201    /// - Only global: return the global entry.
202    pub fn resolve_reaction_config(&self, session: &Session, key: &str) -> Option<ReactionConfig> {
203        let global = self.global_reactions().get(key).cloned();
204        let project = self
205            .ao_config
206            .as_ref()
207            .and_then(|c| c.projects.get(&session.project_id))
208            .and_then(|p| p.reactions.get(key))
209            .cloned();
210
211        match (global, project) {
212            (Some(g), Some(p)) => Some(merge_reaction_config(g, p)),
213            (Some(g), None) => Some(g),
214            (None, Some(p)) => Some(p),
215            (None, None) => None,
216        }
217    }
218
219    /// Attach an SCM plugin so `auto-merge` can actually merge.
220    ///
221    /// Builder form to match `LifecycleManager::with_scm`. When unset,
222    /// `dispatch_auto_merge` falls back to Phase D's "log and emit intent
223    /// only" behaviour so existing callers that don't know about SCM
224    /// keep working.
225    pub fn with_scm(mut self, scm: Arc<dyn Scm>) -> Self {
226        self.scm = Some(scm);
227        self
228    }
229
230    /// Attach a notifier registry so `dispatch_notify` can fan out to
231    /// real notifier plugins. Without a registry the engine falls back
232    /// to Phase D behaviour (emit event, return success).
233    pub fn with_notifier_registry(mut self, registry: NotifierRegistry) -> Self {
234        self.notifier_registry = Some(registry);
235        self
236    }
237
238    /// Fire the reaction configured for `reaction_key` against `session`,
239    /// if any. Returns `None` when there's no matching config — the
240    /// caller (usually `LifecycleManager::transition`) treats that as
241    /// "silently do nothing" rather than an error.
242    ///
243    /// `session` is borrowed (not cloned) because dispatch only needs
244    /// the ID and runtime handle; nothing is persisted back.
245    pub async fn dispatch(
246        &self,
247        session: &Session,
248        reaction_key: &str,
249    ) -> Result<Option<ReactionOutcome>> {
250        let Some(cfg) = self.resolve_reaction_config(session, reaction_key) else {
251            tracing::debug!(
252                reaction = reaction_key,
253                session = %session.id,
254                "no reaction configured; skipping"
255            );
256            return Ok(None);
257        };
258        self.dispatch_with_cfg(session, reaction_key, cfg).await
259    }
260
261    /// Like [`Self::dispatch`] but overrides the configured `message` with
262    /// `message_override`. Used by call sites that build dynamic message
263    /// bodies (e.g. CI-failed formatting check names/URLs from live data)
264    /// without needing a static config entry.
265    ///
266    /// Returns `None` when no reaction is configured for the key (same
267    /// contract as `dispatch`).
268    pub async fn dispatch_with_message(
269        &self,
270        session: &Session,
271        reaction_key: &str,
272        message_override: String,
273    ) -> Result<Option<ReactionOutcome>> {
274        let Some(mut cfg) = self.resolve_reaction_config(session, reaction_key) else {
275            tracing::debug!(
276                reaction = reaction_key,
277                session = %session.id,
278                "no reaction configured; skipping"
279            );
280            return Ok(None);
281        };
282        cfg.message = Some(message_override);
283        self.dispatch_with_cfg(session, reaction_key, cfg).await
284    }
285
286    /// Internal: run dispatch with a pre-resolved (possibly modified) config.
287    /// Holds all the tracker / escalation logic that was previously inline in
288    /// `dispatch`. Both `dispatch` and `dispatch_with_message` call this.
289    async fn dispatch_with_cfg(
290        &self,
291        session: &Session,
292        reaction_key: &str,
293        cfg: crate::reactions::ReactionConfig,
294    ) -> Result<Option<ReactionOutcome>> {
295        // `auto: false` means "the key exists so don't fall through to a
296        // default, but don't actually do anything automatically". For
297        // non-notify actions we skip entirely. For `Notify` we DO run it
298        // (a disabled reaction still wants to surface to a human) but we
299        // bypass the retry/escalation machinery — `auto: false` notify
300        // has no budget, it just fires once per transition. Otherwise a
301        // user who configured `auto: false` + `retries: 0` would see
302        // spurious escalations on the first attempt.
303        if !cfg.auto {
304            if cfg.action == ReactionAction::Notify {
305                let outcome = self
306                    .dispatch_notify(session, reaction_key, &cfg, false)
307                    .await;
308                return Ok(Some(outcome));
309            }
310            tracing::debug!(
311                reaction = reaction_key,
312                session = %session.id,
313                "reaction auto: false; skipping non-notify action"
314            );
315            return Ok(None);
316        }
317
318        // Resolve the duration-form `escalate_after` gate BEFORE the
319        // tracker lock. `parse_duration` is pure and `warn_once_parse_failure`
320        // takes its own independent lock — parsing outside avoids nested
321        // locking. A `None` result here either means "no duration gate
322        // configured" or "garbage string, already warned" — in both cases
323        // the duration gate contributes nothing to escalation this dispatch.
324        let duration_gate: Option<std::time::Duration> = match cfg.escalate_after {
325            Some(EscalateAfter::Duration(ref s)) => match parse_duration(s) {
326                Some(d) => Some(d),
327                None => {
328                    self.warn_once_parse_failure(reaction_key, "escalate_after", s);
329                    None
330                }
331            },
332            _ => None,
333        };
334
335        // Bump attempts under the lock and decide escalation inside the
336        // same critical section so two concurrent dispatches can't both
337        // escape the retry budget.
338        let (attempts, should_escalate) = {
339            let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
340                tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
341                e.into_inner()
342            });
343            let entry = trackers
344                .entry((session.id.clone(), reaction_key.to_string()))
345                .or_insert_with(|| TrackerState {
346                    attempts: 0,
347                    first_triggered_at: Instant::now(),
348                });
349            entry.attempts += 1;
350            let attempts = entry.attempts;
351
352            // Gate 1: `retries` budget. TS semantics — this is the MAX
353            // number of attempts the engine will make before escalating.
354            // Unset = infinite.
355            let max_attempts = cfg.retries;
356            let mut escalate = max_attempts.is_some_and(|n| attempts > n);
357
358            // Gate 2: `escalate_after`. Either the attempts form (`N`)
359            // with the same `>` comparison as retries, or the duration
360            // form honoured via `first_triggered_at.elapsed()`. Only
361            // one variant fires per dispatch because `escalate_after`
362            // is a single `Option<enum>`.
363            if let Some(EscalateAfter::Attempts(n)) = cfg.escalate_after {
364                if attempts > n {
365                    escalate = true;
366                }
367            } else if let Some(d) = duration_gate {
368                // `>=` would fire on `0s` with zero elapsed too, but
369                // that's not a sensible config and TS uses strict `>`.
370                // We match TS: `elapsed > d` fires, `elapsed == d` doesn't.
371                if entry.first_triggered_at.elapsed() > d {
372                    escalate = true;
373                }
374            }
375
376            (attempts, escalate)
377        };
378
379        if should_escalate {
380            self.emit(OrchestratorEvent::ReactionEscalated {
381                id: session.id.clone(),
382                reaction_key: reaction_key.to_string(),
383                attempts,
384            });
385            // Escalation ALWAYS reports as an executed `Notify`, regardless
386            // of the originally configured action. Phase B routes through
387            // the registry so a retry-exhausted escalation actually
388            // reaches configured notifiers. `dispatch_notify` emits the
389            // `ReactionTriggered(Notify)` event internally.
390            let outcome = self
391                .dispatch_notify(session, reaction_key, &cfg, true)
392                .await;
393            return Ok(Some(outcome));
394        }
395
396        let outcome = match cfg.action {
397            ReactionAction::SendToAgent => {
398                self.dispatch_send_to_agent(session, reaction_key, &cfg)
399                    .await
400            }
401            ReactionAction::Notify => {
402                self.dispatch_notify(session, reaction_key, &cfg, false)
403                    .await
404            }
405            ReactionAction::AutoMerge => {
406                self.dispatch_auto_merge(session, reaction_key, &cfg).await
407            }
408        };
409        Ok(Some(outcome))
410    }
411
412    /// Forget any tracker state for `(session, reaction_key)`. Called by
413    /// `LifecycleManager` on the tick that *leaves* a triggering status,
414    /// so the next time the session re-enters it, retries start from zero.
415    /// A lingering tracker would mean a session that failed CI, was fixed,
416    /// and failed again would start already half-way through the retry
417    /// budget — not what anyone wants.
418    pub fn clear_tracker(&self, session_id: &SessionId, reaction_key: &str) {
419        let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
420            tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
421            e.into_inner()
422        });
423        trackers.remove(&(session_id.clone(), reaction_key.to_string()));
424    }
425
426    /// Drop every tracker entry for `session_id`. Called by
427    /// `LifecycleManager::terminate` — without this, the map would grow
428    /// monotonically over a long-running `ao-rs watch` as terminated
429    /// sessions leave orphan entries behind. Cheap: one full-map walk
430    /// per termination, and the N is small (reaction-key count).
431    pub fn clear_all_for_session(&self, session_id: &SessionId) {
432        let mut trackers = self.trackers.lock().unwrap_or_else(|e| {
433            tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
434            e.into_inner()
435        });
436        trackers.retain(|(sid, _), _| sid != session_id);
437    }
438
439    /// Current attempt count for `(session, reaction_key)`. Returns 0 if
440    /// no tracker exists yet. Exposed for tests and for future CLI
441    /// debugging (`ao-rs react status <id>`).
442    pub fn attempts(&self, session_id: &SessionId, reaction_key: &str) -> u32 {
443        self.trackers
444            .lock()
445            .unwrap_or_else(|e| {
446                tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
447                e.into_inner()
448            })
449            .get(&(session_id.clone(), reaction_key.to_string()))
450            .map(|t| t.attempts)
451            .unwrap_or(0)
452    }
453
454    /// Return the `Instant` at which this `(session, reaction_key)` pair
455    /// was first triggered, or `None` if no tracker exists yet. Used by
456    /// tests to assert the timestamp survives multiple dispatches — in
457    /// production, `dispatch` reads the field inside the mutex-held
458    /// critical section directly, so no external accessor is needed
459    /// outside test code.
460    #[cfg(test)]
461    fn first_triggered_at(&self, session_id: &SessionId, reaction_key: &str) -> Option<Instant> {
462        self.trackers
463            .lock()
464            .unwrap_or_else(|e| {
465                tracing::error!("reaction tracker mutex poisoned; recovering inner state: {e}");
466                e.into_inner()
467            })
468            .get(&(session_id.clone(), reaction_key.to_string()))
469            .map(|t| t.first_triggered_at)
470    }
471
472    /// Emit a `tracing::warn!` exactly once per `(reaction_key, field)`
473    /// pair for a duration-parse failure, then remember we've warned so
474    /// subsequent parse failures for the same pair are silent.
475    ///
476    /// Used by two call sites:
477    ///
478    /// - `dispatch` for malformed `escalate_after` strings.
479    /// - `LifecycleManager::check_stuck` (via the engine's config
480    ///   accessor) for malformed `threshold` strings on the
481    ///   `agent-stuck` reaction.
482    ///
483    /// See Design Decision 9 in
484    /// `docs/ai/design/feature-agent-stuck-detection.md` and the
485    /// warn-once observability note in the non-functional requirements
486    /// section of the same doc.
487    pub(crate) fn warn_once_parse_failure(&self, reaction_key: &str, field: &str, raw: &str) {
488        let key = format!("{reaction_key}.{field}");
489        let mut warned = self.warned_parse_failures.lock().unwrap_or_else(|e| {
490            tracing::error!(
491                "reaction warned_parse_failures mutex poisoned; recovering inner state: {e}"
492            );
493            e.into_inner()
494        });
495        if warned.insert(key) {
496            tracing::warn!(
497                reaction = reaction_key,
498                field = field,
499                value = raw,
500                "ignoring unparseable duration string; expected `^\\d+(s|m|h)$`"
501            );
502        }
503    }
504
505    /// Broadcast an event. A send error means zero subscribers — the
506    /// same "not worth surfacing" case as `LifecycleManager::emit`.
507    fn emit(&self, event: OrchestratorEvent) {
508        let _ = self.events_tx.send(event);
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use crate::{
516        config::{AoConfig, ProjectConfig},
517        notifier::NotificationPayload,
518        reactions::{EscalateAfter, EventPriority, ReactionAction, ReactionConfig},
519        traits::Runtime,
520        types::{now_ms, ActivityState, Session, SessionId, SessionStatus},
521    };
522    use async_trait::async_trait;
523    use std::path::{Path, PathBuf};
524    use std::sync::atomic::Ordering;
525    use std::sync::Mutex as StdMutex;
526
527    // ---------- Helpers ---------- //
528
529    /// Mock runtime that records every send_message for assertions.
530    struct RecordingRuntime {
531        sends: StdMutex<Vec<(String, String)>>,
532        fail_send: std::sync::atomic::AtomicBool,
533    }
534
535    impl RecordingRuntime {
536        fn new() -> Self {
537            Self {
538                sends: StdMutex::new(Vec::new()),
539                fail_send: std::sync::atomic::AtomicBool::new(false),
540            }
541        }
542        fn sends(&self) -> Vec<(String, String)> {
543            self.sends.lock().unwrap().clone()
544        }
545    }
546
547    #[async_trait]
548    impl Runtime for RecordingRuntime {
549        async fn create(
550            &self,
551            _session_id: &str,
552            _cwd: &Path,
553            _launch_command: &str,
554            _env: &[(String, String)],
555        ) -> Result<String> {
556            Ok("mock-handle".into())
557        }
558        async fn send_message(&self, handle: &str, msg: &str) -> Result<()> {
559            if self.fail_send.load(Ordering::SeqCst) {
560                return Err(crate::error::AoError::Runtime("mock send failed".into()));
561            }
562            self.sends
563                .lock()
564                .unwrap()
565                .push((handle.to_string(), msg.to_string()));
566            Ok(())
567        }
568        async fn is_alive(&self, _handle: &str) -> Result<bool> {
569            Ok(true)
570        }
571        async fn destroy(&self, _handle: &str) -> Result<()> {
572            Ok(())
573        }
574    }
575
576    fn fake_session(id: &str) -> Session {
577        Session {
578            id: SessionId(id.into()),
579            project_id: "demo".into(),
580            status: SessionStatus::CiFailed,
581            agent: "claude-code".into(),
582            agent_config: None,
583            branch: format!("ao-{id}"),
584            task: "t".into(),
585            workspace_path: Some(PathBuf::from("/tmp/ws")),
586            runtime_handle: Some(format!("handle-{id}")),
587            runtime: "tmux".into(),
588            activity: Some(ActivityState::Ready),
589            created_at: now_ms(),
590            cost: None,
591            issue_id: None,
592            issue_url: None,
593            claimed_pr_number: None,
594            claimed_pr_url: None,
595            initial_prompt_override: None,
596            spawned_by: None,
597            last_merge_conflict_dispatched: None,
598            last_review_backlog_fingerprint: None,
599        }
600    }
601
602    fn minimal_project(reactions: HashMap<String, ReactionConfig>) -> ProjectConfig {
603        ProjectConfig {
604            name: None,
605            repo: "test/test".into(),
606            path: "/tmp/ao-test-project".into(),
607            default_branch: "main".into(),
608            session_prefix: None,
609            branch_namespace: None,
610            runtime: None,
611            agent: None,
612            workspace: None,
613            tracker: None,
614            scm: None,
615            symlinks: vec![],
616            post_create: vec![],
617            agent_config: None,
618            orchestrator: None,
619            worker: None,
620            reactions,
621            agent_rules: None,
622            agent_rules_file: None,
623            orchestrator_rules: None,
624            orchestrator_session_strategy: None,
625            opencode_issue_session_strategy: None,
626        }
627    }
628
629    fn build(
630        cfg_map: HashMap<String, ReactionConfig>,
631    ) -> (
632        Arc<ReactionEngine>,
633        Arc<RecordingRuntime>,
634        broadcast::Receiver<OrchestratorEvent>,
635    ) {
636        let runtime = Arc::new(RecordingRuntime::new());
637        let (tx, rx) = broadcast::channel(32);
638        let engine = Arc::new(ReactionEngine::new(
639            cfg_map,
640            runtime.clone() as Arc<dyn Runtime>,
641            tx,
642        ));
643        (engine, runtime, rx)
644    }
645
646    fn drain(rx: &mut broadcast::Receiver<OrchestratorEvent>) -> Vec<OrchestratorEvent> {
647        let mut out = Vec::new();
648        while let Ok(e) = rx.try_recv() {
649            out.push(e);
650        }
651        out
652    }
653
654    // ---------- Tests ---------- //
655
656    #[test]
657    fn resolve_reaction_config_merges_global_and_project() {
658        let mut global = ReactionConfig::new(ReactionAction::SendToAgent);
659        global.message = Some("global-msg".into());
660        global.retries = Some(3);
661        global.auto = true;
662        global.priority = Some(EventPriority::Warning);
663
664        let mut proj_cfg = ReactionConfig::new(ReactionAction::Notify);
665        proj_cfg.message = None;
666        proj_cfg.retries = None;
667        proj_cfg.auto = false;
668        proj_cfg.priority = Some(EventPriority::Urgent);
669
670        let mut reactions = HashMap::new();
671        reactions.insert("ci-failed".into(), global);
672
673        let mut proj_reactions = HashMap::new();
674        proj_reactions.insert("ci-failed".into(), proj_cfg);
675
676        let mut projects = HashMap::new();
677        projects.insert("demo".into(), minimal_project(proj_reactions));
678
679        let ao = AoConfig {
680            reactions,
681            projects,
682            ..Default::default()
683        };
684
685        let (tx, _rx) = broadcast::channel(4);
686        let engine = ReactionEngine::new_with_config(
687            Arc::new(ao),
688            Arc::new(RecordingRuntime::new()) as Arc<dyn Runtime>,
689            tx,
690        );
691        let session = fake_session("s1");
692
693        let resolved = engine
694            .resolve_reaction_config(&session, "ci-failed")
695            .expect("merged config");
696
697        assert_eq!(resolved.action, ReactionAction::Notify);
698        assert!(!resolved.auto);
699        assert_eq!(resolved.message.as_deref(), Some("global-msg"));
700        assert_eq!(resolved.retries, Some(3));
701        assert_eq!(resolved.priority, Some(EventPriority::Urgent));
702    }
703
704    #[test]
705    fn resolve_reaction_config_project_only_key() {
706        let mut proj_cfg = ReactionConfig::new(ReactionAction::Notify);
707        proj_cfg.message = Some("project-local".into());
708
709        let mut proj_reactions = HashMap::new();
710        proj_reactions.insert("only-in-project".into(), proj_cfg);
711
712        let mut projects = HashMap::new();
713        projects.insert("demo".into(), minimal_project(proj_reactions));
714
715        let ao = AoConfig {
716            projects,
717            ..Default::default()
718        };
719
720        let (tx, _rx) = broadcast::channel(4);
721        let engine = ReactionEngine::new_with_config(
722            Arc::new(ao),
723            Arc::new(RecordingRuntime::new()) as Arc<dyn Runtime>,
724            tx,
725        );
726        let session = fake_session("s1");
727
728        let resolved = engine
729            .resolve_reaction_config(&session, "only-in-project")
730            .expect("project-only reaction");
731        assert_eq!(resolved.action, ReactionAction::Notify);
732        assert_eq!(resolved.message.as_deref(), Some("project-local"));
733    }
734
735    #[tokio::test]
736    async fn tracker_first_triggered_at_persists_across_dispatches() {
737        // Invariant for Task 1.2: the first dispatch populates
738        // `first_triggered_at`; subsequent dispatches bump `attempts`
739        // but DO NOT reset the timestamp. This is what duration-based
740        // escalation will rely on (Task 2.1) — escalate_after: 10m
741        // must measure from the first trigger, not from the last
742        // attempt.
743        let mut config = ReactionConfig::new(ReactionAction::Notify);
744        config.message = Some("hi".into());
745        let mut map = HashMap::new();
746        map.insert("ci-failed".into(), config);
747
748        let (engine, _runtime, _rx) = build(map);
749        let session = fake_session("s1");
750
751        // Never-triggered: attempts = 0, no tracker entry.
752        assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
753        assert!(engine
754            .first_triggered_at(&session.id, "ci-failed")
755            .is_none());
756
757        // First dispatch populates both fields.
758        engine.dispatch(&session, "ci-failed").await.unwrap();
759        assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
760        let first = engine
761            .first_triggered_at(&session.id, "ci-failed")
762            .expect("first dispatch must populate first_triggered_at");
763
764        // Tiny sleep so a resetting bug would be observable — the
765        // second dispatch's `Instant::now()` is guaranteed later than
766        // `first`.
767        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
768
769        // Second dispatch increments attempts; timestamp unchanged.
770        engine.dispatch(&session, "ci-failed").await.unwrap();
771        assert_eq!(engine.attempts(&session.id, "ci-failed"), 2);
772        assert_eq!(
773            engine.first_triggered_at(&session.id, "ci-failed"),
774            Some(first),
775            "first_triggered_at must survive subsequent dispatches"
776        );
777    }
778
779    #[tokio::test]
780    async fn tracker_first_triggered_at_resets_after_clear() {
781        // Clearing the tracker (on status change away from the
782        // triggering status) drops the whole entry, so the next
783        // dispatch gets a fresh `first_triggered_at`. Protects the
784        // "second episode starts a fresh clock" property the doc
785        // comment promises.
786        let mut config = ReactionConfig::new(ReactionAction::Notify);
787        config.message = Some("hi".into());
788        let mut map = HashMap::new();
789        map.insert("ci-failed".into(), config);
790
791        let (engine, _runtime, _rx) = build(map);
792        let session = fake_session("s1");
793
794        engine.dispatch(&session, "ci-failed").await.unwrap();
795        let first = engine
796            .first_triggered_at(&session.id, "ci-failed")
797            .expect("populated");
798
799        // Simulate the lifecycle's "left the triggering status" hook.
800        engine.clear_tracker(&session.id, "ci-failed");
801        assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
802        assert!(engine
803            .first_triggered_at(&session.id, "ci-failed")
804            .is_none());
805
806        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
807
808        engine.dispatch(&session, "ci-failed").await.unwrap();
809        let second = engine
810            .first_triggered_at(&session.id, "ci-failed")
811            .expect("repopulated");
812        assert!(
813            second > first,
814            "second episode must start a fresh first_triggered_at"
815        );
816    }
817
818    #[tokio::test]
819    async fn dispatch_unconfigured_key_is_noop() {
820        let (engine, runtime, mut rx) = build(HashMap::new());
821        let session = fake_session("s1");
822        let result = engine.dispatch(&session, "ci-failed").await.unwrap();
823        assert!(result.is_none());
824        assert!(runtime.sends().is_empty());
825        assert!(drain(&mut rx).is_empty());
826    }
827
828    #[tokio::test]
829    async fn dispatch_send_to_agent_calls_runtime_and_emits_event() {
830        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
831        config.message = Some("CI broke — please fix.".into());
832        let mut map = HashMap::new();
833        map.insert("ci-failed".into(), config);
834
835        let (engine, runtime, mut rx) = build(map);
836        let session = fake_session("s1");
837
838        let result = engine
839            .dispatch(&session, "ci-failed")
840            .await
841            .unwrap()
842            .unwrap();
843
844        assert!(result.success);
845        assert!(!result.escalated);
846        assert_eq!(result.action, ReactionAction::SendToAgent);
847        assert_eq!(runtime.sends().len(), 1);
848        assert_eq!(runtime.sends()[0].0, "handle-s1");
849        assert_eq!(runtime.sends()[0].1, "CI broke — please fix.");
850
851        let events = drain(&mut rx);
852        assert_eq!(events.len(), 2, "got {events:?}");
853        assert!(events.iter().any(|e| matches!(
854            e,
855            OrchestratorEvent::ReactionTriggered {
856                reaction_key,
857                action: ReactionAction::SendToAgent,
858                ..
859            } if reaction_key == "ci-failed"
860        )));
861        assert!(events.iter().any(|e| matches!(
862            e,
863            OrchestratorEvent::UiNotification { notification } if notification.reaction_key == "ci-failed"
864        )));
865    }
866
867    #[tokio::test]
868    async fn dispatch_send_to_agent_without_message_fails_softly() {
869        let config = ReactionConfig::new(ReactionAction::SendToAgent); // no message
870        let mut map = HashMap::new();
871        map.insert("ci-failed".into(), config);
872
873        let (engine, runtime, mut rx) = build(map);
874        let session = fake_session("s1");
875        let result = engine
876            .dispatch(&session, "ci-failed")
877            .await
878            .unwrap()
879            .unwrap();
880
881        assert!(!result.success);
882        assert!(runtime.sends().is_empty());
883        // No event emitted on soft failure — subscribers shouldn't see a
884        // "triggered" event for a dispatch that never left the engine.
885        assert!(drain(&mut rx).is_empty());
886    }
887
888    #[tokio::test]
889    async fn dispatch_send_to_agent_propagates_runtime_send_failure_as_soft_failure() {
890        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
891        config.message = Some("fix it".into());
892        let mut map = HashMap::new();
893        map.insert("ci-failed".into(), config);
894
895        let (engine, runtime, mut rx) = build(map);
896        runtime.fail_send.store(true, Ordering::SeqCst);
897        let session = fake_session("s1");
898
899        let result = engine
900            .dispatch(&session, "ci-failed")
901            .await
902            .unwrap()
903            .unwrap();
904        assert!(!result.success);
905        // Attempt still counted — so the next tick's retry competes for
906        // the same retry budget. Guards against a pathological case where
907        // the engine would forget that it tried.
908        assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
909        assert!(drain(&mut rx).is_empty());
910    }
911
912    #[tokio::test]
913    async fn dispatch_notify_emits_event_and_succeeds() {
914        let mut config = ReactionConfig::new(ReactionAction::Notify);
915        config.message = Some("approved & green".into());
916        let mut map = HashMap::new();
917        map.insert("approved-and-green".into(), config);
918
919        let (engine, runtime, mut rx) = build(map);
920        let mut session = fake_session("s1");
921        session.status = SessionStatus::Mergeable;
922
923        let result = engine
924            .dispatch(&session, "approved-and-green")
925            .await
926            .unwrap()
927            .unwrap();
928
929        assert!(result.success);
930        assert_eq!(result.action, ReactionAction::Notify);
931        assert!(runtime.sends().is_empty());
932
933        let events = drain(&mut rx);
934        assert_eq!(events.len(), 2, "got {events:?}");
935        assert!(events.iter().any(|e| matches!(
936            e,
937            OrchestratorEvent::ReactionTriggered {
938                action: ReactionAction::Notify,
939                ..
940            }
941        )));
942        assert!(events.iter().any(|e| matches!(
943            e,
944            OrchestratorEvent::UiNotification { notification } if notification.action == ReactionAction::Notify
945        )));
946    }
947
948    #[tokio::test]
949    async fn dispatch_auto_merge_without_scm_falls_back_to_phase_d_behaviour() {
950        // Guard the backwards-compatible path: engines constructed
951        // without `.with_scm(...)` (e.g. the existing Phase D fixtures)
952        // must keep emitting intent + returning success without making
953        // any SCM calls. Breaking this test would silently regress
954        // every test that builds an engine the Phase D way.
955        let config = ReactionConfig::new(ReactionAction::AutoMerge);
956        let mut map = HashMap::new();
957        map.insert("approved-and-green".into(), config);
958
959        let (engine, _runtime, mut rx) = build(map);
960        let mut session = fake_session("s1");
961        session.status = SessionStatus::Mergeable;
962
963        let result = engine
964            .dispatch(&session, "approved-and-green")
965            .await
966            .unwrap()
967            .unwrap();
968        assert!(result.success);
969        assert_eq!(result.action, ReactionAction::AutoMerge);
970
971        let events = drain(&mut rx);
972        assert_eq!(events.len(), 1);
973        assert!(matches!(
974            &events[0],
975            OrchestratorEvent::ReactionTriggered {
976                action: ReactionAction::AutoMerge,
977                ..
978            }
979        ));
980    }
981
982    // ---------- Phase F: auto-merge with SCM plugin ---------- //
983
984    use crate::scm::{
985        CheckRun, CiStatus, MergeMethod, MergeReadiness, PrState, PullRequest, Review,
986        ReviewComment, ReviewDecision,
987    };
988
989    /// Scripted SCM plugin. Each method reads from `Mutex<_>` cells so
990    /// tests can pre-configure the responses `dispatch_auto_merge` will
991    /// see on its re-probe.
992    struct MergeMockScm {
993        pr: StdMutex<Option<PullRequest>>,
994        readiness: StdMutex<MergeReadiness>,
995        merge_calls: StdMutex<Vec<(u32, Option<MergeMethod>)>>,
996        detect_pr_errors: std::sync::atomic::AtomicBool,
997        merge_errors: std::sync::atomic::AtomicBool,
998    }
999
1000    impl MergeMockScm {
1001        fn new(pr: Option<PullRequest>, readiness: MergeReadiness) -> Self {
1002            Self {
1003                pr: StdMutex::new(pr),
1004                readiness: StdMutex::new(readiness),
1005                merge_calls: StdMutex::new(Vec::new()),
1006                detect_pr_errors: std::sync::atomic::AtomicBool::new(false),
1007                merge_errors: std::sync::atomic::AtomicBool::new(false),
1008            }
1009        }
1010        fn merges(&self) -> Vec<(u32, Option<MergeMethod>)> {
1011            self.merge_calls.lock().unwrap().clone()
1012        }
1013    }
1014
1015    #[async_trait]
1016    impl Scm for MergeMockScm {
1017        fn name(&self) -> &str {
1018            "merge-mock"
1019        }
1020        async fn detect_pr(&self, _session: &Session) -> Result<Option<PullRequest>> {
1021            if self.detect_pr_errors.load(Ordering::SeqCst) {
1022                return Err(crate::error::AoError::Runtime("detect_pr".into()));
1023            }
1024            Ok(self.pr.lock().unwrap().clone())
1025        }
1026        async fn pr_state(&self, _pr: &PullRequest) -> Result<PrState> {
1027            Ok(PrState::Open)
1028        }
1029        async fn ci_checks(&self, _pr: &PullRequest) -> Result<Vec<CheckRun>> {
1030            Ok(vec![])
1031        }
1032        async fn ci_status(&self, _pr: &PullRequest) -> Result<CiStatus> {
1033            Ok(CiStatus::Passing)
1034        }
1035        async fn reviews(&self, _pr: &PullRequest) -> Result<Vec<Review>> {
1036            Ok(vec![])
1037        }
1038        async fn review_decision(&self, _pr: &PullRequest) -> Result<ReviewDecision> {
1039            Ok(ReviewDecision::Approved)
1040        }
1041        async fn pending_comments(&self, _pr: &PullRequest) -> Result<Vec<ReviewComment>> {
1042            Ok(vec![])
1043        }
1044        async fn mergeability(&self, _pr: &PullRequest) -> Result<MergeReadiness> {
1045            Ok(self.readiness.lock().unwrap().clone())
1046        }
1047        async fn merge(&self, pr: &PullRequest, method: Option<MergeMethod>) -> Result<()> {
1048            if self.merge_errors.load(Ordering::SeqCst) {
1049                return Err(crate::error::AoError::Runtime("merge failed".into()));
1050            }
1051            self.merge_calls.lock().unwrap().push((pr.number, method));
1052            Ok(())
1053        }
1054    }
1055
1056    fn ready_readiness() -> MergeReadiness {
1057        MergeReadiness {
1058            mergeable: true,
1059            ci_passing: true,
1060            approved: true,
1061            no_conflicts: true,
1062            blockers: vec![],
1063        }
1064    }
1065
1066    fn fake_pr(number: u32) -> PullRequest {
1067        PullRequest {
1068            number,
1069            url: format!("https://github.com/acme/widgets/pull/{number}"),
1070            title: "fix the widgets".into(),
1071            owner: "acme".into(),
1072            repo: "widgets".into(),
1073            branch: "ao-s1".into(),
1074            base_branch: "main".into(),
1075            is_draft: false,
1076        }
1077    }
1078
1079    fn build_with_scm(
1080        cfg_map: HashMap<String, ReactionConfig>,
1081        scm: Arc<dyn Scm>,
1082    ) -> (
1083        Arc<ReactionEngine>,
1084        Arc<RecordingRuntime>,
1085        broadcast::Receiver<OrchestratorEvent>,
1086    ) {
1087        let runtime = Arc::new(RecordingRuntime::new());
1088        let (tx, rx) = broadcast::channel(32);
1089        let engine = Arc::new(
1090            ReactionEngine::new(cfg_map, runtime.clone() as Arc<dyn Runtime>, tx).with_scm(scm),
1091        );
1092        (engine, runtime, rx)
1093    }
1094
1095    #[tokio::test]
1096    async fn dispatch_auto_merge_with_ready_pr_calls_scm_merge() {
1097        // Happy path: observation still holds on re-probe, engine calls
1098        // `Scm::merge(pr, None)` with the default merge method.
1099        let config = ReactionConfig::new(ReactionAction::AutoMerge);
1100        let mut map = HashMap::new();
1101        map.insert("approved-and-green".into(), config);
1102
1103        let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
1104        let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1105
1106        let mut session = fake_session("s1");
1107        session.status = SessionStatus::Mergeable;
1108
1109        let result = engine
1110            .dispatch(&session, "approved-and-green")
1111            .await
1112            .unwrap()
1113            .unwrap();
1114
1115        assert!(result.success);
1116        assert_eq!(result.action, ReactionAction::AutoMerge);
1117        assert_eq!(scm.merges().len(), 1, "expected one merge call");
1118        assert_eq!(scm.merges()[0], (42, None));
1119        assert!(result.message.unwrap().contains("merged PR #42"));
1120
1121        let events = drain(&mut rx);
1122        assert_eq!(events.len(), 1);
1123        assert!(matches!(
1124            &events[0],
1125            OrchestratorEvent::ReactionTriggered {
1126                action: ReactionAction::AutoMerge,
1127                ..
1128            }
1129        ));
1130    }
1131
1132    #[tokio::test]
1133    async fn dispatch_auto_merge_with_stale_green_observation_does_not_merge() {
1134        // The lifecycle tick saw mergeable=true, but by the time the
1135        // engine ran (a few hundred ms later) CI flipped red. The
1136        // re-probe says not-ready → skip the merge, return soft failure,
1137        // and emit NO event (the commit-point emit happens only when
1138        // the engine actually calls `Scm::merge`).
1139        //
1140        // This is the whole reason for the re-probe: avoid merging on
1141        // observations that have gone stale since the transition fired.
1142        let config = ReactionConfig::new(ReactionAction::AutoMerge);
1143        let mut map = HashMap::new();
1144        map.insert("approved-and-green".into(), config);
1145
1146        let stale = MergeReadiness {
1147            mergeable: false,
1148            ci_passing: false,
1149            approved: true,
1150            no_conflicts: true,
1151            blockers: vec!["CI is failing".into()],
1152        };
1153        let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), stale));
1154        let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1155
1156        let mut session = fake_session("s1");
1157        session.status = SessionStatus::Mergeable;
1158
1159        let result = engine
1160            .dispatch(&session, "approved-and-green")
1161            .await
1162            .unwrap()
1163            .unwrap();
1164
1165        assert!(!result.success, "stale observation must not merge");
1166        assert!(scm.merges().is_empty(), "Scm::merge must not be called");
1167
1168        // No event emitted — a subscriber reading `ReactionTriggered`
1169        // should be able to trust that "triggered" means a merge was
1170        // actually attempted. Skip paths leave the stream silent.
1171        let events = drain(&mut rx);
1172        assert!(
1173            events.is_empty(),
1174            "stale-green skip must not emit events, got {events:?}"
1175        );
1176    }
1177
1178    #[tokio::test]
1179    async fn dispatch_auto_merge_with_no_pr_returns_soft_failure() {
1180        // `detect_pr` returns None (agent force-pushed, PR was closed
1181        // out-of-band). Nothing to merge → soft failure, no events.
1182        let config = ReactionConfig::new(ReactionAction::AutoMerge);
1183        let mut map = HashMap::new();
1184        map.insert("approved-and-green".into(), config);
1185
1186        let scm = Arc::new(MergeMockScm::new(None, ready_readiness()));
1187        let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1188
1189        let mut session = fake_session("s1");
1190        session.status = SessionStatus::Mergeable;
1191
1192        let result = engine
1193            .dispatch(&session, "approved-and-green")
1194            .await
1195            .unwrap()
1196            .unwrap();
1197
1198        assert!(!result.success);
1199        assert!(scm.merges().is_empty());
1200        // Pin the semantics: no triggered event on soft failure so a
1201        // future refactor can't accidentally leak a "we tried" event.
1202        let events = drain(&mut rx);
1203        assert!(
1204            events.is_empty(),
1205            "no-PR skip must not emit events, got {events:?}"
1206        );
1207    }
1208
1209    #[tokio::test]
1210    async fn dispatch_auto_merge_with_detect_pr_error_returns_soft_failure() {
1211        let config = ReactionConfig::new(ReactionAction::AutoMerge);
1212        let mut map = HashMap::new();
1213        map.insert("approved-and-green".into(), config);
1214
1215        let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
1216        scm.detect_pr_errors.store(true, Ordering::SeqCst);
1217        let (engine, _runtime, mut rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1218
1219        let mut session = fake_session("s1");
1220        session.status = SessionStatus::Mergeable;
1221
1222        let result = engine
1223            .dispatch(&session, "approved-and-green")
1224            .await
1225            .unwrap()
1226            .unwrap();
1227
1228        assert!(!result.success);
1229        assert!(scm.merges().is_empty(), "merge must not run on detect err");
1230        let events = drain(&mut rx);
1231        assert!(
1232            events.is_empty(),
1233            "detect_pr error must not emit events, got {events:?}"
1234        );
1235    }
1236
1237    #[tokio::test]
1238    async fn dispatch_auto_merge_propagates_merge_error_as_soft_failure() {
1239        // Scm::merge itself fails (branch protection, network, whatever).
1240        // Engine surfaces the error message in the outcome so the CLI
1241        // can print it. Tracker has still incremented — retry logic
1242        // applies on the next tick if the transition re-fires.
1243        let config = ReactionConfig::new(ReactionAction::AutoMerge);
1244        let mut map = HashMap::new();
1245        map.insert("approved-and-green".into(), config);
1246
1247        let scm = Arc::new(MergeMockScm::new(Some(fake_pr(42)), ready_readiness()));
1248        scm.merge_errors.store(true, Ordering::SeqCst);
1249        let (engine, _runtime, _rx) = build_with_scm(map, scm.clone() as Arc<dyn Scm>);
1250
1251        let mut session = fake_session("s1");
1252        session.status = SessionStatus::Mergeable;
1253
1254        let result = engine
1255            .dispatch(&session, "approved-and-green")
1256            .await
1257            .unwrap()
1258            .unwrap();
1259
1260        assert!(!result.success);
1261        assert!(
1262            result.message.unwrap().contains("merge failed"),
1263            "error message should surface"
1264        );
1265    }
1266
1267    #[tokio::test]
1268    async fn dispatch_auto_false_skips_active_actions_but_allows_notify() {
1269        // `auto: false` on SendToAgent → no-op.
1270        let mut sta = ReactionConfig::new(ReactionAction::SendToAgent);
1271        sta.auto = false;
1272        sta.message = Some("noop".into());
1273        let mut map = HashMap::new();
1274        map.insert("ci-failed".into(), sta);
1275
1276        // `auto: false` on Notify → still runs (notify is always a human
1277        // call, the disable flag doesn't gate it).
1278        let mut notify = ReactionConfig::new(ReactionAction::Notify);
1279        notify.auto = false;
1280        map.insert("approved-and-green".into(), notify);
1281
1282        let (engine, runtime, mut rx) = build(map);
1283
1284        // Active action should be skipped entirely — no outcome, no event.
1285        let s1 = fake_session("s1");
1286        assert!(engine.dispatch(&s1, "ci-failed").await.unwrap().is_none());
1287        assert!(runtime.sends().is_empty());
1288        assert!(drain(&mut rx).is_empty());
1289
1290        // Notify must still fire.
1291        let mut s2 = fake_session("s2");
1292        s2.status = SessionStatus::Mergeable;
1293        let result = engine
1294            .dispatch(&s2, "approved-and-green")
1295            .await
1296            .unwrap()
1297            .unwrap();
1298        assert!(result.success);
1299        assert_eq!(result.action, ReactionAction::Notify);
1300    }
1301
1302    #[tokio::test]
1303    async fn retries_exhausted_escalates_to_notify_and_emits_both_events() {
1304        // retries: 2 means the 3rd dispatch attempt is the one that escalates.
1305        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1306        config.message = Some("fix".into());
1307        config.retries = Some(2);
1308        let mut map = HashMap::new();
1309        map.insert("ci-failed".into(), config);
1310
1311        let (engine, runtime, mut rx) = build(map);
1312        let session = fake_session("s1");
1313
1314        // Attempts 1 and 2: normal SendToAgent.
1315        let r1 = engine
1316            .dispatch(&session, "ci-failed")
1317            .await
1318            .unwrap()
1319            .unwrap();
1320        assert!(r1.success);
1321        assert!(!r1.escalated);
1322        let r2 = engine
1323            .dispatch(&session, "ci-failed")
1324            .await
1325            .unwrap()
1326            .unwrap();
1327        assert!(r2.success);
1328        assert!(!r2.escalated);
1329        assert_eq!(runtime.sends().len(), 2);
1330
1331        // Attempt 3: escalate.
1332        let r3 = engine
1333            .dispatch(&session, "ci-failed")
1334            .await
1335            .unwrap()
1336            .unwrap();
1337        assert!(r3.escalated);
1338        assert_eq!(r3.action, ReactionAction::Notify);
1339        // Runtime NOT called on escalation — we only notify.
1340        assert_eq!(runtime.sends().len(), 2);
1341
1342        // Events across all three dispatches:
1343        // triggered(send), triggered(send), escalated + triggered(notify).
1344        let events = drain(&mut rx);
1345        assert_eq!(events.len(), 7, "got {events:?}");
1346        let escalated_count = events
1347            .iter()
1348            .filter(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. }))
1349            .count();
1350        assert_eq!(escalated_count, 1);
1351        assert!(events.iter().any(|e| matches!(
1352            e,
1353            OrchestratorEvent::ReactionTriggered {
1354                action: ReactionAction::Notify,
1355                ..
1356            }
1357        )));
1358        assert!(matches!(
1359            events.last().unwrap(),
1360            OrchestratorEvent::UiNotification { .. }
1361        ));
1362    }
1363
1364    #[tokio::test]
1365    async fn escalate_after_attempts_escalates_independently_of_retries() {
1366        // No `retries` → infinite, but `escalate-after: 1` forces
1367        // escalation after the first attempt.
1368        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1369        config.message = Some("fix".into());
1370        config.escalate_after = Some(EscalateAfter::Attempts(1));
1371        let mut map = HashMap::new();
1372        map.insert("ci-failed".into(), config);
1373
1374        let (engine, runtime, _rx) = build(map);
1375        let session = fake_session("s1");
1376
1377        // Attempt 1: normal send.
1378        let r1 = engine
1379            .dispatch(&session, "ci-failed")
1380            .await
1381            .unwrap()
1382            .unwrap();
1383        assert!(!r1.escalated);
1384        assert_eq!(runtime.sends().len(), 1);
1385
1386        // Attempt 2: escalated (attempts=2 > 1).
1387        let r2 = engine
1388            .dispatch(&session, "ci-failed")
1389            .await
1390            .unwrap()
1391            .unwrap();
1392        assert!(r2.escalated);
1393        assert_eq!(runtime.sends().len(), 1);
1394    }
1395
1396    #[tokio::test]
1397    async fn escalate_after_duration_does_not_fire_before_elapsed() {
1398        // Phase H contract: duration gate is now honoured, but 5 rapid
1399        // back-to-back dispatches with a `10m` threshold are nowhere
1400        // near elapsed, so no escalation fires — the retries path is
1401        // unset, and the duration gate compares against `first_triggered_at
1402        // + 10m`, which is still in the future. Exercises the happy
1403        // "gate configured, not yet tripped" path.
1404        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1405        config.message = Some("fix".into());
1406        config.escalate_after = Some(EscalateAfter::Duration("10m".into()));
1407        let mut map = HashMap::new();
1408        map.insert("ci-failed".into(), config);
1409
1410        let (engine, runtime, _rx) = build(map);
1411        let session = fake_session("s1");
1412
1413        for _ in 0..5 {
1414            let r = engine
1415                .dispatch(&session, "ci-failed")
1416                .await
1417                .unwrap()
1418                .unwrap();
1419            assert!(!r.escalated);
1420        }
1421        assert_eq!(runtime.sends().len(), 5);
1422    }
1423
1424    #[tokio::test]
1425    async fn escalate_after_duration_fires_once_elapsed_exceeds_threshold() {
1426        // Phase H: duration-based escalation is real. We rewind
1427        // `first_triggered_at` by more than the configured threshold
1428        // instead of sleeping — the logic under test is a pure
1429        // comparison against `elapsed()`, and rewinding exercises the
1430        // same code path far faster than waiting.
1431        let mut config = ReactionConfig::new(ReactionAction::Notify);
1432        config.message = Some("stuck".into());
1433        config.retries = None; // no attempts gate — only duration gate
1434        config.escalate_after = Some(EscalateAfter::Duration("1s".into()));
1435        let mut map = HashMap::new();
1436        map.insert("agent-stuck".into(), config);
1437
1438        let (engine, _runtime, mut rx) = build(map);
1439        let mut session = fake_session("s1");
1440        session.status = SessionStatus::Working;
1441
1442        // First dispatch: tracker created, first_triggered_at = now,
1443        // elapsed ≈ 0, duration gate NOT tripped.
1444        let first = engine
1445            .dispatch(&session, "agent-stuck")
1446            .await
1447            .unwrap()
1448            .unwrap();
1449        assert!(!first.escalated);
1450
1451        // Rewind so elapsed() > 1s on the next read. We access the
1452        // private tracker map directly because this test lives inside
1453        // the same module.
1454        {
1455            let mut trackers = engine.trackers.lock().unwrap();
1456            let key = (session.id.clone(), "agent-stuck".to_string());
1457            let entry = trackers.get_mut(&key).expect("tracker populated");
1458            entry.first_triggered_at = Instant::now()
1459                .checked_sub(std::time::Duration::from_secs(2))
1460                .expect("monotonic clock has been running >2s");
1461        }
1462
1463        // Second dispatch: elapsed ≈ 2s > threshold 1s → escalate.
1464        let second = engine
1465            .dispatch(&session, "agent-stuck")
1466            .await
1467            .unwrap()
1468            .unwrap();
1469        assert!(second.escalated, "duration gate should have fired");
1470        assert_eq!(second.action, ReactionAction::Notify);
1471
1472        // Both a ReactionEscalated and a ReactionTriggered(Notify) are
1473        // emitted on escalation — matches the attempts-form path.
1474        let events = drain(&mut rx);
1475        assert!(
1476            events
1477                .iter()
1478                .any(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. })),
1479            "expected ReactionEscalated, got {events:?}"
1480        );
1481    }
1482
1483    #[tokio::test]
1484    async fn escalate_after_duration_with_garbage_string_logs_once_and_retries_gate_still_fires() {
1485        // A malformed `escalate_after` string must not panic or flip
1486        // `escalate = true`. The retries gate is independent and must
1487        // still fire after the configured number of attempts.
1488        // `warned_parse_failures` must record the key exactly once.
1489        let mut config = ReactionConfig::new(ReactionAction::Notify);
1490        config.message = Some("stuck".into());
1491        config.retries = Some(2);
1492        config.escalate_after = Some(EscalateAfter::Duration("ten minutes".into()));
1493        let mut map = HashMap::new();
1494        map.insert("agent-stuck".into(), config);
1495
1496        let (engine, _runtime, _rx) = build(map);
1497        let mut session = fake_session("s1");
1498        session.status = SessionStatus::Working;
1499
1500        // 3 dispatches: attempts 1, 2 no escalate; attempt 3 > retries=2 escalates.
1501        let r1 = engine
1502            .dispatch(&session, "agent-stuck")
1503            .await
1504            .unwrap()
1505            .unwrap();
1506        assert!(!r1.escalated);
1507        let r2 = engine
1508            .dispatch(&session, "agent-stuck")
1509            .await
1510            .unwrap()
1511            .unwrap();
1512        assert!(!r2.escalated);
1513        let r3 = engine
1514            .dispatch(&session, "agent-stuck")
1515            .await
1516            .unwrap()
1517            .unwrap();
1518        assert!(
1519            r3.escalated,
1520            "retries gate must still fire even when escalate_after is garbage"
1521        );
1522
1523        // Warn-once set should contain the key exactly once (three
1524        // dispatches, but only the first parse failure emits a warn).
1525        let warned = engine.warned_parse_failures.lock().unwrap();
1526        assert!(warned.contains("agent-stuck.escalate_after"));
1527        assert_eq!(
1528            warned.len(),
1529            1,
1530            "only one warn should be recorded across 3 dispatches"
1531        );
1532    }
1533
1534    #[tokio::test]
1535    async fn warn_once_parse_failure_is_idempotent_per_key() {
1536        // Direct helper test: calling warn_once_parse_failure multiple
1537        // times with the same (reaction_key, field) pair inserts once
1538        // and is silently idempotent on subsequent calls. Calling with
1539        // a different field inserts a second entry — the two warnings
1540        // are independent so a reaction with BOTH threshold and
1541        // escalate_after broken gets warned about each.
1542        let (engine, _runtime, _rx) = build(HashMap::new());
1543
1544        engine.warn_once_parse_failure("agent-stuck", "threshold", "ten");
1545        engine.warn_once_parse_failure("agent-stuck", "threshold", "eleven");
1546        engine.warn_once_parse_failure("agent-stuck", "threshold", "twelve");
1547        engine.warn_once_parse_failure("agent-stuck", "escalate_after", "frob");
1548
1549        let warned = engine.warned_parse_failures.lock().unwrap();
1550        assert_eq!(warned.len(), 2);
1551        assert!(warned.contains("agent-stuck.threshold"));
1552        assert!(warned.contains("agent-stuck.escalate_after"));
1553    }
1554
1555    #[tokio::test]
1556    async fn clear_tracker_after_escalation_restores_real_action() {
1557        // Contract: once a session escalates and then *leaves* the
1558        // triggering status (lifecycle calls clear_tracker), re-entering
1559        // the same status must run the configured action again rather
1560        // than immediately re-escalating. This is the whole point of
1561        // clearing trackers on exit — without it, a session that
1562        // recovered and re-failed would see nothing but escalations.
1563        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1564        config.message = Some("fix".into());
1565        config.retries = Some(1);
1566        let mut map = HashMap::new();
1567        map.insert("ci-failed".into(), config);
1568
1569        let (engine, runtime, _rx) = build(map);
1570        let session = fake_session("s1");
1571
1572        // 1st attempt: SendToAgent runs.
1573        let r1 = engine
1574            .dispatch(&session, "ci-failed")
1575            .await
1576            .unwrap()
1577            .unwrap();
1578        assert!(!r1.escalated);
1579        // 2nd attempt: escalates (attempts=2 > retries=1).
1580        let r2 = engine
1581            .dispatch(&session, "ci-failed")
1582            .await
1583            .unwrap()
1584            .unwrap();
1585        assert!(r2.escalated);
1586        assert_eq!(runtime.sends().len(), 1);
1587
1588        // Lifecycle clears the tracker on exit from CiFailed.
1589        engine.clear_tracker(&session.id, "ci-failed");
1590
1591        // Re-entry: action runs again from a clean slate.
1592        let r3 = engine
1593            .dispatch(&session, "ci-failed")
1594            .await
1595            .unwrap()
1596            .unwrap();
1597        assert!(r3.success);
1598        assert!(!r3.escalated);
1599        assert_eq!(r3.action, ReactionAction::SendToAgent);
1600        assert_eq!(runtime.sends().len(), 2);
1601    }
1602
1603    #[tokio::test]
1604    async fn clear_all_for_session_drops_every_reaction_tracker() {
1605        // Covers the leak-guard added for `LifecycleManager::terminate`:
1606        // terminating a session must drop all its trackers regardless
1607        // of which reaction keys it touched.
1608        let mut ci = ReactionConfig::new(ReactionAction::SendToAgent);
1609        ci.message = Some("fix".into());
1610        let mut cr = ReactionConfig::new(ReactionAction::SendToAgent);
1611        cr.message = Some("review".into());
1612        let mut map = HashMap::new();
1613        map.insert("ci-failed".into(), ci);
1614        map.insert("changes-requested".into(), cr);
1615
1616        let (engine, _runtime, _rx) = build(map);
1617        let a = fake_session("a");
1618        let b = fake_session("b");
1619
1620        // Seed three trackers across two sessions.
1621        engine.dispatch(&a, "ci-failed").await.unwrap();
1622        engine.dispatch(&a, "changes-requested").await.unwrap();
1623        engine.dispatch(&b, "ci-failed").await.unwrap();
1624        assert_eq!(engine.attempts(&a.id, "ci-failed"), 1);
1625        assert_eq!(engine.attempts(&a.id, "changes-requested"), 1);
1626        assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
1627
1628        // Wipe session a only.
1629        engine.clear_all_for_session(&a.id);
1630
1631        assert_eq!(engine.attempts(&a.id, "ci-failed"), 0);
1632        assert_eq!(engine.attempts(&a.id, "changes-requested"), 0);
1633        // Session b's trackers survive.
1634        assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
1635    }
1636
1637    #[tokio::test]
1638    async fn auto_false_notify_fires_once_per_transition_and_does_not_escalate() {
1639        // Guards the `auto: false` + Notify edge case: a disabled
1640        // notify has no retry budget, so even `retries: Some(0)` cannot
1641        // trigger spurious escalations on it.
1642        let mut cfg = ReactionConfig::new(ReactionAction::Notify);
1643        cfg.auto = false;
1644        cfg.retries = Some(0); // would escalate if retry path ran
1645        let mut map = HashMap::new();
1646        map.insert("approved-and-green".into(), cfg);
1647
1648        let (engine, _runtime, mut rx) = build(map);
1649        let mut session = fake_session("s1");
1650        session.status = SessionStatus::Mergeable;
1651
1652        // Two consecutive dispatches both return a normal (non-escalated)
1653        // Notify outcome — neither increments the tracker.
1654        for _ in 0..2 {
1655            let r = engine
1656                .dispatch(&session, "approved-and-green")
1657                .await
1658                .unwrap()
1659                .unwrap();
1660            assert!(r.success);
1661            assert!(!r.escalated);
1662            assert_eq!(r.action, ReactionAction::Notify);
1663        }
1664        assert_eq!(engine.attempts(&session.id, "approved-and-green"), 0);
1665
1666        // No ReactionEscalated emitted on the channel.
1667        let events = drain(&mut rx);
1668        assert!(
1669            !events
1670                .iter()
1671                .any(|e| matches!(e, OrchestratorEvent::ReactionEscalated { .. })),
1672            "auto:false notify must not escalate, got {events:?}"
1673        );
1674    }
1675
1676    #[tokio::test]
1677    async fn clear_tracker_resets_attempts_for_next_transition() {
1678        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1679        config.message = Some("fix".into());
1680        config.retries = Some(1);
1681        let mut map = HashMap::new();
1682        map.insert("ci-failed".into(), config);
1683
1684        let (engine, _runtime, _rx) = build(map);
1685        let session = fake_session("s1");
1686
1687        // First attempt uses one retry.
1688        engine.dispatch(&session, "ci-failed").await.unwrap();
1689        assert_eq!(engine.attempts(&session.id, "ci-failed"), 1);
1690
1691        // CI goes green then red again → tracker cleared by lifecycle.
1692        engine.clear_tracker(&session.id, "ci-failed");
1693        assert_eq!(engine.attempts(&session.id, "ci-failed"), 0);
1694
1695        // Fresh attempt sees a full budget.
1696        let r = engine
1697            .dispatch(&session, "ci-failed")
1698            .await
1699            .unwrap()
1700            .unwrap();
1701        assert!(r.success);
1702        assert!(!r.escalated);
1703    }
1704
1705    #[tokio::test]
1706    async fn trackers_are_scoped_per_reaction_key() {
1707        // An attempt on `ci-failed` must not consume budget for
1708        // `changes-requested` on the same session.
1709        let mut ci = ReactionConfig::new(ReactionAction::SendToAgent);
1710        ci.message = Some("fix ci".into());
1711        let mut cr = ReactionConfig::new(ReactionAction::SendToAgent);
1712        cr.message = Some("address review".into());
1713
1714        let mut map = HashMap::new();
1715        map.insert("ci-failed".into(), ci);
1716        map.insert("changes-requested".into(), cr);
1717
1718        let (engine, _runtime, _rx) = build(map);
1719        let session = fake_session("s1");
1720
1721        engine.dispatch(&session, "ci-failed").await.unwrap();
1722        engine.dispatch(&session, "ci-failed").await.unwrap();
1723        engine
1724            .dispatch(&session, "changes-requested")
1725            .await
1726            .unwrap();
1727
1728        assert_eq!(engine.attempts(&session.id, "ci-failed"), 2);
1729        assert_eq!(engine.attempts(&session.id, "changes-requested"), 1);
1730    }
1731
1732    #[tokio::test]
1733    async fn trackers_are_scoped_per_session_id() {
1734        let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
1735        cfg.message = Some("fix".into());
1736        let mut map = HashMap::new();
1737        map.insert("ci-failed".into(), cfg);
1738
1739        let (engine, _runtime, _rx) = build(map);
1740        let a = fake_session("a");
1741        let b = fake_session("b");
1742
1743        engine.dispatch(&a, "ci-failed").await.unwrap();
1744        engine.dispatch(&a, "ci-failed").await.unwrap();
1745        engine.dispatch(&b, "ci-failed").await.unwrap();
1746
1747        assert_eq!(engine.attempts(&a.id, "ci-failed"), 2);
1748        assert_eq!(engine.attempts(&b.id, "ci-failed"), 1);
1749    }
1750
1751    // ---------- Phase B: notifier registry integration ---------- //
1752
1753    use crate::notifier::{tests::TestNotifier, NotificationRouting, NotifierRegistry};
1754
1755    /// Build helper with a notifier registry attached. Same as `build()`
1756    /// but chains `.with_notifier_registry(...)`.
1757    fn build_with_notifier(
1758        cfg_map: HashMap<String, ReactionConfig>,
1759        registry: NotifierRegistry,
1760    ) -> (
1761        Arc<ReactionEngine>,
1762        Arc<RecordingRuntime>,
1763        broadcast::Receiver<OrchestratorEvent>,
1764    ) {
1765        let runtime = Arc::new(RecordingRuntime::new());
1766        let (tx, rx) = broadcast::channel(32);
1767        let engine = Arc::new(
1768            ReactionEngine::new(cfg_map, runtime.clone() as Arc<dyn Runtime>, tx)
1769                .with_notifier_registry(registry),
1770        );
1771        (engine, runtime, rx)
1772    }
1773
1774    #[tokio::test]
1775    async fn dispatch_notify_without_registry_unchanged() {
1776        // Guard Phase D backwards compat: engines without a notifier
1777        // registry must keep emitting the event and returning success.
1778        let mut config = ReactionConfig::new(ReactionAction::Notify);
1779        config.message = Some("approved".into());
1780        let mut map = HashMap::new();
1781        map.insert("approved-and-green".into(), config);
1782
1783        let (engine, _runtime, mut rx) = build(map);
1784        let mut session = fake_session("s1");
1785        session.status = SessionStatus::Mergeable;
1786
1787        let result = engine
1788            .dispatch(&session, "approved-and-green")
1789            .await
1790            .unwrap()
1791            .unwrap();
1792        assert!(result.success);
1793        assert_eq!(result.action, ReactionAction::Notify);
1794        assert!(!result.escalated);
1795        assert_eq!(result.message.as_deref(), Some("approved"));
1796
1797        let events = drain(&mut rx);
1798        assert_eq!(events.len(), 2, "got {events:?}");
1799        assert!(events.iter().any(|e| matches!(
1800            e,
1801            OrchestratorEvent::ReactionTriggered {
1802                action: ReactionAction::Notify,
1803                ..
1804            }
1805        )));
1806        assert!(events.iter().any(|e| matches!(
1807            e,
1808            OrchestratorEvent::UiNotification { notification } if notification.action == ReactionAction::Notify
1809        )));
1810    }
1811
1812    #[tokio::test]
1813    async fn dispatch_notify_with_empty_routing_is_success() {
1814        // Registry attached but routing table empty → resolve returns
1815        // nothing → success true, no plugins called, event still emitted.
1816        let registry = NotifierRegistry::new(NotificationRouting::default());
1817        let config = ReactionConfig::new(ReactionAction::Notify);
1818        let mut map = HashMap::new();
1819        map.insert("approved-and-green".into(), config);
1820
1821        let (engine, _runtime, mut rx) = build_with_notifier(map, registry);
1822        let mut session = fake_session("s1");
1823        session.status = SessionStatus::Mergeable;
1824
1825        let result = engine
1826            .dispatch(&session, "approved-and-green")
1827            .await
1828            .unwrap()
1829            .unwrap();
1830        assert!(result.success);
1831        assert!(!result.escalated);
1832
1833        let events = drain(&mut rx);
1834        assert!(events
1835            .iter()
1836            .any(|e| matches!(e, OrchestratorEvent::ReactionTriggered { .. })));
1837    }
1838
1839    #[tokio::test]
1840    async fn dispatch_notify_routes_to_single_plugin() {
1841        // One plugin registered for the priority, one notification
1842        // delivered. Assert the payload has the right fields.
1843        let mut routing = HashMap::new();
1844        routing.insert(EventPriority::Action, vec!["test".to_string()]);
1845        let (tn, received) = TestNotifier::new("test");
1846        let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
1847        registry.register("test", Arc::new(tn));
1848
1849        let mut config = ReactionConfig::new(ReactionAction::Notify);
1850        config.message = Some("PR merged".into());
1851        let mut map = HashMap::new();
1852        map.insert("approved-and-green".into(), config);
1853
1854        let (engine, _runtime, _rx) = build_with_notifier(map, registry);
1855        let mut session = fake_session("s1");
1856        session.status = SessionStatus::Mergeable;
1857
1858        let result = engine
1859            .dispatch(&session, "approved-and-green")
1860            .await
1861            .unwrap()
1862            .unwrap();
1863        assert!(result.success);
1864        assert_eq!(result.message.as_deref(), Some("PR merged"));
1865
1866        let payloads = received.lock().unwrap();
1867        assert_eq!(payloads.len(), 1);
1868        assert_eq!(payloads[0].reaction_key, "approved-and-green");
1869        assert_eq!(payloads[0].priority, EventPriority::Action);
1870        assert_eq!(payloads[0].body, "PR merged");
1871        assert!(!payloads[0].escalated);
1872    }
1873
1874    #[tokio::test]
1875    async fn dispatch_notify_fan_out_reports_partial_failure() {
1876        // Two plugins: one succeeds, one fails. The outcome must be
1877        // success = false and message must name the failing plugin.
1878        use crate::notifier::NotifierError;
1879
1880        struct FailNotifier;
1881
1882        #[async_trait::async_trait]
1883        impl crate::notifier::Notifier for FailNotifier {
1884            fn name(&self) -> &str {
1885                "fail"
1886            }
1887            async fn send(
1888                &self,
1889                _payload: &NotificationPayload,
1890            ) -> std::result::Result<(), NotifierError> {
1891                Err(NotifierError::Unavailable("offline".into()))
1892            }
1893        }
1894
1895        let mut routing = HashMap::new();
1896        routing.insert(
1897            EventPriority::Urgent,
1898            vec!["ok-plugin".to_string(), "fail".to_string()],
1899        );
1900        let (tn, received) = TestNotifier::new("ok-plugin");
1901        let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
1902        registry.register("ok-plugin", Arc::new(tn));
1903        registry.register("fail", Arc::new(FailNotifier));
1904
1905        let mut config = ReactionConfig::new(ReactionAction::Notify);
1906        config.message = Some("something".into());
1907        let mut map = HashMap::new();
1908        // Default priority for `agent-stuck` is `urgent` (matches
1909        // `default_priority_for_reaction_key`).
1910        map.insert("agent-stuck".into(), config);
1911
1912        let (engine, _runtime, _rx) = build_with_notifier(map, registry);
1913        let mut session = fake_session("s1");
1914        session.status = SessionStatus::Stuck;
1915
1916        let result = engine
1917            .dispatch(&session, "agent-stuck")
1918            .await
1919            .unwrap()
1920            .unwrap();
1921        assert!(!result.success);
1922        let msg = result.message.unwrap();
1923        assert!(
1924            msg.contains("fail"),
1925            "error message should name the failing notifier, got: {msg}"
1926        );
1927
1928        // Successful plugin still received the payload.
1929        let payloads = received.lock().unwrap();
1930        assert_eq!(payloads.len(), 1);
1931        assert_eq!(payloads[0].reaction_key, "agent-stuck");
1932    }
1933
1934    #[tokio::test]
1935    async fn escalation_routes_through_notifier_registry() {
1936        // When retries exhaust and the engine escalates to Notify, the
1937        // registry is used to fan out the escalated notification.
1938        let mut routing = HashMap::new();
1939        routing.insert(EventPriority::Urgent, vec!["test".to_string()]);
1940        let (tn, received) = TestNotifier::new("test");
1941        let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
1942        registry.register("test", Arc::new(tn));
1943
1944        let mut config = ReactionConfig::new(ReactionAction::SendToAgent);
1945        config.message = Some("fix ci".into());
1946        config.retries = Some(1);
1947        let mut map = HashMap::new();
1948        map.insert("ci-failed".into(), config);
1949
1950        let (engine, _runtime, mut rx) = build_with_notifier(map, registry);
1951        let session = fake_session("s1");
1952
1953        // 1st attempt: SendToAgent (no escalation).
1954        let r1 = engine
1955            .dispatch(&session, "ci-failed")
1956            .await
1957            .unwrap()
1958            .unwrap();
1959        assert!(!r1.escalated);
1960
1961        // 2nd attempt: retries exhausted → escalation to Notify.
1962        let r2 = engine
1963            .dispatch(&session, "ci-failed")
1964            .await
1965            .unwrap()
1966            .unwrap();
1967        assert!(r2.escalated);
1968        assert_eq!(r2.action, ReactionAction::Notify);
1969
1970        // Notifier received an escalated payload.
1971        let payloads = received.lock().unwrap();
1972        assert_eq!(payloads.len(), 1);
1973        assert!(payloads[0].escalated);
1974        assert_eq!(payloads[0].reaction_key, "ci-failed");
1975        assert_eq!(payloads[0].priority, EventPriority::Urgent);
1976
1977        // Events: SendToAgent trigger, then ReactionEscalated + ReactionTriggered(Notify).
1978        let events = drain(&mut rx);
1979        assert!(events.iter().any(|e| matches!(
1980            e,
1981            OrchestratorEvent::ReactionEscalated {
1982                reaction_key,
1983                ..
1984            } if reaction_key == "ci-failed"
1985        )));
1986        assert!(events.iter().any(|e| matches!(
1987            e,
1988            OrchestratorEvent::ReactionTriggered {
1989                action: ReactionAction::Notify,
1990                ..
1991            }
1992        )));
1993    }
1994
1995    #[tokio::test]
1996    async fn auto_false_notify_still_routes_through_registry() {
1997        // `auto: false` + action: Notify → bypass retry budget but
1998        // still fan out through the registry.
1999        let mut routing = HashMap::new();
2000        routing.insert(EventPriority::Action, vec!["test".to_string()]);
2001        let (tn, received) = TestNotifier::new("test");
2002        let mut registry = NotifierRegistry::new(NotificationRouting::from_map(routing));
2003        registry.register("test", Arc::new(tn));
2004
2005        let mut config = ReactionConfig::new(ReactionAction::Notify);
2006        config.auto = false;
2007        config.message = Some("fyi".into());
2008        let mut map = HashMap::new();
2009        map.insert("approved-and-green".into(), config);
2010
2011        let (engine, _runtime, _rx) = build_with_notifier(map, registry);
2012        let mut session = fake_session("s1");
2013        session.status = SessionStatus::Mergeable;
2014
2015        let result = engine
2016            .dispatch(&session, "approved-and-green")
2017            .await
2018            .unwrap()
2019            .unwrap();
2020        assert!(result.success);
2021        assert!(!result.escalated);
2022
2023        let payloads = received.lock().unwrap();
2024        assert_eq!(payloads.len(), 1);
2025        assert_eq!(payloads[0].body, "fyi");
2026    }
2027}