codetether_agent/session/persistence.rs
1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14 /// Load an existing session by its UUID.
15 ///
16 /// # Errors
17 ///
18 /// Returns an error if the session file does not exist, exceeds the
19 /// sanity size cap, or the JSON is malformed.
20 pub async fn load(id: &str) -> Result<Self> {
21 let path = Self::session_path(id)?;
22 super::helper::persistence_cap::ensure_session_file_size(&path).await?;
23 let mut session: Session = serde_json::from_str(&fs::read_to_string(&path).await?)?;
24 session.normalize_sidecars();
25 Ok(session)
26 }
27
28 /// Load the most recent session, optionally scoped to a workspace
29 /// directory.
30 ///
31 /// When `workspace` is [`Some`], only considers sessions created in that
32 /// directory. When [`None`], returns the most recent session globally.
33 ///
34 /// # Errors
35 ///
36 /// Returns an error if no sessions exist (or, with `workspace` set,
37 /// none match the requested directory).
38 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
39 Self::last_for_directory_tail(workspace, usize::MAX)
40 .await
41 .map(|t| t.session)
42 }
43
44 /// Like [`Self::last_for_directory`] but keeps only the last `window`
45 /// messages and tool uses in memory, returning a [`TailLoad`] with the
46 /// number of entries that were dropped. Use this when resuming very
47 /// large sessions where the full transcript would exhaust memory.
48 ///
49 /// Implementation: the entire scan runs on a single blocking thread.
50 /// For each candidate (newest-mtime first) we do a cheap
51 /// [`SessionHeader`] parse to compare `metadata.directory`; only the
52 /// matching file pays for a full tail parse.
53 pub async fn last_for_directory_tail(
54 workspace: Option<&std::path::Path>,
55 window: usize,
56 ) -> Result<TailLoad> {
57 let sessions_dir = Self::sessions_dir()?;
58 let canonical_workspace = workspace.map(|w| {
59 w.canonicalize().unwrap_or_else(|e| {
60 tracing::warn!(path = %w.display(), error = %e, "canonicalize failed; using raw path");
61 w.to_path_buf()
62 })
63 });
64 tokio::task::spawn_blocking(move || {
65 scan_with_index(&sessions_dir, canonical_workspace, window)
66 })
67 .await
68 .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
69 }
70
71 /// Load the most recent session globally (unscoped).
72 ///
73 /// Kept for legacy compatibility; prefer
74 /// [`Session::last_for_directory`].
75 pub async fn last() -> Result<Self> {
76 Self::last_for_directory(None).await
77 }
78
79 /// Persist the session to disk as JSON. Creates the sessions directory
80 /// on demand.
81 ///
82 /// Performance:
83 /// - Serialization runs on the blocking thread pool so a large session
84 /// doesn't stall the async reactor during `serde_json` formatting.
85 /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
86 /// serialize, ~30–40% smaller on disk, and correspondingly faster
87 /// to load and mmap-scan on resume. Session files are machine-owned
88 /// — humans never hand-edit them — so indentation is pure overhead.
89 /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
90 /// previous session intact, and the mmap prefilter in
91 /// [`file_contains_finder`] cannot observe a torn buffer.
92 pub async fn save(&self) -> Result<()> {
93 let path = Self::session_path(&self.id)?;
94 if let Some(parent) = path.parent() {
95 fs::create_dir_all(parent).await?;
96 }
97 let tmp = path.with_extension("json.tmp");
98 // Clone-and-serialize off the reactor. The clone is a Vec-copy
99 // (~memcpy speed) which is always cheaper than the JSON
100 // formatting we are about to avoid blocking the reactor on.
101 let mut snapshot = self.clone();
102 snapshot.normalize_sidecars();
103 let sink_config = snapshot.metadata.history_sink.clone().or_else(|| {
104 super::history_sink::HistorySinkConfig::from_env()
105 .ok()
106 .flatten()
107 });
108 let session_id_for_journal = snapshot.id.clone();
109 let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
110 .await
111 .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
112 fs::write(&tmp, content).await?;
113 // Atomic swap. On POSIX `rename` is atomic over existing files;
114 // on Windows we fall back to remove-then-rename.
115 if let Err(primary) = fs::rename(&tmp, &path).await {
116 let _ = fs::remove_file(&path).await;
117 if let Err(retry) = fs::rename(&tmp, &path).await {
118 let _ = fs::remove_file(&tmp).await;
119 return Err(anyhow::anyhow!(
120 "session rename failed: {primary} (retry: {retry})"
121 ));
122 }
123 }
124 let mut journal = super::journal::WritebackJournal::new(&session_id_for_journal);
125 let tx = journal.stage(super::journal::Op::Save);
126 if let Err(reason) = journal.commit(tx) {
127 journal.reject(tx, reason);
128 }
129 if let Err(err) =
130 super::journal::append_entries(&session_id_for_journal, journal.entries()).await
131 {
132 tracing::warn!(
133 %err,
134 session_id = %session_id_for_journal,
135 "save journal append failed (non-fatal)"
136 );
137 }
138 // Update the workspace index so the next resume is O(1). Best
139 // effort — a failed index write must not fail the session save.
140 if let Some(dir) = &self.metadata.directory {
141 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
142 let session_id = self.id.clone();
143 tokio::task::spawn_blocking(move || {
144 if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
145 tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
146 }
147 });
148 }
149 // Phase A history sink: stream pure history to MinIO/S3.
150 // Env-gated, fire-and-forget — never blocks the save, never
151 // fails it. See [`super::history_sink`] for the env variables.
152 //
153 // Coalesce concurrent saves per session: if an upload for this
154 // session is already in flight, skip this spawn — the next
155 // save will pick up the latest history. This prevents bursty
156 // save loops (e.g. during long tool chains) from queueing
157 // unbounded background uploads.
158 if let Some(sink_config) = sink_config {
159 use std::collections::HashSet;
160 use std::sync::{Mutex, OnceLock};
161 static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
162 let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
163
164 let inserted = in_flight
165 .lock()
166 .map(|mut s| s.insert(self.id.clone()))
167 .unwrap_or(false);
168
169 if inserted {
170 let session_id = self.id.clone();
171 let messages = self.messages.clone();
172 tokio::spawn(async move {
173 if let Err(err) = super::history_sink::upload_full_history(
174 &sink_config,
175 &session_id,
176 &messages,
177 )
178 .await
179 {
180 tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
181 }
182 if let Ok(mut s) = in_flight.lock() {
183 s.remove(&session_id);
184 }
185 });
186 } else {
187 tracing::debug!(
188 session_id = %self.id,
189 "history sink upload already in flight; skipping duplicate"
190 );
191 }
192 }
193 Ok(())
194 }
195
196 /// Delete a session file by ID. No-op if the file does not exist.
197 pub async fn delete(id: &str) -> Result<()> {
198 let path = Self::session_path(id)?;
199 match tokio::fs::remove_file(&path).await {
200 Ok(()) => Ok(()),
201 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
202 Err(e) => Err(e.into()),
203 }
204 }
205
206 /// Resolve the sessions directory (`<data_dir>/sessions`).
207 ///
208 /// Resolved from [`crate::config::Config::data_dir`] each time so tests
209 /// that override `CODETETHER_DATA_DIR` remain isolated.
210 pub(crate) fn sessions_dir() -> Result<PathBuf> {
211 crate::config::Config::data_dir()
212 .map(|d| d.join("sessions"))
213 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
214 }
215
216 /// Resolve the on-disk path for a session file.
217 pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
218 if id.is_empty()
219 || id.len() > 128
220 || id.contains(|c: char| !c.is_alphanumeric() && c != '-' && c != '_')
221 {
222 anyhow::bail!("Invalid session ID:rejecting path traversal risk");
223 }
224 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
225 }
226}
227
228/// Index-first scan. Tries the O(1) workspace index; on miss or stale
229/// entry, falls back to the byte-prefiltered directory scan and repairs
230/// the index with the winner so the next launch is O(1).
231fn scan_with_index(
232 sessions_dir: &Path,
233 canonical_workspace: Option<PathBuf>,
234 window: usize,
235) -> Result<TailLoad> {
236 // Fast path: check the sidecar index.
237 if let Some(ws) = canonical_workspace.as_ref() {
238 let index = super::workspace_index::WorkspaceIndex::load_sync();
239 if let Some(id) = index.get(ws) {
240 let candidate = sessions_dir.join(format!("{id}.json"));
241 if candidate.exists() {
242 if let Ok(load) = tail_load_sync(&candidate, window) {
243 // Confirm it still belongs to this workspace — the user
244 // could have edited the session's metadata.directory
245 // manually or moved a file.
246 let dir_ok = load
247 .session
248 .metadata
249 .directory
250 .as_ref()
251 .map(|d| {
252 let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
253 &canonical == ws
254 })
255 .unwrap_or(false);
256 if dir_ok {
257 return Ok(load);
258 }
259 tracing::warn!(
260 session_id = %id,
261 stored_dir = ?load.session.metadata.directory,
262 expected_dir = ?ws,
263 "Index hit but directory mismatch; falling back to scan"
264 );
265 } else {
266 tracing::warn!(
267 session_id = %id,
268 path = %candidate.display(),
269 "Index hit but session file failed to parse; falling back to scan"
270 );
271 }
272 }
273 }
274 }
275
276 // Slow path: scan everything.
277 let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
278
279 // Repair the index with whatever we found, so next time is O(1).
280 if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
281 let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
282 }
283
284 result
285}
286
287/// Tail-load a specific path synchronously (used by the index fast path).
288fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
289 use std::fs;
290 use std::io::BufReader;
291 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
292 let file = fs::File::open(path)?;
293 let reader = BufReader::with_capacity(64 * 1024, file);
294 let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
295 let mut session = parsed?;
296 session.normalize_sidecars();
297 Ok(TailLoad {
298 session,
299 dropped,
300 file_bytes,
301 })
302}
303
304/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
305///
306/// Flow:
307/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
308/// 2. Sort newest-first.
309/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
310/// `Vec<Message>` allocation. Because `metadata` is serialized before
311/// `messages`/`tool_uses` in new files, this is O(header bytes); older
312/// files still work but pay a lex-through cost.
313/// 4. On workspace match, re-open and do one tail-capped full parse.
314fn scan_sync(
315 sessions_dir: &Path,
316 canonical_workspace: Option<PathBuf>,
317 window: usize,
318) -> Result<TailLoad> {
319 use std::fs;
320 use std::io::BufReader;
321 use std::time::SystemTime;
322
323 if !sessions_dir.exists() {
324 anyhow::bail!("No sessions found");
325 }
326
327 let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
328 for entry in fs::read_dir(sessions_dir)? {
329 let entry = match entry {
330 Ok(e) => e,
331 Err(err) => {
332 tracing::warn!(error = %err, "skipping unreadable directory entry");
333 continue;
334 }
335 };
336 let path = entry.path();
337 if path.extension().and_then(|s| s.to_str()) != Some("json") {
338 continue;
339 }
340 let mtime = entry
341 .metadata()
342 .ok()
343 .and_then(|m| m.modified().ok())
344 .unwrap_or(SystemTime::UNIX_EPOCH);
345 candidates.push((path, mtime));
346 }
347 if candidates.is_empty() {
348 anyhow::bail!("No sessions found");
349 }
350 candidates.sort_by(|a, b| b.1.cmp(&a.1));
351
352 // Precompute a cheap byte-level needle for the workspace path. JSON
353 // serializes `metadata.directory` as `"directory":"<path>"`, so if
354 // the raw file bytes don't contain the path as a substring, we can
355 // skip it without invoking serde_json at all.
356 //
357 // This is the single biggest win for large workspaces: a 10 MB
358 // session that is *not* for this cwd gets ruled out in a few ms of
359 // byte scanning instead of a full JSON lex.
360 let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
361 // JSON-escape the path (backslashes on Windows become `\\`,
362 // quotes become `\"`). `serde_json::to_string` handles all the
363 // edge cases for us.
364 let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
365 // Strip the surrounding quotes; we want the inner bytes so we
366 // match whether the JSON has `"directory":"..."` or any other
367 // surrounding context.
368 let inner = quoted
369 .strip_prefix('"')
370 .and_then(|s| s.strip_suffix('"'))
371 .unwrap_or("ed);
372 inner.as_bytes().to_vec()
373 });
374 // Build the SIMD-accelerated substring finder once for the whole
375 // scan loop; reusing it across files is much faster than rebuilding.
376 let finder = needle
377 .as_ref()
378 .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
379
380 // Parallel byte-prefilter: for each candidate, compute whether the
381 // workspace path bytes appear in the file. This is the expensive
382 // O(file_bytes) step. Running it concurrently across CPU cores
383 // turns "sum of all file scan times" into ~"longest single file
384 // scan time" on a machine with multiple cores.
385 //
386 // We preserve mtime ordering by collecting results into a parallel
387 // Vec<bool> indexed the same as `candidates`, then iterating
388 // serially to find the first hit.
389 let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
390 (None, _) => vec![true; candidates.len()],
391 (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
392 (Some(finder), _) => {
393 let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
394 let results: std::sync::Mutex<Vec<Option<bool>>> =
395 std::sync::Mutex::new(vec![None; paths.len()]);
396 std::thread::scope(|scope| {
397 // Chunk candidates across available CPUs. For ~300 files
398 // a fan-out of 4-8 threads saturates I/O and CPU nicely
399 // without oversubscribing.
400 let threads = std::thread::available_parallelism()
401 .map(|n| n.get())
402 .unwrap_or(4)
403 .min(8);
404 let chunk_size = paths.len().div_ceil(threads);
405 for chunk_idx in 0..threads {
406 let start = chunk_idx * chunk_size;
407 if start >= paths.len() {
408 break;
409 }
410 let end = (start + chunk_size).min(paths.len());
411 let chunk_paths = &paths[start..end];
412 let results = &results;
413 scope.spawn(move || {
414 for (offset, p) in chunk_paths.iter().enumerate() {
415 let hit = file_contains_finder(p, finder).unwrap_or(false);
416 // Lock-per-entry is fine: contention is
417 // negligible vs. the ~ms file scan cost.
418 if let Ok(mut guard) = results.lock() {
419 guard[start + offset] = Some(hit);
420 }
421 }
422 });
423 }
424 });
425 results
426 .into_inner()
427 .unwrap_or_default()
428 .into_iter()
429 .map(|o| o.unwrap_or(false))
430 .collect()
431 }
432 };
433
434 for (idx, (path, _)) in candidates.iter().enumerate() {
435 // Fast path: byte-level substring prefilter (precomputed in parallel).
436 if !prefilter_hits.get(idx).copied().unwrap_or(false) {
437 continue;
438 }
439
440 // Slower path: full JSON header parse to confirm the match
441 // (the substring test has false positives: the path could
442 // appear in a chat message or tool output).
443 let header_ok = (|| -> Result<bool> {
444 let file = fs::File::open(path)?;
445 let reader = BufReader::with_capacity(16 * 1024, file);
446 let header: SessionHeader = match serde_json::from_reader(reader) {
447 Ok(h) => h,
448 Err(_) => return Ok(false),
449 };
450 if let Some(ref ws) = canonical_workspace {
451 let Some(dir) = header.metadata.directory.as_ref() else {
452 return Ok(false);
453 };
454 if dir == ws {
455 return Ok(true);
456 }
457 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
458 Ok(&canonical_dir == ws)
459 } else {
460 Ok(true)
461 }
462 })();
463
464 match header_ok {
465 Ok(true) => {}
466 Ok(false) => continue,
467 Err(err) => {
468 tracing::warn!(
469 path = %path.display(),
470 error = %err,
471 "skipping unreadable session file",
472 );
473 continue;
474 }
475 }
476
477 // Match found — do the tail-capped full parse.
478 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
479 let file = fs::File::open(path)?;
480 let reader = BufReader::with_capacity(64 * 1024, file);
481 let (parsed, dropped) =
482 with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
483 return Ok(TailLoad {
484 session: parsed?,
485 dropped,
486 file_bytes,
487 });
488 }
489
490 anyhow::bail!("No sessions found")
491}
492
493/// Memory-map `path` and return `true` if the SIMD finder matches.
494///
495/// mmap avoids the per-chunk `read` syscall overhead and the
496/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
497/// straight over the OS-provided virtual memory window, and the kernel
498/// pages in only what's actually touched. On a ~10 MB session file
499/// this is measurably faster than chunked `BufRead` + SIMD.
500///
501/// Falls back to returning `false` on any I/O error — the caller's
502/// mtime loop will simply try the next candidate.
503fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
504 use std::fs;
505
506 let needle_len = finder.needle().len();
507 if needle_len == 0 {
508 return Ok(true);
509 }
510 let file = fs::File::open(path)?;
511 let meta = file.metadata()?;
512 let len = meta.len();
513 if (len as usize) < needle_len {
514 return Ok(false);
515 }
516 const MAX_MMAP_SIZE: u64 = 64 * 1024 * 1024; // 64 MB
517 if len > MAX_MMAP_SIZE {
518 tracing::warn!(path = %path.display(), size = len, "Skipping oversized session file");
519 return Ok(false);
520 }
521 // SAFETY: We only read the mapping, we do not mutate it. The
522 // kernel COW-protects the mapping and the `Mmap` owns the lifetime
523 // tied to `file`, which we keep alive for the duration of `find`.
524 // Concurrent external modification of the file could theoretically
525 // tear the mapping, but session files are only ever rewritten
526 // wholesale via atomic rename — never in-place truncated.
527 let mmap = unsafe { memmap2::Mmap::map(&file)? };
528 Ok(finder.find(&mmap[..]).is_some())
529}