codetether_agent/session/persistence.rs
1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14 /// Load an existing session by its UUID.
15 ///
16 /// # Errors
17 ///
18 /// Returns an error if the session file does not exist or the JSON is
19 /// malformed.
20 pub async fn load(id: &str) -> Result<Self> {
21 let path = Self::session_path(id)?;
22 let content = fs::read_to_string(&path).await?;
23 let session: Session = serde_json::from_str(&content)?;
24 Ok(session)
25 }
26
27 /// Load the most recent session, optionally scoped to a workspace
28 /// directory.
29 ///
30 /// When `workspace` is [`Some`], only considers sessions created in that
31 /// directory. When [`None`], returns the most recent session globally.
32 ///
33 /// # Errors
34 ///
35 /// Returns an error if no sessions exist (or, with `workspace` set,
36 /// none match the requested directory).
37 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
38 Self::last_for_directory_tail(workspace, usize::MAX)
39 .await
40 .map(|t| t.session)
41 }
42
43 /// Like [`Self::last_for_directory`] but keeps only the last `window`
44 /// messages and tool uses in memory, returning a [`TailLoad`] with the
45 /// number of entries that were dropped. Use this when resuming very
46 /// large sessions where the full transcript would exhaust memory.
47 ///
48 /// Implementation: the entire scan runs on a single blocking thread.
49 /// For each candidate (newest-mtime first) we do a cheap
50 /// [`SessionHeader`] parse to compare `metadata.directory`; only the
51 /// matching file pays for a full tail parse.
52 pub async fn last_for_directory_tail(
53 workspace: Option<&std::path::Path>,
54 window: usize,
55 ) -> Result<TailLoad> {
56 let sessions_dir = Self::sessions_dir()?;
57 let canonical_workspace =
58 workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
59 tokio::task::spawn_blocking(move || {
60 scan_with_index(&sessions_dir, canonical_workspace, window)
61 })
62 .await
63 .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
64 }
65
66 /// Load the most recent session globally (unscoped).
67 ///
68 /// Kept for legacy compatibility; prefer
69 /// [`Session::last_for_directory`].
70 pub async fn last() -> Result<Self> {
71 Self::last_for_directory(None).await
72 }
73
74 /// Persist the session to disk as JSON. Creates the sessions directory
75 /// on demand.
76 ///
77 /// Performance:
78 /// - Serialization runs on the blocking thread pool so a large session
79 /// doesn't stall the async reactor during `serde_json` formatting.
80 /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
81 /// serialize, ~30–40% smaller on disk, and correspondingly faster
82 /// to load and mmap-scan on resume. Session files are machine-owned
83 /// — humans never hand-edit them — so indentation is pure overhead.
84 /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
85 /// previous session intact, and the mmap prefilter in
86 /// [`file_contains_finder`] cannot observe a torn buffer.
87 pub async fn save(&self) -> Result<()> {
88 let path = Self::session_path(&self.id)?;
89 if let Some(parent) = path.parent() {
90 fs::create_dir_all(parent).await?;
91 }
92 let tmp = path.with_extension("json.tmp");
93 // Clone-and-serialize off the reactor. The clone is a Vec-copy
94 // (~memcpy speed) which is always cheaper than the JSON
95 // formatting we are about to avoid blocking the reactor on.
96 let snapshot = self.clone();
97 let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
98 .await
99 .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
100 fs::write(&tmp, content).await?;
101 // Atomic swap. On POSIX `rename` is atomic over existing files;
102 // on Windows we fall back to remove-then-rename.
103 if let Err(primary) = fs::rename(&tmp, &path).await {
104 let _ = fs::remove_file(&path).await;
105 if let Err(retry) = fs::rename(&tmp, &path).await {
106 let _ = fs::remove_file(&tmp).await;
107 return Err(anyhow::anyhow!(
108 "session rename failed: {primary} (retry: {retry})"
109 ));
110 }
111 }
112 // Update the workspace index so the next resume is O(1). Best
113 // effort — a failed index write must not fail the session save.
114 if let Some(dir) = &self.metadata.directory {
115 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
116 let session_id = self.id.clone();
117 tokio::task::spawn_blocking(move || {
118 if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
119 tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
120 }
121 });
122 }
123 // Phase A history sink: stream pure history to MinIO/S3.
124 // Env-gated, fire-and-forget — never blocks the save, never
125 // fails it. See [`super::history_sink`] for the env variables.
126 //
127 // Coalesce concurrent saves per session: if an upload for this
128 // session is already in flight, skip this spawn — the next
129 // save will pick up the latest history. This prevents bursty
130 // save loops (e.g. during long tool chains) from queueing
131 // unbounded background uploads.
132 if let Ok(Some(sink_config)) = super::history_sink::HistorySinkConfig::from_env() {
133 use std::collections::HashSet;
134 use std::sync::{Mutex, OnceLock};
135 static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
136 let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
137
138 let inserted = in_flight
139 .lock()
140 .map(|mut s| s.insert(self.id.clone()))
141 .unwrap_or(false);
142
143 if inserted {
144 let session_id = self.id.clone();
145 let messages = self.messages.clone();
146 tokio::spawn(async move {
147 if let Err(err) = super::history_sink::upload_full_history(
148 &sink_config,
149 &session_id,
150 &messages,
151 )
152 .await
153 {
154 tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
155 }
156 if let Ok(mut s) = in_flight.lock() {
157 s.remove(&session_id);
158 }
159 });
160 } else {
161 tracing::debug!(
162 session_id = %self.id,
163 "history sink upload already in flight; skipping duplicate"
164 );
165 }
166 }
167 Ok(())
168 }
169
170 /// Delete a session file by ID. No-op if the file does not exist.
171 pub async fn delete(id: &str) -> Result<()> {
172 let path = Self::session_path(id)?;
173 if path.exists() {
174 tokio::fs::remove_file(&path).await?;
175 }
176 Ok(())
177 }
178
179 /// Resolve the sessions directory (`<data_dir>/sessions`).
180 ///
181 /// Cached after first resolution: `Config::data_dir()` does env-var
182 /// lookups and filesystem checks which are cheap individually but add
183 /// up across hundreds of save/load calls over a session's lifetime.
184 pub(crate) fn sessions_dir() -> Result<PathBuf> {
185 use std::sync::OnceLock;
186 static CACHED: OnceLock<PathBuf> = OnceLock::new();
187 if let Some(dir) = CACHED.get() {
188 return Ok(dir.clone());
189 }
190 let dir = crate::config::Config::data_dir()
191 .map(|d| d.join("sessions"))
192 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
193 // Best-effort set; if another thread won the race we just use
194 // theirs.
195 let _ = CACHED.set(dir.clone());
196 Ok(dir)
197 }
198
199 /// Resolve the on-disk path for a session file.
200 pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
201 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
202 }
203}
204
205/// Index-first scan. Tries the O(1) workspace index; on miss or stale
206/// entry, falls back to the byte-prefiltered directory scan and repairs
207/// the index with the winner so the next launch is O(1).
208fn scan_with_index(
209 sessions_dir: &Path,
210 canonical_workspace: Option<PathBuf>,
211 window: usize,
212) -> Result<TailLoad> {
213 // Fast path: check the sidecar index.
214 if let Some(ws) = canonical_workspace.as_ref() {
215 let index = super::workspace_index::WorkspaceIndex::load_sync();
216 if let Some(id) = index.get(ws) {
217 let candidate = sessions_dir.join(format!("{id}.json"));
218 if candidate.exists()
219 && let Ok(load) = tail_load_sync(&candidate, window)
220 {
221 // Confirm it still belongs to this workspace — the user
222 // could have edited the session's metadata.directory
223 // manually or moved a file.
224 let dir_ok = load
225 .session
226 .metadata
227 .directory
228 .as_ref()
229 .map(|d| {
230 let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
231 &canonical == ws
232 })
233 .unwrap_or(false);
234 if dir_ok {
235 return Ok(load);
236 }
237 }
238 }
239 }
240
241 // Slow path: scan everything.
242 let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
243
244 // Repair the index with whatever we found, so next time is O(1).
245 if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
246 let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
247 }
248
249 result
250}
251
252/// Tail-load a specific path synchronously (used by the index fast path).
253fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
254 use std::fs;
255 use std::io::BufReader;
256 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
257 let file = fs::File::open(path)?;
258 let reader = BufReader::with_capacity(64 * 1024, file);
259 let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
260 Ok(TailLoad {
261 session: parsed?,
262 dropped,
263 file_bytes,
264 })
265}
266
267/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
268///
269/// Flow:
270/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
271/// 2. Sort newest-first.
272/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
273/// `Vec<Message>` allocation. Because `metadata` is serialized before
274/// `messages`/`tool_uses` in new files, this is O(header bytes); older
275/// files still work but pay a lex-through cost.
276/// 4. On workspace match, re-open and do one tail-capped full parse.
277fn scan_sync(
278 sessions_dir: &Path,
279 canonical_workspace: Option<PathBuf>,
280 window: usize,
281) -> Result<TailLoad> {
282 use std::fs;
283 use std::io::BufReader;
284 use std::time::SystemTime;
285
286 if !sessions_dir.exists() {
287 anyhow::bail!("No sessions found");
288 }
289
290 let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
291 for entry in fs::read_dir(sessions_dir)? {
292 let entry = match entry {
293 Ok(e) => e,
294 Err(_) => continue,
295 };
296 let path = entry.path();
297 if path.extension().and_then(|s| s.to_str()) != Some("json") {
298 continue;
299 }
300 let mtime = entry
301 .metadata()
302 .ok()
303 .and_then(|m| m.modified().ok())
304 .unwrap_or(SystemTime::UNIX_EPOCH);
305 candidates.push((path, mtime));
306 }
307 if candidates.is_empty() {
308 anyhow::bail!("No sessions found");
309 }
310 candidates.sort_by(|a, b| b.1.cmp(&a.1));
311
312 // Precompute a cheap byte-level needle for the workspace path. JSON
313 // serializes `metadata.directory` as `"directory":"<path>"`, so if
314 // the raw file bytes don't contain the path as a substring, we can
315 // skip it without invoking serde_json at all.
316 //
317 // This is the single biggest win for large workspaces: a 10 MB
318 // session that is *not* for this cwd gets ruled out in a few ms of
319 // byte scanning instead of a full JSON lex.
320 let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
321 // JSON-escape the path (backslashes on Windows become `\\`,
322 // quotes become `\"`). `serde_json::to_string` handles all the
323 // edge cases for us.
324 let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
325 // Strip the surrounding quotes; we want the inner bytes so we
326 // match whether the JSON has `"directory":"..."` or any other
327 // surrounding context.
328 let inner = quoted
329 .strip_prefix('"')
330 .and_then(|s| s.strip_suffix('"'))
331 .unwrap_or("ed);
332 inner.as_bytes().to_vec()
333 });
334 // Build the SIMD-accelerated substring finder once for the whole
335 // scan loop; reusing it across files is much faster than rebuilding.
336 let finder = needle
337 .as_ref()
338 .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
339
340 // Parallel byte-prefilter: for each candidate, compute whether the
341 // workspace path bytes appear in the file. This is the expensive
342 // O(file_bytes) step. Running it concurrently across CPU cores
343 // turns "sum of all file scan times" into ~"longest single file
344 // scan time" on a machine with multiple cores.
345 //
346 // We preserve mtime ordering by collecting results into a parallel
347 // Vec<bool> indexed the same as `candidates`, then iterating
348 // serially to find the first hit.
349 let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
350 (None, _) => vec![true; candidates.len()],
351 (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
352 (Some(finder), _) => {
353 let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
354 let results: std::sync::Mutex<Vec<Option<bool>>> =
355 std::sync::Mutex::new(vec![None; paths.len()]);
356 std::thread::scope(|scope| {
357 // Chunk candidates across available CPUs. For ~300 files
358 // a fan-out of 4-8 threads saturates I/O and CPU nicely
359 // without oversubscribing.
360 let threads = std::thread::available_parallelism()
361 .map(|n| n.get())
362 .unwrap_or(4)
363 .min(8);
364 let chunk_size = paths.len().div_ceil(threads);
365 for chunk_idx in 0..threads {
366 let start = chunk_idx * chunk_size;
367 if start >= paths.len() {
368 break;
369 }
370 let end = (start + chunk_size).min(paths.len());
371 let chunk_paths = &paths[start..end];
372 let results = &results;
373 scope.spawn(move || {
374 for (offset, p) in chunk_paths.iter().enumerate() {
375 let hit = file_contains_finder(p, finder).unwrap_or(false);
376 // Lock-per-entry is fine: contention is
377 // negligible vs. the ~ms file scan cost.
378 if let Ok(mut guard) = results.lock() {
379 guard[start + offset] = Some(hit);
380 }
381 }
382 });
383 }
384 });
385 results
386 .into_inner()
387 .unwrap_or_default()
388 .into_iter()
389 .map(|o| o.unwrap_or(false))
390 .collect()
391 }
392 };
393
394 for (idx, (path, _)) in candidates.iter().enumerate() {
395 // Fast path: byte-level substring prefilter (precomputed in parallel).
396 if !prefilter_hits.get(idx).copied().unwrap_or(false) {
397 continue;
398 }
399
400 // Slower path: full JSON header parse to confirm the match
401 // (the substring test has false positives: the path could
402 // appear in a chat message or tool output).
403 let header_ok = (|| -> Result<bool> {
404 let file = fs::File::open(path)?;
405 let reader = BufReader::with_capacity(16 * 1024, file);
406 let header: SessionHeader = match serde_json::from_reader(reader) {
407 Ok(h) => h,
408 Err(_) => return Ok(false),
409 };
410 if let Some(ref ws) = canonical_workspace {
411 let Some(dir) = header.metadata.directory.as_ref() else {
412 return Ok(false);
413 };
414 if dir == ws {
415 return Ok(true);
416 }
417 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
418 Ok(&canonical_dir == ws)
419 } else {
420 Ok(true)
421 }
422 })();
423
424 match header_ok {
425 Ok(true) => {}
426 Ok(false) => continue,
427 Err(err) => {
428 tracing::warn!(
429 path = %path.display(),
430 error = %err,
431 "skipping unreadable session file",
432 );
433 continue;
434 }
435 }
436
437 // Match found — do the tail-capped full parse.
438 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
439 let file = fs::File::open(path)?;
440 let reader = BufReader::with_capacity(64 * 1024, file);
441 let (parsed, dropped) =
442 with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
443 return Ok(TailLoad {
444 session: parsed?,
445 dropped,
446 file_bytes,
447 });
448 }
449
450 anyhow::bail!("No sessions found")
451}
452
453/// Memory-map `path` and return `true` if the SIMD finder matches.
454///
455/// mmap avoids the per-chunk `read` syscall overhead and the
456/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
457/// straight over the OS-provided virtual memory window, and the kernel
458/// pages in only what's actually touched. On a ~10 MB session file
459/// this is measurably faster than chunked `BufRead` + SIMD.
460///
461/// Falls back to returning `false` on any I/O error — the caller's
462/// mtime loop will simply try the next candidate.
463fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
464 use std::fs;
465
466 let needle_len = finder.needle().len();
467 if needle_len == 0 {
468 return Ok(true);
469 }
470 let file = fs::File::open(path)?;
471 let meta = file.metadata()?;
472 let len = meta.len();
473 if (len as usize) < needle_len {
474 return Ok(false);
475 }
476 // SAFETY: We only read the mapping, we do not mutate it. The
477 // kernel COW-protects the mapping and the `Mmap` owns the lifetime
478 // tied to `file`, which we keep alive for the duration of `find`.
479 // Concurrent external modification of the file could theoretically
480 // tear the mapping, but session files are only ever rewritten
481 // wholesale via atomic rename — never in-place truncated.
482 let mmap = unsafe { memmap2::Mmap::map(&file)? };
483 Ok(finder.find(&mmap[..]).is_some())
484}