Skip to main content

capo_agent/session/
lookup.rs

1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3//! `--continue` and `--session <prefix-or-path>` resolution.
4
5use std::path::PathBuf;
6
7use motosan_agent_loop::{FileSessionStore, SessionStore};
8
9use crate::error::{AppError, Result};
10use crate::session::{paths::SessionPaths, SessionId};
11
12/// Stateless namespace for session resolution.
13pub struct SessionLookup;
14
15impl SessionLookup {
16    /// `--continue / -c` — most recent session in `paths.bucket_dir`,
17    /// ranked by `motosan SessionMeta::updated_at_ms` descending.
18    ///
19    /// Errors if the bucket is empty or doesn't exist.
20    pub async fn most_recent(paths: &SessionPaths) -> Result<SessionId> {
21        if !paths.bucket_dir.exists() {
22            return Err(AppError::Config(format!(
23                "no sessions yet for this directory (looked in {})",
24                paths.bucket_dir.display()
25            )));
26        }
27        let store = FileSessionStore::new(paths.bucket_dir.clone());
28        let metas = store
29            .list_meta()
30            .await
31            .map_err(|err| AppError::Config(format!("list_meta failed: {err}")))?;
32        // `FileSessionStore::list_meta` already sorts by updated_at_ms DESC.
33        let first = metas
34            .into_iter()
35            .next()
36            .ok_or_else(|| AppError::Config("no sessions found".into()))?;
37        Ok(SessionId::from_string(first.session_id))
38    }
39
40    /// `--session <prefix-or-path>` — disambiguate:
41    /// - Starts with `/`, `~`, or `.` → treat as a path to a `.jsonl` file.
42    /// - Else → ULID prefix. Looks in `paths.bucket_dir` first; if no
43    ///   match there, scans all sibling buckets under `paths.sessions_root()`.
44    ///   Errors on 0 or >1 matches.
45    pub async fn resolve(paths: &SessionPaths, prefix_or_path: &str) -> Result<SessionId> {
46        if prefix_or_path.starts_with('/')
47            || prefix_or_path.starts_with('~')
48            || prefix_or_path.starts_with('.')
49        {
50            return resolve_path(prefix_or_path);
51        }
52        resolve_prefix(paths, prefix_or_path).await
53    }
54}
55
56fn resolve_path(input: &str) -> Result<SessionId> {
57    let expanded = expand_tilde(input);
58    if !expanded.exists() {
59        return Err(AppError::Config(format!(
60            "session file not found: {}",
61            expanded.display()
62        )));
63    }
64    let stem = expanded
65        .file_stem()
66        .and_then(|s| s.to_str())
67        .ok_or_else(|| {
68            AppError::Config(format!("session path has no stem: {}", expanded.display()))
69        })?;
70    Ok(SessionId::from_string(stem.to_string()))
71}
72
73fn expand_tilde(input: &str) -> PathBuf {
74    if let Some(rest) = input.strip_prefix("~/") {
75        if let Ok(home) = std::env::var("HOME") {
76            return PathBuf::from(home).join(rest);
77        }
78    }
79    PathBuf::from(input)
80}
81
82async fn resolve_prefix(paths: &SessionPaths, prefix: &str) -> Result<SessionId> {
83    // First pass: current bucket only.
84    if paths.bucket_dir.exists() {
85        let store = FileSessionStore::new(paths.bucket_dir.clone());
86        let ids = store
87            .list()
88            .await
89            .map_err(|err| AppError::Config(format!("list failed: {err}")))?;
90        let matches: Vec<String> = ids
91            .into_iter()
92            .filter(|id| id.starts_with(prefix))
93            .collect();
94        match matches.len() {
95            0 => {} // fall through to all-buckets scan
96            1 => {
97                let only = matches
98                    .into_iter()
99                    .next()
100                    .ok_or_else(|| AppError::Config("internal: vec drained".into()))?;
101                return Ok(SessionId::from_string(only));
102            }
103            n => {
104                return Err(AppError::Config(format!(
105                    "session prefix {prefix:?} matches {n} sessions in this directory"
106                )));
107            }
108        }
109    }
110
111    // Second pass: scan every bucket under sessions/.
112    let sessions_root = paths.sessions_root();
113    if !sessions_root.exists() {
114        return Err(AppError::Config(format!(
115            "session prefix {prefix:?}: no sessions exist (root {} missing)",
116            sessions_root.display()
117        )));
118    }
119
120    let mut all_matches: Vec<String> = Vec::new();
121    let iter = std::fs::read_dir(&sessions_root).map_err(|err| {
122        AppError::Config(format!(
123            "failed to read sessions root {}: {err}",
124            sessions_root.display()
125        ))
126    })?;
127    for entry in iter {
128        let entry = entry.map_err(|err| AppError::Config(format!("readdir error: {err}")))?;
129        let path = entry.path();
130        if !path.is_dir() {
131            continue;
132        }
133        let store = FileSessionStore::new(path);
134        let ids = store
135            .list()
136            .await
137            .map_err(|err| AppError::Config(format!("list failed: {err}")))?;
138        for id in ids {
139            if id.starts_with(prefix) {
140                all_matches.push(id);
141            }
142        }
143    }
144
145    match all_matches.len() {
146        0 => Err(AppError::Config(format!(
147            "session prefix {prefix:?} matches no sessions"
148        ))),
149        1 => {
150            let only = all_matches
151                .into_iter()
152                .next()
153                .ok_or_else(|| AppError::Config("internal: vec drained".into()))?;
154            Ok(SessionId::from_string(only))
155        }
156        n => Err(AppError::Config(format!(
157            "session prefix {prefix:?} matches {n} sessions across all buckets"
158        ))),
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use motosan_agent_loop::{Message, SessionEntry};
166    use tempfile::tempdir;
167
168    async fn seed_session(paths: &SessionPaths, id: &str) {
169        paths.ensure_bucket().unwrap();
170        let store = FileSessionStore::new(paths.bucket_dir.clone());
171        store
172            .append_entry(id, &SessionEntry::message(Message::user("hi")))
173            .await
174            .unwrap();
175        store.flush(id).await.unwrap();
176    }
177
178    #[tokio::test]
179    async fn most_recent_returns_latest_by_updated_at() {
180        let tmp = tempdir().unwrap();
181        let agent_dir = tmp.path().to_path_buf();
182        let cwd = PathBuf::from("/tmp/x");
183        let paths = SessionPaths::for_cwd(agent_dir, &cwd);
184
185        seed_session(&paths, "01OLD000000000000000000000").await;
186        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
187        seed_session(&paths, "01NEW000000000000000000000").await;
188
189        let most_recent = SessionLookup::most_recent(&paths).await.unwrap();
190        assert_eq!(most_recent.as_str(), "01NEW000000000000000000000");
191    }
192
193    #[tokio::test]
194    async fn most_recent_errors_on_empty_bucket() {
195        let tmp = tempdir().unwrap();
196        let paths = SessionPaths::for_cwd(tmp.path().to_path_buf(), &PathBuf::from("/x"));
197        let err = SessionLookup::most_recent(&paths).await.unwrap_err();
198        assert!(format!("{err}").contains("no sessions"));
199    }
200
201    #[tokio::test]
202    async fn resolve_prefix_unique_match_in_bucket_succeeds() {
203        let tmp = tempdir().unwrap();
204        let paths = SessionPaths::for_cwd(tmp.path().to_path_buf(), &PathBuf::from("/x"));
205        seed_session(&paths, "01ABCDEFGHIJKLMNOPQRSTUVWX").await;
206
207        let id = SessionLookup::resolve(&paths, "01ABC").await.unwrap();
208        assert_eq!(id.as_str(), "01ABCDEFGHIJKLMNOPQRSTUVWX");
209    }
210
211    #[tokio::test]
212    async fn resolve_prefix_ambiguous_match_errors() {
213        let tmp = tempdir().unwrap();
214        let paths = SessionPaths::for_cwd(tmp.path().to_path_buf(), &PathBuf::from("/x"));
215        seed_session(&paths, "01ABCD0000000000000000000A").await;
216        seed_session(&paths, "01ABCD0000000000000000000B").await;
217
218        let err = SessionLookup::resolve(&paths, "01ABCD").await.unwrap_err();
219        assert!(format!("{err}").contains("matches 2 sessions"));
220    }
221
222    #[tokio::test]
223    async fn resolve_path_starting_with_slash_reads_from_filesystem() {
224        let tmp = tempdir().unwrap();
225        let agent_dir = tmp.path().to_path_buf();
226        let paths = SessionPaths::for_cwd(agent_dir, &PathBuf::from("/x"));
227        seed_session(&paths, "01XYZWVUTSRQPONMLKJIHGFEDC").await;
228
229        let file = paths.bucket_dir.join("01XYZWVUTSRQPONMLKJIHGFEDC.jsonl");
230        assert!(file.exists());
231        let id = SessionLookup::resolve(&paths, file.to_str().unwrap())
232            .await
233            .unwrap();
234        assert_eq!(id.as_str(), "01XYZWVUTSRQPONMLKJIHGFEDC");
235    }
236}