Skip to main content

ao_core/lifecycle/
transition.rs

1use super::*;
2
3impl LifecycleManager {
4    /// Flip a session to a terminal state, persist, and emit both the
5    /// `StatusChanged` (to the chosen terminal `SessionStatus`) and the
6    /// `Terminated` event.
7    ///
8    /// Also clears any reaction trackers the engine was holding for this
9    /// session. Without this, a long-running `ao-rs watch` would slowly
10    /// leak tracker entries for every terminated session.
11    pub(super) async fn terminate(
12        &self,
13        session: &mut Session,
14        reason: TerminationReason,
15    ) -> Result<()> {
16        // ao-ts parity: runtime/agent exit is `killed`. Rust keeps `terminated`
17        // for "exited but not explicitly killed" elsewhere, but lifecycle
18        // liveness probes map to `killed` just like ao-ts.
19        let terminal_status = match reason {
20            TerminationReason::RuntimeGone
21            | TerminationReason::AgentExited
22            | TerminationReason::NoHandle => SessionStatus::Killed,
23        };
24        if session.status != terminal_status {
25            self.transition(session, terminal_status).await?;
26        }
27        if let Some(engine) = self.reaction_engine.as_ref() {
28            engine.clear_all_for_session(&session.id);
29        }
30        // Purge per-session bookkeeping so a long-running watch loop
31        // doesn't accumulate entries for every session it has ever seen.
32        self.idle_since
33            .lock()
34            .unwrap_or_else(|e| {
35                tracing::error!("lifecycle idle_since mutex poisoned; recovering inner state: {e}");
36                e.into_inner()
37            })
38            .remove(&session.id);
39        self.last_review_backlog_check
40            .lock()
41            .unwrap_or_else(|e| {
42                tracing::error!(
43                    "last_review_backlog_check mutex poisoned; recovering inner state: {e}"
44                );
45                e.into_inner()
46            })
47            .remove(&session.id);
48        self.emit(OrchestratorEvent::Terminated {
49            id: session.id.clone(),
50            reason,
51        });
52        Ok(())
53    }
54
55    /// Transition status, persist, emit `StatusChanged`, and (if a
56    /// reaction engine is attached) dispatch any reaction associated
57    /// with the new status.
58    ///
59    /// Ordering matters: normally we save + emit `StatusChanged` *before*
60    /// calling the engine, so subscribers see the transition event in the
61    /// right order and so a panicking engine doesn't lose the state change.
62    ///
63    /// **Phase G parking hook.** When the reaction is `auto-merge` and
64    /// the engine reports a non-escalated failure, `transition` persists
65    /// the session as `MergeFailed` (instead of `Mergeable`) so the next
66    /// SCM tick's `derive_scm_status` can decide whether to retry
67    /// (still-ready observation re-promotes to `Mergeable`) or abandon
68    /// (flake / closed PR drops off the PR track). Escalated outcomes are
69    /// left in `Mergeable` so the retry loop stops and the human
70    /// notification stands — see the doc on `should_park_in_merge_failed`.
71    pub async fn transition(&self, session: &mut Session, to: SessionStatus) -> Result<()> {
72        if session.status == to {
73            return Ok(());
74        }
75        let from = session.status;
76        session.status = to;
77
78        // Poll cost on every status change (not every tick). Only
79        // overwrite when the agent returns Some — a None keeps the
80        // existing cost intact so we never lose data.
81        //
82        // `cost_estimate` may do blocking file I/O (JSONL parsing),
83        // and `record_cost` writes to disk — both are wrapped in
84        // spawn_blocking to avoid starving the Tokio executor.
85        match self.agent.cost_estimate(session).await {
86            Ok(Some(cost)) => {
87                // Best-effort ledger write — don't fail the transition.
88                let sid = session.id.0.clone();
89                let pid = session.project_id.clone();
90                let br = session.branch.clone();
91                let c = cost.clone();
92                let ca = session.created_at;
93                let ledger_result = tokio::task::spawn_blocking(move || {
94                    crate::cost_ledger::record_cost(&sid, &pid, &br, &c, ca)
95                })
96                .await;
97                match ledger_result {
98                    Ok(Err(e)) => {
99                        tracing::warn!(session = %session.id, "cost ledger write failed: {e}");
100                    }
101                    Err(e) => {
102                        tracing::warn!(session = %session.id, "cost ledger task panicked: {e}");
103                    }
104                    Ok(Ok(())) => {}
105                }
106                session.cost = Some(cost);
107            }
108            Ok(None) => {}
109            Err(e) => {
110                tracing::debug!(session = %session.id, "cost_estimate failed: {e}");
111            }
112        }
113
114        // Phase 1 invariant: **one status transition per session per tick**.
115        //
116        // The Phase G auto-merge retry loop needs to "park" a just-entered
117        // `Mergeable` session in `MergeFailed` when the auto-merge action
118        // fails without escalating. Historically this produced two
119        // transitions/events in one tick (`… → Mergeable` then
120        // `Mergeable → MergeFailed`). To preserve the invariant while
121        // keeping reaction dispatch semantics, we decide the *final*
122        // persisted status before emitting `StatusChanged`.
123        let mut persisted_to = to;
124        if let Some(engine) = self.reaction_engine.as_ref() {
125            if let Some(next_key) = status_to_reaction_key(to) {
126                match engine.dispatch(session, next_key).await {
127                    Ok(Some(outcome))
128                        if should_park_in_merge_failed(engine, session, to, next_key, &outcome) =>
129                    {
130                        persisted_to = SessionStatus::MergeFailed;
131                        session.status = persisted_to;
132                    }
133                    Ok(_) => {}
134                    Err(e) => {
135                        tracing::warn!(
136                            session = %session.id,
137                            reaction = next_key,
138                            error = %e,
139                            "reaction dispatch failed; lifecycle loop continues"
140                        );
141                    }
142                }
143            }
144        }
145
146        // If the "parking" rewrite lands us back in the original status,
147        // this tick should not persist or emit a no-op transition. (The
148        // reaction attempt has already been recorded by the engine.)
149        if persisted_to == from {
150            session.status = from;
151            return Ok(());
152        }
153
154        self.sessions.save(session).await?;
155        self.emit(OrchestratorEvent::StatusChanged {
156            id: session.id.clone(),
157            from,
158            to: persisted_to,
159        });
160
161        // Issue #169: notify the parent orchestrator (if any) so it can
162        // react to worker state changes without manual human prodding.
163        // Only fires for transitions the orchestrator actually needs to
164        // see; best-effort, never fails the transition.
165        if is_orchestrator_notifiable(persisted_to) {
166            self.notify_orchestrator(session, persisted_to).await;
167        }
168
169        if let Some(engine) = self.reaction_engine.as_ref() {
170            // Leaving a reaction-triggering status? Clear its tracker so
171            // the next entry (e.g. new CI failure after a fix) gets a
172            // fresh retry budget. Parking-loop transitions
173            // (`Mergeable ↔ MergeFailed`) are the exception — see
174            // `clear_tracker_on_transition` for the rationale.
175            clear_tracker_on_transition(engine, &session.id, from, persisted_to);
176        }
177
178        Ok(())
179    }
180
181    /// Best-effort delivery of a worker state-change notification to the
182    /// parent orchestrator via `Runtime::send_message`. Silent when the
183    /// worker has no `spawned_by`, the parent yaml is gone, or the
184    /// parent has no live runtime handle — any of those mean "no one to
185    /// tell", not "error".
186    pub(super) async fn notify_orchestrator(&self, worker: &Session, to: SessionStatus) {
187        let Some(orch_id) = worker.spawned_by.as_ref() else {
188            return;
189        };
190        let parent = match self.sessions.find_by_prefix(&orch_id.0).await {
191            Ok(p) => p,
192            Err(e) => {
193                tracing::debug!(
194                    session = %worker.id,
195                    parent = %orch_id.0,
196                    "orchestrator session lookup failed: {e}"
197                );
198                return;
199            }
200        };
201        let Some(handle) = parent.runtime_handle.as_deref() else {
202            return;
203        };
204        let msg = format_orchestrator_notification(worker, to);
205        if let Err(e) = self.runtime.send_message(handle, &msg).await {
206            tracing::warn!(
207                session = %worker.id,
208                parent = %parent.id,
209                "failed to deliver orchestrator notification: {e}"
210            );
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::lifecycle::tests::{
219        build_engine_with_ci_failed, fake_pr, fake_session, recv_timeout, MockAgent, MockRuntime,
220        MockScm,
221    };
222    use crate::reactions::ReactionAction;
223    use std::collections::HashSet;
224
225    // ---------- Reaction engine integration (Phase D) ---------- //
226
227    #[tokio::test]
228    async fn transition_into_ci_failed_dispatches_reaction_on_shared_channel() {
229        // ci-failed is dispatched via check_ci_failed (poll_scm step 6), not
230        // through the generic status_to_reaction_key path in transition. This
231        // test exercises the full SCM-driven tick so the wiring is end-to-end.
232        use crate::lifecycle::tests::unique_temp_dir;
233        use crate::scm::{CiStatus, PrState, ReviewDecision};
234        use crate::session_manager::SessionManager;
235        let base = unique_temp_dir("reaction-transition");
236        let sessions = Arc::new(SessionManager::new(base.clone()));
237        let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
238        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
239        let scm = Arc::new(MockScm::new());
240
241        let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
242        let engine = build_engine_with_ci_failed(&lifecycle, "fix CI please");
243        let lifecycle = Arc::new(
244            lifecycle
245                .with_reaction_engine(engine.clone())
246                .with_scm(scm.clone() as Arc<dyn Scm>),
247        );
248
249        let mut rx = lifecycle.subscribe();
250
251        let mut s = fake_session("s1", "demo");
252        s.status = SessionStatus::Working;
253        sessions.save(&s).await.unwrap();
254
255        scm.set_pr(Some(fake_pr(7, "ao-s1")));
256        scm.set_state(PrState::Open);
257        scm.set_ci(CiStatus::Failing);
258        scm.set_review(ReviewDecision::None);
259
260        let mut seen = HashSet::new();
261        lifecycle.tick(&mut seen).await.unwrap();
262
263        let mut events = Vec::new();
264        while let Some(e) = recv_timeout(&mut rx).await {
265            events.push(e);
266        }
267
268        assert!(
269            events.iter().any(|e| matches!(
270                e,
271                OrchestratorEvent::StatusChanged {
272                    to: SessionStatus::CiFailed,
273                    ..
274                }
275            )),
276            "expected StatusChanged to CiFailed, got {events:?}"
277        );
278        assert!(
279            events.iter().any(|e| matches!(
280                e,
281                OrchestratorEvent::ReactionTriggered {
282                    action: ReactionAction::SendToAgent,
283                    ..
284                }
285            )),
286            "expected ReactionTriggered(SendToAgent) from engine, got {events:?}"
287        );
288
289        assert_eq!(engine.attempts(&s.id, "ci-failed"), 1);
290
291        let _ = std::fs::remove_dir_all(&base);
292    }
293
294    #[tokio::test]
295    async fn leaving_reaction_status_clears_tracker() {
296        use crate::lifecycle::tests::unique_temp_dir;
297        use crate::scm::{CiStatus, PrState, ReviewDecision};
298        use crate::session_manager::SessionManager;
299        let base = unique_temp_dir("reaction-clear");
300        let sessions = Arc::new(SessionManager::new(base.clone()));
301        let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
302        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
303        let scm = Arc::new(MockScm::new());
304
305        let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
306        let engine = build_engine_with_ci_failed(&lifecycle, "fix");
307        let lifecycle = Arc::new(
308            lifecycle
309                .with_reaction_engine(engine.clone())
310                .with_scm(scm.clone() as Arc<dyn Scm>),
311        );
312
313        let mut s = fake_session("s1", "demo");
314        s.status = SessionStatus::Working;
315        sessions.save(&s).await.unwrap();
316
317        scm.set_pr(Some(fake_pr(8, "ao-s1")));
318        scm.set_state(PrState::Open);
319        scm.set_ci(CiStatus::Failing);
320        scm.set_review(ReviewDecision::None);
321        let mut seen = HashSet::new();
322        lifecycle.tick(&mut seen).await.unwrap();
323        assert_eq!(engine.attempts(&s.id, "ci-failed"), 1);
324
325        let s_updated = sessions.find_by_prefix("s1").await.unwrap();
326        assert_eq!(s_updated.status, SessionStatus::CiFailed);
327
328        let mut s2 = s_updated;
329        lifecycle
330            .transition(&mut s2, SessionStatus::PrOpen)
331            .await
332            .unwrap();
333        assert_eq!(
334            engine.attempts(&s2.id, "ci-failed"),
335            0,
336            "tracker should be cleared on exit from CiFailed"
337        );
338
339        let _ = std::fs::remove_dir_all(&base);
340    }
341
342    #[tokio::test]
343    async fn unrelated_transition_does_not_touch_reaction_engine() {
344        use crate::lifecycle::tests::unique_temp_dir;
345        use crate::session_manager::SessionManager;
346        let base = unique_temp_dir("no-react");
347        let sessions = Arc::new(SessionManager::new(base.clone()));
348        let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
349        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
350
351        let lifecycle = LifecycleManager::new(sessions.clone(), runtime, agent);
352        let engine = build_engine_with_ci_failed(&lifecycle, "never fires");
353        let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine.clone()));
354
355        let mut rx = lifecycle.subscribe();
356        sessions.save(&fake_session("s1", "demo")).await.unwrap();
357        let mut seen = HashSet::new();
358        lifecycle.tick(&mut seen).await.unwrap();
359
360        let mut events = Vec::new();
361        while let Some(e) = recv_timeout(&mut rx).await {
362            events.push(e);
363        }
364
365        assert!(events.iter().any(|e| matches!(
366            e,
367            OrchestratorEvent::StatusChanged {
368                to: SessionStatus::Working,
369                ..
370            }
371        )));
372        assert!(
373            !events
374                .iter()
375                .any(|e| matches!(e, OrchestratorEvent::ReactionTriggered { .. })),
376            "unexpected ReactionTriggered on Working transition: {events:?}"
377        );
378
379        let _ = std::fs::remove_dir_all(&base);
380    }
381
382    // ---------- Orchestrator notification (issue #169) ---------- //
383
384    #[tokio::test]
385    async fn transition_notifies_parent_orchestrator_via_runtime() {
386        use crate::lifecycle::tests::unique_temp_dir;
387        use crate::session_manager::SessionManager;
388        let base = unique_temp_dir("orchestrator-notify");
389        let sessions = Arc::new(SessionManager::new(base.clone()));
390        let runtime = Arc::new(MockRuntime::new(true));
391        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
392        let lifecycle = Arc::new(LifecycleManager::new(
393            sessions.clone(),
394            runtime.clone() as Arc<dyn Runtime>,
395            agent,
396        ));
397
398        let mut parent = fake_session("orch1", "demo");
399        parent.runtime_handle = Some("orch-handle".into());
400        sessions.save(&parent).await.unwrap();
401
402        let mut worker = fake_session("work1", "demo");
403        worker.status = SessionStatus::Working;
404        worker.spawned_by = Some(parent.id.clone());
405        sessions.save(&worker).await.unwrap();
406
407        lifecycle
408            .transition(&mut worker, SessionStatus::PrOpen)
409            .await
410            .unwrap();
411
412        let sends = runtime.sends();
413        assert_eq!(
414            sends.len(),
415            1,
416            "expected one notification to parent, got {sends:?}"
417        );
418        assert_eq!(sends[0].0, "orch-handle");
419        assert!(
420            sends[0].1.contains("pr_open"),
421            "message should mention new status, got {:?}",
422            sends[0].1
423        );
424
425        let _ = std::fs::remove_dir_all(&base);
426    }
427
428    #[tokio::test]
429    async fn transition_without_spawned_by_sends_no_message() {
430        use crate::lifecycle::tests::unique_temp_dir;
431        use crate::session_manager::SessionManager;
432        let base = unique_temp_dir("orchestrator-notify-none");
433        let sessions = Arc::new(SessionManager::new(base.clone()));
434        let runtime = Arc::new(MockRuntime::new(true));
435        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
436        let lifecycle = Arc::new(LifecycleManager::new(
437            sessions.clone(),
438            runtime.clone() as Arc<dyn Runtime>,
439            agent,
440        ));
441
442        let mut worker = fake_session("lone1", "demo");
443        worker.status = SessionStatus::Working;
444        assert!(worker.spawned_by.is_none());
445        sessions.save(&worker).await.unwrap();
446
447        lifecycle
448            .transition(&mut worker, SessionStatus::PrOpen)
449            .await
450            .unwrap();
451
452        assert!(
453            runtime.sends().is_empty(),
454            "workers without spawned_by must not trigger a send"
455        );
456
457        let _ = std::fs::remove_dir_all(&base);
458    }
459
460    #[tokio::test]
461    async fn transition_into_non_notifiable_status_sends_no_message() {
462        use crate::lifecycle::tests::unique_temp_dir;
463        use crate::session_manager::SessionManager;
464        let base = unique_temp_dir("orchestrator-notify-filter");
465        let sessions = Arc::new(SessionManager::new(base.clone()));
466        let runtime = Arc::new(MockRuntime::new(true));
467        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
468        let lifecycle = Arc::new(LifecycleManager::new(
469            sessions.clone(),
470            runtime.clone() as Arc<dyn Runtime>,
471            agent,
472        ));
473
474        let mut parent = fake_session("orch2", "demo");
475        parent.runtime_handle = Some("orch-handle".into());
476        sessions.save(&parent).await.unwrap();
477
478        let mut worker = fake_session("work2", "demo");
479        worker.status = SessionStatus::Spawning;
480        worker.spawned_by = Some(parent.id.clone());
481        sessions.save(&worker).await.unwrap();
482
483        lifecycle
484            .transition(&mut worker, SessionStatus::Working)
485            .await
486            .unwrap();
487
488        assert!(
489            runtime.sends().is_empty(),
490            "transition to Working should not notify orchestrator"
491        );
492
493        let _ = std::fs::remove_dir_all(&base);
494    }
495
496    // ---------- Issue #195 H3: all-complete dispatch ---------- //
497
498    #[tokio::test]
499    async fn all_complete_fires_once_when_last_session_terminates() {
500        use crate::lifecycle::tests::unique_temp_dir;
501        use crate::reactions::ReactionConfig;
502        use crate::session_manager::SessionManager;
503        let base = unique_temp_dir("all-complete");
504        let sessions = Arc::new(SessionManager::new(base.clone()));
505        let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
506        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
507
508        let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
509        let engine_runtime = Arc::new(MockRuntime::new(true));
510        let cfg = ReactionConfig::new(ReactionAction::Notify);
511        let mut map = std::collections::HashMap::new();
512        map.insert("all-complete".into(), cfg);
513        let engine = Arc::new(ReactionEngine::new(
514            map,
515            engine_runtime.clone() as Arc<dyn Runtime>,
516            lifecycle.events_sender(),
517        ));
518        let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine.clone()));
519
520        let mut rx = lifecycle.subscribe();
521
522        let mut s = fake_session("s1", "demo");
523        s.status = SessionStatus::Done;
524        sessions.save(&s).await.unwrap();
525
526        let mut seen = HashSet::new();
527        lifecycle.tick(&mut seen).await.unwrap();
528
529        let mut events = Vec::new();
530        while let Some(e) = recv_timeout(&mut rx).await {
531            events.push(e);
532        }
533
534        assert!(
535            events.iter().any(|e| matches!(
536                e,
537                OrchestratorEvent::ReactionTriggered {
538                    reaction_key,
539                    ..
540                } if reaction_key == "all-complete"
541            )),
542            "expected all-complete ReactionTriggered, got {events:?}"
543        );
544
545        lifecycle.tick(&mut seen).await.unwrap();
546        let mut events2 = Vec::new();
547        while let Some(e) = recv_timeout(&mut rx).await {
548            events2.push(e);
549        }
550        assert!(
551            !events2.iter().any(|e| matches!(
552                e,
553                OrchestratorEvent::ReactionTriggered {
554                    reaction_key,
555                    ..
556                } if reaction_key == "all-complete"
557            )),
558            "all-complete must NOT re-fire on second tick: {events2:?}"
559        );
560
561        let _ = std::fs::remove_dir_all(&base);
562    }
563
564    #[tokio::test]
565    async fn all_complete_resets_on_new_session() {
566        use crate::lifecycle::tests::unique_temp_dir;
567        use crate::reactions::ReactionConfig;
568        use crate::session_manager::SessionManager;
569        use std::sync::atomic::Ordering;
570        let base = unique_temp_dir("all-complete-reset");
571        let sessions = Arc::new(SessionManager::new(base.clone()));
572        let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
573        let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
574
575        let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
576        let engine_runtime = Arc::new(MockRuntime::new(true));
577        let mut map = std::collections::HashMap::new();
578        map.insert(
579            "all-complete".into(),
580            ReactionConfig::new(ReactionAction::Notify),
581        );
582        let engine = Arc::new(ReactionEngine::new(
583            map,
584            engine_runtime.clone() as Arc<dyn Runtime>,
585            lifecycle.events_sender(),
586        ));
587        let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine.clone()));
588        let mut rx = lifecycle.subscribe();
589
590        let mut s1 = fake_session("s1", "demo");
591        s1.status = SessionStatus::Done;
592        sessions.save(&s1).await.unwrap();
593        let mut seen = HashSet::new();
594        lifecycle.tick(&mut seen).await.unwrap();
595        while recv_timeout(&mut rx).await.is_some() {}
596
597        let s2 = fake_session("s2", "demo");
598        sessions.save(&s2).await.unwrap();
599        lifecycle.tick(&mut seen).await.unwrap();
600        while recv_timeout(&mut rx).await.is_some() {}
601        assert!(
602            !lifecycle.all_complete_fired.load(Ordering::Relaxed),
603            "flag must be reset when a non-terminal session appears"
604        );
605
606        let mut s2_done = sessions.find_by_prefix("s2").await.unwrap();
607        s2_done.status = SessionStatus::Done;
608        sessions.save(&s2_done).await.unwrap();
609        lifecycle.tick(&mut seen).await.unwrap();
610        let mut events3 = Vec::new();
611        while let Some(e) = recv_timeout(&mut rx).await {
612            events3.push(e);
613        }
614        assert!(
615            events3.iter().any(|e| matches!(
616                e,
617                OrchestratorEvent::ReactionTriggered {
618                    reaction_key,
619                    ..
620                } if reaction_key == "all-complete"
621            )),
622            "all-complete must re-fire after a new drain: {events3:?}"
623        );
624
625        let _ = std::fs::remove_dir_all(&base);
626    }
627}