Skip to main content

ao_core/
session_manager.rs

1//! Disk-backed session store.
2//!
3//! Each session is one yaml file at:
4//!     `<base>/<project_id>/<session_uuid>.yaml`
5//!
6//! Writes are atomic (write to `.tmp`, then rename). Reads scan all project
7//! subdirectories — fine for Slice 1 since N is small (tens of sessions).
8//!
9//! There's intentionally **no in-memory cache**. The disk is the source of
10//! truth, and Slice 1's `ao-rs status` is happy to do a full directory walk
11//! per invocation. Slice 2+ may add caching for the daemon polling loop.
12
13use crate::{
14    error::{AoError, Result},
15    paths,
16    types::{Session, SessionId},
17};
18use std::path::{Path, PathBuf};
19use tokio::fs;
20
21pub struct SessionManager {
22    base_dir: PathBuf,
23}
24
25impl SessionManager {
26    pub fn new(base_dir: PathBuf) -> Self {
27        Self { base_dir }
28    }
29
30    /// Use the default `~/.ao-rs/sessions` location.
31    pub fn with_default() -> Self {
32        Self::new(paths::default_sessions_dir())
33    }
34
35    pub fn base_dir(&self) -> &Path {
36        &self.base_dir
37    }
38
39    fn project_dir(&self, project_id: &str) -> PathBuf {
40        self.base_dir.join(project_id)
41    }
42
43    fn session_path(&self, project_id: &str, id: &SessionId) -> PathBuf {
44        self.project_dir(project_id).join(format!("{}.yaml", id.0))
45    }
46
47    /// Atomically persist a session. Creates parent dirs as needed.
48    pub async fn save(&self, session: &Session) -> Result<()> {
49        let project_dir = self.project_dir(&session.project_id);
50        fs::create_dir_all(&project_dir).await?;
51
52        let target = self.session_path(&session.project_id, &session.id);
53        // Write to a sibling temp file first, then rename — rename is atomic
54        // on the same filesystem so a reader never sees a half-written yaml.
55        let temp = target.with_extension("yaml.tmp");
56
57        let yaml =
58            serde_yaml::to_string(session).map_err(|e| AoError::Yaml(format!("serialize: {e}")))?;
59
60        fs::write(&temp, yaml).await?;
61        fs::rename(&temp, &target).await?;
62        Ok(())
63    }
64
65    /// Read every session across all projects, sorted newest-first.
66    ///
67    /// `.archive/` subdirectories inside each project dir are safe because
68    /// the inner `read_dir` is non-recursive — only direct children of the
69    /// project directory are inspected, and `.archive` (a directory) is
70    /// skipped by the `.yaml` extension filter.
71    pub async fn list(&self) -> Result<Vec<Session>> {
72        let mut result = Vec::new();
73        if !self.base_dir.exists() {
74            return Ok(result);
75        }
76
77        let mut projects = fs::read_dir(&self.base_dir).await?;
78        while let Some(entry) = projects.next_entry().await? {
79            if !entry.file_type().await?.is_dir() {
80                continue;
81            }
82            let mut sessions = fs::read_dir(entry.path()).await?;
83            while let Some(file) = sessions.next_entry().await? {
84                let path = file.path();
85                if path.extension().and_then(|s| s.to_str()) != Some("yaml") {
86                    continue;
87                }
88                match load_file(&path).await {
89                    Ok(session) => result.push(session),
90                    Err(e) => {
91                        // Skip unreadable files instead of failing the whole list.
92                        // A half-written tmp file (extremely rare given atomic writes)
93                        // shouldn't break `ao-rs status`.
94                        tracing::warn!("skipping unreadable session {path:?}: {e}");
95                    }
96                }
97            }
98        }
99        result.sort_by_key(|b| std::cmp::Reverse(b.created_at));
100        Ok(result)
101    }
102
103    /// Same as `list` but filtered to one project.
104    pub async fn list_for_project(&self, project_id: &str) -> Result<Vec<Session>> {
105        let project_dir = self.project_dir(project_id);
106        if !project_dir.exists() {
107            return Ok(Vec::new());
108        }
109        let mut result = Vec::new();
110        let mut sessions = fs::read_dir(&project_dir).await?;
111        while let Some(file) = sessions.next_entry().await? {
112            let path = file.path();
113            if path.extension().and_then(|s| s.to_str()) != Some("yaml") {
114                continue;
115            }
116            match load_file(&path).await {
117                Ok(session) => result.push(session),
118                Err(e) => tracing::warn!("skipping {path:?}: {e}"),
119            }
120        }
121        result.sort_by_key(|b| std::cmp::Reverse(b.created_at));
122        Ok(result)
123    }
124
125    /// Find a session by full uuid or any unambiguous prefix.
126    ///
127    /// `starts_with` semantics — the 8-char short id used by the CLI
128    /// (`ao-rs status`, `ao-rs send <short>`) is a valid lookup key, as is
129    /// the full uuid. Returns `SessionNotFound` on no match and
130    /// `AoError::Runtime` on more than one. Shared by `restore_session`,
131    /// `ao-rs send`, `ao-rs pr`, so the CLI's "resolve a session" idiom
132    /// lives in one place.
133    pub async fn find_by_prefix(&self, id_or_prefix: &str) -> Result<Session> {
134        if id_or_prefix.is_empty() {
135            return Err(AoError::SessionNotFound(String::new()));
136        }
137        let all = self.list().await?;
138        let mut matches = all.into_iter().filter(|s| s.id.0.starts_with(id_or_prefix));
139        let first = matches
140            .next()
141            .ok_or_else(|| AoError::SessionNotFound(id_or_prefix.to_string()))?;
142        if matches.next().is_some() {
143            // We've consumed two (`first` + the one that made this branch
144            // fire); anything still in the iterator is `extra`. Avoids
145            // collecting into a Vec in the common (unique-match) path.
146            let extra = matches.count();
147            return Err(AoError::Runtime(format!(
148                "ambiguous session id \"{id_or_prefix}\": {} matches",
149                2 + extra
150            )));
151        }
152        Ok(first)
153    }
154
155    /// Find all non-terminal sessions with a matching `issue_id`.
156    ///
157    /// Used for duplicate detection before `ao-rs spawn --issue` — if another
158    /// active session is already working on the same issue, the user should
159    /// either wait or use `--force`.
160    pub async fn find_by_issue_id(&self, issue_id: &str) -> Result<Vec<Session>> {
161        let all = self.list().await?;
162        Ok(all
163            .into_iter()
164            .filter(|s| !s.is_terminal() && s.issue_id.as_deref() == Some(issue_id))
165            .collect())
166    }
167
168    /// Remove a session's yaml file. No-op if it doesn't exist.
169    pub async fn delete(&self, project_id: &str, id: &SessionId) -> Result<()> {
170        let path = self.session_path(project_id, id);
171        if path.exists() {
172            fs::remove_file(&path).await?;
173        }
174        Ok(())
175    }
176
177    /// Archive a session: move its YAML from the active directory into
178    /// `sessions/<project>/.archive/<uuid>.yaml`. Archiving removes the
179    /// session from `list()` results while preserving it on disk for
180    /// historical reference. No-op if the source file doesn't exist
181    /// (already archived or never persisted).
182    pub async fn archive(&self, session: &Session) -> Result<()> {
183        let source = self.session_path(&session.project_id, &session.id);
184        let archive_dir = self.project_dir(&session.project_id).join(".archive");
185        fs::create_dir_all(&archive_dir).await?;
186        let target = archive_dir.join(format!("{}.yaml", session.id.0));
187        // Attempt the rename directly — treat NotFound as success (already
188        // archived or never persisted) to avoid a TOCTOU race with concurrent
189        // callers.
190        match fs::rename(&source, &target).await {
191            Ok(()) => Ok(()),
192            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
193            Err(e) => Err(e.into()),
194        }
195    }
196
197    /// List archived sessions for a project, sorted newest-first.
198    pub async fn list_archived(&self, project_id: &str) -> Result<Vec<Session>> {
199        let archive_dir = self.project_dir(project_id).join(".archive");
200        if !archive_dir.exists() {
201            return Ok(Vec::new());
202        }
203        let mut result = Vec::new();
204        let mut entries = fs::read_dir(&archive_dir).await?;
205        while let Some(file) = entries.next_entry().await? {
206            let path = file.path();
207            if path.extension().and_then(|s| s.to_str()) != Some("yaml") {
208                continue;
209            }
210            match load_file(&path).await {
211                Ok(session) => result.push(session),
212                Err(e) => tracing::warn!("skipping archived {path:?}: {e}"),
213            }
214        }
215        result.sort_by_key(|b| std::cmp::Reverse(b.created_at));
216        Ok(result)
217    }
218}
219
220async fn load_file(path: &Path) -> Result<Session> {
221    let bytes = fs::read(path).await?;
222    serde_yaml::from_slice::<Session>(&bytes)
223        .map_err(|e| AoError::Yaml(format!("parse {}: {e}", path.display())))
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::types::{now_ms, SessionStatus};
230    use std::time::{SystemTime, UNIX_EPOCH};
231
232    fn unique_temp_dir(label: &str) -> PathBuf {
233        let nanos = SystemTime::now()
234            .duration_since(UNIX_EPOCH)
235            .unwrap()
236            .as_nanos();
237        std::env::temp_dir().join(format!("ao-rs-sm-{label}-{nanos}"))
238    }
239
240    fn fake_session(id: &str, project: &str, task: &str) -> Session {
241        Session {
242            id: SessionId(id.into()),
243            project_id: project.into(),
244            status: SessionStatus::Spawning,
245            agent: "claude-code".into(),
246            agent_config: None,
247            branch: format!("ao-{id}"),
248            task: task.into(),
249            workspace_path: None,
250            runtime_handle: None,
251            runtime: "tmux".into(),
252            activity: None,
253            created_at: now_ms(),
254            cost: None,
255            issue_id: None,
256            issue_url: None,
257            claimed_pr_number: None,
258            claimed_pr_url: None,
259            initial_prompt_override: None,
260            spawned_by: None,
261            last_merge_conflict_dispatched: None,
262            last_review_backlog_fingerprint: None,
263        }
264    }
265
266    #[tokio::test]
267    async fn save_and_list_roundtrip() {
268        let base = unique_temp_dir("roundtrip");
269        let manager = SessionManager::new(base.clone());
270
271        let s1 = fake_session("uuid-1", "demo", "first task");
272        let s2 = fake_session("uuid-2", "demo", "second task");
273        let s3 = fake_session("uuid-3", "other", "third task");
274
275        manager.save(&s1).await.unwrap();
276        manager.save(&s2).await.unwrap();
277        manager.save(&s3).await.unwrap();
278
279        let all = manager.list().await.unwrap();
280        assert_eq!(all.len(), 3);
281
282        let demo_only = manager.list_for_project("demo").await.unwrap();
283        assert_eq!(demo_only.len(), 2);
284        assert!(demo_only.iter().all(|s| s.project_id == "demo"));
285
286        let _ = std::fs::remove_dir_all(&base);
287    }
288
289    #[tokio::test]
290    async fn list_returns_empty_when_dir_missing() {
291        let manager = SessionManager::new(unique_temp_dir("missing"));
292        assert!(manager.list().await.unwrap().is_empty());
293    }
294
295    #[tokio::test]
296    async fn find_by_issue_id_returns_active_matches_only() {
297        let base = unique_temp_dir("find-issue");
298        let manager = SessionManager::new(base.clone());
299
300        // Active session on issue 42.
301        let mut active = fake_session("uuid-active", "demo", "fix it");
302        active.issue_id = Some("42".into());
303        active.status = SessionStatus::Working;
304        manager.save(&active).await.unwrap();
305
306        // Terminal session on same issue (should not match).
307        let mut killed = fake_session("uuid-killed", "demo", "old attempt");
308        killed.issue_id = Some("42".into());
309        killed.status = SessionStatus::Killed;
310        manager.save(&killed).await.unwrap();
311
312        // Active session on different issue (should not match).
313        let mut other = fake_session("uuid-other", "demo", "other thing");
314        other.issue_id = Some("99".into());
315        other.status = SessionStatus::Working;
316        manager.save(&other).await.unwrap();
317
318        let matches = manager.find_by_issue_id("42").await.unwrap();
319        assert_eq!(matches.len(), 1);
320        assert_eq!(matches[0].id.0, "uuid-active");
321
322        // No match for unknown issue.
323        let empty = manager.find_by_issue_id("999").await.unwrap();
324        assert!(empty.is_empty());
325
326        let _ = std::fs::remove_dir_all(&base);
327    }
328
329    #[tokio::test]
330    async fn delete_removes_file() {
331        let base = unique_temp_dir("delete");
332        let manager = SessionManager::new(base.clone());
333        let s = fake_session("uuid-x", "demo", "doomed");
334        manager.save(&s).await.unwrap();
335        assert_eq!(manager.list().await.unwrap().len(), 1);
336
337        manager.delete("demo", &s.id).await.unwrap();
338        assert_eq!(manager.list().await.unwrap().len(), 0);
339
340        let _ = std::fs::remove_dir_all(&base);
341    }
342
343    #[tokio::test]
344    async fn find_by_prefix_resolves_unique_short_id() {
345        let base = unique_temp_dir("find-unique");
346        let manager = SessionManager::new(base.clone());
347        manager
348            .save(&fake_session("deadbeef-aaaa-bbbb", "demo", "only one"))
349            .await
350            .unwrap();
351
352        let hit = manager.find_by_prefix("deadbeef").await.unwrap();
353        assert_eq!(hit.id.0, "deadbeef-aaaa-bbbb");
354
355        // Full uuid also works via starts_with.
356        let hit_full = manager.find_by_prefix("deadbeef-aaaa-bbbb").await.unwrap();
357        assert_eq!(hit_full.id.0, "deadbeef-aaaa-bbbb");
358
359        let _ = std::fs::remove_dir_all(&base);
360    }
361
362    #[tokio::test]
363    async fn find_by_prefix_unknown_returns_session_not_found() {
364        let base = unique_temp_dir("find-missing");
365        let manager = SessionManager::new(base.clone());
366        let err = manager.find_by_prefix("no-such-session").await.unwrap_err();
367        assert!(
368            matches!(err, AoError::SessionNotFound(ref s) if s == "no-such-session"),
369            "unexpected error: {err:?}"
370        );
371        let _ = std::fs::remove_dir_all(&base);
372    }
373
374    #[tokio::test]
375    async fn find_by_prefix_empty_string_is_session_not_found() {
376        // Empty prefix would otherwise match every session via `starts_with`,
377        // so the CLI would surface the *ambiguous* branch and the message
378        // would talk about N matches instead of "did you forget the id?".
379        // Short-circuit explicitly.
380        let base = unique_temp_dir("find-empty");
381        let manager = SessionManager::new(base.clone());
382        manager
383            .save(&fake_session("anything", "demo", "task"))
384            .await
385            .unwrap();
386        let err = manager.find_by_prefix("").await.unwrap_err();
387        assert!(matches!(err, AoError::SessionNotFound(_)));
388        let _ = std::fs::remove_dir_all(&base);
389    }
390
391    #[tokio::test]
392    async fn find_by_prefix_ambiguous_lists_match_count() {
393        let base = unique_temp_dir("find-ambig");
394        let manager = SessionManager::new(base.clone());
395        manager
396            .save(&fake_session("abc-111", "demo", "one"))
397            .await
398            .unwrap();
399        manager
400            .save(&fake_session("abc-222", "demo", "two"))
401            .await
402            .unwrap();
403        manager
404            .save(&fake_session("abc-333", "demo", "three"))
405            .await
406            .unwrap();
407
408        let err = manager.find_by_prefix("abc").await.unwrap_err();
409        let msg = format!("{err}");
410        assert!(msg.contains("ambiguous"), "got: {msg}");
411        assert!(msg.contains("3 matches"), "got: {msg}");
412    }
413
414    #[tokio::test]
415    async fn archive_moves_yaml_to_dot_archive_dir() {
416        let base = unique_temp_dir("archive");
417        let manager = SessionManager::new(base.clone());
418        let s = fake_session("uuid-arc", "demo", "archivable");
419        manager.save(&s).await.unwrap();
420        assert_eq!(manager.list().await.unwrap().len(), 1);
421
422        manager.archive(&s).await.unwrap();
423
424        // No longer in active list.
425        assert_eq!(manager.list().await.unwrap().len(), 0);
426        // Present in archived list.
427        let archived = manager.list_archived("demo").await.unwrap();
428        assert_eq!(archived.len(), 1);
429        assert_eq!(archived[0].id.0, "uuid-arc");
430
431        let _ = std::fs::remove_dir_all(&base);
432    }
433
434    #[tokio::test]
435    async fn archive_is_noop_when_source_missing() {
436        let base = unique_temp_dir("archive-noop");
437        let manager = SessionManager::new(base.clone());
438        let s = fake_session("uuid-gone", "demo", "already gone");
439        // Don't save — source doesn't exist on disk.
440        manager.archive(&s).await.unwrap(); // should not error
441        let archived = manager.list_archived("demo").await.unwrap();
442        assert!(archived.is_empty());
443        let _ = std::fs::remove_dir_all(&base);
444    }
445
446    #[tokio::test]
447    async fn list_archived_returns_empty_when_no_archive() {
448        let base = unique_temp_dir("archive-empty");
449        let manager = SessionManager::new(base.clone());
450        let archived = manager.list_archived("nonexistent").await.unwrap();
451        assert!(archived.is_empty());
452        let _ = std::fs::remove_dir_all(&base);
453    }
454
455    #[tokio::test]
456    async fn list_sorts_newest_first() {
457        let base = unique_temp_dir("sort");
458        let manager = SessionManager::new(base.clone());
459
460        let mut a = fake_session("a", "demo", "older");
461        a.created_at = 1000;
462        let mut b = fake_session("b", "demo", "newest");
463        b.created_at = 3000;
464        let mut c = fake_session("c", "demo", "middle");
465        c.created_at = 2000;
466
467        manager.save(&a).await.unwrap();
468        manager.save(&b).await.unwrap();
469        manager.save(&c).await.unwrap();
470
471        let all = manager.list().await.unwrap();
472        assert_eq!(all[0].id.0, "b");
473        assert_eq!(all[1].id.0, "c");
474        assert_eq!(all[2].id.0, "a");
475
476        let _ = std::fs::remove_dir_all(&base);
477    }
478
479    #[tokio::test]
480    async fn list_skips_corrupted_yaml_among_many() {
481        let base = unique_temp_dir("corrupt");
482        let manager = SessionManager::new(base.clone());
483
484        // One valid session.
485        let ok = fake_session("uuid-ok", "demo", "good");
486        manager.save(&ok).await.unwrap();
487
488        // One corrupted YAML file in the same project dir.
489        let project_dir = base.join("demo");
490        std::fs::create_dir_all(&project_dir).unwrap();
491        let bad_path = project_dir.join("uuid-bad.yaml");
492        std::fs::write(&bad_path, "this: is: not: valid: yaml: [").unwrap();
493
494        let all = manager.list().await.unwrap();
495        assert_eq!(all.len(), 1, "expected only the valid session to load");
496        assert_eq!(all[0].id.0, "uuid-ok");
497
498        let _ = std::fs::remove_dir_all(&base);
499    }
500}