codetether_agent/session/persistence.rs
1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14 /// Load an existing session by its UUID.
15 ///
16 /// # Errors
17 ///
18 /// Returns an error if the session file does not exist or the JSON is
19 /// malformed.
20 pub async fn load(id: &str) -> Result<Self> {
21 let path = Self::session_path(id)?;
22 let content = fs::read_to_string(&path).await?;
23 let session: Session = serde_json::from_str(&content)?;
24 Ok(session)
25 }
26
27 /// Load the most recent session, optionally scoped to a workspace
28 /// directory.
29 ///
30 /// When `workspace` is [`Some`], only considers sessions created in that
31 /// directory. When [`None`], returns the most recent session globally.
32 ///
33 /// # Errors
34 ///
35 /// Returns an error if no sessions exist (or, with `workspace` set,
36 /// none match the requested directory).
37 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
38 Self::last_for_directory_tail(workspace, usize::MAX)
39 .await
40 .map(|t| t.session)
41 }
42
43 /// Like [`Self::last_for_directory`] but keeps only the last `window`
44 /// messages and tool uses in memory, returning a [`TailLoad`] with the
45 /// number of entries that were dropped. Use this when resuming very
46 /// large sessions where the full transcript would exhaust memory.
47 ///
48 /// Implementation: the entire scan runs on a single blocking thread.
49 /// For each candidate (newest-mtime first) we do a cheap
50 /// [`SessionHeader`] parse to compare `metadata.directory`; only the
51 /// matching file pays for a full tail parse.
52 pub async fn last_for_directory_tail(
53 workspace: Option<&std::path::Path>,
54 window: usize,
55 ) -> Result<TailLoad> {
56 let sessions_dir = Self::sessions_dir()?;
57 let canonical_workspace =
58 workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
59 tokio::task::spawn_blocking(move || {
60 scan_with_index(&sessions_dir, canonical_workspace, window)
61 })
62 .await
63 .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
64 }
65
66 /// Load the most recent session globally (unscoped).
67 ///
68 /// Kept for legacy compatibility; prefer
69 /// [`Session::last_for_directory`].
70 pub async fn last() -> Result<Self> {
71 Self::last_for_directory(None).await
72 }
73
74 /// Persist the session to disk as JSON. Creates the sessions directory
75 /// on demand.
76 ///
77 /// Performance:
78 /// - Serialization runs on the blocking thread pool so a large session
79 /// doesn't stall the async reactor during `serde_json` formatting.
80 /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
81 /// serialize, ~30–40% smaller on disk, and correspondingly faster
82 /// to load and mmap-scan on resume. Session files are machine-owned
83 /// — humans never hand-edit them — so indentation is pure overhead.
84 /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
85 /// previous session intact, and the mmap prefilter in
86 /// [`file_contains_finder`] cannot observe a torn buffer.
87 pub async fn save(&self) -> Result<()> {
88 let path = Self::session_path(&self.id)?;
89 if let Some(parent) = path.parent() {
90 fs::create_dir_all(parent).await?;
91 }
92 let tmp = path.with_extension("json.tmp");
93 // Clone-and-serialize off the reactor. The clone is a Vec-copy
94 // (~memcpy speed) which is always cheaper than the JSON
95 // formatting we are about to avoid blocking the reactor on.
96 let snapshot = self.clone();
97 let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
98 .await
99 .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
100 fs::write(&tmp, content).await?;
101 // Atomic swap. On POSIX `rename` is atomic over existing files;
102 // on Windows we fall back to remove-then-rename.
103 if let Err(primary) = fs::rename(&tmp, &path).await {
104 let _ = fs::remove_file(&path).await;
105 if let Err(retry) = fs::rename(&tmp, &path).await {
106 let _ = fs::remove_file(&tmp).await;
107 return Err(anyhow::anyhow!(
108 "session rename failed: {primary} (retry: {retry})"
109 ));
110 }
111 }
112 // Update the workspace index so the next resume is O(1). Best
113 // effort — a failed index write must not fail the session save.
114 if let Some(dir) = &self.metadata.directory {
115 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
116 let session_id = self.id.clone();
117 tokio::task::spawn_blocking(move || {
118 if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
119 tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
120 }
121 });
122 }
123 Ok(())
124 }
125
126 /// Delete a session file by ID. No-op if the file does not exist.
127 pub async fn delete(id: &str) -> Result<()> {
128 let path = Self::session_path(id)?;
129 if path.exists() {
130 tokio::fs::remove_file(&path).await?;
131 }
132 Ok(())
133 }
134
135 /// Resolve the sessions directory (`<data_dir>/sessions`).
136 ///
137 /// Cached after first resolution: `Config::data_dir()` does env-var
138 /// lookups and filesystem checks which are cheap individually but add
139 /// up across hundreds of save/load calls over a session's lifetime.
140 pub(crate) fn sessions_dir() -> Result<PathBuf> {
141 use std::sync::OnceLock;
142 static CACHED: OnceLock<PathBuf> = OnceLock::new();
143 if let Some(dir) = CACHED.get() {
144 return Ok(dir.clone());
145 }
146 let dir = crate::config::Config::data_dir()
147 .map(|d| d.join("sessions"))
148 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
149 // Best-effort set; if another thread won the race we just use
150 // theirs.
151 let _ = CACHED.set(dir.clone());
152 Ok(dir)
153 }
154
155 /// Resolve the on-disk path for a session file.
156 pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
157 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
158 }
159}
160
161/// Index-first scan. Tries the O(1) workspace index; on miss or stale
162/// entry, falls back to the byte-prefiltered directory scan and repairs
163/// the index with the winner so the next launch is O(1).
164fn scan_with_index(
165 sessions_dir: &Path,
166 canonical_workspace: Option<PathBuf>,
167 window: usize,
168) -> Result<TailLoad> {
169 // Fast path: check the sidecar index.
170 if let Some(ws) = canonical_workspace.as_ref() {
171 let index = super::workspace_index::WorkspaceIndex::load_sync();
172 if let Some(id) = index.get(ws) {
173 let candidate = sessions_dir.join(format!("{id}.json"));
174 if candidate.exists()
175 && let Ok(load) = tail_load_sync(&candidate, window)
176 {
177 // Confirm it still belongs to this workspace — the user
178 // could have edited the session's metadata.directory
179 // manually or moved a file.
180 let dir_ok = load
181 .session
182 .metadata
183 .directory
184 .as_ref()
185 .map(|d| {
186 let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
187 &canonical == ws
188 })
189 .unwrap_or(false);
190 if dir_ok {
191 return Ok(load);
192 }
193 }
194 }
195 }
196
197 // Slow path: scan everything.
198 let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
199
200 // Repair the index with whatever we found, so next time is O(1).
201 if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
202 let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
203 }
204
205 result
206}
207
208/// Tail-load a specific path synchronously (used by the index fast path).
209fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
210 use std::fs;
211 use std::io::BufReader;
212 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
213 let file = fs::File::open(path)?;
214 let reader = BufReader::with_capacity(64 * 1024, file);
215 let (parsed, dropped) = with_tail_cap(window, || {
216 serde_json::from_reader::<_, Session>(reader)
217 });
218 Ok(TailLoad {
219 session: parsed?,
220 dropped,
221 file_bytes,
222 })
223}
224
225/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
226///
227/// Flow:
228/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
229/// 2. Sort newest-first.
230/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
231/// `Vec<Message>` allocation. Because `metadata` is serialized before
232/// `messages`/`tool_uses` in new files, this is O(header bytes); older
233/// files still work but pay a lex-through cost.
234/// 4. On workspace match, re-open and do one tail-capped full parse.
235fn scan_sync(
236 sessions_dir: &Path,
237 canonical_workspace: Option<PathBuf>,
238 window: usize,
239) -> Result<TailLoad> {
240 use std::fs;
241 use std::io::BufReader;
242 use std::time::SystemTime;
243
244 if !sessions_dir.exists() {
245 anyhow::bail!("No sessions found");
246 }
247
248 let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
249 for entry in fs::read_dir(sessions_dir)? {
250 let entry = match entry {
251 Ok(e) => e,
252 Err(_) => continue,
253 };
254 let path = entry.path();
255 if path.extension().and_then(|s| s.to_str()) != Some("json") {
256 continue;
257 }
258 let mtime = entry
259 .metadata()
260 .ok()
261 .and_then(|m| m.modified().ok())
262 .unwrap_or(SystemTime::UNIX_EPOCH);
263 candidates.push((path, mtime));
264 }
265 if candidates.is_empty() {
266 anyhow::bail!("No sessions found");
267 }
268 candidates.sort_by(|a, b| b.1.cmp(&a.1));
269
270 // Precompute a cheap byte-level needle for the workspace path. JSON
271 // serializes `metadata.directory` as `"directory":"<path>"`, so if
272 // the raw file bytes don't contain the path as a substring, we can
273 // skip it without invoking serde_json at all.
274 //
275 // This is the single biggest win for large workspaces: a 10 MB
276 // session that is *not* for this cwd gets ruled out in a few ms of
277 // byte scanning instead of a full JSON lex.
278 let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
279 // JSON-escape the path (backslashes on Windows become `\\`,
280 // quotes become `\"`). `serde_json::to_string` handles all the
281 // edge cases for us.
282 let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
283 // Strip the surrounding quotes; we want the inner bytes so we
284 // match whether the JSON has `"directory":"..."` or any other
285 // surrounding context.
286 let inner = quoted
287 .strip_prefix('"')
288 .and_then(|s| s.strip_suffix('"'))
289 .unwrap_or("ed);
290 inner.as_bytes().to_vec()
291 });
292 // Build the SIMD-accelerated substring finder once for the whole
293 // scan loop; reusing it across files is much faster than rebuilding.
294 let finder = needle
295 .as_ref()
296 .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
297
298 // Parallel byte-prefilter: for each candidate, compute whether the
299 // workspace path bytes appear in the file. This is the expensive
300 // O(file_bytes) step. Running it concurrently across CPU cores
301 // turns "sum of all file scan times" into ~"longest single file
302 // scan time" on a machine with multiple cores.
303 //
304 // We preserve mtime ordering by collecting results into a parallel
305 // Vec<bool> indexed the same as `candidates`, then iterating
306 // serially to find the first hit.
307 let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
308 (None, _) => vec![true; candidates.len()],
309 (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
310 (Some(finder), _) => {
311 let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
312 let results: std::sync::Mutex<Vec<Option<bool>>> =
313 std::sync::Mutex::new(vec![None; paths.len()]);
314 std::thread::scope(|scope| {
315 // Chunk candidates across available CPUs. For ~300 files
316 // a fan-out of 4-8 threads saturates I/O and CPU nicely
317 // without oversubscribing.
318 let threads =
319 std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4).min(8);
320 let chunk_size = paths.len().div_ceil(threads);
321 for chunk_idx in 0..threads {
322 let start = chunk_idx * chunk_size;
323 if start >= paths.len() {
324 break;
325 }
326 let end = (start + chunk_size).min(paths.len());
327 let chunk_paths = &paths[start..end];
328 let results = &results;
329 scope.spawn(move || {
330 for (offset, p) in chunk_paths.iter().enumerate() {
331 let hit = file_contains_finder(p, finder).unwrap_or(false);
332 // Lock-per-entry is fine: contention is
333 // negligible vs. the ~ms file scan cost.
334 if let Ok(mut guard) = results.lock() {
335 guard[start + offset] = Some(hit);
336 }
337 }
338 });
339 }
340 });
341 results
342 .into_inner()
343 .unwrap_or_default()
344 .into_iter()
345 .map(|o| o.unwrap_or(false))
346 .collect()
347 }
348 };
349
350 for (idx, (path, _)) in candidates.iter().enumerate() {
351 // Fast path: byte-level substring prefilter (precomputed in parallel).
352 if !prefilter_hits.get(idx).copied().unwrap_or(false) {
353 continue;
354 }
355
356 // Slower path: full JSON header parse to confirm the match
357 // (the substring test has false positives: the path could
358 // appear in a chat message or tool output).
359 let header_ok = (|| -> Result<bool> {
360 let file = fs::File::open(path)?;
361 let reader = BufReader::with_capacity(16 * 1024, file);
362 let header: SessionHeader = match serde_json::from_reader(reader) {
363 Ok(h) => h,
364 Err(_) => return Ok(false),
365 };
366 if let Some(ref ws) = canonical_workspace {
367 let Some(dir) = header.metadata.directory.as_ref() else {
368 return Ok(false);
369 };
370 if dir == ws {
371 return Ok(true);
372 }
373 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
374 Ok(&canonical_dir == ws)
375 } else {
376 Ok(true)
377 }
378 })();
379
380 match header_ok {
381 Ok(true) => {}
382 Ok(false) => continue,
383 Err(err) => {
384 tracing::warn!(
385 path = %path.display(),
386 error = %err,
387 "skipping unreadable session file",
388 );
389 continue;
390 }
391 }
392
393 // Match found — do the tail-capped full parse.
394 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
395 let file = fs::File::open(path)?;
396 let reader = BufReader::with_capacity(64 * 1024, file);
397 let (parsed, dropped) =
398 with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
399 return Ok(TailLoad {
400 session: parsed?,
401 dropped,
402 file_bytes,
403 });
404 }
405
406 anyhow::bail!("No sessions found")
407}
408
409/// Memory-map `path` and return `true` if the SIMD finder matches.
410///
411/// mmap avoids the per-chunk `read` syscall overhead and the
412/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
413/// straight over the OS-provided virtual memory window, and the kernel
414/// pages in only what's actually touched. On a ~10 MB session file
415/// this is measurably faster than chunked `BufRead` + SIMD.
416///
417/// Falls back to returning `false` on any I/O error — the caller's
418/// mtime loop will simply try the next candidate.
419fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
420 use std::fs;
421
422 let needle_len = finder.needle().len();
423 if needle_len == 0 {
424 return Ok(true);
425 }
426 let file = fs::File::open(path)?;
427 let meta = file.metadata()?;
428 let len = meta.len();
429 if (len as usize) < needle_len {
430 return Ok(false);
431 }
432 // SAFETY: We only read the mapping, we do not mutate it. The
433 // kernel COW-protects the mapping and the `Mmap` owns the lifetime
434 // tied to `file`, which we keep alive for the duration of `find`.
435 // Concurrent external modification of the file could theoretically
436 // tear the mapping, but session files are only ever rewritten
437 // wholesale via atomic rename — never in-place truncated.
438 let mmap = unsafe { memmap2::Mmap::map(&file)? };
439 Ok(finder.find(&mmap[..]).is_some())
440}