Skip to main content

codetether_agent/session/
persistence.rs

1//! On-disk persistence: save, load, delete, and directory lookup.
2
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use tokio::fs;
7
8use super::header::SessionHeader;
9use super::tail_load::TailLoad;
10use super::tail_seed::with_tail_cap;
11use super::types::Session;
12
13impl Session {
14    /// Load an existing session by its UUID.
15    ///
16    /// # Errors
17    ///
18    /// Returns an error if the session file does not exist or the JSON is
19    /// malformed.
20    pub async fn load(id: &str) -> Result<Self> {
21        let path = Self::session_path(id)?;
22        let content = fs::read_to_string(&path).await?;
23        let mut session: Session = serde_json::from_str(&content)?;
24        session.normalize_sidecars();
25        Ok(session)
26    }
27
28    /// Load the most recent session, optionally scoped to a workspace
29    /// directory.
30    ///
31    /// When `workspace` is [`Some`], only considers sessions created in that
32    /// directory. When [`None`], returns the most recent session globally.
33    ///
34    /// # Errors
35    ///
36    /// Returns an error if no sessions exist (or, with `workspace` set,
37    /// none match the requested directory).
38    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
39        Self::last_for_directory_tail(workspace, usize::MAX)
40            .await
41            .map(|t| t.session)
42    }
43
44    /// Like [`Self::last_for_directory`] but keeps only the last `window`
45    /// messages and tool uses in memory, returning a [`TailLoad`] with the
46    /// number of entries that were dropped. Use this when resuming very
47    /// large sessions where the full transcript would exhaust memory.
48    ///
49    /// Implementation: the entire scan runs on a single blocking thread.
50    /// For each candidate (newest-mtime first) we do a cheap
51    /// [`SessionHeader`] parse to compare `metadata.directory`; only the
52    /// matching file pays for a full tail parse.
53    pub async fn last_for_directory_tail(
54        workspace: Option<&std::path::Path>,
55        window: usize,
56    ) -> Result<TailLoad> {
57        let sessions_dir = Self::sessions_dir()?;
58        let canonical_workspace =
59            workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
60        tokio::task::spawn_blocking(move || {
61            scan_with_index(&sessions_dir, canonical_workspace, window)
62        })
63        .await
64        .map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
65    }
66
67    /// Load the most recent session globally (unscoped).
68    ///
69    /// Kept for legacy compatibility; prefer
70    /// [`Session::last_for_directory`].
71    pub async fn last() -> Result<Self> {
72        Self::last_for_directory(None).await
73    }
74
75    /// Persist the session to disk as JSON. Creates the sessions directory
76    /// on demand.
77    ///
78    /// Performance:
79    /// - Serialization runs on the blocking thread pool so a large session
80    ///   doesn't stall the async reactor during `serde_json` formatting.
81    /// - Writes **compact** JSON (no pretty-printing): ~30% less CPU to
82    ///   serialize, ~30–40% smaller on disk, and correspondingly faster
83    ///   to load and mmap-scan on resume. Session files are machine-owned
84    ///   — humans never hand-edit them — so indentation is pure overhead.
85    /// - Write is atomic (tmp + rename) so a crash mid-save leaves the
86    ///   previous session intact, and the mmap prefilter in
87    ///   [`file_contains_finder`] cannot observe a torn buffer.
88    pub async fn save(&self) -> Result<()> {
89        let path = Self::session_path(&self.id)?;
90        if let Some(parent) = path.parent() {
91            fs::create_dir_all(parent).await?;
92        }
93        let tmp = path.with_extension("json.tmp");
94        // Clone-and-serialize off the reactor. The clone is a Vec-copy
95        // (~memcpy speed) which is always cheaper than the JSON
96        // formatting we are about to avoid blocking the reactor on.
97        let mut snapshot = self.clone();
98        snapshot.normalize_sidecars();
99        let sink_config = snapshot.metadata.history_sink.clone().or_else(|| {
100            super::history_sink::HistorySinkConfig::from_env()
101                .ok()
102                .flatten()
103        });
104        let session_id_for_journal = snapshot.id.clone();
105        let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
106            .await
107            .map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
108        fs::write(&tmp, content).await?;
109        // Atomic swap. On POSIX `rename` is atomic over existing files;
110        // on Windows we fall back to remove-then-rename.
111        if let Err(primary) = fs::rename(&tmp, &path).await {
112            let _ = fs::remove_file(&path).await;
113            if let Err(retry) = fs::rename(&tmp, &path).await {
114                let _ = fs::remove_file(&tmp).await;
115                return Err(anyhow::anyhow!(
116                    "session rename failed: {primary} (retry: {retry})"
117                ));
118            }
119        }
120        let mut journal = super::journal::WritebackJournal::new(&session_id_for_journal);
121        let tx = journal.stage(super::journal::Op::Save);
122        if let Err(reason) = journal.commit(tx) {
123            journal.reject(tx, reason);
124        }
125        if let Err(err) =
126            super::journal::append_entries(&session_id_for_journal, journal.entries()).await
127        {
128            tracing::warn!(
129                %err,
130                session_id = %session_id_for_journal,
131                "save journal append failed (non-fatal)"
132            );
133        }
134        // Update the workspace index so the next resume is O(1). Best
135        // effort — a failed index write must not fail the session save.
136        if let Some(dir) = &self.metadata.directory {
137            let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
138            let session_id = self.id.clone();
139            tokio::task::spawn_blocking(move || {
140                if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
141                    tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
142                }
143            });
144        }
145        // Phase A history sink: stream pure history to MinIO/S3.
146        // Env-gated, fire-and-forget — never blocks the save, never
147        // fails it. See [`super::history_sink`] for the env variables.
148        //
149        // Coalesce concurrent saves per session: if an upload for this
150        // session is already in flight, skip this spawn — the next
151        // save will pick up the latest history. This prevents bursty
152        // save loops (e.g. during long tool chains) from queueing
153        // unbounded background uploads.
154        if let Some(sink_config) = sink_config {
155            use std::collections::HashSet;
156            use std::sync::{Mutex, OnceLock};
157            static IN_FLIGHT: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
158            let in_flight = IN_FLIGHT.get_or_init(|| Mutex::new(HashSet::new()));
159
160            let inserted = in_flight
161                .lock()
162                .map(|mut s| s.insert(self.id.clone()))
163                .unwrap_or(false);
164
165            if inserted {
166                let session_id = self.id.clone();
167                let messages = self.messages.clone();
168                tokio::spawn(async move {
169                    if let Err(err) = super::history_sink::upload_full_history(
170                        &sink_config,
171                        &session_id,
172                        &messages,
173                    )
174                    .await
175                    {
176                        tracing::warn!(%err, %session_id, "history sink upload failed (non-fatal)");
177                    }
178                    if let Ok(mut s) = in_flight.lock() {
179                        s.remove(&session_id);
180                    }
181                });
182            } else {
183                tracing::debug!(
184                    session_id = %self.id,
185                    "history sink upload already in flight; skipping duplicate"
186                );
187            }
188        }
189        Ok(())
190    }
191
192    /// Delete a session file by ID. No-op if the file does not exist.
193    pub async fn delete(id: &str) -> Result<()> {
194        let path = Self::session_path(id)?;
195        if path.exists() {
196            tokio::fs::remove_file(&path).await?;
197        }
198        Ok(())
199    }
200
201    /// Resolve the sessions directory (`<data_dir>/sessions`).
202    ///
203    /// Cached after first resolution: `Config::data_dir()` does env-var
204    /// lookups and filesystem checks which are cheap individually but add
205    /// up across hundreds of save/load calls over a session's lifetime.
206    pub(crate) fn sessions_dir() -> Result<PathBuf> {
207        use std::sync::OnceLock;
208        static CACHED: OnceLock<PathBuf> = OnceLock::new();
209        if let Some(dir) = CACHED.get() {
210            return Ok(dir.clone());
211        }
212        let dir = crate::config::Config::data_dir()
213            .map(|d| d.join("sessions"))
214            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
215        // Best-effort set; if another thread won the race we just use
216        // theirs.
217        let _ = CACHED.set(dir.clone());
218        Ok(dir)
219    }
220
221    /// Resolve the on-disk path for a session file.
222    pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
223        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
224    }
225}
226
227/// Index-first scan. Tries the O(1) workspace index; on miss or stale
228/// entry, falls back to the byte-prefiltered directory scan and repairs
229/// the index with the winner so the next launch is O(1).
230fn scan_with_index(
231    sessions_dir: &Path,
232    canonical_workspace: Option<PathBuf>,
233    window: usize,
234) -> Result<TailLoad> {
235    // Fast path: check the sidecar index.
236    if let Some(ws) = canonical_workspace.as_ref() {
237        let index = super::workspace_index::WorkspaceIndex::load_sync();
238        if let Some(id) = index.get(ws) {
239            let candidate = sessions_dir.join(format!("{id}.json"));
240            if candidate.exists()
241                && let Ok(load) = tail_load_sync(&candidate, window)
242            {
243                // Confirm it still belongs to this workspace — the user
244                // could have edited the session's metadata.directory
245                // manually or moved a file.
246                let dir_ok = load
247                    .session
248                    .metadata
249                    .directory
250                    .as_ref()
251                    .map(|d| {
252                        let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
253                        &canonical == ws
254                    })
255                    .unwrap_or(false);
256                if dir_ok {
257                    return Ok(load);
258                }
259            }
260        }
261    }
262
263    // Slow path: scan everything.
264    let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
265
266    // Repair the index with whatever we found, so next time is O(1).
267    if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
268        let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
269    }
270
271    result
272}
273
274/// Tail-load a specific path synchronously (used by the index fast path).
275fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
276    use std::fs;
277    use std::io::BufReader;
278    let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
279    let file = fs::File::open(path)?;
280    let reader = BufReader::with_capacity(64 * 1024, file);
281    let (parsed, dropped) = with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
282    let mut session = parsed?;
283    session.normalize_sidecars();
284    Ok(TailLoad {
285        session,
286        dropped,
287        file_bytes,
288    })
289}
290
291/// Fully synchronous directory scan. Lives inside `spawn_blocking`.
292///
293/// Flow:
294/// 1. `read_dir` + stat every entry once to build `(path, mtime)` pairs.
295/// 2. Sort newest-first.
296/// 3. For each candidate, do a header-only parse (`SessionHeader`) — no
297///    `Vec<Message>` allocation. Because `metadata` is serialized before
298///    `messages`/`tool_uses` in new files, this is O(header bytes); older
299///    files still work but pay a lex-through cost.
300/// 4. On workspace match, re-open and do one tail-capped full parse.
301fn scan_sync(
302    sessions_dir: &Path,
303    canonical_workspace: Option<PathBuf>,
304    window: usize,
305) -> Result<TailLoad> {
306    use std::fs;
307    use std::io::BufReader;
308    use std::time::SystemTime;
309
310    if !sessions_dir.exists() {
311        anyhow::bail!("No sessions found");
312    }
313
314    let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
315    for entry in fs::read_dir(sessions_dir)? {
316        let entry = match entry {
317            Ok(e) => e,
318            Err(_) => continue,
319        };
320        let path = entry.path();
321        if path.extension().and_then(|s| s.to_str()) != Some("json") {
322            continue;
323        }
324        let mtime = entry
325            .metadata()
326            .ok()
327            .and_then(|m| m.modified().ok())
328            .unwrap_or(SystemTime::UNIX_EPOCH);
329        candidates.push((path, mtime));
330    }
331    if candidates.is_empty() {
332        anyhow::bail!("No sessions found");
333    }
334    candidates.sort_by(|a, b| b.1.cmp(&a.1));
335
336    // Precompute a cheap byte-level needle for the workspace path. JSON
337    // serializes `metadata.directory` as `"directory":"<path>"`, so if
338    // the raw file bytes don't contain the path as a substring, we can
339    // skip it without invoking serde_json at all.
340    //
341    // This is the single biggest win for large workspaces: a 10 MB
342    // session that is *not* for this cwd gets ruled out in a few ms of
343    // byte scanning instead of a full JSON lex.
344    let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
345        // JSON-escape the path (backslashes on Windows become `\\`,
346        // quotes become `\"`). `serde_json::to_string` handles all the
347        // edge cases for us.
348        let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
349        // Strip the surrounding quotes; we want the inner bytes so we
350        // match whether the JSON has `"directory":"..."` or any other
351        // surrounding context.
352        let inner = quoted
353            .strip_prefix('"')
354            .and_then(|s| s.strip_suffix('"'))
355            .unwrap_or(&quoted);
356        inner.as_bytes().to_vec()
357    });
358    // Build the SIMD-accelerated substring finder once for the whole
359    // scan loop; reusing it across files is much faster than rebuilding.
360    let finder = needle
361        .as_ref()
362        .map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
363
364    // Parallel byte-prefilter: for each candidate, compute whether the
365    // workspace path bytes appear in the file. This is the expensive
366    // O(file_bytes) step. Running it concurrently across CPU cores
367    // turns "sum of all file scan times" into ~"longest single file
368    // scan time" on a machine with multiple cores.
369    //
370    // We preserve mtime ordering by collecting results into a parallel
371    // Vec<bool> indexed the same as `candidates`, then iterating
372    // serially to find the first hit.
373    let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
374        (None, _) => vec![true; candidates.len()],
375        (Some(_), 0..=1) => vec![true; candidates.len()], // not worth spawning
376        (Some(finder), _) => {
377            let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
378            let results: std::sync::Mutex<Vec<Option<bool>>> =
379                std::sync::Mutex::new(vec![None; paths.len()]);
380            std::thread::scope(|scope| {
381                // Chunk candidates across available CPUs. For ~300 files
382                // a fan-out of 4-8 threads saturates I/O and CPU nicely
383                // without oversubscribing.
384                let threads = std::thread::available_parallelism()
385                    .map(|n| n.get())
386                    .unwrap_or(4)
387                    .min(8);
388                let chunk_size = paths.len().div_ceil(threads);
389                for chunk_idx in 0..threads {
390                    let start = chunk_idx * chunk_size;
391                    if start >= paths.len() {
392                        break;
393                    }
394                    let end = (start + chunk_size).min(paths.len());
395                    let chunk_paths = &paths[start..end];
396                    let results = &results;
397                    scope.spawn(move || {
398                        for (offset, p) in chunk_paths.iter().enumerate() {
399                            let hit = file_contains_finder(p, finder).unwrap_or(false);
400                            // Lock-per-entry is fine: contention is
401                            // negligible vs. the ~ms file scan cost.
402                            if let Ok(mut guard) = results.lock() {
403                                guard[start + offset] = Some(hit);
404                            }
405                        }
406                    });
407                }
408            });
409            results
410                .into_inner()
411                .unwrap_or_default()
412                .into_iter()
413                .map(|o| o.unwrap_or(false))
414                .collect()
415        }
416    };
417
418    for (idx, (path, _)) in candidates.iter().enumerate() {
419        // Fast path: byte-level substring prefilter (precomputed in parallel).
420        if !prefilter_hits.get(idx).copied().unwrap_or(false) {
421            continue;
422        }
423
424        // Slower path: full JSON header parse to confirm the match
425        // (the substring test has false positives: the path could
426        // appear in a chat message or tool output).
427        let header_ok = (|| -> Result<bool> {
428            let file = fs::File::open(path)?;
429            let reader = BufReader::with_capacity(16 * 1024, file);
430            let header: SessionHeader = match serde_json::from_reader(reader) {
431                Ok(h) => h,
432                Err(_) => return Ok(false),
433            };
434            if let Some(ref ws) = canonical_workspace {
435                let Some(dir) = header.metadata.directory.as_ref() else {
436                    return Ok(false);
437                };
438                if dir == ws {
439                    return Ok(true);
440                }
441                let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
442                Ok(&canonical_dir == ws)
443            } else {
444                Ok(true)
445            }
446        })();
447
448        match header_ok {
449            Ok(true) => {}
450            Ok(false) => continue,
451            Err(err) => {
452                tracing::warn!(
453                    path = %path.display(),
454                    error = %err,
455                    "skipping unreadable session file",
456                );
457                continue;
458            }
459        }
460
461        // Match found — do the tail-capped full parse.
462        let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
463        let file = fs::File::open(path)?;
464        let reader = BufReader::with_capacity(64 * 1024, file);
465        let (parsed, dropped) =
466            with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
467        return Ok(TailLoad {
468            session: parsed?,
469            dropped,
470            file_bytes,
471        });
472    }
473
474    anyhow::bail!("No sessions found")
475}
476
477/// Memory-map `path` and return `true` if the SIMD finder matches.
478///
479/// mmap avoids the per-chunk `read` syscall overhead and the
480/// carry-over bookkeeping of a streaming scan: `memmem::find` runs
481/// straight over the OS-provided virtual memory window, and the kernel
482/// pages in only what's actually touched. On a ~10 MB session file
483/// this is measurably faster than chunked `BufRead` + SIMD.
484///
485/// Falls back to returning `false` on any I/O error — the caller's
486/// mtime loop will simply try the next candidate.
487fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
488    use std::fs;
489
490    let needle_len = finder.needle().len();
491    if needle_len == 0 {
492        return Ok(true);
493    }
494    let file = fs::File::open(path)?;
495    let meta = file.metadata()?;
496    let len = meta.len();
497    if (len as usize) < needle_len {
498        return Ok(false);
499    }
500    // SAFETY: We only read the mapping, we do not mutate it. The
501    // kernel COW-protects the mapping and the `Mmap` owns the lifetime
502    // tied to `file`, which we keep alive for the duration of `find`.
503    // Concurrent external modification of the file could theoretically
504    // tear the mapping, but session files are only ever rewritten
505    // wholesale via atomic rename — never in-place truncated.
506    let mmap = unsafe { memmap2::Mmap::map(&file)? };
507    Ok(finder.find(&mmap[..]).is_some())
508}