Skip to main content

ao_core/lifecycle/
mod.rs

1//! Background polling loop that keeps `Session` state in sync with reality.
2//!
3//! Corresponds to `packages/core/src/lifecycle-manager.ts` in the reference
4//! repo, trimmed to what Slice 1 Phase C actually needs:
5//!
6//! 1. Every `poll_interval`, list all non-terminal sessions from disk.
7//! 2. For each one, probe `Runtime::is_alive` and `Agent::detect_activity`.
8//! 3. Apply state transitions and persist the new `Session` atomically.
9//! 4. Broadcast `OrchestratorEvent`s so subscribers (CLI, reaction engine,
10//!    notifiers, …) can react without polling themselves.
11//!
12//! Design notes:
13//!
14//! - **Trait objects, not generics.** The manager owns `Arc<dyn Runtime>`
15//!   etc. so the same `LifecycleManager` type can be used in tests (with
16//!   mocks) and in the real CLI (with tmux/claude-code). Generic parameters
17//!   would have leaked through every consumer.
18//!
19//! - **Disk is the source of truth.** The loop re-reads from
20//!   `SessionManager::list` each tick rather than holding state in memory.
21//!   This matches the Slice 1 design principle established in Phase A, and
22//!   means `ao-rs spawn` running in a separate process is immediately
23//!   visible on the next tick. (A future Slice 2+ may add an in-memory
24//!   cache + file-watcher for efficiency.)
25//!
26//! - **Per-session errors don't stop the loop.** If one session's runtime
27//!   probe fails, we emit `TickError` and continue. Only fatal `SessionManager::list`
28//!   errors bubble up (and even then we log and keep looping).
29//!
30//! - **Event channel lag.** We use `tokio::sync::broadcast`, which drops
31//!   old events when a slow subscriber can't keep up. That's fine for
32//!   observability — a reaction engine that misses a tick just picks up
33//!   the next one. Anyone needing lossless delivery should snapshot via
34//!   `SessionManager::list` on startup and then subscribe.
35
36use crate::{
37    error::Result,
38    events::{OrchestratorEvent, TerminationReason},
39    reaction_engine::{parse_duration, status_to_reaction_key, ReactionEngine},
40    reactions::{ReactionAction, ReactionOutcome},
41    scm::{CheckStatus, CiStatus, MergeReadiness, PrState, PullRequest, ReviewDecision},
42    scm_transitions::{derive_scm_status, ScmObservation},
43    session_manager::SessionManager,
44    traits::{Agent, Runtime, Scm, Workspace},
45    types::{ActivityState, Session, SessionId, SessionStatus},
46};
47use std::{
48    collections::{HashMap, HashSet},
49    hash::{Hash, Hasher},
50    sync::{
51        atomic::{AtomicBool, AtomicU64, Ordering},
52        Arc, Mutex,
53    },
54    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
55};
56use tokio::sync::broadcast;
57use tokio_util::sync::CancellationToken;
58
59mod scm_poll;
60mod stuck;
61mod tick;
62mod transition;
63
64/// How many events the broadcast channel buffers before dropping the oldest.
65/// 1024 is generous — a healthy loop emits at most ~N events per tick where
66/// N is the session count, and slow subscribers will lag at most a handful
67/// of ticks before catching up.
68const EVENT_CHANNEL_CAPACITY: usize = 1024;
69
70/// Default poll interval. Increased from the TS reference's 5 s to 10 s
71/// to further reduce GitHub API pressure now that batch enrichment +
72/// ETag guards handle the hot path.
73pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(10);
74
75/// Minimum time between review-backlog API calls per session (2 min).
76/// Mirrors `REVIEW_BACKLOG_THROTTLE_MS = 120_000` in the TS reference.
77/// Applies when the batch enrichment cache has no hit and the session is
78/// already in a review-related state — avoids hammering the REST API for
79/// review data that rarely changes within a 2-minute window.
80const REVIEW_BACKLOG_THROTTLE: Duration = Duration::from_secs(120);
81
82pub struct LifecycleManager {
83    pub(super) sessions: Arc<SessionManager>,
84    pub(super) runtime: Arc<dyn Runtime>,
85    pub(super) agent: Arc<dyn Agent>,
86    pub(super) events_tx: broadcast::Sender<OrchestratorEvent>,
87    poll_interval: Duration,
88    /// Optional Slice 2 Phase D reaction engine. When set, every status
89    /// transition into a reaction-triggering state (see
90    /// `status_to_reaction_key`) calls `engine.dispatch(...)`. When unset,
91    /// the lifecycle loop behaves exactly as it did in Phase C.
92    pub(super) reaction_engine: Option<Arc<ReactionEngine>>,
93    /// Optional Slice 2 Phase F SCM plugin. When set, every tick calls
94    /// `detect_pr` on each non-terminal session; a fresh PR observation
95    /// is folded through `derive_scm_status` to produce PR-driven status
96    /// transitions (`Working → PrOpen/CiFailed/ChangesRequested/…`).
97    /// When unset, the lifecycle loop is exactly the Phase C/D behaviour
98    /// — SCM polling is off. This matches how tests and `ao-rs watch`
99    /// without a configured plugin should behave.
100    pub(super) scm: Option<Arc<dyn Scm>>,
101    /// Optional workspace plugin. When set, sessions that transition to
102    /// `Merged` automatically have their worktree destroyed so disk space
103    /// is reclaimed without a manual `ao-rs cleanup`.
104    pub(super) workspace: Option<Arc<dyn Workspace>>,
105    /// Slice 2 Phase H bookkeeping for agent-stuck detection.
106    ///
107    /// Records the `Instant` at which each session first entered an idle
108    /// activity state (`Idle` / `Blocked`). `check_stuck` reads this map
109    /// to decide whether the session has been idle longer than the
110    /// configured `agent-stuck.threshold`.
111    ///
112    /// - An entry is **inserted** the first tick activity flips into
113    ///   `Idle`/`Blocked` and **preserved** across subsequent idle ticks
114    ///   so `elapsed()` grows monotonically.
115    /// - An entry is **removed** as soon as activity flips back to any
116    ///   non-idle state, so the next idle streak restarts the clock.
117    /// - `terminate` also clears the entry to bound memory for long-
118    ///   running watch loops (Task 2.7).
119    ///
120    /// `Mutex<HashMap>` mirrors how `ReactionEngine::trackers` is stored —
121    /// short critical sections around pure read/modify/write, no nested
122    /// locking.
123    pub(super) idle_since: Mutex<HashMap<SessionId, Instant>>,
124    /// Per-tick cache of batch-enriched PR observations.
125    ///
126    /// Populated once at the start of each `tick()` call via
127    /// `Scm::enrich_prs_batch()`. Individual `poll_scm` calls check this
128    /// cache first and skip the 4× REST fan-out when they find a hit.
129    /// Cleared at the start of the next tick.
130    ///
131    /// Key format: `"{owner}/{repo}#{number}"`.
132    pub(super) pr_enrichment_cache: Mutex<HashMap<String, ScmObservation>>,
133    /// Per-session timestamp of the last review backlog API check.
134    /// Throttles `pending_comments` calls to at most once per 2 minutes.
135    pub(super) last_review_backlog_check: Mutex<HashMap<SessionId, Instant>>,
136    /// Per-tick cache of detected PRs from `detect_pr`. Populated in
137    /// `tick()` Pass 1 so `poll_scm` reuses the result instead of
138    /// calling `detect_pr` a second time.
139    pub(super) detected_prs_cache: Mutex<HashMap<SessionId, Option<PullRequest>>>,
140    /// Unix-ms when `run_loop` started. `0` means "not yet started"
141    /// (e.g. tests driving `tick` directly). Used by `tick` to
142    /// classify first-seen sessions as `SessionRestored` (created
143    /// before startup) vs. `Spawned` (created after).
144    pub(super) startup_ms: AtomicU64,
145    /// Set to `true` once `all-complete` has been dispatched for the
146    /// current drain cycle (all active sessions reached terminal state).
147    /// Reset to `false` on the first tick that observes a new non-terminal
148    /// session, so a fresh batch of sessions gets its own `all-complete`.
149    pub(super) all_complete_fired: AtomicBool,
150}
151
152impl LifecycleManager {
153    pub fn new(
154        sessions: Arc<SessionManager>,
155        runtime: Arc<dyn Runtime>,
156        agent: Arc<dyn Agent>,
157    ) -> Self {
158        let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
159        Self {
160            sessions,
161            runtime,
162            agent,
163            events_tx,
164            poll_interval: DEFAULT_POLL_INTERVAL,
165            reaction_engine: None,
166            scm: None,
167            workspace: None,
168            idle_since: Mutex::new(HashMap::new()),
169            pr_enrichment_cache: Mutex::new(HashMap::new()),
170            last_review_backlog_check: Mutex::new(HashMap::new()),
171            detected_prs_cache: Mutex::new(HashMap::new()),
172            startup_ms: AtomicU64::new(0),
173            all_complete_fired: AtomicBool::new(false),
174        }
175    }
176
177    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
178        self.poll_interval = interval;
179        self
180    }
181
182    /// Attach a reaction engine. `ReactionEngine::new` should be given
183    /// this `LifecycleManager`'s `events_tx` (via `events_sender()`) so
184    /// reaction events land on the same broadcast channel as lifecycle
185    /// events. Builder form for test ergonomics.
186    pub fn with_reaction_engine(mut self, engine: Arc<ReactionEngine>) -> Self {
187        self.reaction_engine = Some(engine);
188        self
189    }
190
191    /// Attach an SCM plugin. When present, `poll_one` fans out a
192    /// `detect_pr` + four parallel `pr_state`/`ci_status`/`review_decision`/
193    /// `mergeability` probes per session per tick, then routes the result
194    /// through `derive_scm_status` for PR-driven status transitions.
195    ///
196    /// Builder form mirrors `with_reaction_engine` — call sites that don't
197    /// care about SCM polling leave it unset and get Phase C/D behaviour.
198    pub fn with_scm(mut self, scm: Arc<dyn Scm>) -> Self {
199        self.scm = Some(scm);
200        self
201    }
202
203    /// Attach a workspace plugin. When present, sessions that transition to
204    /// `Merged` automatically have their worktree destroyed within the same
205    /// poll cycle. Sessions with `workspace_path: None` are unaffected.
206    pub fn with_workspace(mut self, workspace: Arc<dyn Workspace>) -> Self {
207        self.workspace = Some(workspace);
208        self
209    }
210
211    /// Borrow the underlying broadcast sender so a `ReactionEngine`
212    /// constructed separately can publish events on the same channel
213    /// `LifecycleManager` uses. Cheap clone — `broadcast::Sender` is
214    /// internally ref-counted.
215    pub fn events_sender(&self) -> broadcast::Sender<OrchestratorEvent> {
216        self.events_tx.clone()
217    }
218
219    /// Get a fresh subscriber. Each `recv()` call sees events from the
220    /// point of subscription onward — history is not replayed.
221    pub fn subscribe(&self) -> broadcast::Receiver<OrchestratorEvent> {
222        self.events_tx.subscribe()
223    }
224
225    /// Spawn the background polling loop. Returns a handle that can be
226    /// used to stop it cleanly.
227    ///
228    /// We use `tokio_util::sync::CancellationToken` rather than a oneshot
229    /// because cancellation tokens are cheap to clone and can be passed
230    /// into future sub-tasks (e.g. a reaction engine that shares this
231    /// manager's shutdown signal).
232    pub fn spawn(self: Arc<Self>) -> LifecycleHandle {
233        let token = CancellationToken::new();
234        let child_token = token.clone();
235        let this = self.clone();
236        let join = tokio::spawn(async move {
237            this.run_loop(child_token).await;
238        });
239        LifecycleHandle { join, token }
240    }
241
242    /// The loop body. Ticks on `poll_interval`, exits cleanly when the
243    /// cancellation token fires.
244    async fn run_loop(self: Arc<Self>, token: CancellationToken) {
245        // Per-loop memory of which session IDs we've already announced via
246        // `Spawned`, so we emit it exactly once per session observed.
247        let mut seen: HashSet<SessionId> = HashSet::new();
248
249        // Record startup time so `tick` can distinguish sessions that
250        // predate this loop (emitted as `SessionRestored`) from sessions
251        // created after startup (emitted as `Spawned`). Set *before* the
252        // sweep so any session whose `created_at` equals or exceeds this
253        // moment is classified as new.
254        let startup_ms = SystemTime::now()
255            .duration_since(UNIX_EPOCH)
256            .map(|d| d.as_millis() as u64)
257            .unwrap_or(0);
258        // `0` is the "not started" sentinel — guard against clock skew
259        // that would stamp it as such.
260        self.startup_ms.store(startup_ms.max(1), Ordering::Relaxed);
261
262        // Crash-recovery sweep: if the process restarted after a session was
263        // persisted as `Merged` but before its worktree was destroyed (e.g.
264        // the daemon was killed mid-tick), the transition tick will never fire
265        // again because terminal sessions are skipped by `poll_one`. We scan
266        // once at startup and clean up any leftover worktrees.
267        self.sweep_merged_worktrees().await;
268
269        let mut ticker = tokio::time::interval(self.poll_interval);
270        // Skip the immediate-fire behaviour of `interval` — users expect
271        // "start, wait, tick" not "start, tick, wait". (The TS loop
272        // behaves the same way.)
273        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
274
275        loop {
276            tokio::select! {
277                _ = token.cancelled() => {
278                    tracing::debug!("lifecycle loop received cancel");
279                    return;
280                }
281                _ = ticker.tick() => {
282                    if let Err(e) = self.tick(&mut seen).await {
283                        // Fatal disk read error — log and keep going.
284                        // A transient `~/.ao-rs/sessions` permission issue
285                        // shouldn't permanently kill the loop.
286                        tracing::error!("lifecycle tick failed: {e}");
287                    }
288                }
289            }
290        }
291    }
292
293    /// One pass over every non-terminal session. Public so tests can
294    /// drive the state machine deterministically without `sleep`ing.
295    pub async fn tick(&self, seen: &mut HashSet<SessionId>) -> Result<()> {
296        let sessions = self.sessions.list().await?;
297
298        // ---- Batch PR enrichment (rate-limit optimization) ----
299        // Two-pass approach:
300        //   Pass 1: detect_pr for each non-terminal session → collect PRs
301        //   Batch: enrich_prs_batch for all collected PRs → populate cache
302        //   Pass 2: poll_one (which calls poll_scm) consumes from cache,
303        //           skipping the 4× REST fan-out when a batch hit exists.
304        //
305        // detect_pr runs once per session per tick (same as before); the
306        // savings come from replacing 4× REST per session with 1 GraphQL
307        // batch for all sessions.
308
309        // Clear the previous tick's cache.
310        {
311            let mut cache = self.pr_enrichment_cache.lock().unwrap_or_else(|e| {
312                tracing::error!("pr_enrichment_cache mutex poisoned; recovering inner state: {e}");
313                e.into_inner()
314            });
315            cache.clear();
316        }
317
318        // Pass 1: detect PRs (only when SCM is configured).
319        // Store detected PRs keyed by session ID so poll_scm can reuse them.
320        let mut detected_prs: HashMap<SessionId, Option<PullRequest>> = HashMap::new();
321        if let Some(scm) = self.scm.as_ref() {
322            let mut prs_for_batch = Vec::new();
323            for session in &sessions {
324                if session.is_terminal() {
325                    continue;
326                }
327                let id = session.id.clone();
328                match scm.detect_pr(session).await {
329                    Ok(pr) => {
330                        if let Some(ref p) = pr {
331                            prs_for_batch.push(p.clone());
332                        }
333                        detected_prs.insert(id, pr);
334                    }
335                    Err(e) => {
336                        self.emit(OrchestratorEvent::TickError {
337                            id: id.clone(),
338                            message: format!("scm.detect_pr: {e}"),
339                        });
340                        detected_prs.insert(id, None);
341                    }
342                }
343            }
344
345            // Batch enrichment
346            if !prs_for_batch.is_empty() {
347                match scm.enrich_prs_batch(&prs_for_batch).await {
348                    Ok(enrichment) => {
349                        if !enrichment.is_empty() {
350                            tracing::debug!(
351                                "[batch enrichment] cached {} PR observations",
352                                enrichment.len()
353                            );
354                            let mut cache = self
355                                .pr_enrichment_cache
356                                .lock()
357                                .unwrap_or_else(|e| {
358                                    tracing::error!("pr_enrichment_cache mutex poisoned; recovering inner state: {e}");
359                                    e.into_inner()
360                                });
361                            *cache = enrichment;
362                        }
363                    }
364                    Err(e) => {
365                        tracing::warn!("[batch enrichment] failed: {e}");
366                    }
367                }
368            }
369        }
370
371        // Store detected PRs so poll_scm can consume them.
372        {
373            let mut cache = self.detected_prs_cache.lock().unwrap_or_else(|e| {
374                tracing::error!("detected_prs_cache mutex poisoned; recovering inner state: {e}");
375                e.into_inner()
376            });
377            *cache = detected_prs;
378        }
379
380        // Pass 2: poll each session.
381        let startup_ms = self.startup_ms.load(Ordering::Relaxed);
382        let mut any_active = false;
383        for session in sessions {
384            let id = session.id.clone();
385            if seen.insert(id.clone()) {
386                // Sessions that predate loop startup are restored from disk,
387                // not newly spawned. When `startup_ms == 0` (tests driving
388                // `tick` directly, no `run_loop`), preserve the original
389                // behaviour and emit `Spawned` for everything.
390                if startup_ms != 0 && session.created_at < startup_ms {
391                    self.emit(OrchestratorEvent::SessionRestored {
392                        id: id.clone(),
393                        project_id: session.project_id.clone(),
394                        status: session.status,
395                    });
396                } else {
397                    self.emit(OrchestratorEvent::Spawned {
398                        id,
399                        project_id: session.project_id.clone(),
400                    });
401                }
402            }
403
404            if session.is_terminal() {
405                continue;
406            }
407
408            any_active = true;
409            // A fresh non-terminal session re-arms the all-complete gate so
410            // a subsequent drain fires a new `all-complete`.
411            self.all_complete_fired.store(false, Ordering::Relaxed);
412
413            if let Err(e) = self.poll_one(session).await {
414                tracing::warn!("poll_one failed: {e}");
415            }
416        }
417
418        // ---- all-complete (issue #195 H3) ----
419        // When all seen sessions are terminal and we have seen at least one,
420        // dispatch `all-complete` exactly once per drain cycle.
421        if !any_active && !seen.is_empty() && !self.all_complete_fired.load(Ordering::Relaxed) {
422            if let Some(engine) = self.reaction_engine.as_ref() {
423                // `all-complete` has no session context — we use a synthetic
424                // sentinel session so the engine can look up the reaction
425                // config. This mirrors how TS fires a summary-level event.
426                let sentinel = all_complete_sentinel();
427                match engine.dispatch(&sentinel, "all-complete").await {
428                    Ok(_) => {
429                        self.all_complete_fired.store(true, Ordering::Relaxed);
430                    }
431                    Err(e) => {
432                        tracing::warn!(error = %e, "all-complete dispatch failed");
433                    }
434                }
435            }
436        }
437
438        Ok(())
439    }
440
441    /// Maintain the `idle_since` map in response to a fresh activity
442    /// reading.
443    ///
444    /// - `Idle` or `Blocked` → insert `Instant::now()` **only if** the
445    ///   session isn't already in the map. Preserving the older timestamp
446    ///   means an entry that has been idle for three ticks is still
447    ///   three-ticks-old on the fourth tick, so `elapsed()` grows
448    ///   monotonically across the idle streak.
449    /// - Any other activity state → remove the entry so the next idle
450    ///   streak restarts the clock from zero.
451    ///
452    /// Called unconditionally from `poll_one` after the persist-activity
453    /// block. Terminal activities (`Exited`) never reach this helper —
454    /// `poll_one` short-circuits to `terminate` beforehand, which clears
455    /// the entry via `idle_since.remove` (Task 2.7).
456    pub(super) fn update_idle_since(&self, session_id: &SessionId, activity: ActivityState) {
457        let mut map = self.idle_since.lock().unwrap_or_else(|e| {
458            tracing::error!("lifecycle idle_since mutex poisoned; recovering inner state: {e}");
459            e.into_inner()
460        });
461        match activity {
462            ActivityState::Idle | ActivityState::Blocked => {
463                map.entry(session_id.clone()).or_insert_with(Instant::now);
464            }
465            _ => {
466                map.remove(session_id);
467            }
468        }
469    }
470
471    /// Fire an event into the broadcast channel. A send error only means
472    /// there are currently zero subscribers — that's expected during CLI
473    /// startup and not worth surfacing.
474    pub(super) fn emit(&self, event: OrchestratorEvent) {
475        let _ = self.events_tx.send(event);
476    }
477}
478
479/// Fold four `Result<_, AoError>` probe results into a single
480/// `ScmObservation`, or produce a `"probe_name: error; …"` diagnostic
481/// string listing every probe that failed.
482///
483/// The `poll_scm` tick refuses to transition on a partial observation —
484/// it's all or nothing — so the caller's path after this helper is a
485/// single `match` between "we have all four fields" and "emit TickError".
486///
487/// Free function rather than a method because the decision has no
488/// `&self` dependencies; extracting it keeps the async fan-out in
489/// `poll_scm` readable at one level of abstraction.
490pub(super) fn assemble_observation(
491    state: Result<PrState>,
492    ci: Result<CiStatus>,
493    review: Result<ReviewDecision>,
494    readiness: Result<MergeReadiness>,
495) -> std::result::Result<ScmObservation, String> {
496    match (state, ci, review, readiness) {
497        (Ok(state), Ok(ci), Ok(review), Ok(readiness)) => Ok(ScmObservation {
498            state,
499            ci,
500            review,
501            readiness,
502        }),
503        (state, ci, review, readiness) => {
504            // Join whichever errors fired into one human-readable
505            // message. Each slot contributes `"<slot>: <err>"` or
506            // nothing; empty output is impossible because we only hit
507            // this arm when at least one was Err.
508            let parts: Vec<String> = [
509                state.err().map(|e| format!("pr_state: {e}")),
510                ci.err().map(|e| format!("ci_status: {e}")),
511                review.err().map(|e| format!("review_decision: {e}")),
512                readiness.err().map(|e| format!("mergeability: {e}")),
513            ]
514            .into_iter()
515            .flatten()
516            .collect();
517            Err(parts.join("; "))
518        }
519    }
520}
521
522/// Should the lifecycle park this session in `MergeFailed` after
523/// dispatching `approved-and-green`?
524///
525/// Yes iff we *just* entered `Mergeable`, `reactions.approved-and-green`
526/// is configured with `action: auto-merge`, and the dispatch soft-failed
527/// *without* escalating. Parking is keyed off the **configured** action,
528/// not `ReactionOutcome::action`, so a mismatched or escalated outcome
529/// cannot trap a `notify` / `send-to-agent` rule in the merge retry
530/// loop.
531///
532/// Escalated outcomes are deliberately **not** parked: once the retry
533/// budget is exhausted the human has been notified, and bouncing the
534/// session into `MergeFailed → Mergeable → escalate → Notify → ...`
535/// on every tick would spam the notification channel. Leaving the
536/// session in `Mergeable` visually says "ready, but auto-merge gave
537/// up" — any subsequent observation change (CI flake, reviewer
538/// dismissal, branch deletion) will naturally flip it off the ready
539/// path via the normal ladder.
540pub(super) fn should_park_in_merge_failed(
541    engine: &ReactionEngine,
542    session: &Session,
543    to: SessionStatus,
544    reaction_key: &str,
545    outcome: &ReactionOutcome,
546) -> bool {
547    to == SessionStatus::Mergeable
548        && reaction_key == "approved-and-green"
549        && engine
550            .resolve_reaction_config(session, reaction_key)
551            .is_some_and(|c| c.action == ReactionAction::AutoMerge)
552        && !outcome.success
553        && !outcome.escalated
554}
555
556/// Clear reaction trackers on status transitions, with a carve-out
557/// for the `Mergeable ↔ MergeFailed` parking loop.
558///
559/// The default rule is simple: on exit from a reaction-triggering
560/// status, clear that reaction's tracker so a future re-entry starts
561/// with a full retry budget. Phase G needs a carve-out because the
562/// parking loop repeatedly re-enters `Mergeable` on purpose, and the
563/// retry budget is supposed to *accumulate* across those re-entries
564/// — clearing the tracker on `Mergeable → MergeFailed` or
565/// `MergeFailed → Mergeable` would reset attempts to zero and the
566/// retry cap would never fire.
567///
568/// The exit case `MergeFailed → anything_but_Mergeable` (CI flipped
569/// red, reviewer dismissed, PR closed) is subtle: the parking loop
570/// is over, so a later re-entry from `PrOpen → Mergeable` should
571/// start fresh. `status_to_reaction_key(MergeFailed) == None`, so
572/// the default-rule branch below wouldn't clear anything — we need
573/// an explicit `clear_tracker("approved-and-green")` for this case.
574pub(super) fn clear_tracker_on_transition(
575    engine: &ReactionEngine,
576    session_id: &SessionId,
577    from: SessionStatus,
578    to: SessionStatus,
579) {
580    // Parking-loop edges: preserve the `approved-and-green` tracker
581    // so retry accounting accumulates across the loop.
582    let parking_loop_edge = matches!(
583        (from, to),
584        (SessionStatus::Mergeable, SessionStatus::MergeFailed)
585            | (SessionStatus::MergeFailed, SessionStatus::Mergeable)
586    );
587    if parking_loop_edge {
588        return;
589    }
590
591    // Leaving `MergeFailed` to a non-`Mergeable` state: the retry
592    // loop is over (observation moved off the ready path), so clear
593    // the parked tracker. The default-rule branch below would miss
594    // this because `status_to_reaction_key(MergeFailed) == None`.
595    if from == SessionStatus::MergeFailed {
596        engine.clear_tracker(session_id, "approved-and-green");
597        return;
598    }
599
600    // `CiFailed` is excluded from `status_to_reaction_key` (issue #195 H3)
601    // because its dispatch is handled by `check_ci_failed`, not `transition`.
602    // Explicitly clear the tracker here so a second CI failure episode
603    // after a fix starts with a fresh retry budget.
604    if from == SessionStatus::CiFailed {
605        engine.clear_tracker(session_id, "ci-failed");
606        return;
607    }
608
609    // Default rule: clear the `from` reaction's tracker on exit.
610    if let Some(prev_key) = status_to_reaction_key(from) {
611        engine.clear_tracker(session_id, prev_key);
612    }
613}
614
615/// Is `status` eligible for stuck detection? I.e., if a session in
616/// this status has been observed with `Idle`/`Blocked` activity for
617/// longer than the `agent-stuck` reaction's `threshold`, should it be
618/// flipped to `Stuck`?
619///
620/// Phase H. The set matches the "work in progress" statuses where a
621/// silent agent is genuinely unexpected:
622///
623/// - `Working`: happy path coder lost its train of thought.
624/// - `PrOpen` / `CiFailed` / `ReviewPending` / `ChangesRequested`
625///   / `Approved` / `Mergeable`: PR-track states where the agent is
626///   waiting on or reacting to CI / a reviewer, and should be
627///   actively working (applying review comments, re-running tests,
628///   responding to CI failures).
629///
630/// Excluded statuses — and why each is excluded — are enumerated
631/// exhaustively in the match below. The match has **no wildcard**: a
632/// future `SessionStatus` variant will fail the build here until
633/// stuck-eligibility is decided for it. Same discipline as the
634/// `ALL_SESSION_STATUSES` exhaustiveness test in `scm_transitions.rs`.
635/// Returns `true` for session statuses where the review observation is
636/// unlikely to change within a few seconds (the session is waiting on a
637/// human review or has changes requested). Used by the review backlog
638/// throttle to skip redundant REST API calls.
639pub(super) const fn is_review_stable(status: SessionStatus) -> bool {
640    matches!(
641        status,
642        SessionStatus::ChangesRequested | SessionStatus::ReviewPending | SessionStatus::Approved
643    )
644}
645
646/// Transitions that warrant notifying the parent orchestrator (issue #169).
647///
648/// Chosen so the orchestrator only gets messages that require a decision
649/// or at least a "status FYI" — not every intermediate tick. Noisy or
650/// purely-informational states (e.g. `Working`, `Spawning`, intermediate
651/// review flips) are intentionally excluded; the human/CLI can still see
652/// them via `ao-rs status` or the SSE event stream.
653pub(super) const fn is_orchestrator_notifiable(status: SessionStatus) -> bool {
654    matches!(
655        status,
656        SessionStatus::PrOpen
657            | SessionStatus::ReviewPending
658            | SessionStatus::CiFailed
659            | SessionStatus::ChangesRequested
660            | SessionStatus::Approved
661            | SessionStatus::Merged
662            | SessionStatus::MergeFailed
663            | SessionStatus::Killed
664            | SessionStatus::Terminated
665            | SessionStatus::Errored
666            | SessionStatus::NeedsInput
667            | SessionStatus::Stuck
668    )
669}
670
671/// Render the notification message the orchestrator sees in its
672/// terminal. Kept short and parseable — the orchestrator is usually an
673/// LLM agent that re-reads its scrollback.
674pub(super) fn format_orchestrator_notification(worker: &Session, to: SessionStatus) -> String {
675    let short: String = worker.id.0.chars().take(8).collect();
676    let pr = worker
677        .claimed_pr_url
678        .as_deref()
679        .or(worker.issue_url.as_deref())
680        .unwrap_or("none");
681    format!(
682        "[ao-rs] worker {short} is now {to} — branch: {branch}, url: {pr}",
683        branch = worker.branch,
684    )
685}
686
687pub(super) const fn is_stuck_eligible(status: SessionStatus) -> bool {
688    match status {
689        // Stuck-eligible: active work or PR-track where progress is expected.
690        SessionStatus::Working
691        | SessionStatus::PrOpen
692        | SessionStatus::CiFailed
693        | SessionStatus::ReviewPending
694        | SessionStatus::ChangesRequested
695        | SessionStatus::Approved
696        | SessionStatus::Mergeable => true,
697
698        // Not stuck-eligible:
699        //
700        // - Spawning: agent hasn't had its first activity poll yet;
701        //   idle_since would never populate for this state anyway.
702        // - Idle: the dedicated "no task assigned / waiting for work"
703        //   status, distinct from "currently working and momentarily
704        //   gone idle". A session in `Idle` is idle by design.
705        // - NeedsInput: already a known-blocked-on-human state with
706        //   its own (future) `agent-needs-input` reaction.
707        // - Stuck: already stuck. Re-entry is handled by the
708        //   `Stuck → Working` exit branch in `poll_one` step 4.
709        // - MergeFailed: Phase G parking state with its own retry
710        //   budget via the `approved-and-green` tracker. Conflating
711        //   with stuck would double-charge retries and confuse the
712        //   parking-loop accounting.
713        // - Terminal states (`Killed`, `Terminated`, `Done`,
714        //   `Cleanup`, `Errored`, `Merged`): filtered out by the
715        //   `tick()` pre-filter long before `check_stuck` is called.
716        //   Listed here so the exhaustive match stays exhaustive.
717        SessionStatus::Spawning
718        | SessionStatus::Idle
719        | SessionStatus::NeedsInput
720        | SessionStatus::Stuck
721        | SessionStatus::MergeFailed
722        | SessionStatus::Killed
723        | SessionStatus::Terminated
724        | SessionStatus::Done
725        | SessionStatus::Cleanup
726        | SessionStatus::Errored
727        | SessionStatus::Merged => false,
728    }
729}
730
731/// Handle returned by `LifecycleManager::spawn`. Dropping it does **not**
732/// stop the loop — the caller must `.stop().await` explicitly, so a
733/// CLI handler that accidentally drops the handle doesn't silently kill
734/// the background worker.
735pub struct LifecycleHandle {
736    join: tokio::task::JoinHandle<()>,
737    token: CancellationToken,
738}
739
740impl LifecycleHandle {
741    /// Signal the loop to stop and wait for it to finish the current tick.
742    pub async fn stop(self) {
743        self.token.cancel();
744        let _ = self.join.await;
745    }
746
747    /// Clone the cancellation token so sub-tasks can share shutdown.
748    pub fn cancellation_token(&self) -> CancellationToken {
749        self.token.clone()
750    }
751}
752
753/// Stable hash fingerprint of a `ReviewComment` slice.
754///
755/// Sorts by `(author, body, url)` for determinism (API order can vary),
756/// then folds through `DefaultHasher`. Used by `check_review_backlog` to
757/// detect when the pending-comments set has changed between ticks.
758pub(super) fn fingerprint_comments(comments: &[crate::scm::ReviewComment]) -> u64 {
759    use std::collections::hash_map::DefaultHasher;
760    let mut keys: Vec<(&str, &str, &str)> = comments
761        .iter()
762        .map(|c| (c.author.as_str(), c.body.as_str(), c.url.as_str()))
763        .collect();
764    keys.sort_unstable();
765    let mut h = DefaultHasher::new();
766    keys.hash(&mut h);
767    h.finish()
768}
769
770/// A minimal dummy `Session` used as the dispatch target for `all-complete`.
771///
772/// `all-complete` is a drain-level event — it has no per-session context.
773/// The engine needs a session to look up the project's reaction config, but
774/// since `all-complete` is a global reaction, any project id works here. We
775/// use an empty project id so config lookup falls back to the global entry.
776pub(super) fn all_complete_sentinel() -> Session {
777    use crate::types::{now_ms, SessionId};
778    Session {
779        id: SessionId("__all_complete__".into()),
780        project_id: String::new(),
781        status: SessionStatus::Done,
782        agent: String::new(),
783        agent_config: None,
784        branch: String::new(),
785        task: String::new(),
786        workspace_path: None,
787        runtime_handle: None,
788        runtime: String::new(),
789        activity: None,
790        created_at: now_ms(),
791        cost: None,
792        issue_id: None,
793        issue_url: None,
794        claimed_pr_number: None,
795        claimed_pr_url: None,
796        initial_prompt_override: None,
797        spawned_by: None,
798        last_merge_conflict_dispatched: None,
799        last_review_backlog_fingerprint: None,
800    }
801}
802
803#[cfg(test)]
804pub(crate) mod tests {
805    use super::*;
806    use crate::scm::{
807        CheckRun, CiStatus, MergeMethod, MergeReadiness, PrState, PullRequest, Review,
808        ReviewComment, ReviewDecision,
809    };
810    use crate::traits::Workspace;
811    use crate::types::{now_ms, SessionId, WorkspaceCreateConfig};
812    use async_trait::async_trait;
813    use std::path::{Path, PathBuf};
814    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
815    use std::sync::Mutex;
816    use std::time::{SystemTime, UNIX_EPOCH};
817
818    pub(crate) fn unique_temp_dir(label: &str) -> PathBuf {
819        static COUNTER: AtomicUsize = AtomicUsize::new(0);
820        let nanos = SystemTime::now()
821            .duration_since(UNIX_EPOCH)
822            .unwrap()
823            .as_nanos();
824        let n = COUNTER.fetch_add(1, Ordering::Relaxed);
825        std::env::temp_dir().join(format!("ao-rs-lifecycle-{label}-{nanos}-{n}"))
826    }
827
828    pub(crate) fn fake_session(id: &str, project: &str) -> Session {
829        Session {
830            id: SessionId(id.into()),
831            project_id: project.into(),
832            status: SessionStatus::Spawning,
833            agent: "claude-code".into(),
834            agent_config: None,
835            branch: format!("ao-{id}"),
836            task: "test task".into(),
837            workspace_path: Some(PathBuf::from("/tmp/ws")),
838            runtime_handle: Some(format!("runtime-{id}")),
839            runtime: "tmux".into(),
840            activity: None,
841            created_at: now_ms(),
842            cost: None,
843            issue_id: None,
844            issue_url: None,
845            claimed_pr_number: None,
846            claimed_pr_url: None,
847            initial_prompt_override: None,
848            spawned_by: None,
849            last_merge_conflict_dispatched: None,
850            last_review_backlog_fingerprint: None,
851        }
852    }
853
854    // ---------- Mock plugins ---------- //
855
856    /// Runtime mock with a toggleable `alive` flag and a recorder for
857    /// `send_message` calls. Tests that care about delivery can read
858    /// the recorder via `MockRuntime::sends()`; others ignore it.
859    pub(crate) struct MockRuntime {
860        pub(crate) alive: AtomicBool,
861        pub(crate) sends: Mutex<Vec<(String, String)>>,
862        pub(crate) destroys: Mutex<Vec<String>>,
863    }
864
865    impl MockRuntime {
866        pub(crate) fn new(alive: bool) -> Self {
867            Self {
868                alive: AtomicBool::new(alive),
869                sends: Mutex::new(Vec::new()),
870                destroys: Mutex::new(Vec::new()),
871            }
872        }
873
874        /// Snapshot of all `(handle, message)` pairs received by
875        /// `send_message` in the order they were called.
876        pub(crate) fn sends(&self) -> Vec<(String, String)> {
877            self.sends.lock().unwrap().clone()
878        }
879
880        #[allow(dead_code)]
881        pub(crate) fn destroyed_handles(&self) -> Vec<String> {
882            self.destroys.lock().unwrap().clone()
883        }
884    }
885
886    #[async_trait]
887    impl Runtime for MockRuntime {
888        async fn create(
889            &self,
890            _session_id: &str,
891            _cwd: &Path,
892            _launch_command: &str,
893            _env: &[(String, String)],
894        ) -> Result<String> {
895            Ok("mock-handle".into())
896        }
897        async fn send_message(&self, handle: &str, msg: &str) -> Result<()> {
898            self.sends
899                .lock()
900                .unwrap()
901                .push((handle.to_string(), msg.to_string()));
902            Ok(())
903        }
904        async fn is_alive(&self, _handle: &str) -> Result<bool> {
905            Ok(self.alive.load(Ordering::SeqCst))
906        }
907        async fn destroy(&self, handle: &str) -> Result<()> {
908            self.destroys.lock().unwrap().push(handle.to_string());
909            Ok(())
910        }
911    }
912
913    /// Agent mock that returns a scripted activity state each call.
914    pub(crate) struct MockAgent {
915        next: Mutex<ActivityState>,
916    }
917
918    impl MockAgent {
919        pub(crate) fn new(initial: ActivityState) -> Self {
920            Self {
921                next: Mutex::new(initial),
922            }
923        }
924        pub(crate) fn set(&self, state: ActivityState) {
925            *self.next.lock().unwrap() = state;
926        }
927    }
928
929    #[async_trait]
930    impl Agent for MockAgent {
931        fn launch_command(&self, _session: &Session) -> String {
932            "mock".into()
933        }
934        fn environment(&self, _session: &Session) -> Vec<(String, String)> {
935            vec![]
936        }
937        fn initial_prompt(&self, _session: &Session) -> String {
938            "".into()
939        }
940        async fn detect_activity(&self, _session: &Session) -> Result<ActivityState> {
941            Ok(*self.next.lock().unwrap())
942        }
943    }
944
945    #[allow(dead_code)]
946    pub(crate) struct MockWorkspace {
947        destroyed: Mutex<Vec<PathBuf>>,
948    }
949
950    #[allow(dead_code)]
951    impl MockWorkspace {
952        pub(crate) fn new() -> Self {
953            Self {
954                destroyed: Mutex::new(Vec::new()),
955            }
956        }
957
958        pub(crate) fn destroyed_paths(&self) -> Vec<PathBuf> {
959            self.destroyed.lock().unwrap().clone()
960        }
961    }
962
963    #[async_trait]
964    impl Workspace for MockWorkspace {
965        async fn create(&self, _cfg: &WorkspaceCreateConfig) -> Result<PathBuf> {
966            Ok(PathBuf::from("/tmp/ws"))
967        }
968        async fn destroy(&self, workspace_path: &Path) -> Result<()> {
969            self.destroyed
970                .lock()
971                .unwrap()
972                .push(workspace_path.to_path_buf());
973            Ok(())
974        }
975    }
976
977    /// Scriptable SCM mock. Every method returns a value pre-set by the
978    /// test. `detect_pr` returns `None` until `set_pr(Some(_))`. Each field
979    /// probe (`pr_state`, `ci_status`, …) can be toggled to emit an error
980    /// so tests cover the `TickError` branch of `poll_scm`.
981    ///
982    /// Kept intentionally minimal — we only test the handful of methods
983    /// `poll_scm` calls. `pending_comments`, `reviews`, `ci_checks`, `merge`
984    /// return empty/default values (enough to satisfy the trait).
985    pub(crate) struct MockScm {
986        pub(crate) pr: Mutex<Option<PullRequest>>,
987        pub(crate) state: Mutex<PrState>,
988        pub(crate) ci: Mutex<CiStatus>,
989        pub(crate) review: Mutex<ReviewDecision>,
990        pub(crate) readiness: Mutex<MergeReadiness>,
991        // Counter: incremented on every detect_pr call, so tests can assert
992        // "the loop actually called the plugin".
993        pub(crate) detect_calls: AtomicUsize,
994        // Error toggles. One per probe that `poll_scm` fans out to, so
995        // individual tests can force exactly one slot to fail and
996        // verify the error-aggregation path reports that slot by name.
997        pub(crate) detect_pr_errors: AtomicBool,
998        pub(crate) pr_state_errors: AtomicBool,
999        pub(crate) ci_status_errors: AtomicBool,
1000        pub(crate) review_decision_errors: AtomicBool,
1001        pub(crate) mergeability_errors: AtomicBool,
1002        // Phase G: `merge()` error toggle + call recorder so parking-loop
1003        // tests can script "fail first, succeed later" and assert that
1004        // the engine actually called the plugin the expected number of
1005        // times.
1006        pub(crate) merge_errors: AtomicBool,
1007        pub(crate) merge_calls: Mutex<Vec<(u32, Option<MergeMethod>)>>,
1008        // Issue #195: scriptable pending_comments and ci_checks for H2/H3 tests.
1009        pub(crate) pending_comments_result: Mutex<Vec<ReviewComment>>,
1010        pub(crate) ci_checks_result: Mutex<Vec<CheckRun>>,
1011    }
1012
1013    impl MockScm {
1014        pub(crate) fn new() -> Self {
1015            Self {
1016                pr: Mutex::new(None),
1017                state: Mutex::new(PrState::Open),
1018                ci: Mutex::new(CiStatus::Pending),
1019                review: Mutex::new(ReviewDecision::None),
1020                readiness: Mutex::new(MergeReadiness {
1021                    mergeable: false,
1022                    ci_passing: false,
1023                    approved: false,
1024                    no_conflicts: true,
1025                    blockers: vec!["pending".into()],
1026                }),
1027                detect_calls: AtomicUsize::new(0),
1028                detect_pr_errors: AtomicBool::new(false),
1029                pr_state_errors: AtomicBool::new(false),
1030                ci_status_errors: AtomicBool::new(false),
1031                review_decision_errors: AtomicBool::new(false),
1032                mergeability_errors: AtomicBool::new(false),
1033                merge_errors: AtomicBool::new(false),
1034                merge_calls: Mutex::new(Vec::new()),
1035                pending_comments_result: Mutex::new(vec![]),
1036                ci_checks_result: Mutex::new(vec![]),
1037            }
1038        }
1039        pub(crate) fn merges(&self) -> Vec<(u32, Option<MergeMethod>)> {
1040            self.merge_calls.lock().unwrap().clone()
1041        }
1042        pub(crate) fn set_pending_comments(&self, comments: Vec<ReviewComment>) {
1043            *self.pending_comments_result.lock().unwrap() = comments;
1044        }
1045        pub(crate) fn set_ci_checks(&self, checks: Vec<CheckRun>) {
1046            *self.ci_checks_result.lock().unwrap() = checks;
1047        }
1048        pub(crate) fn set_pr(&self, pr: Option<PullRequest>) {
1049            *self.pr.lock().unwrap() = pr;
1050        }
1051        pub(crate) fn set_state(&self, s: PrState) {
1052            *self.state.lock().unwrap() = s;
1053        }
1054        pub(crate) fn set_ci(&self, c: CiStatus) {
1055            *self.ci.lock().unwrap() = c;
1056        }
1057        pub(crate) fn set_review(&self, r: ReviewDecision) {
1058            *self.review.lock().unwrap() = r;
1059        }
1060        pub(crate) fn set_readiness(&self, r: MergeReadiness) {
1061            *self.readiness.lock().unwrap() = r;
1062        }
1063    }
1064
1065    #[async_trait]
1066    impl Scm for MockScm {
1067        fn name(&self) -> &str {
1068            "mock"
1069        }
1070        async fn detect_pr(&self, _session: &Session) -> Result<Option<PullRequest>> {
1071            self.detect_calls.fetch_add(1, Ordering::SeqCst);
1072            if self.detect_pr_errors.load(Ordering::SeqCst) {
1073                return Err(crate::error::AoError::Runtime("mock detect_pr".into()));
1074            }
1075            Ok(self.pr.lock().unwrap().clone())
1076        }
1077        async fn pr_state(&self, _pr: &PullRequest) -> Result<PrState> {
1078            if self.pr_state_errors.load(Ordering::SeqCst) {
1079                return Err(crate::error::AoError::Runtime("mock pr_state".into()));
1080            }
1081            Ok(*self.state.lock().unwrap())
1082        }
1083        async fn ci_checks(&self, _pr: &PullRequest) -> Result<Vec<CheckRun>> {
1084            Ok(self.ci_checks_result.lock().unwrap().clone())
1085        }
1086        async fn ci_status(&self, _pr: &PullRequest) -> Result<CiStatus> {
1087            if self.ci_status_errors.load(Ordering::SeqCst) {
1088                return Err(crate::error::AoError::Runtime("mock ci_status".into()));
1089            }
1090            Ok(*self.ci.lock().unwrap())
1091        }
1092        async fn reviews(&self, _pr: &PullRequest) -> Result<Vec<Review>> {
1093            Ok(vec![])
1094        }
1095        async fn review_decision(&self, _pr: &PullRequest) -> Result<ReviewDecision> {
1096            if self.review_decision_errors.load(Ordering::SeqCst) {
1097                return Err(crate::error::AoError::Runtime(
1098                    "mock review_decision".into(),
1099                ));
1100            }
1101            Ok(*self.review.lock().unwrap())
1102        }
1103        async fn pending_comments(&self, _pr: &PullRequest) -> Result<Vec<ReviewComment>> {
1104            Ok(self.pending_comments_result.lock().unwrap().clone())
1105        }
1106        async fn mergeability(&self, _pr: &PullRequest) -> Result<MergeReadiness> {
1107            if self.mergeability_errors.load(Ordering::SeqCst) {
1108                return Err(crate::error::AoError::Runtime("mock mergeability".into()));
1109            }
1110            Ok(self.readiness.lock().unwrap().clone())
1111        }
1112        async fn merge(&self, pr: &PullRequest, method: Option<MergeMethod>) -> Result<()> {
1113            if self.merge_errors.load(Ordering::SeqCst) {
1114                return Err(crate::error::AoError::Runtime("mock merge".into()));
1115            }
1116            self.merge_calls.lock().unwrap().push((pr.number, method));
1117            Ok(())
1118        }
1119    }
1120
1121    pub(crate) fn fake_pr(number: u32, branch: &str) -> PullRequest {
1122        PullRequest {
1123            number,
1124            url: format!("https://github.com/acme/widgets/pull/{number}"),
1125            title: "fix the widgets".into(),
1126            owner: "acme".into(),
1127            repo: "widgets".into(),
1128            branch: branch.into(),
1129            base_branch: "main".into(),
1130            is_draft: false,
1131        }
1132    }
1133
1134    // ---------- Test helpers ---------- //
1135
1136    pub(crate) async fn setup(
1137        label: &str,
1138        initial_activity: ActivityState,
1139    ) -> (
1140        Arc<LifecycleManager>,
1141        Arc<SessionManager>,
1142        Arc<MockRuntime>,
1143        Arc<MockAgent>,
1144        PathBuf,
1145    ) {
1146        use crate::session_manager::SessionManager;
1147        let base = unique_temp_dir(label);
1148        let sessions = Arc::new(SessionManager::new(base.clone()));
1149        let runtime = Arc::new(MockRuntime::new(true));
1150        let agent = Arc::new(MockAgent::new(initial_activity));
1151        let lifecycle = Arc::new(LifecycleManager::new(
1152            sessions.clone(),
1153            runtime.clone() as Arc<dyn Runtime>,
1154            agent.clone() as Arc<dyn Agent>,
1155        ));
1156        (lifecycle, sessions, runtime, agent, base)
1157    }
1158
1159    pub(crate) async fn recv_timeout(
1160        rx: &mut broadcast::Receiver<OrchestratorEvent>,
1161    ) -> Option<OrchestratorEvent> {
1162        tokio::time::timeout(Duration::from_millis(100), rx.recv())
1163            .await
1164            .ok()
1165            .and_then(|r| r.ok())
1166    }
1167
1168    pub(crate) async fn drain_events(
1169        rx: &mut broadcast::Receiver<OrchestratorEvent>,
1170    ) -> Vec<OrchestratorEvent> {
1171        let mut out = Vec::new();
1172        while let Some(e) = recv_timeout(rx).await {
1173            out.push(e);
1174        }
1175        out
1176    }
1177
1178    pub(crate) fn rewind_idle_since(
1179        lifecycle: &LifecycleManager,
1180        session_id: &SessionId,
1181        by: Duration,
1182    ) {
1183        let mut map = lifecycle.idle_since.lock().unwrap_or_else(|e| {
1184            tracing::error!("idle_since mutex poisoned; recovering inner state: {e}");
1185            e.into_inner()
1186        });
1187        let rewound = Instant::now()
1188            .checked_sub(by)
1189            .expect("test clock rewind underflowed Instant");
1190        map.insert(session_id.clone(), rewound);
1191    }
1192
1193    pub(crate) async fn setup_with_scm(
1194        label: &str,
1195    ) -> (
1196        Arc<LifecycleManager>,
1197        Arc<SessionManager>,
1198        Arc<MockScm>,
1199        PathBuf,
1200    ) {
1201        use crate::session_manager::SessionManager;
1202        let base = unique_temp_dir(label);
1203        let sessions = Arc::new(SessionManager::new(base.clone()));
1204        let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1205        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
1206        let scm = Arc::new(MockScm::new());
1207        let lifecycle = Arc::new(
1208            LifecycleManager::new(sessions.clone(), runtime, agent)
1209                .with_scm(scm.clone() as Arc<dyn Scm>),
1210        );
1211        (lifecycle, sessions, scm, base)
1212    }
1213
1214    pub(crate) fn script_ready_pr(scm: &MockScm, pr_number: u32) {
1215        scm.set_pr(Some(fake_pr(pr_number, "ao-s1")));
1216        scm.set_state(PrState::Open);
1217        scm.set_ci(CiStatus::Passing);
1218        scm.set_review(ReviewDecision::Approved);
1219        scm.set_readiness(MergeReadiness {
1220            mergeable: true,
1221            ci_passing: true,
1222            approved: true,
1223            no_conflicts: true,
1224            blockers: vec![],
1225        });
1226    }
1227
1228    pub(crate) async fn setup_with_scm_and_auto_merge_engine(
1229        label: &str,
1230        retries: Option<u32>,
1231    ) -> (
1232        Arc<LifecycleManager>,
1233        Arc<SessionManager>,
1234        Arc<MockScm>,
1235        Arc<ReactionEngine>,
1236        PathBuf,
1237    ) {
1238        use crate::reactions::ReactionConfig;
1239        use crate::session_manager::SessionManager;
1240        let base = unique_temp_dir(label);
1241        let sessions = Arc::new(SessionManager::new(base.clone()));
1242        let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1243        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
1244        let scm = Arc::new(MockScm::new());
1245
1246        let lifecycle = LifecycleManager::new(sessions.clone(), runtime, agent);
1247
1248        let mut cfg = ReactionConfig::new(ReactionAction::AutoMerge);
1249        cfg.retries = retries;
1250        let mut map = std::collections::HashMap::new();
1251        map.insert("approved-and-green".into(), cfg);
1252
1253        let engine_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1254        let engine = Arc::new(
1255            ReactionEngine::new(map, engine_runtime, lifecycle.events_sender())
1256                .with_scm(scm.clone() as Arc<dyn Scm>),
1257        );
1258
1259        let lifecycle = Arc::new(
1260            lifecycle
1261                .with_reaction_engine(engine.clone())
1262                .with_scm(scm.clone() as Arc<dyn Scm>),
1263        );
1264        (lifecycle, sessions, scm, engine, base)
1265    }
1266
1267    pub(crate) async fn setup_with_merge_conflicts_engine(
1268        label: &str,
1269    ) -> (
1270        Arc<LifecycleManager>,
1271        Arc<SessionManager>,
1272        Arc<MockScm>,
1273        Arc<MockRuntime>,
1274        Arc<ReactionEngine>,
1275        PathBuf,
1276    ) {
1277        use crate::reactions::ReactionConfig;
1278        use crate::session_manager::SessionManager;
1279        let base = unique_temp_dir(label);
1280        let sessions = Arc::new(SessionManager::new(base.clone()));
1281        let runtime = Arc::new(MockRuntime::new(true));
1282        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
1283        let scm = Arc::new(MockScm::new());
1284
1285        let lifecycle =
1286            LifecycleManager::new(sessions.clone(), runtime.clone() as Arc<dyn Runtime>, agent);
1287
1288        let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
1289        cfg.message = Some("please rebase".into());
1290        let mut map = std::collections::HashMap::new();
1291        map.insert("merge-conflicts".into(), cfg);
1292
1293        let engine_runtime: Arc<dyn Runtime> = runtime.clone() as Arc<dyn Runtime>;
1294        let engine = Arc::new(ReactionEngine::new(
1295            map,
1296            engine_runtime,
1297            lifecycle.events_sender(),
1298        ));
1299
1300        let lifecycle = Arc::new(
1301            lifecycle
1302                .with_reaction_engine(engine.clone())
1303                .with_scm(scm.clone() as Arc<dyn Scm>),
1304        );
1305        (lifecycle, sessions, scm, runtime, engine, base)
1306    }
1307
1308    pub(crate) async fn setup_stuck(
1309        label: &str,
1310        threshold: Option<&str>,
1311    ) -> (
1312        Arc<LifecycleManager>,
1313        Arc<SessionManager>,
1314        Arc<MockAgent>,
1315        PathBuf,
1316    ) {
1317        use crate::reactions::ReactionConfig;
1318        use crate::session_manager::SessionManager;
1319        let base = unique_temp_dir(label);
1320        let sessions = Arc::new(SessionManager::new(base.clone()));
1321        let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1322        let agent = Arc::new(MockAgent::new(ActivityState::Idle));
1323
1324        let lifecycle =
1325            LifecycleManager::new(sessions.clone(), runtime, agent.clone() as Arc<dyn Agent>);
1326
1327        let mut cfg = ReactionConfig::new(ReactionAction::Notify);
1328        cfg.message = Some("stuck!".into());
1329        cfg.threshold = threshold.map(String::from);
1330        let mut map = std::collections::HashMap::new();
1331        map.insert("agent-stuck".into(), cfg);
1332        let engine_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1333        let engine = Arc::new(ReactionEngine::new(
1334            map,
1335            engine_runtime,
1336            lifecycle.events_sender(),
1337        ));
1338        let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine));
1339        (lifecycle, sessions, agent, base)
1340    }
1341
1342    pub(crate) fn build_engine_with_ci_failed(
1343        lifecycle: &LifecycleManager,
1344        message: &str,
1345    ) -> Arc<ReactionEngine> {
1346        use crate::reactions::ReactionConfig;
1347        let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
1348        cfg.message = Some(message.into());
1349        let mut map = std::collections::HashMap::new();
1350        map.insert("ci-failed".into(), cfg);
1351
1352        let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1353        Arc::new(ReactionEngine::new(map, runtime, lifecycle.events_sender()))
1354    }
1355
1356    // Actual test functions live in the per-submodule test modules:
1357    // - tick::tests (tick.rs)
1358    // - scm_poll::tests (scm_poll.rs)
1359    // - transition::tests (transition.rs)
1360    // - stuck::tests (stuck.rs)
1361    //
1362    // The helpers above (fake_session, setup, recv_timeout, etc.) are
1363    // shared via `pub(crate)` visibility and imported by those modules.
1364}