Skip to main content

codetether_agent/session/
persistence.rs

1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14    /// Load an existing session by its UUID.
15    ///
16    /// # Errors
17    ///
18    /// Returns an error if the session file does not exist, exceeds the
19    /// sanity size cap, or the JSON is malformed.
20    pub async fn load(id: &str) -> Result<Self> {
21        let path = Self::session_path(id)?;
22        super::helper::persistence_cap::ensure_session_file_size(&path).await?;
23        let mut session: Session = serde_json::from_str(&fs::read_to_string(&path).await?)?;
24        session.normalize_sidecars();
25        Ok(session)
26    }
27
28    /// Load the most recent session, optionally scoped to a workspace
29    /// directory.
30    ///
31    /// When `workspace` is [`Some`], only considers sessions created in that
32    /// directory. When [`None`], returns the most recent session globally.
33    ///
34    /// # Errors
35    ///
36    /// Returns an error if no sessions exist (or, with `workspace` set,
37    /// none match the requested directory).
38    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
39        Self::last_for_directory_tail(workspace, usize::MAX)
40            .await
41            .map(|t| t.session)
42    }
43
44    /// Like [`Self::last_for_directory`] but keeps only the last `window`
45    /// messages and tool uses in memory, returning a [`TailLoad`] with the
46    /// number of entries that were dropped. Use this when resuming very
47    /// large sessions where the full transcript would exhaust memory.
48    ///
49    /// Implementation: the entire scan runs on a single blocking thread.
50    /// For each candidate (newest-mtime first) we do a cheap
51    /// [`SessionHeader`] parse to compare `metadata.directory`; only the
52    /// matching file pays for a full tail parse.
53    pub async fn last_for_directory_tail(
54        workspace: Option<&std::path::Path>,
55        window: usize,
56    ) -> Result<TailLoad> {
57        let sessions_dir = Self::sessions_dir()?;
58        let canonical_workspace = workspace.map(|w| {
59            w.canonicalize().unwrap_or_else(|e| {
60                tracing::warn!(path = %w.display(), error = %e, "canonicalize failed; using raw path");
61                w.to_path_buf()
62            })
63        });
64        tokio::task::spawn_blocking(move || {
65            scan_with_index(&sessions_dir, canonical_workspace, window)
66        })
67        .await
68        .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
69    }
70
71    /// Load the most recent session globally (unscoped).
72    ///
73    /// Kept for legacy compatibility; prefer
74    /// [`Session::last_for_directory`].
75    pub async fn last() -> Result<Self> {
76        Self::last_for_directory(None).await
77    }
78
79    /// Persist the session to disk as JSON. Creates the sessions directory
80    /// on demand.
81    ///
82    /// Performance:
83    /// - Serialization runs on the blocking thread pool so a large session
84    ///   doesn't stall the async reactor during `serde_json` formatting.
85    /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
86    ///   serialize, ~30–40% smaller on disk, and correspondingly faster
87    ///   to load and mmap-scan on resume. Session files are machine-owned
88    ///   — humans never hand-edit them — so indentation is pure overhead.
89    /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
90    ///   previous session intact, and the mmap prefilter in
91    ///   [`file_contains_finder`] cannot observe a torn buffer.
92    pub async fn save(&self) -> Result<()> {
93        let path = Self::session_path(&self.id)?;
94        if let Some(parent) = path.parent() {
95            fs::create_dir_all(parent).await?;
96        }
97        let tmp = path.with_extension("json.tmp");
98        // Clone-and-serialize off the reactor. The clone is a Vec-copy
99        // (~memcpy speed) which is always cheaper than the JSON
100        // formatting we are about to avoid blocking the reactor on.
101        let mut snapshot = self.clone();
102        snapshot.normalize_sidecars();
103        let sink_config = snapshot.metadata.history_sink.clone().or_else(|| {
104            super::history_sink::HistorySinkConfig::from_env()
105                .ok()
106                .flatten()
107        });
108        let session_id_for_journal = snapshot.id.clone();
109        let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
110            .await
111            .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
112        fs::write(&tmp, content).await?;
113        // Atomic swap. On POSIX `rename` is atomic over existing files;
114        // on Windows we fall back to remove-then-rename.
115        if let Err(primary) = fs::rename(&tmp, &path).await {
116            let _ = fs::remove_file(&path).await;
117            if let Err(retry) = fs::rename(&tmp, &path).await {
118                let _ = fs::remove_file(&tmp).await;
119                return Err(anyhow::anyhow!(
120                    "session rename failed: {primary} (retry: {retry})"
121                ));
122            }
123        }
124        let mut journal = super::journal::WritebackJournal::new(&session_id_for_journal);
125        let tx = journal.stage(super::journal::Op::Save);
126        if let Err(reason) = journal.commit(tx) {
127            journal.reject(tx, reason);
128        }
129        if let Err(err) =
130            super::journal::append_entries(&session_id_for_journal, journal.entries()).await
131        {
132            tracing::warn!(
133                %err,
134                session_id = %session_id_for_journal,
135                "save journal append failed (non-fatal)"
136            );
137        }
138        // Update the workspace index so the next resume is O(1). Best
139        // effort — a failed index write must not fail the session save.
140        if let Some(dir) = &self.metadata.directory {
141            let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
142            let session_id = self.id.clone();
143            tokio::task::spawn_blocking(move || {
144                if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
145                    tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
146                }
147            });
148        }
149        // Phase A history sink: stream pure history to MinIO/S3.
150        // Env-gated, fire-and-forget — never blocks the save, never
151        // fails it. See [`super::history_sink`] for the env variables.
152        //
153        // Coalesce concurrent saves per session: if an upload for this
154        // session is already in flight, skip this spawn — the next
155        // save will pick up the latest history. This prevents bursty
156        // save loops (e.g. during long tool chains) from queueing
157        // unbounded background uploads.
158        if let Some(sink_config) = sink_config {
159            use std::collections::HashSet;
160            use std::sync::{Mutex, OnceLock};
161            static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
162            let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
163
164            let inserted = in_flight
165                .lock()
166                .map(|mut s| s.insert(self.id.clone()))
167                .unwrap_or(false);
168
169            if inserted {
170                let session_id = self.id.clone();
171                let messages = self.messages.clone();
172                tokio::spawn(async move {
173                    if let Err(err) = super::history_sink::upload_full_history(
174                        &sink_config,
175                        &session_id,
176                        &messages,
177                    )
178                    .await
179                    {
180                        tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
181                    }
182                    if let Ok(mut s) = in_flight.lock() {
183                        s.remove(&session_id);
184                    }
185                });
186            } else {
187                tracing::debug!(
188                    session_id = %self.id,
189                    "history sink upload already in flight; skipping duplicate"
190                );
191            }
192        }
193        Ok(())
194    }
195
196    /// Delete a session file by ID. No-op if the file does not exist.
197    pub async fn delete(id: &str) -> Result<()> {
198        let path = Self::session_path(id)?;
199        match tokio::fs::remove_file(&path).await {
200            Ok(()) => Ok(()),
201            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
202            Err(e) => Err(e.into()),
203        }
204    }
205
206    /// Resolve the sessions directory (`<data_dir>/sessions`).
207    ///
208    /// Resolved from [`crate::config::Config::data_dir`] each time so tests
209    /// that override `CODETETHER_DATA_DIR` remain isolated.
210    pub(crate) fn sessions_dir() -> Result<PathBuf> {
211        crate::config::Config::data_dir()
212            .map(|d| d.join("sessions"))
213            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
214    }
215
216    /// Resolve the on-disk path for a session file.
217    pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
218        if id.is_empty()
219            || id.len() > 128
220            || id.contains(|c: char| !c.is_alphanumeric() && c != '-' && c != '_')
221        {
222            anyhow::bail!("Invalid session ID:rejecting path traversal risk");
223        }
224        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
225    }
226}
227
228/// Index-first scan. Tries the O(1) workspace index; on miss or stale
229/// entry, falls back to the byte-prefiltered directory scan and repairs
230/// the index with the winner so the next launch is O(1).
231fn scan_with_index(
232    sessions_dir: &Path,
233    canonical_workspace: Option<PathBuf>,
234    window: usize,
235) -> Result<TailLoad> {
236    // Fast path: check the sidecar index.
237    if let Some(ws) = canonical_workspace.as_ref() {
238        let index = super::workspace_index::WorkspaceIndex::load_sync();
239        if let Some(id) = index.get(ws) {
240            let candidate = sessions_dir.join(format!("{id}.json"));
241            if candidate.exists() {
242                if let Ok(load) = tail_load_sync(&candidate, window) {
243                    // Confirm it still belongs to this workspace — the user
244                    // could have edited the session's metadata.directory
245                    // manually or moved a file.
246                    let dir_ok = load
247                        .session
248                        .metadata
249                        .directory
250                        .as_ref()
251                        .map(|d| {
252                            let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
253                            &canonical == ws
254                        })
255                        .unwrap_or(false);
256                    if dir_ok {
257                        return Ok(load);
258                    }
259                    tracing::warn!(
260                        session_id = %id,
261                        stored_dir = ?load.session.metadata.directory,
262                        expected_dir = ?ws,
263                        "Index hit but directory mismatch; falling back to scan"
264                    );
265                } else {
266                    tracing::warn!(
267                        session_id = %id,
268                        path = %candidate.display(),
269                        "Index hit but session file failed to parse; falling back to scan"
270                    );
271                }
272            }
273        }
274    }
275
276    // Slow path: scan everything.
277    let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
278
279    // Repair the index with whatever we found, so next time is O(1).
280    if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
281        let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
282    }
283
284    result
285}
286
287/// Tail-load a specific path synchronously (used by the index fast path).
288fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
289    use std::fs;
290    use std::io::BufReader;
291    let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
292    let file = fs::File::open(path)?;
293    let reader = BufReader::with_capacity(64 * 1024, file);
294    let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
295    let mut session = parsed?;
296    session.normalize_sidecars();
297    Ok(TailLoad {
298        session,
299        dropped,
300        file_bytes,
301    })
302}
303
304/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
305///
306/// Flow:
307/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
308/// 2. Sort newest-first.
309/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
310///    `Vec<Message>` allocation. Because `metadata` is serialized before
311///    `messages`/`tool_uses` in new files, this is O(header bytes); older
312///    files still work but pay a lex-through cost.
313/// 4. On workspace match, re-open and do one tail-capped full parse.
314fn scan_sync(
315    sessions_dir: &Path,
316    canonical_workspace: Option<PathBuf>,
317    window: usize,
318) -> Result<TailLoad> {
319    use std::fs;
320    use std::io::BufReader;
321    use std::time::SystemTime;
322
323    if !sessions_dir.exists() {
324        anyhow::bail!("No sessions found");
325    }
326
327    let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
328    for entry in fs::read_dir(sessions_dir)? {
329        let entry = match entry {
330            Ok(e) => e,
331            Err(err) => {
332                tracing::warn!(error = %err, "skipping unreadable directory entry");
333                continue;
334            }
335        };
336        let path = entry.path();
337        if path.extension().and_then(|s| s.to_str()) != Some("json") {
338            continue;
339        }
340        let mtime = entry
341            .metadata()
342            .ok()
343            .and_then(|m| m.modified().ok())
344            .unwrap_or(SystemTime::UNIX_EPOCH);
345        candidates.push((path, mtime));
346    }
347    if candidates.is_empty() {
348        anyhow::bail!("No sessions found");
349    }
350    candidates.sort_by(|a, b| b.1.cmp(&a.1));
351
352    // Precompute a cheap byte-level needle for the workspace path. JSON
353    // serializes `metadata.directory` as `"directory":"<path>"`, so if
354    // the raw file bytes don't contain the path as a substring, we can
355    // skip it without invoking serde_json at all.
356    //
357    // This is the single biggest win for large workspaces: a 10 MB
358    // session that is *not* for this cwd gets ruled out in a few ms of
359    // byte scanning instead of a full JSON lex.
360    let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
361        // JSON-escape the path (backslashes on Windows become `\\`,
362        // quotes become `\"`). `serde_json::to_string` handles all the
363        // edge cases for us.
364        let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
365        // Strip the surrounding quotes; we want the inner bytes so we
366        // match whether the JSON has `"directory":"..."` or any other
367        // surrounding context.
368        let inner = quoted
369            .strip_prefix('"')
370            .and_then(|s| s.strip_suffix('"'))
371            .unwrap_or(&quoted);
372        inner.as_bytes().to_vec()
373    });
374    // Build the SIMD-accelerated substring finder once for the whole
375    // scan loop; reusing it across files is much faster than rebuilding.
376    let finder = needle
377        .as_ref()
378        .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
379
380    // Parallel byte-prefilter: for each candidate, compute whether the
381    // workspace path bytes appear in the file. This is the expensive
382    // O(file_bytes) step. Running it concurrently across CPU cores
383    // turns "sum of all file scan times" into ~"longest single file
384    // scan time" on a machine with multiple cores.
385    //
386    // We preserve mtime ordering by collecting results into a parallel
387    // Vec<bool> indexed the same as `candidates`, then iterating
388    // serially to find the first hit.
389    let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
390        (None, _) => vec![true; candidates.len()],
391        (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
392        (Some(finder), _) => {
393            let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
394            let results: std::sync::Mutex<Vec<Option<bool>>> =
395                std::sync::Mutex::new(vec![None; paths.len()]);
396            std::thread::scope(|scope| {
397                // Chunk candidates across available CPUs. For ~300 files
398                // a fan-out of 4-8 threads saturates I/O and CPU nicely
399                // without oversubscribing.
400                let threads = std::thread::available_parallelism()
401                    .map(|n| n.get())
402                    .unwrap_or(4)
403                    .min(8);
404                let chunk_size = paths.len().div_ceil(threads);
405                for chunk_idx in 0..threads {
406                    let start = chunk_idx * chunk_size;
407                    if start >= paths.len() {
408                        break;
409                    }
410                    let end = (start + chunk_size).min(paths.len());
411                    let chunk_paths = &paths[start..end];
412                    let results = &results;
413                    scope.spawn(move || {
414                        for (offset, p) in chunk_paths.iter().enumerate() {
415                            let hit = file_contains_finder(p, finder).unwrap_or(false);
416                            // Lock-per-entry is fine: contention is
417                            // negligible vs. the ~ms file scan cost.
418                            if let Ok(mut guard) = results.lock() {
419                                guard[start + offset] = Some(hit);
420                            }
421                        }
422                    });
423                }
424            });
425            results
426                .into_inner()
427                .unwrap_or_default()
428                .into_iter()
429                .map(|o| o.unwrap_or(false))
430                .collect()
431        }
432    };
433
434    for (idx, (path, _)) in candidates.iter().enumerate() {
435        // Fast path: byte-level substring prefilter (precomputed in parallel).
436        if !prefilter_hits.get(idx).copied().unwrap_or(false) {
437            continue;
438        }
439
440        // Slower path: full JSON header parse to confirm the match
441        // (the substring test has false positives: the path could
442        // appear in a chat message or tool output).
443        let header_ok = (|| -> Result<bool> {
444            let file = fs::File::open(path)?;
445            let reader = BufReader::with_capacity(16 * 1024, file);
446            let header: SessionHeader = match serde_json::from_reader(reader) {
447                Ok(h) => h,
448                Err(_) => return Ok(false),
449            };
450            if let Some(ref ws) = canonical_workspace {
451                let Some(dir) = header.metadata.directory.as_ref() else {
452                    return Ok(false);
453                };
454                if dir == ws {
455                    return Ok(true);
456                }
457                let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
458                Ok(&canonical_dir == ws)
459            } else {
460                Ok(true)
461            }
462        })();
463
464        match header_ok {
465            Ok(true) => {}
466            Ok(false) => continue,
467            Err(err) => {
468                tracing::warn!(
469                    path = %path.display(),
470                    error = %err,
471                    "skipping unreadable session file",
472                );
473                continue;
474            }
475        }
476
477        // Match found — do the tail-capped full parse.
478        let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
479        let file = fs::File::open(path)?;
480        let reader = BufReader::with_capacity(64 * 1024, file);
481        let (parsed, dropped) =
482            with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
483        return Ok(TailLoad {
484            session: parsed?,
485            dropped,
486            file_bytes,
487        });
488    }
489
490    anyhow::bail!("No sessions found")
491}
492
493/// Memory-map `path` and return `true` if the SIMD finder matches.
494///
495/// mmap avoids the per-chunk `read` syscall overhead and the
496/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
497/// straight over the OS-provided virtual memory window, and the kernel
498/// pages in only what's actually touched. On a ~10 MB session file
499/// this is measurably faster than chunked `BufRead` + SIMD.
500///
501/// Falls back to returning `false` on any I/O error — the caller's
502/// mtime loop will simply try the next candidate.
503fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
504    use std::fs;
505
506    let needle_len = finder.needle().len();
507    if needle_len == 0 {
508        return Ok(true);
509    }
510    let file = fs::File::open(path)?;
511    let meta = file.metadata()?;
512    let len = meta.len();
513    if (len as usize) < needle_len {
514        return Ok(false);
515    }
516    const MAX_MMAP_SIZE: u64 = 64 * 1024 * 1024; // 64 MB
517    if len > MAX_MMAP_SIZE {
518        tracing::warn!(path = %path.display(), size = len, "Skipping oversized session file");
519        return Ok(false);
520    }
521    // SAFETY: We only read the mapping, we do not mutate it. The
522    // kernel COW-protects the mapping and the `Mmap` owns the lifetime
523    // tied to `file`, which we keep alive for the duration of `find`.
524    // Concurrent external modification of the file could theoretically
525    // tear the mapping, but session files are only ever rewritten
526    // wholesale via atomic rename — never in-place truncated.
527    let mmap = unsafe { memmap2::Mmap::map(&file)? };
528    Ok(finder.find(&mmap[..]).is_some())
529}