codetether_agent/session/persistence.rs
1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14 /// Load an existing session by its UUID.
15 ///
16 /// # Errors
17 ///
18 /// Returns an error if the session file does not exist or the JSON is
19 /// malformed.
20 pub async fn load(id: &str) -> Result<Self> {
21 let path = Self::session_path(id)?;
22 let content = fs::read_to_string(&path).await?;
23 let mut session: Session = serde_json::from_str(&content)?;
24 session.normalize_sidecars();
25 Ok(session)
26 }
27
28 /// Load the most recent session, optionally scoped to a workspace
29 /// directory.
30 ///
31 /// When `workspace` is [`Some`], only considers sessions created in that
32 /// directory. When [`None`], returns the most recent session globally.
33 ///
34 /// # Errors
35 ///
36 /// Returns an error if no sessions exist (or, with `workspace` set,
37 /// none match the requested directory).
38 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
39 Self::last_for_directory_tail(workspace, usize::MAX)
40 .await
41 .map(|t| t.session)
42 }
43
44 /// Like [`Self::last_for_directory`] but keeps only the last `window`
45 /// messages and tool uses in memory, returning a [`TailLoad`] with the
46 /// number of entries that were dropped. Use this when resuming very
47 /// large sessions where the full transcript would exhaust memory.
48 ///
49 /// Implementation: the entire scan runs on a single blocking thread.
50 /// For each candidate (newest-mtime first) we do a cheap
51 /// [`SessionHeader`] parse to compare `metadata.directory`; only the
52 /// matching file pays for a full tail parse.
53 pub async fn last_for_directory_tail(
54 workspace: Option<&std::path::Path>,
55 window: usize,
56 ) -> Result<TailLoad> {
57 let sessions_dir = Self::sessions_dir()?;
58 let canonical_workspace =
59 workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
60 tokio::task::spawn_blocking(move || {
61 scan_with_index(&sessions_dir, canonical_workspace, window)
62 })
63 .await
64 .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
65 }
66
67 /// Load the most recent session globally (unscoped).
68 ///
69 /// Kept for legacy compatibility; prefer
70 /// [`Session::last_for_directory`].
71 pub async fn last() -> Result<Self> {
72 Self::last_for_directory(None).await
73 }
74
75 /// Persist the session to disk as JSON. Creates the sessions directory
76 /// on demand.
77 ///
78 /// Performance:
79 /// - Serialization runs on the blocking thread pool so a large session
80 /// doesn't stall the async reactor during `serde_json` formatting.
81 /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
82 /// serialize, ~30–40% smaller on disk, and correspondingly faster
83 /// to load and mmap-scan on resume. Session files are machine-owned
84 /// — humans never hand-edit them — so indentation is pure overhead.
85 /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
86 /// previous session intact, and the mmap prefilter in
87 /// [`file_contains_finder`] cannot observe a torn buffer.
88 pub async fn save(&self) -> Result<()> {
89 let path = Self::session_path(&self.id)?;
90 if let Some(parent) = path.parent() {
91 fs::create_dir_all(parent).await?;
92 }
93 let tmp = path.with_extension("json.tmp");
94 // Clone-and-serialize off the reactor. The clone is a Vec-copy
95 // (~memcpy speed) which is always cheaper than the JSON
96 // formatting we are about to avoid blocking the reactor on.
97 let mut snapshot = self.clone();
98 snapshot.normalize_sidecars();
99 let sink_config = snapshot.metadata.history_sink.clone().or_else(|| {
100 super::history_sink::HistorySinkConfig::from_env()
101 .ok()
102 .flatten()
103 });
104 let session_id_for_journal = snapshot.id.clone();
105 let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
106 .await
107 .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
108 fs::write(&tmp, content).await?;
109 // Atomic swap. On POSIX `rename` is atomic over existing files;
110 // on Windows we fall back to remove-then-rename.
111 if let Err(primary) = fs::rename(&tmp, &path).await {
112 let _ = fs::remove_file(&path).await;
113 if let Err(retry) = fs::rename(&tmp, &path).await {
114 let _ = fs::remove_file(&tmp).await;
115 return Err(anyhow::anyhow!(
116 "session rename failed: {primary} (retry: {retry})"
117 ));
118 }
119 }
120 let mut journal = super::journal::WritebackJournal::new(&session_id_for_journal);
121 let tx = journal.stage(super::journal::Op::Save);
122 if let Err(reason) = journal.commit(tx) {
123 journal.reject(tx, reason);
124 }
125 if let Err(err) =
126 super::journal::append_entries(&session_id_for_journal, journal.entries()).await
127 {
128 tracing::warn!(
129 %err,
130 session_id = %session_id_for_journal,
131 "save journal append failed (non-fatal)"
132 );
133 }
134 // Update the workspace index so the next resume is O(1). Best
135 // effort — a failed index write must not fail the session save.
136 if let Some(dir) = &self.metadata.directory {
137 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
138 let session_id = self.id.clone();
139 tokio::task::spawn_blocking(move || {
140 if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
141 tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
142 }
143 });
144 }
145 // Phase A history sink: stream pure history to MinIO/S3.
146 // Env-gated, fire-and-forget — never blocks the save, never
147 // fails it. See [`super::history_sink`] for the env variables.
148 //
149 // Coalesce concurrent saves per session: if an upload for this
150 // session is already in flight, skip this spawn — the next
151 // save will pick up the latest history. This prevents bursty
152 // save loops (e.g. during long tool chains) from queueing
153 // unbounded background uploads.
154 if let Some(sink_config) = sink_config {
155 use std::collections::HashSet;
156 use std::sync::{Mutex, OnceLock};
157 static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
158 let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
159
160 let inserted = in_flight
161 .lock()
162 .map(|mut s| s.insert(self.id.clone()))
163 .unwrap_or(false);
164
165 if inserted {
166 let session_id = self.id.clone();
167 let messages = self.messages.clone();
168 tokio::spawn(async move {
169 if let Err(err) = super::history_sink::upload_full_history(
170 &sink_config,
171 &session_id,
172 &messages,
173 )
174 .await
175 {
176 tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
177 }
178 if let Ok(mut s) = in_flight.lock() {
179 s.remove(&session_id);
180 }
181 });
182 } else {
183 tracing::debug!(
184 session_id = %self.id,
185 "history sink upload already in flight; skipping duplicate"
186 );
187 }
188 }
189 Ok(())
190 }
191
192 /// Delete a session file by ID. No-op if the file does not exist.
193 pub async fn delete(id: &str) -> Result<()> {
194 let path = Self::session_path(id)?;
195 if path.exists() {
196 tokio::fs::remove_file(&path).await?;
197 }
198 Ok(())
199 }
200
201 /// Resolve the sessions directory (`<data_dir>/sessions`).
202 ///
203 /// Cached after first resolution: `Config::data_dir()` does env-var
204 /// lookups and filesystem checks which are cheap individually but add
205 /// up across hundreds of save/load calls over a session's lifetime.
206 pub(crate) fn sessions_dir() -> Result<PathBuf> {
207 use std::sync::OnceLock;
208 static CACHED: OnceLock<PathBuf> = OnceLock::new();
209 if let Some(dir) = CACHED.get() {
210 return Ok(dir.clone());
211 }
212 let dir = crate::config::Config::data_dir()
213 .map(|d| d.join("sessions"))
214 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
215 // Best-effort set; if another thread won the race we just use
216 // theirs.
217 let _ = CACHED.set(dir.clone());
218 Ok(dir)
219 }
220
221 /// Resolve the on-disk path for a session file.
222 pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
223 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
224 }
225}
226
227/// Index-first scan. Tries the O(1) workspace index; on miss or stale
228/// entry, falls back to the byte-prefiltered directory scan and repairs
229/// the index with the winner so the next launch is O(1).
230fn scan_with_index(
231 sessions_dir: &Path,
232 canonical_workspace: Option<PathBuf>,
233 window: usize,
234) -> Result<TailLoad> {
235 // Fast path: check the sidecar index.
236 if let Some(ws) = canonical_workspace.as_ref() {
237 let index = super::workspace_index::WorkspaceIndex::load_sync();
238 if let Some(id) = index.get(ws) {
239 let candidate = sessions_dir.join(format!("{id}.json"));
240 if candidate.exists()
241 && let Ok(load) = tail_load_sync(&candidate, window)
242 {
243 // Confirm it still belongs to this workspace — the user
244 // could have edited the session's metadata.directory
245 // manually or moved a file.
246 let dir_ok = load
247 .session
248 .metadata
249 .directory
250 .as_ref()
251 .map(|d| {
252 let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
253 &canonical == ws
254 })
255 .unwrap_or(false);
256 if dir_ok {
257 return Ok(load);
258 }
259 }
260 }
261 }
262
263 // Slow path: scan everything.
264 let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
265
266 // Repair the index with whatever we found, so next time is O(1).
267 if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
268 let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
269 }
270
271 result
272}
273
274/// Tail-load a specific path synchronously (used by the index fast path).
275fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
276 use std::fs;
277 use std::io::BufReader;
278 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
279 let file = fs::File::open(path)?;
280 let reader = BufReader::with_capacity(64 * 1024, file);
281 let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
282 let mut session = parsed?;
283 session.normalize_sidecars();
284 Ok(TailLoad {
285 session,
286 dropped,
287 file_bytes,
288 })
289}
290
291/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
292///
293/// Flow:
294/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
295/// 2. Sort newest-first.
296/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
297/// `Vec<Message>` allocation. Because `metadata` is serialized before
298/// `messages`/`tool_uses` in new files, this is O(header bytes); older
299/// files still work but pay a lex-through cost.
300/// 4. On workspace match, re-open and do one tail-capped full parse.
301fn scan_sync(
302 sessions_dir: &Path,
303 canonical_workspace: Option<PathBuf>,
304 window: usize,
305) -> Result<TailLoad> {
306 use std::fs;
307 use std::io::BufReader;
308 use std::time::SystemTime;
309
310 if !sessions_dir.exists() {
311 anyhow::bail!("No sessions found");
312 }
313
314 let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
315 for entry in fs::read_dir(sessions_dir)? {
316 let entry = match entry {
317 Ok(e) => e,
318 Err(_) => continue,
319 };
320 let path = entry.path();
321 if path.extension().and_then(|s| s.to_str()) != Some("json") {
322 continue;
323 }
324 let mtime = entry
325 .metadata()
326 .ok()
327 .and_then(|m| m.modified().ok())
328 .unwrap_or(SystemTime::UNIX_EPOCH);
329 candidates.push((path, mtime));
330 }
331 if candidates.is_empty() {
332 anyhow::bail!("No sessions found");
333 }
334 candidates.sort_by(|a, b| b.1.cmp(&a.1));
335
336 // Precompute a cheap byte-level needle for the workspace path. JSON
337 // serializes `metadata.directory` as `"directory":"<path>"`, so if
338 // the raw file bytes don't contain the path as a substring, we can
339 // skip it without invoking serde_json at all.
340 //
341 // This is the single biggest win for large workspaces: a 10 MB
342 // session that is *not* for this cwd gets ruled out in a few ms of
343 // byte scanning instead of a full JSON lex.
344 let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
345 // JSON-escape the path (backslashes on Windows become `\\`,
346 // quotes become `\"`). `serde_json::to_string` handles all the
347 // edge cases for us.
348 let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
349 // Strip the surrounding quotes; we want the inner bytes so we
350 // match whether the JSON has `"directory":"..."` or any other
351 // surrounding context.
352 let inner = quoted
353 .strip_prefix('"')
354 .and_then(|s| s.strip_suffix('"'))
355 .unwrap_or("ed);
356 inner.as_bytes().to_vec()
357 });
358 // Build the SIMD-accelerated substring finder once for the whole
359 // scan loop; reusing it across files is much faster than rebuilding.
360 let finder = needle
361 .as_ref()
362 .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
363
364 // Parallel byte-prefilter: for each candidate, compute whether the
365 // workspace path bytes appear in the file. This is the expensive
366 // O(file_bytes) step. Running it concurrently across CPU cores
367 // turns "sum of all file scan times" into ~"longest single file
368 // scan time" on a machine with multiple cores.
369 //
370 // We preserve mtime ordering by collecting results into a parallel
371 // Vec<bool> indexed the same as `candidates`, then iterating
372 // serially to find the first hit.
373 let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
374 (None, _) => vec![true; candidates.len()],
375 (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
376 (Some(finder), _) => {
377 let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
378 let results: std::sync::Mutex<Vec<Option<bool>>> =
379 std::sync::Mutex::new(vec![None; paths.len()]);
380 std::thread::scope(|scope| {
381 // Chunk candidates across available CPUs. For ~300 files
382 // a fan-out of 4-8 threads saturates I/O and CPU nicely
383 // without oversubscribing.
384 let threads = std::thread::available_parallelism()
385 .map(|n| n.get())
386 .unwrap_or(4)
387 .min(8);
388 let chunk_size = paths.len().div_ceil(threads);
389 for chunk_idx in 0..threads {
390 let start = chunk_idx * chunk_size;
391 if start >= paths.len() {
392 break;
393 }
394 let end = (start + chunk_size).min(paths.len());
395 let chunk_paths = &paths[start..end];
396 let results = &results;
397 scope.spawn(move || {
398 for (offset, p) in chunk_paths.iter().enumerate() {
399 let hit = file_contains_finder(p, finder).unwrap_or(false);
400 // Lock-per-entry is fine: contention is
401 // negligible vs. the ~ms file scan cost.
402 if let Ok(mut guard) = results.lock() {
403 guard[start + offset] = Some(hit);
404 }
405 }
406 });
407 }
408 });
409 results
410 .into_inner()
411 .unwrap_or_default()
412 .into_iter()
413 .map(|o| o.unwrap_or(false))
414 .collect()
415 }
416 };
417
418 for (idx, (path, _)) in candidates.iter().enumerate() {
419 // Fast path: byte-level substring prefilter (precomputed in parallel).
420 if !prefilter_hits.get(idx).copied().unwrap_or(false) {
421 continue;
422 }
423
424 // Slower path: full JSON header parse to confirm the match
425 // (the substring test has false positives: the path could
426 // appear in a chat message or tool output).
427 let header_ok = (|| -> Result<bool> {
428 let file = fs::File::open(path)?;
429 let reader = BufReader::with_capacity(16 * 1024, file);
430 let header: SessionHeader = match serde_json::from_reader(reader) {
431 Ok(h) => h,
432 Err(_) => return Ok(false),
433 };
434 if let Some(ref ws) = canonical_workspace {
435 let Some(dir) = header.metadata.directory.as_ref() else {
436 return Ok(false);
437 };
438 if dir == ws {
439 return Ok(true);
440 }
441 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
442 Ok(&canonical_dir == ws)
443 } else {
444 Ok(true)
445 }
446 })();
447
448 match header_ok {
449 Ok(true) => {}
450 Ok(false) => continue,
451 Err(err) => {
452 tracing::warn!(
453 path = %path.display(),
454 error = %err,
455 "skipping unreadable session file",
456 );
457 continue;
458 }
459 }
460
461 // Match found — do the tail-capped full parse.
462 let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
463 let file = fs::File::open(path)?;
464 let reader = BufReader::with_capacity(64 * 1024, file);
465 let (parsed, dropped) =
466 with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
467 return Ok(TailLoad {
468 session: parsed?,
469 dropped,
470 file_bytes,
471 });
472 }
473
474 anyhow::bail!("No sessions found")
475}
476
477/// Memory-map `path` and return `true` if the SIMD finder matches.
478///
479/// mmap avoids the per-chunk `read` syscall overhead and the
480/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
481/// straight over the OS-provided virtual memory window, and the kernel
482/// pages in only what's actually touched. On a ~10 MB session file
483/// this is measurably faster than chunked `BufRead` + SIMD.
484///
485/// Falls back to returning `false` on any I/O error — the caller's
486/// mtime loop will simply try the next candidate.
487fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
488 use std::fs;
489
490 let needle_len = finder.needle().len();
491 if needle_len == 0 {
492 return Ok(true);
493 }
494 let file = fs::File::open(path)?;
495 let meta = file.metadata()?;
496 let len = meta.len();
497 if (len as usize) < needle_len {
498 return Ok(false);
499 }
500 // SAFETY: We only read the mapping, we do not mutate it. The
501 // kernel COW-protects the mapping and the `Mmap` owns the lifetime
502 // tied to `file`, which we keep alive for the duration of `find`.
503 // Concurrent external modification of the file could theoretically
504 // tear the mapping, but session files are only ever rewritten
505 // wholesale via atomic rename — never in-place truncated.
506 let mmap = unsafe { memmap2::Mmap::map(&file)? };
507 Ok(finder.find(&mmap[..]).is_some())
508}