Skip to main content

codetether_agent/session/
persistence.rs

1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14    /// Load an existing session by its UUID.
15    ///
16    /// # Errors
17    ///
18    /// Returns an error if the session file does not exist, exceeds the
19    /// sanity size cap, or the JSON is malformed.
20    pub async fn load(id: &str) -> Result<Self> {
21        let path = Self::session_path(id)?;
22        super::helper::persistence_cap::ensure_session_file_size(&path).await?;
23        let mut session: Session = serde_json::from_str(&fs::read_to_string(&path).await?)?;
24        session.normalize_sidecars();
25        Ok(session)
26    }
27
28    /// Load the most recent session, optionally scoped to a workspace
29    /// directory.
30    ///
31    /// When `workspace` is [`Some`], only considers sessions created in that
32    /// directory. When [`None`], returns the most recent session globally.
33    ///
34    /// # Errors
35    ///
36    /// Returns an error if no sessions exist (or, with `workspace` set,
37    /// none match the requested directory).
38    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
39        Self::last_for_directory_tail(workspace, usize::MAX)
40            .await
41            .map(|t| t.session)
42    }
43
44    /// Like [`Self::last_for_directory`] but keeps only the last `window`
45    /// messages and tool uses in memory, returning a [`TailLoad`] with the
46    /// number of entries that were dropped. Use this when resuming very
47    /// large sessions where the full transcript would exhaust memory.
48    ///
49    /// Implementation: the entire scan runs on a single blocking thread.
50    /// For each candidate (newest-mtime first) we do a cheap
51    /// [`SessionHeader`] parse to compare `metadata.directory`; only the
52    /// matching file pays for a full tail parse.
53    pub async fn last_for_directory_tail(
54        workspace: Option<&std::path::Path>,
55        window: usize,
56    ) -> Result<TailLoad> {
57        let sessions_dir = Self::sessions_dir()?;
58        let canonical_workspace = workspace.map(|w| {
59            w.canonicalize().unwrap_or_else(|e| {
60                tracing::warn!(path = %w.display(), error = %e, "canonicalize failed; using raw path");
61                w.to_path_buf()
62            })
63        });
64        tokio::task::spawn_blocking(move || {
65            scan_with_index(&sessions_dir, canonical_workspace, window)
66        })
67        .await
68        .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
69    }
70
71    /// Load the most recent session globally (unscoped).
72    ///
73    /// Kept for legacy compatibility; prefer
74    /// [`Session::last_for_directory`].
75    pub async fn last() -> Result<Self> {
76        Self::last_for_directory(None).await
77    }
78
79    /// Persist the session to disk as JSON. Creates the sessions directory
80    /// on demand.
81    ///
82    /// Performance:
83    /// - Serialization runs on the blocking thread pool so a large session
84    ///   doesn't stall the async reactor during `serde_json` formatting.
85    /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
86    ///   serialize, ~30–40% smaller on disk, and correspondingly faster
87    ///   to load and mmap-scan on resume. Session files are machine-owned
88    ///   — humans never hand-edit them — so indentation is pure overhead.
89    /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
90    ///   previous session intact, and the mmap prefilter in
91    ///   [`file_contains_finder`] cannot observe a torn buffer.
92    pub async fn save(&self) -> Result<()> {
93        let path = Self::session_path(&self.id)?;
94        if let Some(parent) = path.parent() {
95            fs::create_dir_all(parent).await?;
96        }
97        let tmp = path.with_extension("json.tmp");
98        // Clone-and-serialize off the reactor. The clone is a Vec-copy
99        // (~memcpy speed) which is always cheaper than the JSON
100        // formatting we are about to avoid blocking the reactor on.
101        let mut snapshot = self.clone();
102        snapshot.normalize_sidecars();
103        let sink_config = snapshot.metadata.history_sink.clone().or_else(|| {
104            super::history_sink::HistorySinkConfig::from_env()
105                .ok()
106                .flatten()
107        });
108        let session_id_for_journal = snapshot.id.clone();
109        let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
110            .await
111            .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
112        fs::write(&tmp, content).await?;
113        // Atomic swap. On POSIX `rename` is atomic over existing files;
114        // on Windows we fall back to remove-then-rename.
115        if let Err(primary) = fs::rename(&tmp, &path).await {
116            let _ = fs::remove_file(&path).await;
117            if let Err(retry) = fs::rename(&tmp, &path).await {
118                let _ = fs::remove_file(&tmp).await;
119                return Err(anyhow::anyhow!(
120                    "session rename failed: {primary} (retry: {retry})"
121                ));
122            }
123        }
124        let mut journal = super::journal::WritebackJournal::new(&session_id_for_journal);
125        let tx = journal.stage(super::journal::Op::Save);
126        if let Err(reason) = journal.commit(tx) {
127            journal.reject(tx, reason);
128        }
129        if let Err(err) =
130            super::journal::append_entries(&session_id_for_journal, journal.entries()).await
131        {
132            tracing::warn!(
133                %err,
134                session_id = %session_id_for_journal,
135                "save journal append failed (non-fatal)"
136            );
137        }
138        // Update the workspace index so the next resume is O(1). Best
139        // effort — a failed index write must not fail the session save.
140        if let Some(dir) = &self.metadata.directory {
141            let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
142            let session_id = self.id.clone();
143            tokio::task::spawn_blocking(move || {
144                if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
145                    tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
146                }
147            });
148        }
149        // Phase A history sink: stream pure history to MinIO/S3.
150        // Env-gated, fire-and-forget — never blocks the save, never
151        // fails it. See [`super::history_sink`] for the env variables.
152        //
153        // Coalesce concurrent saves per session: if an upload for this
154        // session is already in flight, skip this spawn — the next
155        // save will pick up the latest history. This prevents bursty
156        // save loops (e.g. during long tool chains) from queueing
157        // unbounded background uploads.
158        if let Some(sink_config) = sink_config {
159            use std::collections::HashSet;
160            use std::sync::{Mutex, OnceLock};
161            static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
162            let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
163
164            let inserted = in_flight
165                .lock()
166                .map(|mut s| s.insert(self.id.clone()))
167                .unwrap_or(false);
168
169            if inserted {
170                let session_id = self.id.clone();
171                let messages = self.messages.clone();
172                tokio::spawn(async move {
173                    if let Err(err) = super::history_sink::upload_full_history(
174                        &sink_config,
175                        &session_id,
176                        &messages,
177                    )
178                    .await
179                    {
180                        tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
181                    }
182                    if let Ok(mut s) = in_flight.lock() {
183                        s.remove(&session_id);
184                    }
185                });
186            } else {
187                tracing::debug!(
188                    session_id = %self.id,
189                    "history sink upload already in flight; skipping duplicate"
190                );
191            }
192        }
193        Ok(())
194    }
195
196    /// Delete a session file by ID. No-op if the file does not exist.
197    pub async fn delete(id: &str) -> Result<()> {
198        let path = Self::session_path(id)?;
199        match tokio::fs::remove_file(&path).await {
200            Ok(()) => Ok(()),
201            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
202            Err(e) => Err(e.into()),
203        }
204    }
205
206    /// Resolve the sessions directory (`<data_dir>/sessions`).
207    ///
208    /// Cached after first resolution: `Config::data_dir()` does env-var
209    /// lookups and filesystem checks which are cheap individually but add
210    /// up across hundreds of save/load calls over a session's lifetime.
211    pub(crate) fn sessions_dir() -> Result<PathBuf> {
212        use std::sync::OnceLock;
213        static CACHED: OnceLock<PathBuf> = OnceLock::new();
214        if let Some(dir) = CACHED.get() {
215            return Ok(dir.clone());
216        }
217        let dir = crate::config::Config::data_dir()
218            .map(|d| d.join("sessions"))
219            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
220        // Best-effort set; if another thread won the race we just use
221        // theirs.
222        let _ = CACHED.set(dir.clone());
223        Ok(dir)
224    }
225
226    /// Resolve the on-disk path for a session file.
227    pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
228        if id.is_empty()
229            || id.len() > 128
230            || id.contains(|c: char| !c.is_alphanumeric() && c != '-' && c != '_')
231        {
232            anyhow::bail!("Invalid session ID:rejecting path traversal risk");
233        }
234        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
235    }
236}
237
238/// Index-first scan. Tries the O(1) workspace index; on miss or stale
239/// entry, falls back to the byte-prefiltered directory scan and repairs
240/// the index with the winner so the next launch is O(1).
241fn scan_with_index(
242    sessions_dir: &Path,
243    canonical_workspace: Option<PathBuf>,
244    window: usize,
245) -> Result<TailLoad> {
246    // Fast path: check the sidecar index.
247    if let Some(ws) = canonical_workspace.as_ref() {
248        let index = super::workspace_index::WorkspaceIndex::load_sync();
249        if let Some(id) = index.get(ws) {
250            let candidate = sessions_dir.join(format!("{id}.json"));
251            if candidate.exists() {
252                if let Ok(load) = tail_load_sync(&candidate, window) {
253                    // Confirm it still belongs to this workspace — the user
254                    // could have edited the session's metadata.directory
255                    // manually or moved a file.
256                    let dir_ok = load
257                        .session
258                        .metadata
259                        .directory
260                        .as_ref()
261                        .map(|d| {
262                            let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
263                            &canonical == ws
264                        })
265                        .unwrap_or(false);
266                    if dir_ok {
267                        return Ok(load);
268                    }
269                    tracing::warn!(
270                        session_id = %id,
271                        stored_dir = ?load.session.metadata.directory,
272                        expected_dir = ?ws,
273                        "Index hit but directory mismatch; falling back to scan"
274                    );
275                } else {
276                    tracing::warn!(
277                        session_id = %id,
278                        path = %candidate.display(),
279                        "Index hit but session file failed to parse; falling back to scan"
280                    );
281                }
282            }
283        }
284    }
285
286    // Slow path: scan everything.
287    let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
288
289    // Repair the index with whatever we found, so next time is O(1).
290    if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
291        let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
292    }
293
294    result
295}
296
297/// Tail-load a specific path synchronously (used by the index fast path).
298fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
299    use std::fs;
300    use std::io::BufReader;
301    let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
302    let file = fs::File::open(path)?;
303    let reader = BufReader::with_capacity(64 * 1024, file);
304    let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
305    let mut session = parsed?;
306    session.normalize_sidecars();
307    Ok(TailLoad {
308        session,
309        dropped,
310        file_bytes,
311    })
312}
313
314/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
315///
316/// Flow:
317/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
318/// 2. Sort newest-first.
319/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
320///    `Vec<Message>` allocation. Because `metadata` is serialized before
321///    `messages`/`tool_uses` in new files, this is O(header bytes); older
322///    files still work but pay a lex-through cost.
323/// 4. On workspace match, re-open and do one tail-capped full parse.
324fn scan_sync(
325    sessions_dir: &Path,
326    canonical_workspace: Option<PathBuf>,
327    window: usize,
328) -> Result<TailLoad> {
329    use std::fs;
330    use std::io::BufReader;
331    use std::time::SystemTime;
332
333    if !sessions_dir.exists() {
334        anyhow::bail!("No sessions found");
335    }
336
337    let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
338    for entry in fs::read_dir(sessions_dir)? {
339        let entry = match entry {
340            Ok(e) => e,
341            Err(err) => {
342                tracing::warn!(error = %err, "skipping unreadable directory entry");
343                continue;
344            }
345        };
346        let path = entry.path();
347        if path.extension().and_then(|s| s.to_str()) != Some("json") {
348            continue;
349        }
350        let mtime = entry
351            .metadata()
352            .ok()
353            .and_then(|m| m.modified().ok())
354            .unwrap_or(SystemTime::UNIX_EPOCH);
355        candidates.push((path, mtime));
356    }
357    if candidates.is_empty() {
358        anyhow::bail!("No sessions found");
359    }
360    candidates.sort_by(|a, b| b.1.cmp(&a.1));
361
362    // Precompute a cheap byte-level needle for the workspace path. JSON
363    // serializes `metadata.directory` as `"directory":"<path>"`, so if
364    // the raw file bytes don't contain the path as a substring, we can
365    // skip it without invoking serde_json at all.
366    //
367    // This is the single biggest win for large workspaces: a 10 MB
368    // session that is *not* for this cwd gets ruled out in a few ms of
369    // byte scanning instead of a full JSON lex.
370    let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
371        // JSON-escape the path (backslashes on Windows become `\\`,
372        // quotes become `\"`). `serde_json::to_string` handles all the
373        // edge cases for us.
374        let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
375        // Strip the surrounding quotes; we want the inner bytes so we
376        // match whether the JSON has `"directory":"..."` or any other
377        // surrounding context.
378        let inner = quoted
379            .strip_prefix('"')
380            .and_then(|s| s.strip_suffix('"'))
381            .unwrap_or(&quoted);
382        inner.as_bytes().to_vec()
383    });
384    // Build the SIMD-accelerated substring finder once for the whole
385    // scan loop; reusing it across files is much faster than rebuilding.
386    let finder = needle
387        .as_ref()
388        .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
389
390    // Parallel byte-prefilter: for each candidate, compute whether the
391    // workspace path bytes appear in the file. This is the expensive
392    // O(file_bytes) step. Running it concurrently across CPU cores
393    // turns "sum of all file scan times" into ~"longest single file
394    // scan time" on a machine with multiple cores.
395    //
396    // We preserve mtime ordering by collecting results into a parallel
397    // Vec<bool> indexed the same as `candidates`, then iterating
398    // serially to find the first hit.
399    let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
400        (None, _) => vec![true; candidates.len()],
401        (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
402        (Some(finder), _) => {
403            let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
404            let results: std::sync::Mutex<Vec<Option<bool>>> =
405                std::sync::Mutex::new(vec![None; paths.len()]);
406            std::thread::scope(|scope| {
407                // Chunk candidates across available CPUs. For ~300 files
408                // a fan-out of 4-8 threads saturates I/O and CPU nicely
409                // without oversubscribing.
410                let threads = std::thread::available_parallelism()
411                    .map(|n| n.get())
412                    .unwrap_or(4)
413                    .min(8);
414                let chunk_size = paths.len().div_ceil(threads);
415                for chunk_idx in 0..threads {
416                    let start = chunk_idx * chunk_size;
417                    if start >= paths.len() {
418                        break;
419                    }
420                    let end = (start + chunk_size).min(paths.len());
421                    let chunk_paths = &paths[start..end];
422                    let results = &results;
423                    scope.spawn(move || {
424                        for (offset, p) in chunk_paths.iter().enumerate() {
425                            let hit = file_contains_finder(p, finder).unwrap_or(false);
426                            // Lock-per-entry is fine: contention is
427                            // negligible vs. the ~ms file scan cost.
428                            if let Ok(mut guard) = results.lock() {
429                                guard[start + offset] = Some(hit);
430                            }
431                        }
432                    });
433                }
434            });
435            results
436                .into_inner()
437                .unwrap_or_default()
438                .into_iter()
439                .map(|o| o.unwrap_or(false))
440                .collect()
441        }
442    };
443
444    for (idx, (path, _)) in candidates.iter().enumerate() {
445        // Fast path: byte-level substring prefilter (precomputed in parallel).
446        if !prefilter_hits.get(idx).copied().unwrap_or(false) {
447            continue;
448        }
449
450        // Slower path: full JSON header parse to confirm the match
451        // (the substring test has false positives: the path could
452        // appear in a chat message or tool output).
453        let header_ok = (|| -> Result<bool> {
454            let file = fs::File::open(path)?;
455            let reader = BufReader::with_capacity(16 * 1024, file);
456            let header: SessionHeader = match serde_json::from_reader(reader) {
457                Ok(h) => h,
458                Err(_) => return Ok(false),
459            };
460            if let Some(ref ws) = canonical_workspace {
461                let Some(dir) = header.metadata.directory.as_ref() else {
462                    return Ok(false);
463                };
464                if dir == ws {
465                    return Ok(true);
466                }
467                let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
468                Ok(&canonical_dir == ws)
469            } else {
470                Ok(true)
471            }
472        })();
473
474        match header_ok {
475            Ok(true) => {}
476            Ok(false) => continue,
477            Err(err) => {
478                tracing::warn!(
479                    path = %path.display(),
480                    error = %err,
481                    "skipping unreadable session file",
482                );
483                continue;
484            }
485        }
486
487        // Match found — do the tail-capped full parse.
488        let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
489        let file = fs::File::open(path)?;
490        let reader = BufReader::with_capacity(64 * 1024, file);
491        let (parsed, dropped) =
492            with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
493        return Ok(TailLoad {
494            session: parsed?,
495            dropped,
496            file_bytes,
497        });
498    }
499
500    anyhow::bail!("No sessions found")
501}
502
503/// Memory-map `path` and return `true` if the SIMD finder matches.
504///
505/// mmap avoids the per-chunk `read` syscall overhead and the
506/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
507/// straight over the OS-provided virtual memory window, and the kernel
508/// pages in only what's actually touched. On a ~10 MB session file
509/// this is measurably faster than chunked `BufRead` + SIMD.
510///
511/// Falls back to returning `false` on any I/O error — the caller's
512/// mtime loop will simply try the next candidate.
513fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
514    use std::fs;
515
516    let needle_len = finder.needle().len();
517    if needle_len == 0 {
518        return Ok(true);
519    }
520    let file = fs::File::open(path)?;
521    let meta = file.metadata()?;
522    let len = meta.len();
523    if (len as usize) < needle_len {
524        return Ok(false);
525    }
526    const MAX_MMAP_SIZE: u64 = 64 * 1024 * 1024; // 64 MB
527    if len > MAX_MMAP_SIZE {
528        tracing::warn!(path = %path.display(), size = len, "Skipping oversized session file");
529        return Ok(false);
530    }
531    // SAFETY: We only read the mapping, we do not mutate it. The
532    // kernel COW-protects the mapping and the `Mmap` owns the lifetime
533    // tied to `file`, which we keep alive for the duration of `find`.
534    // Concurrent external modification of the file could theoretically
535    // tear the mapping, but session files are only ever rewritten
536    // wholesale via atomic rename — never in-place truncated.
537    let mmap = unsafe { memmap2::Mmap::map(&file)? };
538    Ok(finder.find(&mmap[..]).is_some())
539}