ao_core/lifecycle/mod.rs
1//! Background polling loop that keeps `Session` state in sync with reality.
2//!
3//! Corresponds to `packages/core/src/lifecycle-manager.ts` in the reference
4//! repo, trimmed to what Slice 1 Phase C actually needs:
5//!
6//! 1. Every `poll_interval`, list all non-terminal sessions from disk.
7//! 2. For each one, probe `Runtime::is_alive` and `Agent::detect_activity`.
8//! 3. Apply state transitions and persist the new `Session` atomically.
9//! 4. Broadcast `OrchestratorEvent`s so subscribers (CLI, reaction engine,
10//! notifiers, …) can react without polling themselves.
11//!
12//! Design notes:
13//!
14//! - **Trait objects, not generics.** The manager owns `Arc<dyn Runtime>`
15//! etc. so the same `LifecycleManager` type can be used in tests (with
16//! mocks) and in the real CLI (with tmux/claude-code). Generic parameters
17//! would have leaked through every consumer.
18//!
19//! - **Disk is the source of truth.** The loop re-reads from
20//! `SessionManager::list` each tick rather than holding state in memory.
21//! This matches the Slice 1 design principle established in Phase A, and
22//! means `ao-rs spawn` running in a separate process is immediately
23//! visible on the next tick. (A future Slice 2+ may add an in-memory
24//! cache + file-watcher for efficiency.)
25//!
26//! - **Per-session errors don't stop the loop.** If one session's runtime
27//! probe fails, we emit `TickError` and continue. Only fatal `SessionManager::list`
28//! errors bubble up (and even then we log and keep looping).
29//!
30//! - **Event channel lag.** We use `tokio::sync::broadcast`, which drops
31//! old events when a slow subscriber can't keep up. That's fine for
32//! observability — a reaction engine that misses a tick just picks up
33//! the next one. Anyone needing lossless delivery should snapshot via
34//! `SessionManager::list` on startup and then subscribe.
35
36use crate::{
37 error::Result,
38 events::{OrchestratorEvent, TerminationReason},
39 reaction_engine::{parse_duration, status_to_reaction_key, ReactionEngine},
40 reactions::{ReactionAction, ReactionOutcome},
41 scm::{CheckStatus, CiStatus, MergeReadiness, PrState, PullRequest, ReviewDecision},
42 scm_transitions::{derive_scm_status, ScmObservation},
43 session_manager::SessionManager,
44 traits::{Agent, Runtime, Scm, Workspace},
45 types::{ActivityState, Session, SessionId, SessionStatus},
46};
47use std::{
48 collections::{HashMap, HashSet},
49 hash::{Hash, Hasher},
50 sync::{
51 atomic::{AtomicBool, AtomicU64, Ordering},
52 Arc, Mutex,
53 },
54 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
55};
56use tokio::sync::broadcast;
57use tokio_util::sync::CancellationToken;
58
59mod scm_poll;
60mod stuck;
61mod tick;
62mod transition;
63
64/// How many events the broadcast channel buffers before dropping the oldest.
65/// 1024 is generous — a healthy loop emits at most ~N events per tick where
66/// N is the session count, and slow subscribers will lag at most a handful
67/// of ticks before catching up.
68const EVENT_CHANNEL_CAPACITY: usize = 1024;
69
70/// Default poll interval. Increased from the TS reference's 5 s to 10 s
71/// to further reduce GitHub API pressure now that batch enrichment +
72/// ETag guards handle the hot path.
73pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(10);
74
75/// Minimum time between review-backlog API calls per session (2 min).
76/// Mirrors `REVIEW_BACKLOG_THROTTLE_MS = 120_000` in the TS reference.
77/// Applies when the batch enrichment cache has no hit and the session is
78/// already in a review-related state — avoids hammering the REST API for
79/// review data that rarely changes within a 2-minute window.
80const REVIEW_BACKLOG_THROTTLE: Duration = Duration::from_secs(120);
81
82pub struct LifecycleManager {
83 pub(super) sessions: Arc<SessionManager>,
84 pub(super) runtime: Arc<dyn Runtime>,
85 pub(super) agent: Arc<dyn Agent>,
86 pub(super) events_tx: broadcast::Sender<OrchestratorEvent>,
87 poll_interval: Duration,
88 /// Optional Slice 2 Phase D reaction engine. When set, every status
89 /// transition into a reaction-triggering state (see
90 /// `status_to_reaction_key`) calls `engine.dispatch(...)`. When unset,
91 /// the lifecycle loop behaves exactly as it did in Phase C.
92 pub(super) reaction_engine: Option<Arc<ReactionEngine>>,
93 /// Optional Slice 2 Phase F SCM plugin. When set, every tick calls
94 /// `detect_pr` on each non-terminal session; a fresh PR observation
95 /// is folded through `derive_scm_status` to produce PR-driven status
96 /// transitions (`Working → PrOpen/CiFailed/ChangesRequested/…`).
97 /// When unset, the lifecycle loop is exactly the Phase C/D behaviour
98 /// — SCM polling is off. This matches how tests and `ao-rs watch`
99 /// without a configured plugin should behave.
100 pub(super) scm: Option<Arc<dyn Scm>>,
101 /// Optional workspace plugin. When set, sessions that transition to
102 /// `Merged` automatically have their worktree destroyed so disk space
103 /// is reclaimed without a manual `ao-rs cleanup`.
104 pub(super) workspace: Option<Arc<dyn Workspace>>,
105 /// Slice 2 Phase H bookkeeping for agent-stuck detection.
106 ///
107 /// Records the `Instant` at which each session first entered an idle
108 /// activity state (`Idle` / `Blocked`). `check_stuck` reads this map
109 /// to decide whether the session has been idle longer than the
110 /// configured `agent-stuck.threshold`.
111 ///
112 /// - An entry is **inserted** the first tick activity flips into
113 /// `Idle`/`Blocked` and **preserved** across subsequent idle ticks
114 /// so `elapsed()` grows monotonically.
115 /// - An entry is **removed** as soon as activity flips back to any
116 /// non-idle state, so the next idle streak restarts the clock.
117 /// - `terminate` also clears the entry to bound memory for long-
118 /// running watch loops (Task 2.7).
119 ///
120 /// `Mutex<HashMap>` mirrors how `ReactionEngine::trackers` is stored —
121 /// short critical sections around pure read/modify/write, no nested
122 /// locking.
123 pub(super) idle_since: Mutex<HashMap<SessionId, Instant>>,
124 /// Per-tick cache of batch-enriched PR observations.
125 ///
126 /// Populated once at the start of each `tick()` call via
127 /// `Scm::enrich_prs_batch()`. Individual `poll_scm` calls check this
128 /// cache first and skip the 4× REST fan-out when they find a hit.
129 /// Cleared at the start of the next tick.
130 ///
131 /// Key format: `"{owner}/{repo}#{number}"`.
132 pub(super) pr_enrichment_cache: Mutex<HashMap<String, ScmObservation>>,
133 /// Per-session timestamp of the last review backlog API check.
134 /// Throttles `pending_comments` calls to at most once per 2 minutes.
135 pub(super) last_review_backlog_check: Mutex<HashMap<SessionId, Instant>>,
136 /// Per-tick cache of detected PRs from `detect_pr`. Populated in
137 /// `tick()` Pass 1 so `poll_scm` reuses the result instead of
138 /// calling `detect_pr` a second time.
139 pub(super) detected_prs_cache: Mutex<HashMap<SessionId, Option<PullRequest>>>,
140 /// Unix-ms when `run_loop` started. `0` means "not yet started"
141 /// (e.g. tests driving `tick` directly). Used by `tick` to
142 /// classify first-seen sessions as `SessionRestored` (created
143 /// before startup) vs. `Spawned` (created after).
144 pub(super) startup_ms: AtomicU64,
145 /// Set to `true` once `all-complete` has been dispatched for the
146 /// current drain cycle (all active sessions reached terminal state).
147 /// Reset to `false` on the first tick that observes a new non-terminal
148 /// session, so a fresh batch of sessions gets its own `all-complete`.
149 pub(super) all_complete_fired: AtomicBool,
150}
151
152impl LifecycleManager {
153 pub fn new(
154 sessions: Arc<SessionManager>,
155 runtime: Arc<dyn Runtime>,
156 agent: Arc<dyn Agent>,
157 ) -> Self {
158 let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
159 Self {
160 sessions,
161 runtime,
162 agent,
163 events_tx,
164 poll_interval: DEFAULT_POLL_INTERVAL,
165 reaction_engine: None,
166 scm: None,
167 workspace: None,
168 idle_since: Mutex::new(HashMap::new()),
169 pr_enrichment_cache: Mutex::new(HashMap::new()),
170 last_review_backlog_check: Mutex::new(HashMap::new()),
171 detected_prs_cache: Mutex::new(HashMap::new()),
172 startup_ms: AtomicU64::new(0),
173 all_complete_fired: AtomicBool::new(false),
174 }
175 }
176
177 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
178 self.poll_interval = interval;
179 self
180 }
181
182 /// Attach a reaction engine. `ReactionEngine::new` should be given
183 /// this `LifecycleManager`'s `events_tx` (via `events_sender()`) so
184 /// reaction events land on the same broadcast channel as lifecycle
185 /// events. Builder form for test ergonomics.
186 pub fn with_reaction_engine(mut self, engine: Arc<ReactionEngine>) -> Self {
187 self.reaction_engine = Some(engine);
188 self
189 }
190
191 /// Attach an SCM plugin. When present, `poll_one` fans out a
192 /// `detect_pr` + four parallel `pr_state`/`ci_status`/`review_decision`/
193 /// `mergeability` probes per session per tick, then routes the result
194 /// through `derive_scm_status` for PR-driven status transitions.
195 ///
196 /// Builder form mirrors `with_reaction_engine` — call sites that don't
197 /// care about SCM polling leave it unset and get Phase C/D behaviour.
198 pub fn with_scm(mut self, scm: Arc<dyn Scm>) -> Self {
199 self.scm = Some(scm);
200 self
201 }
202
203 /// Attach a workspace plugin. When present, sessions that transition to
204 /// `Merged` automatically have their worktree destroyed within the same
205 /// poll cycle. Sessions with `workspace_path: None` are unaffected.
206 pub fn with_workspace(mut self, workspace: Arc<dyn Workspace>) -> Self {
207 self.workspace = Some(workspace);
208 self
209 }
210
211 /// Borrow the underlying broadcast sender so a `ReactionEngine`
212 /// constructed separately can publish events on the same channel
213 /// `LifecycleManager` uses. Cheap clone — `broadcast::Sender` is
214 /// internally ref-counted.
215 pub fn events_sender(&self) -> broadcast::Sender<OrchestratorEvent> {
216 self.events_tx.clone()
217 }
218
219 /// Get a fresh subscriber. Each `recv()` call sees events from the
220 /// point of subscription onward — history is not replayed.
221 pub fn subscribe(&self) -> broadcast::Receiver<OrchestratorEvent> {
222 self.events_tx.subscribe()
223 }
224
225 /// Spawn the background polling loop. Returns a handle that can be
226 /// used to stop it cleanly.
227 ///
228 /// We use `tokio_util::sync::CancellationToken` rather than a oneshot
229 /// because cancellation tokens are cheap to clone and can be passed
230 /// into future sub-tasks (e.g. a reaction engine that shares this
231 /// manager's shutdown signal).
232 pub fn spawn(self: Arc<Self>) -> LifecycleHandle {
233 let token = CancellationToken::new();
234 let child_token = token.clone();
235 let this = self.clone();
236 let join = tokio::spawn(async move {
237 this.run_loop(child_token).await;
238 });
239 LifecycleHandle { join, token }
240 }
241
242 /// The loop body. Ticks on `poll_interval`, exits cleanly when the
243 /// cancellation token fires.
244 async fn run_loop(self: Arc<Self>, token: CancellationToken) {
245 // Per-loop memory of which session IDs we've already announced via
246 // `Spawned`, so we emit it exactly once per session observed.
247 let mut seen: HashSet<SessionId> = HashSet::new();
248
249 // Record startup time so `tick` can distinguish sessions that
250 // predate this loop (emitted as `SessionRestored`) from sessions
251 // created after startup (emitted as `Spawned`). Set *before* the
252 // sweep so any session whose `created_at` equals or exceeds this
253 // moment is classified as new.
254 let startup_ms = SystemTime::now()
255 .duration_since(UNIX_EPOCH)
256 .map(|d| d.as_millis() as u64)
257 .unwrap_or(0);
258 // `0` is the "not started" sentinel — guard against clock skew
259 // that would stamp it as such.
260 self.startup_ms.store(startup_ms.max(1), Ordering::Relaxed);
261
262 // Crash-recovery sweep: if the process restarted after a session was
263 // persisted as `Merged` but before its worktree was destroyed (e.g.
264 // the daemon was killed mid-tick), the transition tick will never fire
265 // again because terminal sessions are skipped by `poll_one`. We scan
266 // once at startup and clean up any leftover worktrees.
267 self.sweep_merged_worktrees().await;
268
269 let mut ticker = tokio::time::interval(self.poll_interval);
270 // Skip the immediate-fire behaviour of `interval` — users expect
271 // "start, wait, tick" not "start, tick, wait". (The TS loop
272 // behaves the same way.)
273 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
274
275 loop {
276 tokio::select! {
277 _ = token.cancelled() => {
278 tracing::debug!("lifecycle loop received cancel");
279 return;
280 }
281 _ = ticker.tick() => {
282 if let Err(e) = self.tick(&mut seen).await {
283 // Fatal disk read error — log and keep going.
284 // A transient `~/.ao-rs/sessions` permission issue
285 // shouldn't permanently kill the loop.
286 tracing::error!("lifecycle tick failed: {e}");
287 }
288 }
289 }
290 }
291 }
292
293 /// One pass over every non-terminal session. Public so tests can
294 /// drive the state machine deterministically without `sleep`ing.
295 pub async fn tick(&self, seen: &mut HashSet<SessionId>) -> Result<()> {
296 let sessions = self.sessions.list().await?;
297
298 // ---- Batch PR enrichment (rate-limit optimization) ----
299 // Two-pass approach:
300 // Pass 1: detect_pr for each non-terminal session → collect PRs
301 // Batch: enrich_prs_batch for all collected PRs → populate cache
302 // Pass 2: poll_one (which calls poll_scm) consumes from cache,
303 // skipping the 4× REST fan-out when a batch hit exists.
304 //
305 // detect_pr runs once per session per tick (same as before); the
306 // savings come from replacing 4× REST per session with 1 GraphQL
307 // batch for all sessions.
308
309 // Clear the previous tick's cache.
310 {
311 let mut cache = self.pr_enrichment_cache.lock().unwrap_or_else(|e| {
312 tracing::error!("pr_enrichment_cache mutex poisoned; recovering inner state: {e}");
313 e.into_inner()
314 });
315 cache.clear();
316 }
317
318 // Pass 1: detect PRs (only when SCM is configured).
319 // Store detected PRs keyed by session ID so poll_scm can reuse them.
320 let mut detected_prs: HashMap<SessionId, Option<PullRequest>> = HashMap::new();
321 if let Some(scm) = self.scm.as_ref() {
322 let mut prs_for_batch = Vec::new();
323 for session in &sessions {
324 if session.is_terminal() {
325 continue;
326 }
327 let id = session.id.clone();
328 match scm.detect_pr(session).await {
329 Ok(pr) => {
330 if let Some(ref p) = pr {
331 prs_for_batch.push(p.clone());
332 }
333 detected_prs.insert(id, pr);
334 }
335 Err(e) => {
336 self.emit(OrchestratorEvent::TickError {
337 id: id.clone(),
338 message: format!("scm.detect_pr: {e}"),
339 });
340 detected_prs.insert(id, None);
341 }
342 }
343 }
344
345 // Batch enrichment
346 if !prs_for_batch.is_empty() {
347 match scm.enrich_prs_batch(&prs_for_batch).await {
348 Ok(enrichment) => {
349 if !enrichment.is_empty() {
350 tracing::debug!(
351 "[batch enrichment] cached {} PR observations",
352 enrichment.len()
353 );
354 let mut cache = self
355 .pr_enrichment_cache
356 .lock()
357 .unwrap_or_else(|e| {
358 tracing::error!("pr_enrichment_cache mutex poisoned; recovering inner state: {e}");
359 e.into_inner()
360 });
361 *cache = enrichment;
362 }
363 }
364 Err(e) => {
365 tracing::warn!("[batch enrichment] failed: {e}");
366 }
367 }
368 }
369 }
370
371 // Store detected PRs so poll_scm can consume them.
372 {
373 let mut cache = self.detected_prs_cache.lock().unwrap_or_else(|e| {
374 tracing::error!("detected_prs_cache mutex poisoned; recovering inner state: {e}");
375 e.into_inner()
376 });
377 *cache = detected_prs;
378 }
379
380 // Pass 2: poll each session.
381 let startup_ms = self.startup_ms.load(Ordering::Relaxed);
382 let mut any_active = false;
383 for session in sessions {
384 let id = session.id.clone();
385 if seen.insert(id.clone()) {
386 // Sessions that predate loop startup are restored from disk,
387 // not newly spawned. When `startup_ms == 0` (tests driving
388 // `tick` directly, no `run_loop`), preserve the original
389 // behaviour and emit `Spawned` for everything.
390 if startup_ms != 0 && session.created_at < startup_ms {
391 self.emit(OrchestratorEvent::SessionRestored {
392 id: id.clone(),
393 project_id: session.project_id.clone(),
394 status: session.status,
395 });
396 } else {
397 self.emit(OrchestratorEvent::Spawned {
398 id,
399 project_id: session.project_id.clone(),
400 });
401 }
402 }
403
404 if session.is_terminal() {
405 continue;
406 }
407
408 any_active = true;
409 // A fresh non-terminal session re-arms the all-complete gate so
410 // a subsequent drain fires a new `all-complete`.
411 self.all_complete_fired.store(false, Ordering::Relaxed);
412
413 if let Err(e) = self.poll_one(session).await {
414 tracing::warn!("poll_one failed: {e}");
415 }
416 }
417
418 // ---- all-complete (issue #195 H3) ----
419 // When all seen sessions are terminal and we have seen at least one,
420 // dispatch `all-complete` exactly once per drain cycle.
421 if !any_active && !seen.is_empty() && !self.all_complete_fired.load(Ordering::Relaxed) {
422 if let Some(engine) = self.reaction_engine.as_ref() {
423 // `all-complete` has no session context — we use a synthetic
424 // sentinel session so the engine can look up the reaction
425 // config. This mirrors how TS fires a summary-level event.
426 let sentinel = all_complete_sentinel();
427 match engine.dispatch(&sentinel, "all-complete").await {
428 Ok(_) => {
429 self.all_complete_fired.store(true, Ordering::Relaxed);
430 }
431 Err(e) => {
432 tracing::warn!(error = %e, "all-complete dispatch failed");
433 }
434 }
435 }
436 }
437
438 Ok(())
439 }
440
441 /// Maintain the `idle_since` map in response to a fresh activity
442 /// reading.
443 ///
444 /// - `Idle` or `Blocked` → insert `Instant::now()` **only if** the
445 /// session isn't already in the map. Preserving the older timestamp
446 /// means an entry that has been idle for three ticks is still
447 /// three-ticks-old on the fourth tick, so `elapsed()` grows
448 /// monotonically across the idle streak.
449 /// - Any other activity state → remove the entry so the next idle
450 /// streak restarts the clock from zero.
451 ///
452 /// Called unconditionally from `poll_one` after the persist-activity
453 /// block. Terminal activities (`Exited`) never reach this helper —
454 /// `poll_one` short-circuits to `terminate` beforehand, which clears
455 /// the entry via `idle_since.remove` (Task 2.7).
456 pub(super) fn update_idle_since(&self, session_id: &SessionId, activity: ActivityState) {
457 let mut map = self.idle_since.lock().unwrap_or_else(|e| {
458 tracing::error!("lifecycle idle_since mutex poisoned; recovering inner state: {e}");
459 e.into_inner()
460 });
461 match activity {
462 ActivityState::Idle | ActivityState::Blocked => {
463 map.entry(session_id.clone()).or_insert_with(Instant::now);
464 }
465 _ => {
466 map.remove(session_id);
467 }
468 }
469 }
470
471 /// Fire an event into the broadcast channel. A send error only means
472 /// there are currently zero subscribers — that's expected during CLI
473 /// startup and not worth surfacing.
474 pub(super) fn emit(&self, event: OrchestratorEvent) {
475 let _ = self.events_tx.send(event);
476 }
477}
478
479/// Fold four `Result<_, AoError>` probe results into a single
480/// `ScmObservation`, or produce a `"probe_name: error; …"` diagnostic
481/// string listing every probe that failed.
482///
483/// The `poll_scm` tick refuses to transition on a partial observation —
484/// it's all or nothing — so the caller's path after this helper is a
485/// single `match` between "we have all four fields" and "emit TickError".
486///
487/// Free function rather than a method because the decision has no
488/// `&self` dependencies; extracting it keeps the async fan-out in
489/// `poll_scm` readable at one level of abstraction.
490pub(super) fn assemble_observation(
491 state: Result<PrState>,
492 ci: Result<CiStatus>,
493 review: Result<ReviewDecision>,
494 readiness: Result<MergeReadiness>,
495) -> std::result::Result<ScmObservation, String> {
496 match (state, ci, review, readiness) {
497 (Ok(state), Ok(ci), Ok(review), Ok(readiness)) => Ok(ScmObservation {
498 state,
499 ci,
500 review,
501 readiness,
502 }),
503 (state, ci, review, readiness) => {
504 // Join whichever errors fired into one human-readable
505 // message. Each slot contributes `"<slot>: <err>"` or
506 // nothing; empty output is impossible because we only hit
507 // this arm when at least one was Err.
508 let parts: Vec<String> = [
509 state.err().map(|e| format!("pr_state: {e}")),
510 ci.err().map(|e| format!("ci_status: {e}")),
511 review.err().map(|e| format!("review_decision: {e}")),
512 readiness.err().map(|e| format!("mergeability: {e}")),
513 ]
514 .into_iter()
515 .flatten()
516 .collect();
517 Err(parts.join("; "))
518 }
519 }
520}
521
522/// Should the lifecycle park this session in `MergeFailed` after
523/// dispatching `approved-and-green`?
524///
525/// Yes iff we *just* entered `Mergeable`, `reactions.approved-and-green`
526/// is configured with `action: auto-merge`, and the dispatch soft-failed
527/// *without* escalating. Parking is keyed off the **configured** action,
528/// not `ReactionOutcome::action`, so a mismatched or escalated outcome
529/// cannot trap a `notify` / `send-to-agent` rule in the merge retry
530/// loop.
531///
532/// Escalated outcomes are deliberately **not** parked: once the retry
533/// budget is exhausted the human has been notified, and bouncing the
534/// session into `MergeFailed → Mergeable → escalate → Notify → ...`
535/// on every tick would spam the notification channel. Leaving the
536/// session in `Mergeable` visually says "ready, but auto-merge gave
537/// up" — any subsequent observation change (CI flake, reviewer
538/// dismissal, branch deletion) will naturally flip it off the ready
539/// path via the normal ladder.
540pub(super) fn should_park_in_merge_failed(
541 engine: &ReactionEngine,
542 session: &Session,
543 to: SessionStatus,
544 reaction_key: &str,
545 outcome: &ReactionOutcome,
546) -> bool {
547 to == SessionStatus::Mergeable
548 && reaction_key == "approved-and-green"
549 && engine
550 .resolve_reaction_config(session, reaction_key)
551 .is_some_and(|c| c.action == ReactionAction::AutoMerge)
552 && !outcome.success
553 && !outcome.escalated
554}
555
556/// Clear reaction trackers on status transitions, with a carve-out
557/// for the `Mergeable ↔ MergeFailed` parking loop.
558///
559/// The default rule is simple: on exit from a reaction-triggering
560/// status, clear that reaction's tracker so a future re-entry starts
561/// with a full retry budget. Phase G needs a carve-out because the
562/// parking loop repeatedly re-enters `Mergeable` on purpose, and the
563/// retry budget is supposed to *accumulate* across those re-entries
564/// — clearing the tracker on `Mergeable → MergeFailed` or
565/// `MergeFailed → Mergeable` would reset attempts to zero and the
566/// retry cap would never fire.
567///
568/// The exit case `MergeFailed → anything_but_Mergeable` (CI flipped
569/// red, reviewer dismissed, PR closed) is subtle: the parking loop
570/// is over, so a later re-entry from `PrOpen → Mergeable` should
571/// start fresh. `status_to_reaction_key(MergeFailed) == None`, so
572/// the default-rule branch below wouldn't clear anything — we need
573/// an explicit `clear_tracker("approved-and-green")` for this case.
574pub(super) fn clear_tracker_on_transition(
575 engine: &ReactionEngine,
576 session_id: &SessionId,
577 from: SessionStatus,
578 to: SessionStatus,
579) {
580 // Parking-loop edges: preserve the `approved-and-green` tracker
581 // so retry accounting accumulates across the loop.
582 let parking_loop_edge = matches!(
583 (from, to),
584 (SessionStatus::Mergeable, SessionStatus::MergeFailed)
585 | (SessionStatus::MergeFailed, SessionStatus::Mergeable)
586 );
587 if parking_loop_edge {
588 return;
589 }
590
591 // Leaving `MergeFailed` to a non-`Mergeable` state: the retry
592 // loop is over (observation moved off the ready path), so clear
593 // the parked tracker. The default-rule branch below would miss
594 // this because `status_to_reaction_key(MergeFailed) == None`.
595 if from == SessionStatus::MergeFailed {
596 engine.clear_tracker(session_id, "approved-and-green");
597 return;
598 }
599
600 // `CiFailed` is excluded from `status_to_reaction_key` (issue #195 H3)
601 // because its dispatch is handled by `check_ci_failed`, not `transition`.
602 // Explicitly clear the tracker here so a second CI failure episode
603 // after a fix starts with a fresh retry budget.
604 if from == SessionStatus::CiFailed {
605 engine.clear_tracker(session_id, "ci-failed");
606 return;
607 }
608
609 // Default rule: clear the `from` reaction's tracker on exit.
610 if let Some(prev_key) = status_to_reaction_key(from) {
611 engine.clear_tracker(session_id, prev_key);
612 }
613}
614
615/// Is `status` eligible for stuck detection? I.e., if a session in
616/// this status has been observed with `Idle`/`Blocked` activity for
617/// longer than the `agent-stuck` reaction's `threshold`, should it be
618/// flipped to `Stuck`?
619///
620/// Phase H. The set matches the "work in progress" statuses where a
621/// silent agent is genuinely unexpected:
622///
623/// - `Working`: happy path coder lost its train of thought.
624/// - `PrOpen` / `CiFailed` / `ReviewPending` / `ChangesRequested`
625/// / `Approved` / `Mergeable`: PR-track states where the agent is
626/// waiting on or reacting to CI / a reviewer, and should be
627/// actively working (applying review comments, re-running tests,
628/// responding to CI failures).
629///
630/// Excluded statuses — and why each is excluded — are enumerated
631/// exhaustively in the match below. The match has **no wildcard**: a
632/// future `SessionStatus` variant will fail the build here until
633/// stuck-eligibility is decided for it. Same discipline as the
634/// `ALL_SESSION_STATUSES` exhaustiveness test in `scm_transitions.rs`.
635/// Returns `true` for session statuses where the review observation is
636/// unlikely to change within a few seconds (the session is waiting on a
637/// human review or has changes requested). Used by the review backlog
638/// throttle to skip redundant REST API calls.
639pub(super) const fn is_review_stable(status: SessionStatus) -> bool {
640 matches!(
641 status,
642 SessionStatus::ChangesRequested | SessionStatus::ReviewPending | SessionStatus::Approved
643 )
644}
645
646/// Transitions that warrant notifying the parent orchestrator (issue #169).
647///
648/// Chosen so the orchestrator only gets messages that require a decision
649/// or at least a "status FYI" — not every intermediate tick. Noisy or
650/// purely-informational states (e.g. `Working`, `Spawning`, intermediate
651/// review flips) are intentionally excluded; the human/CLI can still see
652/// them via `ao-rs status` or the SSE event stream.
653pub(super) const fn is_orchestrator_notifiable(status: SessionStatus) -> bool {
654 matches!(
655 status,
656 SessionStatus::PrOpen
657 | SessionStatus::ReviewPending
658 | SessionStatus::CiFailed
659 | SessionStatus::ChangesRequested
660 | SessionStatus::Approved
661 | SessionStatus::Merged
662 | SessionStatus::MergeFailed
663 | SessionStatus::Killed
664 | SessionStatus::Terminated
665 | SessionStatus::Errored
666 | SessionStatus::NeedsInput
667 | SessionStatus::Stuck
668 )
669}
670
671/// Render the notification message the orchestrator sees in its
672/// terminal. Kept short and parseable — the orchestrator is usually an
673/// LLM agent that re-reads its scrollback.
674pub(super) fn format_orchestrator_notification(worker: &Session, to: SessionStatus) -> String {
675 let short: String = worker.id.0.chars().take(8).collect();
676 let pr = worker
677 .claimed_pr_url
678 .as_deref()
679 .or(worker.issue_url.as_deref())
680 .unwrap_or("none");
681 format!(
682 "[ao-rs] worker {short} is now {to} — branch: {branch}, url: {pr}",
683 branch = worker.branch,
684 )
685}
686
687pub(super) const fn is_stuck_eligible(status: SessionStatus) -> bool {
688 match status {
689 // Stuck-eligible: active work or PR-track where progress is expected.
690 SessionStatus::Working
691 | SessionStatus::PrOpen
692 | SessionStatus::CiFailed
693 | SessionStatus::ReviewPending
694 | SessionStatus::ChangesRequested
695 | SessionStatus::Approved
696 | SessionStatus::Mergeable => true,
697
698 // Not stuck-eligible:
699 //
700 // - Spawning: agent hasn't had its first activity poll yet;
701 // idle_since would never populate for this state anyway.
702 // - Idle: the dedicated "no task assigned / waiting for work"
703 // status, distinct from "currently working and momentarily
704 // gone idle". A session in `Idle` is idle by design.
705 // - NeedsInput: already a known-blocked-on-human state with
706 // its own (future) `agent-needs-input` reaction.
707 // - Stuck: already stuck. Re-entry is handled by the
708 // `Stuck → Working` exit branch in `poll_one` step 4.
709 // - MergeFailed: Phase G parking state with its own retry
710 // budget via the `approved-and-green` tracker. Conflating
711 // with stuck would double-charge retries and confuse the
712 // parking-loop accounting.
713 // - Terminal states (`Killed`, `Terminated`, `Done`,
714 // `Cleanup`, `Errored`, `Merged`): filtered out by the
715 // `tick()` pre-filter long before `check_stuck` is called.
716 // Listed here so the exhaustive match stays exhaustive.
717 SessionStatus::Spawning
718 | SessionStatus::Idle
719 | SessionStatus::NeedsInput
720 | SessionStatus::Stuck
721 | SessionStatus::MergeFailed
722 | SessionStatus::Killed
723 | SessionStatus::Terminated
724 | SessionStatus::Done
725 | SessionStatus::Cleanup
726 | SessionStatus::Errored
727 | SessionStatus::Merged => false,
728 }
729}
730
731/// Handle returned by `LifecycleManager::spawn`. Dropping it does **not**
732/// stop the loop — the caller must `.stop().await` explicitly, so a
733/// CLI handler that accidentally drops the handle doesn't silently kill
734/// the background worker.
735pub struct LifecycleHandle {
736 join: tokio::task::JoinHandle<()>,
737 token: CancellationToken,
738}
739
740impl LifecycleHandle {
741 /// Signal the loop to stop and wait for it to finish the current tick.
742 pub async fn stop(self) {
743 self.token.cancel();
744 let _ = self.join.await;
745 }
746
747 /// Clone the cancellation token so sub-tasks can share shutdown.
748 pub fn cancellation_token(&self) -> CancellationToken {
749 self.token.clone()
750 }
751}
752
753/// Stable hash fingerprint of a `ReviewComment` slice.
754///
755/// Sorts by `(author, body, url)` for determinism (API order can vary),
756/// then folds through `DefaultHasher`. Used by `check_review_backlog` to
757/// detect when the pending-comments set has changed between ticks.
758pub(super) fn fingerprint_comments(comments: &[crate::scm::ReviewComment]) -> u64 {
759 use std::collections::hash_map::DefaultHasher;
760 let mut keys: Vec<(&str, &str, &str)> = comments
761 .iter()
762 .map(|c| (c.author.as_str(), c.body.as_str(), c.url.as_str()))
763 .collect();
764 keys.sort_unstable();
765 let mut h = DefaultHasher::new();
766 keys.hash(&mut h);
767 h.finish()
768}
769
770/// A minimal dummy `Session` used as the dispatch target for `all-complete`.
771///
772/// `all-complete` is a drain-level event — it has no per-session context.
773/// The engine needs a session to look up the project's reaction config, but
774/// since `all-complete` is a global reaction, any project id works here. We
775/// use an empty project id so config lookup falls back to the global entry.
776pub(super) fn all_complete_sentinel() -> Session {
777 use crate::types::{now_ms, SessionId};
778 Session {
779 id: SessionId("__all_complete__".into()),
780 project_id: String::new(),
781 status: SessionStatus::Done,
782 agent: String::new(),
783 agent_config: None,
784 branch: String::new(),
785 task: String::new(),
786 workspace_path: None,
787 runtime_handle: None,
788 runtime: String::new(),
789 activity: None,
790 created_at: now_ms(),
791 cost: None,
792 issue_id: None,
793 issue_url: None,
794 claimed_pr_number: None,
795 claimed_pr_url: None,
796 initial_prompt_override: None,
797 spawned_by: None,
798 last_merge_conflict_dispatched: None,
799 last_review_backlog_fingerprint: None,
800 }
801}
802
803#[cfg(test)]
804pub(crate) mod tests {
805 use super::*;
806 use crate::scm::{
807 CheckRun, CiStatus, MergeMethod, MergeReadiness, PrState, PullRequest, Review,
808 ReviewComment, ReviewDecision,
809 };
810 use crate::traits::Workspace;
811 use crate::types::{now_ms, SessionId, WorkspaceCreateConfig};
812 use async_trait::async_trait;
813 use std::path::{Path, PathBuf};
814 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
815 use std::sync::Mutex;
816 use std::time::{SystemTime, UNIX_EPOCH};
817
818 pub(crate) fn unique_temp_dir(label: &str) -> PathBuf {
819 static COUNTER: AtomicUsize = AtomicUsize::new(0);
820 let nanos = SystemTime::now()
821 .duration_since(UNIX_EPOCH)
822 .unwrap()
823 .as_nanos();
824 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
825 std::env::temp_dir().join(format!("ao-rs-lifecycle-{label}-{nanos}-{n}"))
826 }
827
828 pub(crate) fn fake_session(id: &str, project: &str) -> Session {
829 Session {
830 id: SessionId(id.into()),
831 project_id: project.into(),
832 status: SessionStatus::Spawning,
833 agent: "claude-code".into(),
834 agent_config: None,
835 branch: format!("ao-{id}"),
836 task: "test task".into(),
837 workspace_path: Some(PathBuf::from("/tmp/ws")),
838 runtime_handle: Some(format!("runtime-{id}")),
839 runtime: "tmux".into(),
840 activity: None,
841 created_at: now_ms(),
842 cost: None,
843 issue_id: None,
844 issue_url: None,
845 claimed_pr_number: None,
846 claimed_pr_url: None,
847 initial_prompt_override: None,
848 spawned_by: None,
849 last_merge_conflict_dispatched: None,
850 last_review_backlog_fingerprint: None,
851 }
852 }
853
854 // ---------- Mock plugins ---------- //
855
856 /// Runtime mock with a toggleable `alive` flag and a recorder for
857 /// `send_message` calls. Tests that care about delivery can read
858 /// the recorder via `MockRuntime::sends()`; others ignore it.
859 pub(crate) struct MockRuntime {
860 pub(crate) alive: AtomicBool,
861 pub(crate) sends: Mutex<Vec<(String, String)>>,
862 pub(crate) destroys: Mutex<Vec<String>>,
863 }
864
865 impl MockRuntime {
866 pub(crate) fn new(alive: bool) -> Self {
867 Self {
868 alive: AtomicBool::new(alive),
869 sends: Mutex::new(Vec::new()),
870 destroys: Mutex::new(Vec::new()),
871 }
872 }
873
874 /// Snapshot of all `(handle, message)` pairs received by
875 /// `send_message` in the order they were called.
876 pub(crate) fn sends(&self) -> Vec<(String, String)> {
877 self.sends.lock().unwrap().clone()
878 }
879
880 #[allow(dead_code)]
881 pub(crate) fn destroyed_handles(&self) -> Vec<String> {
882 self.destroys.lock().unwrap().clone()
883 }
884 }
885
886 #[async_trait]
887 impl Runtime for MockRuntime {
888 async fn create(
889 &self,
890 _session_id: &str,
891 _cwd: &Path,
892 _launch_command: &str,
893 _env: &[(String, String)],
894 ) -> Result<String> {
895 Ok("mock-handle".into())
896 }
897 async fn send_message(&self, handle: &str, msg: &str) -> Result<()> {
898 self.sends
899 .lock()
900 .unwrap()
901 .push((handle.to_string(), msg.to_string()));
902 Ok(())
903 }
904 async fn is_alive(&self, _handle: &str) -> Result<bool> {
905 Ok(self.alive.load(Ordering::SeqCst))
906 }
907 async fn destroy(&self, handle: &str) -> Result<()> {
908 self.destroys.lock().unwrap().push(handle.to_string());
909 Ok(())
910 }
911 }
912
913 /// Agent mock that returns a scripted activity state each call.
914 pub(crate) struct MockAgent {
915 next: Mutex<ActivityState>,
916 }
917
918 impl MockAgent {
919 pub(crate) fn new(initial: ActivityState) -> Self {
920 Self {
921 next: Mutex::new(initial),
922 }
923 }
924 pub(crate) fn set(&self, state: ActivityState) {
925 *self.next.lock().unwrap() = state;
926 }
927 }
928
929 #[async_trait]
930 impl Agent for MockAgent {
931 fn launch_command(&self, _session: &Session) -> String {
932 "mock".into()
933 }
934 fn environment(&self, _session: &Session) -> Vec<(String, String)> {
935 vec![]
936 }
937 fn initial_prompt(&self, _session: &Session) -> String {
938 "".into()
939 }
940 async fn detect_activity(&self, _session: &Session) -> Result<ActivityState> {
941 Ok(*self.next.lock().unwrap())
942 }
943 }
944
945 #[allow(dead_code)]
946 pub(crate) struct MockWorkspace {
947 destroyed: Mutex<Vec<PathBuf>>,
948 }
949
950 #[allow(dead_code)]
951 impl MockWorkspace {
952 pub(crate) fn new() -> Self {
953 Self {
954 destroyed: Mutex::new(Vec::new()),
955 }
956 }
957
958 pub(crate) fn destroyed_paths(&self) -> Vec<PathBuf> {
959 self.destroyed.lock().unwrap().clone()
960 }
961 }
962
963 #[async_trait]
964 impl Workspace for MockWorkspace {
965 async fn create(&self, _cfg: &WorkspaceCreateConfig) -> Result<PathBuf> {
966 Ok(PathBuf::from("/tmp/ws"))
967 }
968 async fn destroy(&self, workspace_path: &Path) -> Result<()> {
969 self.destroyed
970 .lock()
971 .unwrap()
972 .push(workspace_path.to_path_buf());
973 Ok(())
974 }
975 }
976
977 /// Scriptable SCM mock. Every method returns a value pre-set by the
978 /// test. `detect_pr` returns `None` until `set_pr(Some(_))`. Each field
979 /// probe (`pr_state`, `ci_status`, …) can be toggled to emit an error
980 /// so tests cover the `TickError` branch of `poll_scm`.
981 ///
982 /// Kept intentionally minimal — we only test the handful of methods
983 /// `poll_scm` calls. `pending_comments`, `reviews`, `ci_checks`, `merge`
984 /// return empty/default values (enough to satisfy the trait).
985 pub(crate) struct MockScm {
986 pub(crate) pr: Mutex<Option<PullRequest>>,
987 pub(crate) state: Mutex<PrState>,
988 pub(crate) ci: Mutex<CiStatus>,
989 pub(crate) review: Mutex<ReviewDecision>,
990 pub(crate) readiness: Mutex<MergeReadiness>,
991 // Counter: incremented on every detect_pr call, so tests can assert
992 // "the loop actually called the plugin".
993 pub(crate) detect_calls: AtomicUsize,
994 // Error toggles. One per probe that `poll_scm` fans out to, so
995 // individual tests can force exactly one slot to fail and
996 // verify the error-aggregation path reports that slot by name.
997 pub(crate) detect_pr_errors: AtomicBool,
998 pub(crate) pr_state_errors: AtomicBool,
999 pub(crate) ci_status_errors: AtomicBool,
1000 pub(crate) review_decision_errors: AtomicBool,
1001 pub(crate) mergeability_errors: AtomicBool,
1002 // Phase G: `merge()` error toggle + call recorder so parking-loop
1003 // tests can script "fail first, succeed later" and assert that
1004 // the engine actually called the plugin the expected number of
1005 // times.
1006 pub(crate) merge_errors: AtomicBool,
1007 pub(crate) merge_calls: Mutex<Vec<(u32, Option<MergeMethod>)>>,
1008 // Issue #195: scriptable pending_comments and ci_checks for H2/H3 tests.
1009 pub(crate) pending_comments_result: Mutex<Vec<ReviewComment>>,
1010 pub(crate) ci_checks_result: Mutex<Vec<CheckRun>>,
1011 }
1012
1013 impl MockScm {
1014 pub(crate) fn new() -> Self {
1015 Self {
1016 pr: Mutex::new(None),
1017 state: Mutex::new(PrState::Open),
1018 ci: Mutex::new(CiStatus::Pending),
1019 review: Mutex::new(ReviewDecision::None),
1020 readiness: Mutex::new(MergeReadiness {
1021 mergeable: false,
1022 ci_passing: false,
1023 approved: false,
1024 no_conflicts: true,
1025 blockers: vec!["pending".into()],
1026 }),
1027 detect_calls: AtomicUsize::new(0),
1028 detect_pr_errors: AtomicBool::new(false),
1029 pr_state_errors: AtomicBool::new(false),
1030 ci_status_errors: AtomicBool::new(false),
1031 review_decision_errors: AtomicBool::new(false),
1032 mergeability_errors: AtomicBool::new(false),
1033 merge_errors: AtomicBool::new(false),
1034 merge_calls: Mutex::new(Vec::new()),
1035 pending_comments_result: Mutex::new(vec![]),
1036 ci_checks_result: Mutex::new(vec![]),
1037 }
1038 }
1039 pub(crate) fn merges(&self) -> Vec<(u32, Option<MergeMethod>)> {
1040 self.merge_calls.lock().unwrap().clone()
1041 }
1042 pub(crate) fn set_pending_comments(&self, comments: Vec<ReviewComment>) {
1043 *self.pending_comments_result.lock().unwrap() = comments;
1044 }
1045 pub(crate) fn set_ci_checks(&self, checks: Vec<CheckRun>) {
1046 *self.ci_checks_result.lock().unwrap() = checks;
1047 }
1048 pub(crate) fn set_pr(&self, pr: Option<PullRequest>) {
1049 *self.pr.lock().unwrap() = pr;
1050 }
1051 pub(crate) fn set_state(&self, s: PrState) {
1052 *self.state.lock().unwrap() = s;
1053 }
1054 pub(crate) fn set_ci(&self, c: CiStatus) {
1055 *self.ci.lock().unwrap() = c;
1056 }
1057 pub(crate) fn set_review(&self, r: ReviewDecision) {
1058 *self.review.lock().unwrap() = r;
1059 }
1060 pub(crate) fn set_readiness(&self, r: MergeReadiness) {
1061 *self.readiness.lock().unwrap() = r;
1062 }
1063 }
1064
1065 #[async_trait]
1066 impl Scm for MockScm {
1067 fn name(&self) -> &str {
1068 "mock"
1069 }
1070 async fn detect_pr(&self, _session: &Session) -> Result<Option<PullRequest>> {
1071 self.detect_calls.fetch_add(1, Ordering::SeqCst);
1072 if self.detect_pr_errors.load(Ordering::SeqCst) {
1073 return Err(crate::error::AoError::Runtime("mock detect_pr".into()));
1074 }
1075 Ok(self.pr.lock().unwrap().clone())
1076 }
1077 async fn pr_state(&self, _pr: &PullRequest) -> Result<PrState> {
1078 if self.pr_state_errors.load(Ordering::SeqCst) {
1079 return Err(crate::error::AoError::Runtime("mock pr_state".into()));
1080 }
1081 Ok(*self.state.lock().unwrap())
1082 }
1083 async fn ci_checks(&self, _pr: &PullRequest) -> Result<Vec<CheckRun>> {
1084 Ok(self.ci_checks_result.lock().unwrap().clone())
1085 }
1086 async fn ci_status(&self, _pr: &PullRequest) -> Result<CiStatus> {
1087 if self.ci_status_errors.load(Ordering::SeqCst) {
1088 return Err(crate::error::AoError::Runtime("mock ci_status".into()));
1089 }
1090 Ok(*self.ci.lock().unwrap())
1091 }
1092 async fn reviews(&self, _pr: &PullRequest) -> Result<Vec<Review>> {
1093 Ok(vec![])
1094 }
1095 async fn review_decision(&self, _pr: &PullRequest) -> Result<ReviewDecision> {
1096 if self.review_decision_errors.load(Ordering::SeqCst) {
1097 return Err(crate::error::AoError::Runtime(
1098 "mock review_decision".into(),
1099 ));
1100 }
1101 Ok(*self.review.lock().unwrap())
1102 }
1103 async fn pending_comments(&self, _pr: &PullRequest) -> Result<Vec<ReviewComment>> {
1104 Ok(self.pending_comments_result.lock().unwrap().clone())
1105 }
1106 async fn mergeability(&self, _pr: &PullRequest) -> Result<MergeReadiness> {
1107 if self.mergeability_errors.load(Ordering::SeqCst) {
1108 return Err(crate::error::AoError::Runtime("mock mergeability".into()));
1109 }
1110 Ok(self.readiness.lock().unwrap().clone())
1111 }
1112 async fn merge(&self, pr: &PullRequest, method: Option<MergeMethod>) -> Result<()> {
1113 if self.merge_errors.load(Ordering::SeqCst) {
1114 return Err(crate::error::AoError::Runtime("mock merge".into()));
1115 }
1116 self.merge_calls.lock().unwrap().push((pr.number, method));
1117 Ok(())
1118 }
1119 }
1120
1121 pub(crate) fn fake_pr(number: u32, branch: &str) -> PullRequest {
1122 PullRequest {
1123 number,
1124 url: format!("https://github.com/acme/widgets/pull/{number}"),
1125 title: "fix the widgets".into(),
1126 owner: "acme".into(),
1127 repo: "widgets".into(),
1128 branch: branch.into(),
1129 base_branch: "main".into(),
1130 is_draft: false,
1131 }
1132 }
1133
1134 // ---------- Test helpers ---------- //
1135
1136 pub(crate) async fn setup(
1137 label: &str,
1138 initial_activity: ActivityState,
1139 ) -> (
1140 Arc<LifecycleManager>,
1141 Arc<SessionManager>,
1142 Arc<MockRuntime>,
1143 Arc<MockAgent>,
1144 PathBuf,
1145 ) {
1146 use crate::session_manager::SessionManager;
1147 let base = unique_temp_dir(label);
1148 let sessions = Arc::new(SessionManager::new(base.clone()));
1149 let runtime = Arc::new(MockRuntime::new(true));
1150 let agent = Arc::new(MockAgent::new(initial_activity));
1151 let lifecycle = Arc::new(LifecycleManager::new(
1152 sessions.clone(),
1153 runtime.clone() as Arc<dyn Runtime>,
1154 agent.clone() as Arc<dyn Agent>,
1155 ));
1156 (lifecycle, sessions, runtime, agent, base)
1157 }
1158
1159 pub(crate) async fn recv_timeout(
1160 rx: &mut broadcast::Receiver<OrchestratorEvent>,
1161 ) -> Option<OrchestratorEvent> {
1162 tokio::time::timeout(Duration::from_millis(100), rx.recv())
1163 .await
1164 .ok()
1165 .and_then(|r| r.ok())
1166 }
1167
1168 pub(crate) async fn drain_events(
1169 rx: &mut broadcast::Receiver<OrchestratorEvent>,
1170 ) -> Vec<OrchestratorEvent> {
1171 let mut out = Vec::new();
1172 while let Some(e) = recv_timeout(rx).await {
1173 out.push(e);
1174 }
1175 out
1176 }
1177
1178 pub(crate) fn rewind_idle_since(
1179 lifecycle: &LifecycleManager,
1180 session_id: &SessionId,
1181 by: Duration,
1182 ) {
1183 let mut map = lifecycle.idle_since.lock().unwrap_or_else(|e| {
1184 tracing::error!("idle_since mutex poisoned; recovering inner state: {e}");
1185 e.into_inner()
1186 });
1187 let rewound = Instant::now()
1188 .checked_sub(by)
1189 .expect("test clock rewind underflowed Instant");
1190 map.insert(session_id.clone(), rewound);
1191 }
1192
1193 pub(crate) async fn setup_with_scm(
1194 label: &str,
1195 ) -> (
1196 Arc<LifecycleManager>,
1197 Arc<SessionManager>,
1198 Arc<MockScm>,
1199 PathBuf,
1200 ) {
1201 use crate::session_manager::SessionManager;
1202 let base = unique_temp_dir(label);
1203 let sessions = Arc::new(SessionManager::new(base.clone()));
1204 let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1205 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
1206 let scm = Arc::new(MockScm::new());
1207 let lifecycle = Arc::new(
1208 LifecycleManager::new(sessions.clone(), runtime, agent)
1209 .with_scm(scm.clone() as Arc<dyn Scm>),
1210 );
1211 (lifecycle, sessions, scm, base)
1212 }
1213
1214 pub(crate) fn script_ready_pr(scm: &MockScm, pr_number: u32) {
1215 scm.set_pr(Some(fake_pr(pr_number, "ao-s1")));
1216 scm.set_state(PrState::Open);
1217 scm.set_ci(CiStatus::Passing);
1218 scm.set_review(ReviewDecision::Approved);
1219 scm.set_readiness(MergeReadiness {
1220 mergeable: true,
1221 ci_passing: true,
1222 approved: true,
1223 no_conflicts: true,
1224 blockers: vec![],
1225 });
1226 }
1227
1228 pub(crate) async fn setup_with_scm_and_auto_merge_engine(
1229 label: &str,
1230 retries: Option<u32>,
1231 ) -> (
1232 Arc<LifecycleManager>,
1233 Arc<SessionManager>,
1234 Arc<MockScm>,
1235 Arc<ReactionEngine>,
1236 PathBuf,
1237 ) {
1238 use crate::reactions::ReactionConfig;
1239 use crate::session_manager::SessionManager;
1240 let base = unique_temp_dir(label);
1241 let sessions = Arc::new(SessionManager::new(base.clone()));
1242 let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1243 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
1244 let scm = Arc::new(MockScm::new());
1245
1246 let lifecycle = LifecycleManager::new(sessions.clone(), runtime, agent);
1247
1248 let mut cfg = ReactionConfig::new(ReactionAction::AutoMerge);
1249 cfg.retries = retries;
1250 let mut map = std::collections::HashMap::new();
1251 map.insert("approved-and-green".into(), cfg);
1252
1253 let engine_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1254 let engine = Arc::new(
1255 ReactionEngine::new(map, engine_runtime, lifecycle.events_sender())
1256 .with_scm(scm.clone() as Arc<dyn Scm>),
1257 );
1258
1259 let lifecycle = Arc::new(
1260 lifecycle
1261 .with_reaction_engine(engine.clone())
1262 .with_scm(scm.clone() as Arc<dyn Scm>),
1263 );
1264 (lifecycle, sessions, scm, engine, base)
1265 }
1266
1267 pub(crate) async fn setup_with_merge_conflicts_engine(
1268 label: &str,
1269 ) -> (
1270 Arc<LifecycleManager>,
1271 Arc<SessionManager>,
1272 Arc<MockScm>,
1273 Arc<MockRuntime>,
1274 Arc<ReactionEngine>,
1275 PathBuf,
1276 ) {
1277 use crate::reactions::ReactionConfig;
1278 use crate::session_manager::SessionManager;
1279 let base = unique_temp_dir(label);
1280 let sessions = Arc::new(SessionManager::new(base.clone()));
1281 let runtime = Arc::new(MockRuntime::new(true));
1282 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
1283 let scm = Arc::new(MockScm::new());
1284
1285 let lifecycle =
1286 LifecycleManager::new(sessions.clone(), runtime.clone() as Arc<dyn Runtime>, agent);
1287
1288 let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
1289 cfg.message = Some("please rebase".into());
1290 let mut map = std::collections::HashMap::new();
1291 map.insert("merge-conflicts".into(), cfg);
1292
1293 let engine_runtime: Arc<dyn Runtime> = runtime.clone() as Arc<dyn Runtime>;
1294 let engine = Arc::new(ReactionEngine::new(
1295 map,
1296 engine_runtime,
1297 lifecycle.events_sender(),
1298 ));
1299
1300 let lifecycle = Arc::new(
1301 lifecycle
1302 .with_reaction_engine(engine.clone())
1303 .with_scm(scm.clone() as Arc<dyn Scm>),
1304 );
1305 (lifecycle, sessions, scm, runtime, engine, base)
1306 }
1307
1308 pub(crate) async fn setup_stuck(
1309 label: &str,
1310 threshold: Option<&str>,
1311 ) -> (
1312 Arc<LifecycleManager>,
1313 Arc<SessionManager>,
1314 Arc<MockAgent>,
1315 PathBuf,
1316 ) {
1317 use crate::reactions::ReactionConfig;
1318 use crate::session_manager::SessionManager;
1319 let base = unique_temp_dir(label);
1320 let sessions = Arc::new(SessionManager::new(base.clone()));
1321 let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1322 let agent = Arc::new(MockAgent::new(ActivityState::Idle));
1323
1324 let lifecycle =
1325 LifecycleManager::new(sessions.clone(), runtime, agent.clone() as Arc<dyn Agent>);
1326
1327 let mut cfg = ReactionConfig::new(ReactionAction::Notify);
1328 cfg.message = Some("stuck!".into());
1329 cfg.threshold = threshold.map(String::from);
1330 let mut map = std::collections::HashMap::new();
1331 map.insert("agent-stuck".into(), cfg);
1332 let engine_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1333 let engine = Arc::new(ReactionEngine::new(
1334 map,
1335 engine_runtime,
1336 lifecycle.events_sender(),
1337 ));
1338 let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine));
1339 (lifecycle, sessions, agent, base)
1340 }
1341
1342 pub(crate) fn build_engine_with_ci_failed(
1343 lifecycle: &LifecycleManager,
1344 message: &str,
1345 ) -> Arc<ReactionEngine> {
1346 use crate::reactions::ReactionConfig;
1347 let mut cfg = ReactionConfig::new(ReactionAction::SendToAgent);
1348 cfg.message = Some(message.into());
1349 let mut map = std::collections::HashMap::new();
1350 map.insert("ci-failed".into(), cfg);
1351
1352 let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
1353 Arc::new(ReactionEngine::new(map, runtime, lifecycle.events_sender()))
1354 }
1355
1356 // Actual test functions live in the per-submodule test modules:
1357 // - tick::tests (tick.rs)
1358 // - scm_poll::tests (scm_poll.rs)
1359 // - transition::tests (transition.rs)
1360 // - stuck::tests (stuck.rs)
1361 //
1362 // The helpers above (fake_session, setup, recv_timeout, etc.) are
1363 // shared via `pub(crate)` visibility and imported by those modules.
1364}