Skip to main content

codetether_agent/session/
persistence.rs

1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14    /// Load an existing session by its UUID.
15    ///
16    /// # Errors
17    ///
18    /// Returns an error if the session file does not exist or the JSON is
19    /// malformed.
20    pub async fn load(id: &str) -> Result<Self> {
21        let path = Self::session_path(id)?;
22        let content = fs::read_to_string(&path).await?;
23        let session: Session = serde_json::from_str(&content)?;
24        Ok(session)
25    }
26
27    /// Load the most recent session, optionally scoped to a workspace
28    /// directory.
29    ///
30    /// When `workspace` is [`Some`], only considers sessions created in that
31    /// directory. When [`None`], returns the most recent session globally.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if no sessions exist (or, with `workspace` set,
36    /// none match the requested directory).
37    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
38        Self::last_for_directory_tail(workspace, usize::MAX)
39            .await
40            .map(|t| t.session)
41    }
42
43    /// Like [`Self::last_for_directory`] but keeps only the last `window`
44    /// messages and tool uses in memory, returning a [`TailLoad`] with the
45    /// number of entries that were dropped. Use this when resuming very
46    /// large sessions where the full transcript would exhaust memory.
47    ///
48    /// Implementation: the entire scan runs on a single blocking thread.
49    /// For each candidate (newest-mtime first) we do a cheap
50    /// [`SessionHeader`] parse to compare `metadata.directory`; only the
51    /// matching file pays for a full tail parse.
52    pub async fn last_for_directory_tail(
53        workspace: Option<&std::path::Path>,
54        window: usize,
55    ) -> Result<TailLoad> {
56        let sessions_dir = Self::sessions_dir()?;
57        let canonical_workspace =
58            workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
59        tokio::task::spawn_blocking(move || {
60            scan_with_index(&sessions_dir, canonical_workspace, window)
61        })
62        .await
63        .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
64    }
65
66    /// Load the most recent session globally (unscoped).
67    ///
68    /// Kept for legacy compatibility; prefer
69    /// [`Session::last_for_directory`].
70    pub async fn last() -> Result<Self> {
71        Self::last_for_directory(None).await
72    }
73
74    /// Persist the session to disk as JSON. Creates the sessions directory
75    /// on demand.
76    ///
77    /// Performance:
78    /// - Serialization runs on the blocking thread pool so a large session
79    ///   doesn't stall the async reactor during `serde_json` formatting.
80    /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
81    ///   serialize, ~30–40% smaller on disk, and correspondingly faster
82    ///   to load and mmap-scan on resume. Session files are machine-owned
83    ///   — humans never hand-edit them — so indentation is pure overhead.
84    /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
85    ///   previous session intact, and the mmap prefilter in
86    ///   [`file_contains_finder`] cannot observe a torn buffer.
87    pub async fn save(&self) -> Result<()> {
88        let path = Self::session_path(&self.id)?;
89        if let Some(parent) = path.parent() {
90            fs::create_dir_all(parent).await?;
91        }
92        let tmp = path.with_extension("json.tmp");
93        // Clone-and-serialize off the reactor. The clone is a Vec-copy
94        // (~memcpy speed) which is always cheaper than the JSON
95        // formatting we are about to avoid blocking the reactor on.
96        let snapshot = self.clone();
97        let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
98            .await
99            .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
100        fs::write(&tmp, content).await?;
101        // Atomic swap. On POSIX `rename` is atomic over existing files;
102        // on Windows we fall back to remove-then-rename.
103        if let Err(primary) = fs::rename(&tmp, &path).await {
104            let _ = fs::remove_file(&path).await;
105            if let Err(retry) = fs::rename(&tmp, &path).await {
106                let _ = fs::remove_file(&tmp).await;
107                return Err(anyhow::anyhow!(
108                    "session rename failed: {primary} (retry: {retry})"
109                ));
110            }
111        }
112        // Update the workspace index so the next resume is O(1). Best
113        // effort — a failed index write must not fail the session save.
114        if let Some(dir) = &self.metadata.directory {
115            let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
116            let session_id = self.id.clone();
117            tokio::task::spawn_blocking(move || {
118                if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
119                    tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
120                }
121            });
122        }
123        // Phase A history sink: stream pure history to MinIO/S3.
124        // Env-gated, fire-and-forget — never blocks the save, never
125        // fails it. See [`super::history_sink`] for the env variables.
126        //
127        // Coalesce concurrent saves per session: if an upload for this
128        // session is already in flight, skip this spawn — the next
129        // save will pick up the latest history. This prevents bursty
130        // save loops (e.g. during long tool chains) from queueing
131        // unbounded background uploads.
132        if let Ok(Some(sink_config)) = super::history_sink::HistorySinkConfig::from_env() {
133            use std::collections::HashSet;
134            use std::sync::{Mutex, OnceLock};
135            static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
136            let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
137
138            let inserted = in_flight
139                .lock()
140                .map(|mut s| s.insert(self.id.clone()))
141                .unwrap_or(false);
142
143            if inserted {
144                let session_id = self.id.clone();
145                let messages = self.messages.clone();
146                tokio::spawn(async move {
147                    if let Err(err) = super::history_sink::upload_full_history(
148                        &sink_config,
149                        &session_id,
150                        &messages,
151                    )
152                    .await
153                    {
154                        tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
155                    }
156                    if let Ok(mut s) = in_flight.lock() {
157                        s.remove(&session_id);
158                    }
159                });
160            } else {
161                tracing::debug!(
162                    session_id = %self.id,
163                    "history sink upload already in flight; skipping duplicate"
164                );
165            }
166        }
167        Ok(())
168    }
169
170    /// Delete a session file by ID. No-op if the file does not exist.
171    pub async fn delete(id: &str) -> Result<()> {
172        let path = Self::session_path(id)?;
173        if path.exists() {
174            tokio::fs::remove_file(&path).await?;
175        }
176        Ok(())
177    }
178
179    /// Resolve the sessions directory (`<data_dir>/sessions`).
180    ///
181    /// Cached after first resolution: `Config::data_dir()` does env-var
182    /// lookups and filesystem checks which are cheap individually but add
183    /// up across hundreds of save/load calls over a session's lifetime.
184    pub(crate) fn sessions_dir() -> Result<PathBuf> {
185        use std::sync::OnceLock;
186        static CACHED: OnceLock<PathBuf> = OnceLock::new();
187        if let Some(dir) = CACHED.get() {
188            return Ok(dir.clone());
189        }
190        let dir = crate::config::Config::data_dir()
191            .map(|d| d.join("sessions"))
192            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
193        // Best-effort set; if another thread won the race we just use
194        // theirs.
195        let _ = CACHED.set(dir.clone());
196        Ok(dir)
197    }
198
199    /// Resolve the on-disk path for a session file.
200    pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
201        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
202    }
203}
204
205/// Index-first scan. Tries the O(1) workspace index; on miss or stale
206/// entry, falls back to the byte-prefiltered directory scan and repairs
207/// the index with the winner so the next launch is O(1).
208fn scan_with_index(
209    sessions_dir: &Path,
210    canonical_workspace: Option<PathBuf>,
211    window: usize,
212) -> Result<TailLoad> {
213    // Fast path: check the sidecar index.
214    if let Some(ws) = canonical_workspace.as_ref() {
215        let index = super::workspace_index::WorkspaceIndex::load_sync();
216        if let Some(id) = index.get(ws) {
217            let candidate = sessions_dir.join(format!("{id}.json"));
218            if candidate.exists()
219                && let Ok(load) = tail_load_sync(&candidate, window)
220            {
221                // Confirm it still belongs to this workspace — the user
222                // could have edited the session's metadata.directory
223                // manually or moved a file.
224                let dir_ok = load
225                    .session
226                    .metadata
227                    .directory
228                    .as_ref()
229                    .map(|d| {
230                        let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
231                        &canonical == ws
232                    })
233                    .unwrap_or(false);
234                if dir_ok {
235                    return Ok(load);
236                }
237            }
238        }
239    }
240
241    // Slow path: scan everything.
242    let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
243
244    // Repair the index with whatever we found, so next time is O(1).
245    if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
246        let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
247    }
248
249    result
250}
251
252/// Tail-load a specific path synchronously (used by the index fast path).
253fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
254    use std::fs;
255    use std::io::BufReader;
256    let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
257    let file = fs::File::open(path)?;
258    let reader = BufReader::with_capacity(64 * 1024, file);
259    let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
260    Ok(TailLoad {
261        session: parsed?,
262        dropped,
263        file_bytes,
264    })
265}
266
267/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
268///
269/// Flow:
270/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
271/// 2. Sort newest-first.
272/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
273///    `Vec<Message>` allocation. Because `metadata` is serialized before
274///    `messages`/`tool_uses` in new files, this is O(header bytes); older
275///    files still work but pay a lex-through cost.
276/// 4. On workspace match, re-open and do one tail-capped full parse.
277fn scan_sync(
278    sessions_dir: &Path,
279    canonical_workspace: Option<PathBuf>,
280    window: usize,
281) -> Result<TailLoad> {
282    use std::fs;
283    use std::io::BufReader;
284    use std::time::SystemTime;
285
286    if !sessions_dir.exists() {
287        anyhow::bail!("No sessions found");
288    }
289
290    let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
291    for entry in fs::read_dir(sessions_dir)? {
292        let entry = match entry {
293            Ok(e) => e,
294            Err(_) => continue,
295        };
296        let path = entry.path();
297        if path.extension().and_then(|s| s.to_str()) != Some("json") {
298            continue;
299        }
300        let mtime = entry
301            .metadata()
302            .ok()
303            .and_then(|m| m.modified().ok())
304            .unwrap_or(SystemTime::UNIX_EPOCH);
305        candidates.push((path, mtime));
306    }
307    if candidates.is_empty() {
308        anyhow::bail!("No sessions found");
309    }
310    candidates.sort_by(|a, b| b.1.cmp(&a.1));
311
312    // Precompute a cheap byte-level needle for the workspace path. JSON
313    // serializes `metadata.directory` as `"directory":"<path>"`, so if
314    // the raw file bytes don't contain the path as a substring, we can
315    // skip it without invoking serde_json at all.
316    //
317    // This is the single biggest win for large workspaces: a 10 MB
318    // session that is *not* for this cwd gets ruled out in a few ms of
319    // byte scanning instead of a full JSON lex.
320    let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
321        // JSON-escape the path (backslashes on Windows become `\\`,
322        // quotes become `\"`). `serde_json::to_string` handles all the
323        // edge cases for us.
324        let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
325        // Strip the surrounding quotes; we want the inner bytes so we
326        // match whether the JSON has `"directory":"..."` or any other
327        // surrounding context.
328        let inner = quoted
329            .strip_prefix('"')
330            .and_then(|s| s.strip_suffix('"'))
331            .unwrap_or(&quoted);
332        inner.as_bytes().to_vec()
333    });
334    // Build the SIMD-accelerated substring finder once for the whole
335    // scan loop; reusing it across files is much faster than rebuilding.
336    let finder = needle
337        .as_ref()
338        .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
339
340    // Parallel byte-prefilter: for each candidate, compute whether the
341    // workspace path bytes appear in the file. This is the expensive
342    // O(file_bytes) step. Running it concurrently across CPU cores
343    // turns "sum of all file scan times" into ~"longest single file
344    // scan time" on a machine with multiple cores.
345    //
346    // We preserve mtime ordering by collecting results into a parallel
347    // Vec<bool> indexed the same as `candidates`, then iterating
348    // serially to find the first hit.
349    let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
350        (None, _) => vec![true; candidates.len()],
351        (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
352        (Some(finder), _) => {
353            let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
354            let results: std::sync::Mutex<Vec<Option<bool>>> =
355                std::sync::Mutex::new(vec![None; paths.len()]);
356            std::thread::scope(|scope| {
357                // Chunk candidates across available CPUs. For ~300 files
358                // a fan-out of 4-8 threads saturates I/O and CPU nicely
359                // without oversubscribing.
360                let threads = std::thread::available_parallelism()
361                    .map(|n| n.get())
362                    .unwrap_or(4)
363                    .min(8);
364                let chunk_size = paths.len().div_ceil(threads);
365                for chunk_idx in 0..threads {
366                    let start = chunk_idx * chunk_size;
367                    if start >= paths.len() {
368                        break;
369                    }
370                    let end = (start + chunk_size).min(paths.len());
371                    let chunk_paths = &paths[start..end];
372                    let results = &results;
373                    scope.spawn(move || {
374                        for (offset, p) in chunk_paths.iter().enumerate() {
375                            let hit = file_contains_finder(p, finder).unwrap_or(false);
376                            // Lock-per-entry is fine: contention is
377                            // negligible vs. the ~ms file scan cost.
378                            if let Ok(mut guard) = results.lock() {
379                                guard[start + offset] = Some(hit);
380                            }
381                        }
382                    });
383                }
384            });
385            results
386                .into_inner()
387                .unwrap_or_default()
388                .into_iter()
389                .map(|o| o.unwrap_or(false))
390                .collect()
391        }
392    };
393
394    for (idx, (path, _)) in candidates.iter().enumerate() {
395        // Fast path: byte-level substring prefilter (precomputed in parallel).
396        if !prefilter_hits.get(idx).copied().unwrap_or(false) {
397            continue;
398        }
399
400        // Slower path: full JSON header parse to confirm the match
401        // (the substring test has false positives: the path could
402        // appear in a chat message or tool output).
403        let header_ok = (|| -> Result<bool> {
404            let file = fs::File::open(path)?;
405            let reader = BufReader::with_capacity(16 * 1024, file);
406            let header: SessionHeader = match serde_json::from_reader(reader) {
407                Ok(h) => h,
408                Err(_) => return Ok(false),
409            };
410            if let Some(ref ws) = canonical_workspace {
411                let Some(dir) = header.metadata.directory.as_ref() else {
412                    return Ok(false);
413                };
414                if dir == ws {
415                    return Ok(true);
416                }
417                let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
418                Ok(&canonical_dir == ws)
419            } else {
420                Ok(true)
421            }
422        })();
423
424        match header_ok {
425            Ok(true) => {}
426            Ok(false) => continue,
427            Err(err) => {
428                tracing::warn!(
429                    path = %path.display(),
430                    error = %err,
431                    "skipping unreadable session file",
432                );
433                continue;
434            }
435        }
436
437        // Match found — do the tail-capped full parse.
438        let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
439        let file = fs::File::open(path)?;
440        let reader = BufReader::with_capacity(64 * 1024, file);
441        let (parsed, dropped) =
442            with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
443        return Ok(TailLoad {
444            session: parsed?,
445            dropped,
446            file_bytes,
447        });
448    }
449
450    anyhow::bail!("No sessions found")
451}
452
453/// Memory-map `path` and return `true` if the SIMD finder matches.
454///
455/// mmap avoids the per-chunk `read` syscall overhead and the
456/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
457/// straight over the OS-provided virtual memory window, and the kernel
458/// pages in only what's actually touched. On a ~10 MB session file
459/// this is measurably faster than chunked `BufRead` + SIMD.
460///
461/// Falls back to returning `false` on any I/O error — the caller's
462/// mtime loop will simply try the next candidate.
463fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
464    use std::fs;
465
466    let needle_len = finder.needle().len();
467    if needle_len == 0 {
468        return Ok(true);
469    }
470    let file = fs::File::open(path)?;
471    let meta = file.metadata()?;
472    let len = meta.len();
473    if (len as usize) < needle_len {
474        return Ok(false);
475    }
476    // SAFETY: We only read the mapping, we do not mutate it. The
477    // kernel COW-protects the mapping and the `Mmap` owns the lifetime
478    // tied to `file`, which we keep alive for the duration of `find`.
479    // Concurrent external modification of the file could theoretically
480    // tear the mapping, but session files are only ever rewritten
481    // wholesale via atomic rename — never in-place truncated.
482    let mmap = unsafe { memmap2::Mmap::map(&file)? };
483    Ok(finder.find(&mmap[..]).is_some())
484}