Skip to main content

codetether_agent/session/
persistence.rs

1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14    /// Load an existing session by its UUID.
15    ///
16    /// # Errors
17    ///
18    /// Returns an error if the session file does not exist or the JSON is
19    /// malformed.
20    pub async fn load(id: &str) -> Result<Self> {
21        let path = Self::session_path(id)?;
22        let content = fs::read_to_string(&path).await?;
23        let session: Session = serde_json::from_str(&content)?;
24        Ok(session)
25    }
26
27    /// Load the most recent session, optionally scoped to a workspace
28    /// directory.
29    ///
30    /// When `workspace` is [`Some`], only considers sessions created in that
31    /// directory. When [`None`], returns the most recent session globally.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if no sessions exist (or, with `workspace` set,
36    /// none match the requested directory).
37    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
38        Self::last_for_directory_tail(workspace, usize::MAX)
39            .await
40            .map(|t| t.session)
41    }
42
43    /// Like [`Self::last_for_directory`] but keeps only the last `window`
44    /// messages and tool uses in memory, returning a [`TailLoad`] with the
45    /// number of entries that were dropped. Use this when resuming very
46    /// large sessions where the full transcript would exhaust memory.
47    ///
48    /// Implementation: the entire scan runs on a single blocking thread.
49    /// For each candidate (newest-mtime first) we do a cheap
50    /// [`SessionHeader`] parse to compare `metadata.directory`; only the
51    /// matching file pays for a full tail parse.
52    pub async fn last_for_directory_tail(
53        workspace: Option<&std::path::Path>,
54        window: usize,
55    ) -> Result<TailLoad> {
56        let sessions_dir = Self::sessions_dir()?;
57        let canonical_workspace =
58            workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
59        tokio::task::spawn_blocking(move || {
60            scan_with_index(&sessions_dir, canonical_workspace, window)
61        })
62            .await
63            .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
64    }
65
66    /// Load the most recent session globally (unscoped).
67    ///
68    /// Kept for legacy compatibility; prefer
69    /// [`Session::last_for_directory`].
70    pub async fn last() -> Result<Self> {
71        Self::last_for_directory(None).await
72    }
73
74    /// Persist the session to disk as JSON. Creates the sessions directory
75    /// on demand.
76    ///
77    /// Performance:
78    /// - Serialization runs on the blocking thread pool so a large session
79    ///   doesn't stall the async reactor during `serde_json` formatting.
80    /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
81    ///   serialize, ~30–40% smaller on disk, and correspondingly faster
82    ///   to load and mmap-scan on resume. Session files are machine-owned
83    ///   — humans never hand-edit them — so indentation is pure overhead.
84    /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
85    ///   previous session intact, and the mmap prefilter in
86    ///   [`file_contains_finder`] cannot observe a torn buffer.
87    pub async fn save(&self) -> Result<()> {
88        let path = Self::session_path(&self.id)?;
89        if let Some(parent) = path.parent() {
90            fs::create_dir_all(parent).await?;
91        }
92        let tmp = path.with_extension("json.tmp");
93        // Clone-and-serialize off the reactor. The clone is a Vec-copy
94        // (~memcpy speed) which is always cheaper than the JSON
95        // formatting we are about to avoid blocking the reactor on.
96        let snapshot = self.clone();
97        let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
98            .await
99            .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
100        fs::write(&tmp, content).await?;
101        // Atomic swap. On POSIX `rename` is atomic over existing files;
102        // on Windows we fall back to remove-then-rename.
103        if let Err(primary) = fs::rename(&tmp, &path).await {
104            let _ = fs::remove_file(&path).await;
105            if let Err(retry) = fs::rename(&tmp, &path).await {
106                let _ = fs::remove_file(&tmp).await;
107                return Err(anyhow::anyhow!(
108                    "session rename failed: {primary} (retry: {retry})"
109                ));
110            }
111        }
112        // Update the workspace index so the next resume is O(1). Best
113        // effort — a failed index write must not fail the session save.
114        if let Some(dir) = &self.metadata.directory {
115            let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
116            let session_id = self.id.clone();
117            tokio::task::spawn_blocking(move || {
118                if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
119                    tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
120                }
121            });
122        }
123        Ok(())
124    }
125
126    /// Delete a session file by ID. No-op if the file does not exist.
127    pub async fn delete(id: &str) -> Result<()> {
128        let path = Self::session_path(id)?;
129        if path.exists() {
130            tokio::fs::remove_file(&path).await?;
131        }
132        Ok(())
133    }
134
135    /// Resolve the sessions directory (`<data_dir>/sessions`).
136    ///
137    /// Cached after first resolution: `Config::data_dir()` does env-var
138    /// lookups and filesystem checks which are cheap individually but add
139    /// up across hundreds of save/load calls over a session's lifetime.
140    pub(crate) fn sessions_dir() -> Result<PathBuf> {
141        use std::sync::OnceLock;
142        static CACHED: OnceLock<PathBuf> = OnceLock::new();
143        if let Some(dir) = CACHED.get() {
144            return Ok(dir.clone());
145        }
146        let dir = crate::config::Config::data_dir()
147            .map(|d| d.join("sessions"))
148            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
149        // Best-effort set; if another thread won the race we just use
150        // theirs.
151        let _ = CACHED.set(dir.clone());
152        Ok(dir)
153    }
154
155    /// Resolve the on-disk path for a session file.
156    pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
157        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
158    }
159}
160
161/// Index-first scan. Tries the O(1) workspace index; on miss or stale
162/// entry, falls back to the byte-prefiltered directory scan and repairs
163/// the index with the winner so the next launch is O(1).
164fn scan_with_index(
165    sessions_dir: &Path,
166    canonical_workspace: Option<PathBuf>,
167    window: usize,
168) -> Result<TailLoad> {
169    // Fast path: check the sidecar index.
170    if let Some(ws) = canonical_workspace.as_ref() {
171        let index = super::workspace_index::WorkspaceIndex::load_sync();
172        if let Some(id) = index.get(ws) {
173            let candidate = sessions_dir.join(format!("{id}.json"));
174            if candidate.exists()
175                && let Ok(load) = tail_load_sync(&candidate, window)
176            {
177                // Confirm it still belongs to this workspace — the user
178                // could have edited the session's metadata.directory
179                // manually or moved a file.
180                let dir_ok = load
181                    .session
182                    .metadata
183                    .directory
184                    .as_ref()
185                    .map(|d| {
186                        let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
187                        &canonical == ws
188                    })
189                    .unwrap_or(false);
190                if dir_ok {
191                    return Ok(load);
192                }
193            }
194        }
195    }
196
197    // Slow path: scan everything.
198    let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
199
200    // Repair the index with whatever we found, so next time is O(1).
201    if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
202        let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
203    }
204
205    result
206}
207
208/// Tail-load a specific path synchronously (used by the index fast path).
209fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
210    use std::fs;
211    use std::io::BufReader;
212    let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
213    let file = fs::File::open(path)?;
214    let reader = BufReader::with_capacity(64 * 1024, file);
215    let (parsed, dropped) = with_tail_cap(window, || {
216        serde_json::from_reader::<_, Session>(reader)
217    });
218    Ok(TailLoad {
219        session: parsed?,
220        dropped,
221        file_bytes,
222    })
223}
224
225/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
226///
227/// Flow:
228/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
229/// 2. Sort newest-first.
230/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
231///    `Vec<Message>` allocation. Because `metadata` is serialized before
232///    `messages`/`tool_uses` in new files, this is O(header bytes); older
233///    files still work but pay a lex-through cost.
234/// 4. On workspace match, re-open and do one tail-capped full parse.
235fn scan_sync(
236    sessions_dir: &Path,
237    canonical_workspace: Option<PathBuf>,
238    window: usize,
239) -> Result<TailLoad> {
240    use std::fs;
241    use std::io::BufReader;
242    use std::time::SystemTime;
243
244    if !sessions_dir.exists() {
245        anyhow::bail!("No sessions found");
246    }
247
248    let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
249    for entry in fs::read_dir(sessions_dir)? {
250        let entry = match entry {
251            Ok(e) => e,
252            Err(_) => continue,
253        };
254        let path = entry.path();
255        if path.extension().and_then(|s| s.to_str()) != Some("json") {
256            continue;
257        }
258        let mtime = entry
259            .metadata()
260            .ok()
261            .and_then(|m| m.modified().ok())
262            .unwrap_or(SystemTime::UNIX_EPOCH);
263        candidates.push((path, mtime));
264    }
265    if candidates.is_empty() {
266        anyhow::bail!("No sessions found");
267    }
268    candidates.sort_by(|a, b| b.1.cmp(&a.1));
269
270    // Precompute a cheap byte-level needle for the workspace path. JSON
271    // serializes `metadata.directory` as `"directory":"<path>"`, so if
272    // the raw file bytes don't contain the path as a substring, we can
273    // skip it without invoking serde_json at all.
274    //
275    // This is the single biggest win for large workspaces: a 10 MB
276    // session that is *not* for this cwd gets ruled out in a few ms of
277    // byte scanning instead of a full JSON lex.
278    let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
279        // JSON-escape the path (backslashes on Windows become `\\`,
280        // quotes become `\"`). `serde_json::to_string` handles all the
281        // edge cases for us.
282        let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
283        // Strip the surrounding quotes; we want the inner bytes so we
284        // match whether the JSON has `"directory":"..."` or any other
285        // surrounding context.
286        let inner = quoted
287            .strip_prefix('"')
288            .and_then(|s| s.strip_suffix('"'))
289            .unwrap_or(&quoted);
290        inner.as_bytes().to_vec()
291    });
292    // Build the SIMD-accelerated substring finder once for the whole
293    // scan loop; reusing it across files is much faster than rebuilding.
294    let finder = needle
295        .as_ref()
296        .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
297
298    // Parallel byte-prefilter: for each candidate, compute whether the
299    // workspace path bytes appear in the file. This is the expensive
300    // O(file_bytes) step. Running it concurrently across CPU cores
301    // turns "sum of all file scan times" into ~"longest single file
302    // scan time" on a machine with multiple cores.
303    //
304    // We preserve mtime ordering by collecting results into a parallel
305    // Vec<bool> indexed the same as `candidates`, then iterating
306    // serially to find the first hit.
307    let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
308        (None, _) => vec![true; candidates.len()],
309        (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
310        (Some(finder), _) => {
311            let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
312            let results: std::sync::Mutex<Vec<Option<bool>>> =
313                std::sync::Mutex::new(vec![None; paths.len()]);
314            std::thread::scope(|scope| {
315                // Chunk candidates across available CPUs. For ~300 files
316                // a fan-out of 4-8 threads saturates I/O and CPU nicely
317                // without oversubscribing.
318                let threads =
319                    std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4).min(8);
320                let chunk_size = paths.len().div_ceil(threads);
321                for chunk_idx in 0..threads {
322                    let start = chunk_idx * chunk_size;
323                    if start >= paths.len() {
324                        break;
325                    }
326                    let end = (start + chunk_size).min(paths.len());
327                    let chunk_paths = &paths[start..end];
328                    let results = &results;
329                    scope.spawn(move || {
330                        for (offset, p) in chunk_paths.iter().enumerate() {
331                            let hit = file_contains_finder(p, finder).unwrap_or(false);
332                            // Lock-per-entry is fine: contention is
333                            // negligible vs. the ~ms file scan cost.
334                            if let Ok(mut guard) = results.lock() {
335                                guard[start + offset] = Some(hit);
336                            }
337                        }
338                    });
339                }
340            });
341            results
342                .into_inner()
343                .unwrap_or_default()
344                .into_iter()
345                .map(|o| o.unwrap_or(false))
346                .collect()
347        }
348    };
349
350    for (idx, (path, _)) in candidates.iter().enumerate() {
351        // Fast path: byte-level substring prefilter (precomputed in parallel).
352        if !prefilter_hits.get(idx).copied().unwrap_or(false) {
353            continue;
354        }
355
356        // Slower path: full JSON header parse to confirm the match
357        // (the substring test has false positives: the path could
358        // appear in a chat message or tool output).
359        let header_ok = (|| -> Result<bool> {
360            let file = fs::File::open(path)?;
361            let reader = BufReader::with_capacity(16 * 1024, file);
362            let header: SessionHeader = match serde_json::from_reader(reader) {
363                Ok(h) => h,
364                Err(_) => return Ok(false),
365            };
366            if let Some(ref ws) = canonical_workspace {
367                let Some(dir) = header.metadata.directory.as_ref() else {
368                    return Ok(false);
369                };
370                if dir == ws {
371                    return Ok(true);
372                }
373                let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
374                Ok(&canonical_dir == ws)
375            } else {
376                Ok(true)
377            }
378        })();
379
380        match header_ok {
381            Ok(true) => {}
382            Ok(false) => continue,
383            Err(err) => {
384                tracing::warn!(
385                    path = %path.display(),
386                    error = %err,
387                    "skipping unreadable session file",
388                );
389                continue;
390            }
391        }
392
393        // Match found — do the tail-capped full parse.
394        let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
395        let file = fs::File::open(path)?;
396        let reader = BufReader::with_capacity(64 * 1024, file);
397        let (parsed, dropped) =
398            with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
399        return Ok(TailLoad {
400            session: parsed?,
401            dropped,
402            file_bytes,
403        });
404    }
405
406    anyhow::bail!("No sessions found")
407}
408
409/// Memory-map `path` and return `true` if the SIMD finder matches.
410///
411/// mmap avoids the per-chunk `read` syscall overhead and the
412/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
413/// straight over the OS-provided virtual memory window, and the kernel
414/// pages in only what's actually touched. On a ~10 MB session file
415/// this is measurably faster than chunked `BufRead` + SIMD.
416///
417/// Falls back to returning `false` on any I/O error — the caller's
418/// mtime loop will simply try the next candidate.
419fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
420    use std::fs;
421
422    let needle_len = finder.needle().len();
423    if needle_len == 0 {
424        return Ok(true);
425    }
426    let file = fs::File::open(path)?;
427    let meta = file.metadata()?;
428    let len = meta.len();
429    if (len as usize) < needle_len {
430        return Ok(false);
431    }
432    // SAFETY: We only read the mapping, we do not mutate it. The
433    // kernel COW-protects the mapping and the `Mmap` owns the lifetime
434    // tied to `file`, which we keep alive for the duration of `find`.
435    // Concurrent external modification of the file could theoretically
436    // tear the mapping, but session files are only ever rewritten
437    // wholesale via atomic rename — never in-place truncated.
438    let mmap = unsafe { memmap2::Mmap::map(&file)? };
439    Ok(finder.find(&mmap[..]).is_some())
440}