codetether_agent/session/persistence.rs
1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14 /// Load an existing session by its UUID.
15 ///
16 /// # Errors
17 ///
18 /// Returns an error if the session file does not exist, exceeds the
19 /// sanity size cap, or the JSON is malformed.
20 pub async fn load(id: &str) -> Result<Self> {
21 let path = Self::session_path(id)?;
22 super::helper::persistence_cap::ensure_session_file_size(&path).await?;
23 let mut session: Session = serde_json::from_str(&fs::read_to_string(&path).await?)?;
24 session.normalize_sidecars();
25 Ok(session)
26 }
27
28 /// Load the most recent session, optionally scoped to a workspace
29 /// directory.
30 ///
31 /// When `workspace` is [`Some`], only considers sessions created in that
32 /// directory. When [`None`], returns the most recent session globally.
33 ///
34 /// # Errors
35 ///
36 /// Returns an error if no sessions exist (or, with `workspace` set,
37 /// none match the requested directory).
38 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
39 Self::last_for_directory_tail(workspace, usize::MAX)
40 .await
41 .map(|t| t.session)
42 }
43
44 /// Like [`Self::last_for_directory`] but keeps only the last `window`
45 /// messages and tool uses in memory, returning a [`TailLoad`] with the
46 /// number of entries that were dropped. Use this when resuming very
47 /// large sessions where the full transcript would exhaust memory.
48 ///
49 /// Implementation: the entire scan runs on a single blocking thread.
50 /// For each candidate (newest-mtime first) we do a cheap
51 /// [`SessionHeader`] parse to compare `metadata.directory`; only the
52 /// matching file pays for a full tail parse.
53 pub async fn last_for_directory_tail(
54 workspace: Option<&std::path::Path>,
55 window: usize,
56 ) -> Result<TailLoad> {
57 let sessions_dir = Self::sessions_dir()?;
58 let canonical_workspace = workspace.map(|w| {
59 w.canonicalize().unwrap_or_else(|e| {
60 tracing::warn!(path = %w.display(), error = %e, "canonicalize failed; using raw path");
61 w.to_path_buf()
62 })
63 });
64 tokio::task::spawn_blocking(move || {
65 scan_with_index(&sessions_dir, canonical_workspace, window)
66 })
67 .await
68 .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
69 }
70
71 /// Load the most recent session globally (unscoped).
72 ///
73 /// Kept for legacy compatibility; prefer
74 /// [`Session::last_for_directory`].
75 pub async fn last() -> Result<Self> {
76 Self::last_for_directory(None).await
77 }
78
79 /// Persist the session to disk as JSON. Creates the sessions directory
80 /// on demand.
81 ///
82 /// Performance:
83 /// - Serialization runs on the blocking thread pool so a large session
84 /// doesn't stall the async reactor during `serde_json` formatting.
85 /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
86 /// serialize, ~30–40% smaller on disk, and correspondingly faster
87 /// to load and mmap-scan on resume. Session files are machine-owned
88 /// — humans never hand-edit them — so indentation is pure overhead.
89 /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
90 /// previous session intact, and the mmap prefilter in
91 /// [`file_contains_finder`] cannot observe a torn buffer.
92 pub async fn save(&self) -> Result<()> {
93 let path = Self::session_path(&self.id)?;
94 if let Some(parent) = path.parent() {
95 fs::create_dir_all(parent).await?;
96 }
97 let tmp = path.with_extension("json.tmp");
98 // Clone-and-serialize off the reactor. The clone is a Vec-copy
99 // (~memcpy speed) which is always cheaper than the JSON
100 // formatting we are about to avoid blocking the reactor on.
101 let mut snapshot = self.clone();
102 snapshot.normalize_sidecars();
103 let sink_config = snapshot.metadata.history_sink.clone().or_else(|| {
104 super::history_sink::HistorySinkConfig::from_env()
105 .ok()
106 .flatten()
107 });
108 let session_id_for_journal = snapshot.id.clone();
109 let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
110 .await
111 .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
112 fs::write(&tmp, content).await?;
113 // Atomic swap. On POSIX `rename` is atomic over existing files;
114 // on Windows we fall back to remove-then-rename.
115 if let Err(primary) = fs::rename(&tmp, &path).await {
116 let _ = fs::remove_file(&path).await;
117 if let Err(retry) = fs::rename(&tmp, &path).await {
118 let _ = fs::remove_file(&tmp).await;
119 return Err(anyhow::anyhow!(
120 "session rename failed: {primary} (retry: {retry})"
121 ));
122 }
123 }
124 let mut journal = super::journal::WritebackJournal::new(&session_id_for_journal);
125 let tx = journal.stage(super::journal::Op::Save);
126 if let Err(reason) = journal.commit(tx) {
127 journal.reject(tx, reason);
128 }
129 if let Err(err) =
130 super::journal::append_entries(&session_id_for_journal, journal.entries()).await
131 {
132 tracing::warn!(
133 %err,
134 session_id = %session_id_for_journal,
135 "save journal append failed (non-fatal)"
136 );
137 }
138 // Update the workspace index so the next resume is O(1). Best
139 // effort — a failed index write must not fail the session save.
140 if let Some(dir) = &self.metadata.directory {
141 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
142 let session_id = self.id.clone();
143 tokio::task::spawn_blocking(move || {
144 if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
145 tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
146 }
147 });
148 }
149 // Phase A history sink: stream pure history to MinIO/S3.
150 // Env-gated, fire-and-forget — never blocks the save, never
151 // fails it. See [`super::history_sink`] for the env variables.
152 //
153 // Coalesce concurrent saves per session: if an upload for this
154 // session is already in flight, skip this spawn — the next
155 // save will pick up the latest history. This prevents bursty
156 // save loops (e.g. during long tool chains) from queueing
157 // unbounded background uploads.
158 if let Some(sink_config) = sink_config {
159 use std::collections::HashSet;
160 use std::sync::{Mutex, OnceLock};
161 static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
162 let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
163
164 let inserted = in_flight
165 .lock()
166 .map(|mut s| s.insert(self.id.clone()))
167 .unwrap_or(false);
168
169 if inserted {
170 let session_id = self.id.clone();
171 let messages = self.messages.clone();
172 tokio::spawn(async move {
173 if let Err(err) = super::history_sink::upload_full_history(
174 &sink_config,
175 &session_id,
176 &messages,
177 )
178 .await
179 {
180 tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
181 }
182 if let Ok(mut s) = in_flight.lock() {
183 s.remove(&session_id);
184 }
185 });
186 } else {
187 tracing::debug!(
188 session_id = %self.id,
189 "history sink upload already in flight; skipping duplicate"
190 );
191 }
192 }
193 Ok(())
194 }
195
196 /// Delete a session file by ID. No-op if the file does not exist.
197 pub async fn delete(id: &str) -> Result<()> {
198 let path = Self::session_path(id)?;
199 match tokio::fs::remove_file(&path).await {
200 Ok(()) => Ok(()),
201 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
202 Err(e) => Err(e.into()),
203 }
204 }
205
206 /// Resolve the sessions directory (`<data_dir>/sessions`).
207 ///
208 /// Cached after first resolution: `Config::data_dir()` does env-var
209 /// lookups and filesystem checks which are cheap individually but add
210 /// up across hundreds of save/load calls over a session's lifetime.
211 pub(crate) fn sessions_dir() -> Result<PathBuf> {
212 use std::sync::OnceLock;
213 static CACHED: OnceLock<PathBuf> = OnceLock::new();
214 if let Some(dir) = CACHED.get() {
215 return Ok(dir.clone());
216 }
217 let dir = crate::config::Config::data_dir()
218 .map(|d| d.join("sessions"))
219 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
220 // Best-effort set; if another thread won the race we just use
221 // theirs.
222 let _ = CACHED.set(dir.clone());
223 Ok(dir)
224 }
225
226 /// Resolve the on-disk path for a session file.
227 pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
228 if id.is_empty()
229 || id.len() > 128
230 || id.contains(|c: char| !c.is_alphanumeric() && c != '-' && c != '_')
231 {
232 anyhow::bail!("Invalid session ID:rejecting path traversal risk");
233 }
234 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
235 }
236}
237
238/// Index-first scan. Tries the O(1) workspace index; on miss or stale
239/// entry, falls back to the byte-prefiltered directory scan and repairs
240/// the index with the winner so the next launch is O(1).
241fn scan_with_index(
242 sessions_dir: &Path,
243 canonical_workspace: Option<PathBuf>,
244 window: usize,
245) -> Result<TailLoad> {
246 // Fast path: check the sidecar index.
247 if let Some(ws) = canonical_workspace.as_ref() {
248 let index = super::workspace_index::WorkspaceIndex::load_sync();
249 if let Some(id) = index.get(ws) {
250 let candidate = sessions_dir.join(format!("{id}.json"));
251 if candidate.exists() {
252 if let Ok(load) = tail_load_sync(&candidate, window) {
253 // Confirm it still belongs to this workspace — the user
254 // could have edited the session's metadata.directory
255 // manually or moved a file.
256 let dir_ok = load
257 .session
258 .metadata
259 .directory
260 .as_ref()
261 .map(|d| {
262 let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
263 &canonical == ws
264 })
265 .unwrap_or(false);
266 if dir_ok {
267 return Ok(load);
268 }
269 tracing::warn!(
270 session_id = %id,
271 stored_dir = ?load.session.metadata.directory,
272 expected_dir = ?ws,
273 "Index hit but directory mismatch; falling back to scan"
274 );
275 } else {
276 tracing::warn!(
277 session_id = %id,
278 path = %candidate.display(),
279 "Index hit but session file failed to parse; falling back to scan"
280 );
281 }
282 }
283 }
284 }
285
286 // Slow path: scan everything.
287 let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
288
289 // Repair the index with whatever we found, so next time is O(1).
290 if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
291 let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
292 }
293
294 result
295}
296
297/// Tail-load a specific path synchronously (used by the index fast path).
298fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
299 use std::fs;
300 use std::io::BufReader;
301 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
302 let file = fs::File::open(path)?;
303 let reader = BufReader::with_capacity(64 * 1024, file);
304 let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
305 let mut session = parsed?;
306 session.normalize_sidecars();
307 Ok(TailLoad {
308 session,
309 dropped,
310 file_bytes,
311 })
312}
313
314/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
315///
316/// Flow:
317/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
318/// 2. Sort newest-first.
319/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
320/// `Vec<Message>` allocation. Because `metadata` is serialized before
321/// `messages`/`tool_uses` in new files, this is O(header bytes); older
322/// files still work but pay a lex-through cost.
323/// 4. On workspace match, re-open and do one tail-capped full parse.
324fn scan_sync(
325 sessions_dir: &Path,
326 canonical_workspace: Option<PathBuf>,
327 window: usize,
328) -> Result<TailLoad> {
329 use std::fs;
330 use std::io::BufReader;
331 use std::time::SystemTime;
332
333 if !sessions_dir.exists() {
334 anyhow::bail!("No sessions found");
335 }
336
337 let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
338 for entry in fs::read_dir(sessions_dir)? {
339 let entry = match entry {
340 Ok(e) => e,
341 Err(err) => {
342 tracing::warn!(error = %err, "skipping unreadable directory entry");
343 continue;
344 }
345 };
346 let path = entry.path();
347 if path.extension().and_then(|s| s.to_str()) != Some("json") {
348 continue;
349 }
350 let mtime = entry
351 .metadata()
352 .ok()
353 .and_then(|m| m.modified().ok())
354 .unwrap_or(SystemTime::UNIX_EPOCH);
355 candidates.push((path, mtime));
356 }
357 if candidates.is_empty() {
358 anyhow::bail!("No sessions found");
359 }
360 candidates.sort_by(|a, b| b.1.cmp(&a.1));
361
362 // Precompute a cheap byte-level needle for the workspace path. JSON
363 // serializes `metadata.directory` as `"directory":"<path>"`, so if
364 // the raw file bytes don't contain the path as a substring, we can
365 // skip it without invoking serde_json at all.
366 //
367 // This is the single biggest win for large workspaces: a 10 MB
368 // session that is *not* for this cwd gets ruled out in a few ms of
369 // byte scanning instead of a full JSON lex.
370 let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
371 // JSON-escape the path (backslashes on Windows become `\\`,
372 // quotes become `\"`). `serde_json::to_string` handles all the
373 // edge cases for us.
374 let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
375 // Strip the surrounding quotes; we want the inner bytes so we
376 // match whether the JSON has `"directory":"..."` or any other
377 // surrounding context.
378 let inner = quoted
379 .strip_prefix('"')
380 .and_then(|s| s.strip_suffix('"'))
381 .unwrap_or("ed);
382 inner.as_bytes().to_vec()
383 });
384 // Build the SIMD-accelerated substring finder once for the whole
385 // scan loop; reusing it across files is much faster than rebuilding.
386 let finder = needle
387 .as_ref()
388 .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
389
390 // Parallel byte-prefilter: for each candidate, compute whether the
391 // workspace path bytes appear in the file. This is the expensive
392 // O(file_bytes) step. Running it concurrently across CPU cores
393 // turns "sum of all file scan times" into ~"longest single file
394 // scan time" on a machine with multiple cores.
395 //
396 // We preserve mtime ordering by collecting results into a parallel
397 // Vec<bool> indexed the same as `candidates`, then iterating
398 // serially to find the first hit.
399 let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
400 (None, _) => vec![true; candidates.len()],
401 (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
402 (Some(finder), _) => {
403 let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
404 let results: std::sync::Mutex<Vec<Option<bool>>> =
405 std::sync::Mutex::new(vec![None; paths.len()]);
406 std::thread::scope(|scope| {
407 // Chunk candidates across available CPUs. For ~300 files
408 // a fan-out of 4-8 threads saturates I/O and CPU nicely
409 // without oversubscribing.
410 let threads = std::thread::available_parallelism()
411 .map(|n| n.get())
412 .unwrap_or(4)
413 .min(8);
414 let chunk_size = paths.len().div_ceil(threads);
415 for chunk_idx in 0..threads {
416 let start = chunk_idx * chunk_size;
417 if start >= paths.len() {
418 break;
419 }
420 let end = (start + chunk_size).min(paths.len());
421 let chunk_paths = &paths[start..end];
422 let results = &results;
423 scope.spawn(move || {
424 for (offset, p) in chunk_paths.iter().enumerate() {
425 let hit = file_contains_finder(p, finder).unwrap_or(false);
426 // Lock-per-entry is fine: contention is
427 // negligible vs. the ~ms file scan cost.
428 if let Ok(mut guard) = results.lock() {
429 guard[start + offset] = Some(hit);
430 }
431 }
432 });
433 }
434 });
435 results
436 .into_inner()
437 .unwrap_or_default()
438 .into_iter()
439 .map(|o| o.unwrap_or(false))
440 .collect()
441 }
442 };
443
444 for (idx, (path, _)) in candidates.iter().enumerate() {
445 // Fast path: byte-level substring prefilter (precomputed in parallel).
446 if !prefilter_hits.get(idx).copied().unwrap_or(false) {
447 continue;
448 }
449
450 // Slower path: full JSON header parse to confirm the match
451 // (the substring test has false positives: the path could
452 // appear in a chat message or tool output).
453 let header_ok = (|| -> Result<bool> {
454 let file = fs::File::open(path)?;
455 let reader = BufReader::with_capacity(16 * 1024, file);
456 let header: SessionHeader = match serde_json::from_reader(reader) {
457 Ok(h) => h,
458 Err(_) => return Ok(false),
459 };
460 if let Some(ref ws) = canonical_workspace {
461 let Some(dir) = header.metadata.directory.as_ref() else {
462 return Ok(false);
463 };
464 if dir == ws {
465 return Ok(true);
466 }
467 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
468 Ok(&canonical_dir == ws)
469 } else {
470 Ok(true)
471 }
472 })();
473
474 match header_ok {
475 Ok(true) => {}
476 Ok(false) => continue,
477 Err(err) => {
478 tracing::warn!(
479 path = %path.display(),
480 error = %err,
481 "skipping unreadable session file",
482 );
483 continue;
484 }
485 }
486
487 // Match found — do the tail-capped full parse.
488 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
489 let file = fs::File::open(path)?;
490 let reader = BufReader::with_capacity(64 * 1024, file);
491 let (parsed, dropped) =
492 with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
493 return Ok(TailLoad {
494 session: parsed?,
495 dropped,
496 file_bytes,
497 });
498 }
499
500 anyhow::bail!("No sessions found")
501}
502
503/// Memory-map `path` and return `true` if the SIMD finder matches.
504///
505/// mmap avoids the per-chunk `read` syscall overhead and the
506/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
507/// straight over the OS-provided virtual memory window, and the kernel
508/// pages in only what's actually touched. On a ~10 MB session file
509/// this is measurably faster than chunked `BufRead` + SIMD.
510///
511/// Falls back to returning `false` on any I/O error — the caller's
512/// mtime loop will simply try the next candidate.
513fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
514 use std::fs;
515
516 let needle_len = finder.needle().len();
517 if needle_len == 0 {
518 return Ok(true);
519 }
520 let file = fs::File::open(path)?;
521 let meta = file.metadata()?;
522 let len = meta.len();
523 if (len as usize) < needle_len {
524 return Ok(false);
525 }
526 const MAX_MMAP_SIZE: u64 = 64 * 1024 * 1024; // 64 MB
527 if len > MAX_MMAP_SIZE {
528 tracing::warn!(path = %path.display(), size = len, "Skipping oversized session file");
529 return Ok(false);
530 }
531 // SAFETY: We only read the mapping, we do not mutate it. The
532 // kernel COW-protects the mapping and the `Mmap` owns the lifetime
533 // tied to `file`, which we keep alive for the duration of `find`.
534 // Concurrent external modification of the file could theoretically
535 // tear the mapping, but session files are only ever rewritten
536 // wholesale via atomic rename — never in-place truncated.
537 let mmap = unsafe { memmap2::Mmap::map(&file)? };
538 Ok(finder.find(&mmap[..]).is_some())
539}