ao-core 0.1.0

Core traits and types for the ao-rs agent orchestrator framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
use super::*;

impl LifecycleManager {
    /// Probe one session and apply any resulting transitions.
    pub(super) async fn poll_one(&self, mut session: Session) -> Result<()> {
        let id = session.id.clone();
        // ---- 1. Runtime liveness ----
        let alive = match &session.runtime_handle {
            Some(handle) => match self.runtime.is_alive(handle).await {
                Ok(a) => a,
                Err(e) => {
                    // Runtime probe itself errored — treat as unknown,
                    // emit TickError, and don't transition.
                    self.emit(OrchestratorEvent::TickError {
                        id: id.clone(),
                        message: format!("is_alive: {e}"),
                    });
                    return Ok(());
                }
            },
            // No handle means we never got far enough to spawn a runtime,
            // or it was intentionally cleared. Consider the session dead.
            None => {
                self.terminate(&mut session, TerminationReason::NoHandle)
                    .await?;
                return Ok(());
            }
        };

        if !alive {
            self.terminate(&mut session, TerminationReason::RuntimeGone)
                .await?;
            return Ok(());
        }

        // ---- 2. Agent activity detection ----
        let activity = match self.agent.detect_activity(&session).await {
            Ok(a) => a,
            Err(e) => {
                self.emit(OrchestratorEvent::TickError {
                    id: id.clone(),
                    message: format!("detect_activity: {e}"),
                });
                return Ok(());
            }
        };

        // Agent says the process exited — treat the same as runtime gone,
        // but attribute the reason to the agent so observers can distinguish.
        if activity.is_terminal() {
            self.terminate(&mut session, TerminationReason::AgentExited)
                .await?;
            return Ok(());
        }

        // ao-ts parity: `waiting_input` is a first-class lifecycle status.
        // This must win early so a session doesn't stay `Working` while the
        // agent is blocked on a prompt.
        if activity == ActivityState::WaitingInput && session.status != SessionStatus::NeedsInput {
            self.transition(&mut session, SessionStatus::NeedsInput)
                .await?;
        }

        // ---- 3. Persist any activity transition ----
        if session.activity != Some(activity) {
            let prev = session.activity;
            session.activity = Some(activity);
            self.sessions.save(&session).await?;
            self.emit(OrchestratorEvent::ActivityChanged {
                id: id.clone(),
                prev,
                next: activity,
            });
        }

        // Phase H: maintain the idle-since timestamp used by
        // `check_stuck`. Unconditional every tick — the helper itself
        // decides whether to insert, preserve, or remove.
        self.update_idle_since(&session.id, activity);

        // Snapshot the status BEFORE any transitioning step runs, so
        // step 6 (`check_stuck`) can yield the tick if an earlier step
        // already mutated `session.status`. Matches the TS reference's
        // `determineStatus` contract of one transition per call — see
        // Design Decision 8 in docs/ai/design/feature-agent-stuck-detection.md
        // and the "one transition per tick" entry in memory.
        let pre_transition_status = session.status;

        // ---- 4. Status transitions driven by activity ----
        // Slice 1 Phase C handles the happy-path Spawning → Working flip.
        // Phase F layers SCM-driven transitions on top (see step 5).
        // Phase H extends this with the symmetric `Stuck → Working`
        // recovery: a session that went idle long enough to park in
        // `Stuck` should exit the moment the agent starts producing
        // activity again. `transition` auto-clears the `agent-stuck`
        // tracker via `status_to_reaction_key(Stuck) = Some("agent-stuck")`
        // so there's no bespoke cleanup needed here.
        if matches!(
            session.status,
            SessionStatus::Spawning | SessionStatus::Stuck | SessionStatus::NeedsInput
        ) && matches!(activity, ActivityState::Active | ActivityState::Ready)
        {
            self.transition(&mut session, SessionStatus::Working)
                .await?;
        }

        // ---- 5. PR-driven status transitions (Phase F) ----
        // Only runs when a `Scm` plugin is wired in (via `with_scm`). A
        // failing probe inside `poll_scm` emits `TickError` on the shared
        // channel and returns `Ok(())` so one bad `gh` shell-out doesn't
        // kill the whole tick.
        if self.scm.is_some() {
            self.poll_scm(&mut session).await?;
        }

        // ---- 5b. Worktree cleanup on Merged ----
        // When a workspace plugin is wired in and the session just landed in
        // `Merged`, remove its worktree. The session YAML stays on disk for
        // history; only the working-directory folder is deleted.
        if session.status == SessionStatus::Merged {
            // Kill the runtime (tmux window) — best-effort.
            if let Some(ref handle) = session.runtime_handle {
                match self.runtime.destroy(handle).await {
                    Ok(()) => tracing::info!(session = %session.id, "→ killed runtime on merge"),
                    Err(e) => {
                        tracing::warn!(session = %session.id, error = %e, "runtime destroy on merge failed")
                    }
                }
            }

            if let Some(ref workspace) = self.workspace {
                if let Some(ref ws_path) = session.workspace_path {
                    match workspace.destroy(ws_path).await {
                        Ok(()) => {
                            tracing::info!(
                                session = %session.id,
                                path = %ws_path.display(),
                                "→ removed worktree"
                            );
                        }
                        Err(e) => {
                            tracing::warn!(
                                session = %session.id,
                                path = %ws_path.display(),
                                error = %e,
                                "worktree cleanup failed"
                            );
                        }
                    }
                }
            }
        }

        // ---- 6. Agent-stuck detection (Phase H) ----
        // Gated on the pre-transition snapshot: if step 4 or 5 already
        // mutated `session.status` this tick, we yield and let the next
        // tick decide whether stuck still applies. Also gated on a
        // reaction engine being configured — without one, there's no
        // `agent-stuck` config to read and no way to emit the tracker
        // event, so the early-return in `check_stuck` would fire anyway
        // but checking here keeps the happy path one branch shorter.
        if self.reaction_engine.is_some() && session.status == pre_transition_status {
            self.check_stuck(&mut session).await?;
        }

        Ok(())
    }

    /// Crash-recovery sweep run once at startup.
    ///
    /// Scans all sessions for those already in `Merged` status that still
    /// have a `workspace_path` on disk. This handles the race where the
    /// process was killed after persisting `Merged` but before the per-tick
    /// `destroy()` call completed — terminal sessions are skipped by
    /// `poll_one`, so without this sweep the worktree would live forever.
    pub(super) async fn sweep_merged_worktrees(&self) {
        let Some(ref workspace) = self.workspace else {
            return;
        };

        let sessions = match self.sessions.list().await {
            Ok(s) => s,
            Err(e) => {
                tracing::warn!("startup worktree sweep: failed to list sessions: {e}");
                return;
            }
        };

        for session in sessions {
            if session.status != SessionStatus::Merged {
                continue;
            }
            let Some(ref ws_path) = session.workspace_path else {
                continue;
            };
            if !ws_path.exists() {
                continue;
            }
            match workspace.destroy(ws_path).await {
                Ok(()) => {
                    tracing::info!(
                        session = %session.id,
                        path = %ws_path.display(),
                        "→ removed worktree (startup sweep)"
                    );
                }
                Err(e) => {
                    tracing::warn!(
                        session = %session.id,
                        path = %ws_path.display(),
                        error = %e,
                        "startup worktree sweep: cleanup failed"
                    );
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::lifecycle::tests::{fake_session, recv_timeout, setup, MockAgent, MockRuntime};
    use std::collections::HashSet;
    use std::sync::atomic::Ordering;

    #[tokio::test]
    async fn first_tick_emits_spawned_and_transitions_to_working() {
        let (lifecycle, sessions, _rt, _agent, base) = setup("spawned", ActivityState::Ready).await;
        let mut rx = lifecycle.subscribe();
        sessions.save(&fake_session("s1", "demo")).await.unwrap();

        let mut seen = HashSet::new();
        lifecycle.tick(&mut seen).await.unwrap();

        let mut events = Vec::new();
        while let Some(e) = recv_timeout(&mut rx).await {
            events.push(e);
        }

        assert!(
            events
                .iter()
                .any(|e| matches!(e, OrchestratorEvent::Spawned { .. })),
            "expected Spawned event, got {events:?}"
        );
        assert!(
            events.iter().any(|e| matches!(
                e,
                OrchestratorEvent::ActivityChanged {
                    next: ActivityState::Ready,
                    ..
                }
            )),
            "expected ActivityChanged → Ready, got {events:?}"
        );
        assert!(
            events.iter().any(|e| matches!(
                e,
                OrchestratorEvent::StatusChanged {
                    from: SessionStatus::Spawning,
                    to: SessionStatus::Working,
                    ..
                }
            )),
            "expected StatusChanged Spawning → Working, got {events:?}"
        );

        let persisted = sessions.list().await.unwrap();
        assert_eq!(persisted.len(), 1);
        assert_eq!(persisted[0].status, SessionStatus::Working);
        assert_eq!(persisted[0].activity, Some(ActivityState::Ready));

        let _ = std::fs::remove_dir_all(&base);
    }

    #[tokio::test]
    async fn dead_runtime_terminates_session() {
        let (lifecycle, sessions, rt, _agent, base) = setup("dead", ActivityState::Ready).await;
        let mut rx = lifecycle.subscribe();
        sessions.save(&fake_session("s1", "demo")).await.unwrap();

        rt.alive.store(false, Ordering::SeqCst);

        let mut seen = HashSet::new();
        lifecycle.tick(&mut seen).await.unwrap();

        let mut events = Vec::new();
        while let Some(e) = recv_timeout(&mut rx).await {
            events.push(e);
        }

        assert!(
            events.iter().any(|e| matches!(
                e,
                OrchestratorEvent::Terminated {
                    reason: TerminationReason::RuntimeGone,
                    ..
                }
            )),
            "expected Terminated(RuntimeGone), got {events:?}"
        );

        let persisted = sessions.list().await.unwrap();
        assert_eq!(persisted[0].status, SessionStatus::Killed);
        assert!(persisted[0].is_terminal());

        let _ = std::fs::remove_dir_all(&base);
    }

    #[tokio::test]
    async fn exited_activity_terminates_with_agent_reason() {
        let (lifecycle, sessions, _rt, agent, base) = setup("exited", ActivityState::Ready).await;
        let mut rx = lifecycle.subscribe();
        sessions.save(&fake_session("s1", "demo")).await.unwrap();
        agent.set(ActivityState::Exited);

        let mut seen = HashSet::new();
        lifecycle.tick(&mut seen).await.unwrap();

        let mut events = Vec::new();
        while let Some(e) = recv_timeout(&mut rx).await {
            events.push(e);
        }

        assert!(
            events.iter().any(|e| matches!(
                e,
                OrchestratorEvent::Terminated {
                    reason: TerminationReason::AgentExited,
                    ..
                }
            )),
            "expected Terminated(AgentExited), got {events:?}"
        );

        let _ = std::fs::remove_dir_all(&base);
    }

    #[tokio::test]
    async fn terminal_sessions_are_skipped_on_subsequent_ticks() {
        let (lifecycle, sessions, _rt, _agent, base) = setup("skip", ActivityState::Ready).await;
        let mut s = fake_session("s1", "demo");
        s.status = SessionStatus::Done;
        sessions.save(&s).await.unwrap();

        let mut rx = lifecycle.subscribe();
        let mut seen = HashSet::new();
        lifecycle.tick(&mut seen).await.unwrap();

        let mut events = Vec::new();
        while let Some(e) = recv_timeout(&mut rx).await {
            events.push(e);
        }

        assert_eq!(events.len(), 1, "got {events:?}");
        assert!(matches!(&events[0], OrchestratorEvent::Spawned { .. }));

        let persisted = sessions.list().await.unwrap();
        assert_eq!(persisted[0].status, SessionStatus::Done);

        let _ = std::fs::remove_dir_all(&base);
    }

    #[tokio::test]
    async fn spawned_is_emitted_only_once_per_session() {
        let (lifecycle, sessions, _rt, _agent, base) = setup("once", ActivityState::Ready).await;
        sessions.save(&fake_session("s1", "demo")).await.unwrap();
        let mut rx = lifecycle.subscribe();

        let mut seen = HashSet::new();
        lifecycle.tick(&mut seen).await.unwrap();
        lifecycle.tick(&mut seen).await.unwrap();
        lifecycle.tick(&mut seen).await.unwrap();

        let mut spawned_count = 0;
        while let Some(e) = recv_timeout(&mut rx).await {
            if matches!(e, OrchestratorEvent::Spawned { .. }) {
                spawned_count += 1;
            }
        }
        assert_eq!(spawned_count, 1);

        let _ = std::fs::remove_dir_all(&base);
    }

    #[tokio::test]
    async fn session_restored_emitted_for_preexisting_sessions_on_first_tick() {
        use crate::lifecycle::tests::unique_temp_dir;
        use crate::session_manager::SessionManager;
        let base = unique_temp_dir("restored");
        let sessions = Arc::new(SessionManager::new(base.clone()));
        let mut old = fake_session("old", "demo");
        old.created_at = 1;
        old.status = SessionStatus::Working;
        sessions.save(&old).await.unwrap();

        let lifecycle = Arc::new(
            LifecycleManager::new(
                sessions.clone(),
                Arc::new(MockRuntime::new(true)) as Arc<dyn Runtime>,
                Arc::new(MockAgent::new(ActivityState::Ready)) as Arc<dyn Agent>,
            )
            .with_poll_interval(Duration::from_millis(20)),
        );

        let mut rx = lifecycle.subscribe();
        let handle = lifecycle.spawn();

        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
        let mut saw_restored = None;
        let mut saw_spawned = false;
        while tokio::time::Instant::now() < deadline {
            match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
                Ok(Ok(OrchestratorEvent::SessionRestored {
                    id,
                    project_id,
                    status,
                })) => {
                    saw_restored = Some((id, project_id, status));
                    break;
                }
                Ok(Ok(OrchestratorEvent::Spawned { .. })) => {
                    saw_spawned = true;
                }
                _ => {}
            }
        }

        handle.stop().await;
        assert!(
            !saw_spawned,
            "pre-existing session must not surface as Spawned"
        );
        let (id, project_id, status) = saw_restored.expect("SessionRestored was never emitted");
        assert_eq!(id.0, "old");
        assert_eq!(project_id, "demo");
        assert_eq!(status, SessionStatus::Working);

        let _ = std::fs::remove_dir_all(&base);
    }

    #[tokio::test]
    async fn spawned_emitted_for_sessions_created_after_loop_startup() {
        use crate::lifecycle::tests::unique_temp_dir;
        use crate::session_manager::SessionManager;
        let base = unique_temp_dir("post-startup-spawn");
        let sessions = Arc::new(SessionManager::new(base.clone()));

        let lifecycle = Arc::new(
            LifecycleManager::new(
                sessions.clone(),
                Arc::new(MockRuntime::new(true)) as Arc<dyn Runtime>,
                Arc::new(MockAgent::new(ActivityState::Ready)) as Arc<dyn Agent>,
            )
            .with_poll_interval(Duration::from_millis(20)),
        );

        let mut rx = lifecycle.subscribe();
        let handle = lifecycle.spawn();

        tokio::time::sleep(Duration::from_millis(5)).await;
        sessions.save(&fake_session("fresh", "demo")).await.unwrap();

        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
        let mut saw_spawned = false;
        let mut saw_restored = false;
        while tokio::time::Instant::now() < deadline {
            match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
                Ok(Ok(OrchestratorEvent::Spawned { id, .. })) if id.0 == "fresh" => {
                    saw_spawned = true;
                    break;
                }
                Ok(Ok(OrchestratorEvent::SessionRestored { id, .. })) if id.0 == "fresh" => {
                    saw_restored = true;
                }
                _ => {}
            }
        }

        handle.stop().await;
        assert!(!saw_restored, "fresh session must not surface as restored");
        assert!(saw_spawned, "fresh session never surfaced as Spawned");

        let _ = std::fs::remove_dir_all(&base);
    }

    #[tokio::test]
    async fn background_loop_starts_and_stops_cleanly() {
        use crate::lifecycle::tests::unique_temp_dir;
        use crate::session_manager::SessionManager;
        let base = unique_temp_dir("loop");
        let sessions = Arc::new(SessionManager::new(base.clone()));
        sessions.save(&fake_session("s1", "demo")).await.unwrap();

        let lifecycle = Arc::new(
            LifecycleManager::new(
                sessions.clone(),
                Arc::new(MockRuntime::new(true)) as Arc<dyn Runtime>,
                Arc::new(MockAgent::new(ActivityState::Ready)) as Arc<dyn Agent>,
            )
            .with_poll_interval(Duration::from_millis(20)),
        );

        let mut rx = lifecycle.subscribe();
        let handle = lifecycle.spawn();

        let mut saw_status_change = false;
        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
        while tokio::time::Instant::now() < deadline {
            match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
                Ok(Ok(OrchestratorEvent::StatusChanged { .. })) => {
                    saw_status_change = true;
                    break;
                }
                Ok(Ok(_)) => {}
                _ => {}
            }
        }

        handle.stop().await;
        assert!(
            saw_status_change,
            "background loop never emitted StatusChanged"
        );

        let _ = std::fs::remove_dir_all(&base);
    }
}