Skip to main content

sqz_engine/
session_store.rs

1use std::path::Path;
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection, OpenFlags};
6use serde::{Deserialize, Serialize};
7
8use crate::error::{Result, SqzError};
9use crate::types::{CompressedContent, SessionId, SessionState};
10
11/// A lightweight summary of a session for search results. Doesn't include
12/// the full conversation — just enough to identify and filter sessions.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct SessionSummary {
15    pub id: SessionId,
16    pub project_dir: PathBuf,
17    pub compressed_summary: String,
18    pub created_at: DateTime<Utc>,
19    pub updated_at: DateTime<Utc>,
20}
21
22/// SQLite-backed persistent session and cache store with FTS5 full-text search.
23///
24/// Stores sessions, cache entries, compression logs, and known files in a
25/// single SQLite database. Uses WAL mode for concurrent read access.
26///
27/// ```rust,no_run
28/// use sqz_engine::SessionStore;
29/// use std::path::Path;
30///
31/// let store = SessionStore::open_or_create(Path::new("~/.sqz/sessions.db")).unwrap();
32/// let results = store.search("authentication refactor").unwrap();
33/// for session in &results {
34///     println!("{}: {}", session.id, session.compressed_summary);
35/// }
36/// ```
37pub struct SessionStore {
38    db: Connection,
39}
40
41// ── Schema ────────────────────────────────────────────────────────────────────
42
43const SCHEMA: &str = r#"
44PRAGMA journal_mode = WAL;
45
46CREATE TABLE IF NOT EXISTS sessions (
47    id               TEXT PRIMARY KEY,
48    project_dir      TEXT NOT NULL,
49    compressed_summary TEXT NOT NULL,
50    created_at       TEXT NOT NULL,
51    updated_at       TEXT NOT NULL,
52    data             BLOB NOT NULL
53);
54
55CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
56    id,
57    project_dir,
58    compressed_summary,
59    content='sessions',
60    content_rowid='rowid',
61    tokenize='porter ascii'
62);
63
64CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
65    INSERT INTO sessions_fts(rowid, id, project_dir, compressed_summary)
66    VALUES (new.rowid, new.id, new.project_dir, new.compressed_summary);
67END;
68
69CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
70    INSERT INTO sessions_fts(sessions_fts, rowid, id, project_dir, compressed_summary)
71    VALUES ('delete', old.rowid, old.id, old.project_dir, old.compressed_summary);
72END;
73
74CREATE TRIGGER IF NOT EXISTS sessions_au AFTER UPDATE ON sessions BEGIN
75    INSERT INTO sessions_fts(sessions_fts, rowid, id, project_dir, compressed_summary)
76    VALUES ('delete', old.rowid, old.id, old.project_dir, old.compressed_summary);
77    INSERT INTO sessions_fts(rowid, id, project_dir, compressed_summary)
78    VALUES (new.rowid, new.id, new.project_dir, new.compressed_summary);
79END;
80
81CREATE TABLE IF NOT EXISTS cache_entries (
82    hash        TEXT PRIMARY KEY,
83    data        TEXT NOT NULL,
84    accessed_at TEXT NOT NULL,
85    -- Raw pre-compression bytes so `sqz expand <prefix>` can serve
86    -- truly uncompressed content to agents that cannot parse
87    -- `§ref:…§` dedup tokens. Nullable because the column was added
88    -- in an additive migration; rows written before that migration
89    -- (or via callers that don't have the original bytes) have NULL.
90    original    BLOB
91);
92
93CREATE TABLE IF NOT EXISTS compression_log (
94    id               INTEGER PRIMARY KEY AUTOINCREMENT,
95    tokens_original  INTEGER NOT NULL,
96    tokens_compressed INTEGER NOT NULL,
97    stages_applied   TEXT NOT NULL,
98    mode             TEXT NOT NULL DEFAULT 'auto',
99    created_at       TEXT NOT NULL
100);
101
102CREATE TABLE IF NOT EXISTS known_files (
103    path        TEXT PRIMARY KEY,
104    added_at    TEXT NOT NULL
105);
106
107-- Small key/value store for engine-wide state that needs to persist across
108-- short-lived sqz processes (each shell-hook invocation is a new process).
109-- Initially used only for the last_compaction_at marker: cache entries with
110-- `accessed_at < last_compaction_at` are treated as stale even if still
111-- within the normal TTL. See cache_manager.rs for the freshness model.
112CREATE TABLE IF NOT EXISTS metadata (
113    key         TEXT PRIMARY KEY,
114    value       TEXT NOT NULL
115);
116"#;
117
118// ── Helpers ───────────────────────────────────────────────────────────────────
119
120pub(crate) fn apply_schema(conn: &Connection) -> rusqlite::Result<()> {
121    conn.execute_batch(SCHEMA)?;
122    // Additive migration: add `original` BLOB column to cache_entries if
123    // it does not yet exist. Stores the raw pre-compression bytes so
124    // `sqz expand <prefix>` can return truly uncompressed content when
125    // an agent cannot parse `§ref:…§` tokens (reported on-list by
126    // SquireNed for GLM 5.1). `NULL` for rows written by older sqz
127    // versions — `expand` treats those as "original unavailable, fall
128    // back to the compressed blob" so users don't get spurious errors
129    // on pre-migration data.
130    //
131    // Using pragma_table_info rather than a version table because the
132    // rest of sqz does the same — this is the first additive migration.
133    let has_original: bool = conn
134        .prepare("SELECT 1 FROM pragma_table_info('cache_entries') WHERE name = 'original'")?
135        .query_row([], |_| Ok(()))
136        .is_ok();
137    if !has_original {
138        conn.execute("ALTER TABLE cache_entries ADD COLUMN original BLOB", [])?;
139    }
140
141    // Additive migration: add `project_dir` TEXT column to compression_log.
142    // Enables per-project stats filtering (issue #13). NULL for rows
143    // written by older sqz versions — aggregate queries treat those as
144    // "unknown project".
145    let has_project_dir: bool = conn
146        .prepare("SELECT 1 FROM pragma_table_info('compression_log') WHERE name = 'project_dir'")?
147        .query_row([], |_| Ok(()))
148        .is_ok();
149    if !has_project_dir {
150        conn.execute("ALTER TABLE compression_log ADD COLUMN project_dir TEXT", [])?;
151    }
152
153    Ok(())
154}
155
156fn open_connection(path: &Path) -> rusqlite::Result<Connection> {
157    let conn = Connection::open(path)?;
158    apply_schema(&conn)?;
159    Ok(conn)
160}
161
162fn row_to_summary(
163    id: String,
164    project_dir: String,
165    compressed_summary: String,
166    created_at: String,
167    updated_at: String,
168) -> Result<SessionSummary> {
169    let created_at = created_at
170        .parse::<DateTime<Utc>>()
171        .map_err(|e| SqzError::Other(format!("invalid created_at timestamp: {e}")))?;
172    let updated_at = updated_at
173        .parse::<DateTime<Utc>>()
174        .map_err(|e| SqzError::Other(format!("invalid updated_at timestamp: {e}")))?;
175    Ok(SessionSummary {
176        id,
177        project_dir: PathBuf::from(project_dir),
178        compressed_summary,
179        created_at,
180        updated_at,
181    })
182}
183
184// ── SessionStore ──────────────────────────────────────────────────────────────
185
186impl SessionStore {
187    /// Construct a `SessionStore` from an already-open `Connection`.
188    /// Intended for testing (e.g., in-memory databases).
189    #[cfg(test)]
190    pub(crate) fn from_connection(conn: Connection) -> Self {
191        Self { db: conn }
192    }
193
194    /// Open an existing database at `path`. Returns an error if the file does
195    /// not exist or cannot be opened.
196    pub fn open(path: &Path) -> Result<Self> {
197        let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE)?;
198        apply_schema(&conn)?;
199        Ok(Self { db: conn })
200    }
201
202    /// Open the database at `path`, creating it if it does not exist.
203    /// If the database is corrupted, a fresh database is created at the same
204    /// path and a warning is logged to stderr.
205    pub fn open_or_create(path: &Path) -> Result<Self> {
206        match open_connection(path) {
207            Ok(conn) => Ok(Self { db: conn }),
208            Err(e) => {
209                eprintln!(
210                    "sqz warning: session store at '{}' is corrupted or inaccessible ({e}). \
211                     Creating a new database. Prior session data has been lost.",
212                    path.display()
213                );
214                // Remove the corrupted file so we can start fresh.
215                if path.exists() {
216                    let _ = std::fs::remove_file(path);
217                }
218                let conn = open_connection(path)
219                    .map_err(|e2| SqzError::Other(format!("failed to create new session store: {e2}")))?;
220                Ok(Self { db: conn })
221            }
222        }
223    }
224
225    // ── Session CRUD ──────────────────────────────────────────────────────────
226
227    /// Persist a session. Returns the session id.
228    pub fn save_session(&self, session: &SessionState) -> Result<SessionId> {
229        let data = serde_json::to_vec(session)?;
230        let project_dir = session.project_dir.to_string_lossy().to_string();
231        let created_at = session.created_at.to_rfc3339();
232        let updated_at = session.updated_at.to_rfc3339();
233
234        self.db.execute(
235            r#"INSERT INTO sessions (id, project_dir, compressed_summary, created_at, updated_at, data)
236               VALUES (?1, ?2, ?3, ?4, ?5, ?6)
237               ON CONFLICT(id) DO UPDATE SET
238                   project_dir        = excluded.project_dir,
239                   compressed_summary = excluded.compressed_summary,
240                   created_at         = excluded.created_at,
241                   updated_at         = excluded.updated_at,
242                   data               = excluded.data"#,
243            params![
244                session.id,
245                project_dir,
246                session.compressed_summary,
247                created_at,
248                updated_at,
249                data,
250            ],
251        )?;
252
253        Ok(session.id.clone())
254    }
255
256    /// Load a session by id.
257    pub fn load_session(&self, id: SessionId) -> Result<SessionState> {
258        let data: Vec<u8> = self.db.query_row(
259            "SELECT data FROM sessions WHERE id = ?1",
260            params![id],
261            |row| row.get(0),
262        )?;
263        let session: SessionState = serde_json::from_slice(&data)?;
264        Ok(session)
265    }
266
267    // ── Search ────────────────────────────────────────────────────────────────
268
269    /// Full-text search using FTS5 (porter stemmer, ASCII tokenizer).
270    pub fn search(&self, query: &str) -> Result<Vec<SessionSummary>> {
271        let mut stmt = self.db.prepare(
272            r#"SELECT s.id, s.project_dir, s.compressed_summary, s.created_at, s.updated_at
273               FROM sessions s
274               JOIN sessions_fts f ON s.rowid = f.rowid
275               WHERE sessions_fts MATCH ?1
276               ORDER BY rank"#,
277        )?;
278
279        let rows = stmt.query_map(params![query], |row| {
280            Ok((
281                row.get::<_, String>(0)?,
282                row.get::<_, String>(1)?,
283                row.get::<_, String>(2)?,
284                row.get::<_, String>(3)?,
285                row.get::<_, String>(4)?,
286            ))
287        })?;
288
289        let mut results = Vec::new();
290        for row in rows {
291            let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
292            results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
293        }
294        Ok(results)
295    }
296
297    /// Query sessions whose `updated_at` falls within `[from, to]`.
298    pub fn search_by_date(
299        &self,
300        from: DateTime<Utc>,
301        to: DateTime<Utc>,
302    ) -> Result<Vec<SessionSummary>> {
303        let mut stmt = self.db.prepare(
304            r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
305               FROM sessions
306               WHERE updated_at >= ?1 AND updated_at <= ?2
307               ORDER BY updated_at DESC"#,
308        )?;
309
310        let rows = stmt.query_map(params![from.to_rfc3339(), to.to_rfc3339()], |row| {
311            Ok((
312                row.get::<_, String>(0)?,
313                row.get::<_, String>(1)?,
314                row.get::<_, String>(2)?,
315                row.get::<_, String>(3)?,
316                row.get::<_, String>(4)?,
317            ))
318        })?;
319
320        let mut results = Vec::new();
321        for row in rows {
322            let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
323            results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
324        }
325        Ok(results)
326    }
327
328    /// Return the most recently updated session, or `None` if no sessions exist.
329    pub fn latest_session(&self) -> Result<Option<SessionSummary>> {
330        let mut stmt = self.db.prepare(
331            r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
332               FROM sessions
333               ORDER BY updated_at DESC
334               LIMIT 1"#,
335        ).map_err(SqzError::SessionStore)?;
336
337        let rows = stmt.query_map([], |row| {
338            Ok((
339                row.get::<_, String>(0)?,
340                row.get::<_, String>(1)?,
341                row.get::<_, String>(2)?,
342                row.get::<_, String>(3)?,
343                row.get::<_, String>(4)?,
344            ))
345        }).map_err(SqzError::SessionStore)?;
346
347        for row in rows {
348            let (id, project_dir, compressed_summary, created_at, updated_at) =
349                row.map_err(SqzError::SessionStore)?;
350            return Ok(Some(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?));
351        }
352        Ok(None)
353    }
354
355    /// Query sessions whose `project_dir` matches `dir` exactly.
356    pub fn search_by_project(&self, dir: &Path) -> Result<Vec<SessionSummary>> {
357        let dir_str = dir.to_string_lossy().to_string();
358        let mut stmt = self.db.prepare(
359            r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
360               FROM sessions
361               WHERE project_dir = ?1
362               ORDER BY updated_at DESC"#,
363        )?;
364
365        let rows = stmt.query_map(params![dir_str], |row| {
366            Ok((
367                row.get::<_, String>(0)?,
368                row.get::<_, String>(1)?,
369                row.get::<_, String>(2)?,
370                row.get::<_, String>(3)?,
371                row.get::<_, String>(4)?,
372            ))
373        })?;
374
375        let mut results = Vec::new();
376        for row in rows {
377            let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
378            results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
379        }
380        Ok(results)
381    }
382
383    // ── Cache entries ─────────────────────────────────────────────────────────
384
385    /// Persist a cache entry keyed by content hash.
386    ///
387    /// Stores both the compressed JSON (`data`) used for dedup-hit
388    /// responses AND the raw uncompressed bytes (`original`) so that
389    /// `sqz expand <prefix>` can serve truly uncompressed content. See
390    /// [`save_cache_entry_with_original`] for the original-aware version;
391    /// this convenience wrapper exists for callers that do not (yet)
392    /// have the pre-compression bytes handy. Rows written through this
393    /// path leave `original` as `NULL`, and `expand` will degrade to
394    /// returning the compressed blob with a note.
395    pub fn save_cache_entry(&self, hash: &str, compressed: &CompressedContent) -> Result<()> {
396        self.save_cache_entry_with_original(hash, compressed, None)
397    }
398
399    /// Persist a cache entry with both compressed and original content.
400    ///
401    /// `original` must be the exact bytes that produced `compressed`, so
402    /// that `expand` is a true inverse of dedup. We store the raw bytes
403    /// (not the UTF-8 string) because command output may include
404    /// non-UTF-8 sequences — storing the text would lose them.
405    pub fn save_cache_entry_with_original(
406        &self,
407        hash: &str,
408        compressed: &CompressedContent,
409        original: Option<&[u8]>,
410    ) -> Result<()> {
411        let data = serde_json::to_string(compressed)?;
412        let now = Utc::now().to_rfc3339();
413        self.db.execute(
414            r#"INSERT INTO cache_entries (hash, data, accessed_at, original)
415               VALUES (?1, ?2, ?3, ?4)
416               ON CONFLICT(hash) DO UPDATE
417                   SET data = excluded.data,
418                       accessed_at = excluded.accessed_at,
419                       -- Don't overwrite a previously-stored `original`
420                       -- with NULL. Older callers (that go through
421                       -- save_cache_entry rather than the _with_original
422                       -- variant) shouldn't erase the expand-able bytes.
423                       original = COALESCE(excluded.original, original)"#,
424            params![hash, data, now, original],
425        )?;
426        Ok(())
427    }
428
429    /// Retrieve the stored original bytes for a cached hash, if the
430    /// caller populated them via `save_cache_entry_with_original`.
431    ///
432    /// Returns `Ok(None)` for missing entries AND for entries that were
433    /// saved by an older call site that did not pass `original`. The
434    /// caller should fall back to the compressed blob in the latter case
435    /// and surface a note to the user so they know this specific entry
436    /// wasn't round-trippable.
437    pub fn get_cache_entry_original(&self, hash: &str) -> Result<Option<Vec<u8>>> {
438        let result: rusqlite::Result<Option<Vec<u8>>> = self.db.query_row(
439            "SELECT original FROM cache_entries WHERE hash = ?1",
440            params![hash],
441            |row| row.get(0),
442        );
443        match result {
444            Ok(v) => Ok(v),
445            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
446            Err(e) => Err(SqzError::SessionStore(e)),
447        }
448    }
449
450    /// Delete a cache entry by content hash.
451    pub fn delete_cache_entry(&self, hash: &str) -> Result<()> {
452        self.db.execute(
453            "DELETE FROM cache_entries WHERE hash = ?1",
454            params![hash],
455        )?;
456        Ok(())
457    }
458
459    /// Return all cache entries ordered by `accessed_at` ASC (oldest first),
460    /// as `(hash, size_bytes)` pairs where `size_bytes` is the byte length of
461    /// the stored JSON data.
462    pub fn list_cache_entries_lru(&self) -> Result<Vec<(String, u64)>> {
463        let mut stmt = self.db.prepare(
464            "SELECT hash, length(data) FROM cache_entries ORDER BY accessed_at ASC",
465        )?;
466        let rows = stmt.query_map([], |row| {
467            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
468        })?;
469        let mut entries = Vec::new();
470        for row in rows {
471            let (hash, size) = row?;
472            entries.push((hash, size as u64));
473        }
474        Ok(entries)
475    }
476
477    /// Retrieve a cache entry by content hash, updating `accessed_at`.
478    pub fn get_cache_entry(&self, hash: &str) -> Result<Option<CompressedContent>> {
479        let result: rusqlite::Result<String> = self.db.query_row(
480            "SELECT data FROM cache_entries WHERE hash = ?1",
481            params![hash],
482            |row| row.get(0),
483        );
484
485        match result {
486            Ok(data) => {
487                // Touch accessed_at for LRU tracking.
488                let now = Utc::now().to_rfc3339();
489                let _ = self.db.execute(
490                    "UPDATE cache_entries SET accessed_at = ?1 WHERE hash = ?2",
491                    params![now, hash],
492                );
493                let entry: CompressedContent = serde_json::from_str(&data)?;
494                Ok(Some(entry))
495            }
496            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
497            Err(e) => Err(SqzError::SessionStore(e)),
498        }
499    }
500
501    /// Look up a cache entry by a **prefix** of the content hash.
502    ///
503    /// The inline dedup refs we hand to the LLM carry only the first 16
504    /// hex chars of the SHA-256 (`§ref:<16-hex>§`), so when an agent runs
505    /// `sqz expand <prefix>` we need to resolve those 16 chars back to the
506    /// full 64-char key. Uses `LIKE 'prefix%'` with an index-friendly
507    /// anchored pattern so the query stays O(log n) on the primary key.
508    ///
509    /// Returns `Ok(Some((full_hash, entry)))` on unique match.
510    /// Returns `Ok(None)` if no entries match.
511    /// Returns `Err(_)` if the prefix is ambiguous (2+ matches) — the
512    /// caller should tell the user to use a longer prefix. 16-hex
513    /// collisions are astronomically unlikely (one in 2^64) but we
514    /// refuse to guess rather than quietly serve a surprise file.
515    ///
516    /// The prefix is validated as lowercase hex. Non-hex input returns
517    /// `None` without touching the database — this is how we handle the
518    /// common user-error case of pasting the ref with the `§` markers
519    /// still attached (they get rejected before we query SQLite).
520    pub fn get_cache_entry_by_prefix(
521        &self,
522        prefix: &str,
523    ) -> Result<Option<(String, CompressedContent)>> {
524        // Reject anything that isn't pure lowercase hex. The inline refs
525        // we emit are always lowercase so there's no reason to case-fold
526        // here; accidentally matching uppercase input would also match
527        // unrelated entries if someone hand-crafted a collision.
528        if prefix.is_empty() || !prefix.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()) {
529            return Ok(None);
530        }
531        let pattern = format!("{prefix}%");
532        let mut stmt = self
533            .db
534            .prepare("SELECT hash, data FROM cache_entries WHERE hash LIKE ?1 LIMIT 2")?;
535        let mut rows = stmt.query(params![pattern])?;
536
537        let first = match rows.next()? {
538            Some(r) => {
539                let hash: String = r.get(0)?;
540                let data: String = r.get(1)?;
541                (hash, data)
542            }
543            None => return Ok(None),
544        };
545
546        // Two or more hits — refuse. Prefix ambiguity is user-recoverable:
547        // rerun with a longer prefix. Matching one arbitrarily would be
548        // a silent data surprise.
549        if rows.next()?.is_some() {
550            return Err(SqzError::Other(format!(
551                "cache: prefix '{prefix}' matches multiple entries — use a longer prefix"
552            )));
553        }
554        drop(rows);
555        drop(stmt);
556
557        // Touch accessed_at for LRU tracking — symmetric with get_cache_entry.
558        let now = Utc::now().to_rfc3339();
559        let _ = self.db.execute(
560            "UPDATE cache_entries SET accessed_at = ?1 WHERE hash = ?2",
561            params![now, first.0],
562        );
563        let entry: CompressedContent = serde_json::from_str(&first.1)?;
564        Ok(Some((first.0, entry)))
565    }
566
567    /// Read the `accessed_at` timestamp for a cached hash without updating
568    /// it. Returns `None` if the hash is not cached.
569    ///
570    /// Used by the dedup freshness check: if `accessed_at` is recent, the
571    /// LLM likely still has the original content in its context window, so
572    /// returning a ref is safe. If it's old, re-send the full content.
573    pub fn get_cache_entry_accessed_at(&self, hash: &str) -> Result<Option<DateTime<Utc>>> {
574        let result: rusqlite::Result<String> = self.db.query_row(
575            "SELECT accessed_at FROM cache_entries WHERE hash = ?1",
576            params![hash],
577            |row| row.get(0),
578        );
579        match result {
580            Ok(s) => {
581                let ts = s
582                    .parse::<DateTime<Utc>>()
583                    .map_err(|e| SqzError::Other(format!("invalid accessed_at: {e}")))?;
584                Ok(Some(ts))
585            }
586            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
587            Err(e) => Err(SqzError::SessionStore(e)),
588        }
589    }
590
591    /// Check if a cache entry exists without updating `accessed_at`.
592    pub fn cache_entry_exists(&self, hash: &str) -> Result<bool> {
593        let result: rusqlite::Result<i64> = self.db.query_row(
594            "SELECT 1 FROM cache_entries WHERE hash = ?1",
595            params![hash],
596            |row| row.get(0),
597        );
598        match result {
599            Ok(_) => Ok(true),
600            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
601            Err(e) => Err(SqzError::SessionStore(e)),
602        }
603    }
604
605    /// Update `accessed_at` for a cached hash to the current time. Called by
606    /// the cache manager when a ref is served so the next staleness check
607    /// sees the recent send.
608    pub fn touch_cache_entry(&self, hash: &str) -> Result<()> {
609        let now = Utc::now().to_rfc3339();
610        self.db.execute(
611            "UPDATE cache_entries SET accessed_at = ?1 WHERE hash = ?2",
612            params![now, hash],
613        )?;
614        Ok(())
615    }
616
617    /// Set a metadata key/value. Persists across sqz process boundaries
618    /// (each shell-hook invocation is a short-lived process).
619    pub fn set_metadata(&self, key: &str, value: &str) -> Result<()> {
620        self.db.execute(
621            "INSERT INTO metadata (key, value) VALUES (?1, ?2)
622             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
623            params![key, value],
624        )?;
625        Ok(())
626    }
627
628    /// Get a metadata value. Returns `None` if the key has never been set.
629    pub fn get_metadata(&self, key: &str) -> Result<Option<String>> {
630        let result: rusqlite::Result<String> = self.db.query_row(
631            "SELECT value FROM metadata WHERE key = ?1",
632            params![key],
633            |row| row.get(0),
634        );
635        match result {
636            Ok(v) => Ok(Some(v)),
637            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
638            Err(e) => Err(SqzError::SessionStore(e)),
639        }
640    }
641
642    /// Log a compression event for cumulative stats tracking.
643    pub fn log_compression(
644        &self,
645        tokens_original: u32,
646        tokens_compressed: u32,
647        stages: &[String],
648        mode: &str,
649    ) -> Result<()> {
650        self.log_compression_with_project(tokens_original, tokens_compressed, stages, mode, None)
651    }
652
653    /// Log a compression event tagged with a project directory.
654    pub fn log_compression_with_project(
655        &self,
656        tokens_original: u32,
657        tokens_compressed: u32,
658        stages: &[String],
659        mode: &str,
660        project_dir: Option<&str>,
661    ) -> Result<()> {
662        let now = Utc::now().to_rfc3339();
663        let stages_str = stages.join(",");
664        self.db.execute(
665            "INSERT INTO compression_log (tokens_original, tokens_compressed, stages_applied, mode, created_at, project_dir) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
666            params![tokens_original, tokens_compressed, stages_str, mode, now, project_dir],
667        ).map_err(SqzError::SessionStore)?;
668        Ok(())
669    }
670
671    /// Get cumulative compression stats from the log.
672    pub fn compression_stats(&self) -> Result<CompressionStats> {
673        let mut stmt = self.db.prepare(
674            "SELECT COUNT(*), COALESCE(SUM(tokens_original), 0), COALESCE(SUM(tokens_compressed), 0) FROM compression_log",
675        ).map_err(SqzError::SessionStore)?;
676
677        let stats = stmt.query_row([], |row| {
678            Ok(CompressionStats {
679                total_compressions: row.get::<_, u32>(0)?,
680                total_tokens_in: row.get::<_, u64>(1)?,
681                total_tokens_out: row.get::<_, u64>(2)?,
682            })
683        }).map_err(SqzError::SessionStore)?;
684
685        Ok(stats)
686    }
687
688    /// Get daily compression gains for the last N days.
689    pub fn daily_gains(&self, days: u32) -> Result<Vec<DailyGain>> {
690        let mut stmt = self.db.prepare(
691            "SELECT date(created_at) as d, COUNT(*), SUM(tokens_original), SUM(tokens_compressed) \
692             FROM compression_log \
693             WHERE created_at >= date('now', ?1) \
694             GROUP BY d ORDER BY d",
695        ).map_err(SqzError::SessionStore)?;
696
697        let offset = format!("-{days} days");
698        let rows = stmt.query_map(params![offset], |row| {
699            let tokens_in: u64 = row.get(2)?;
700            let tokens_out: u64 = row.get(3)?;
701            Ok(DailyGain {
702                date: row.get(0)?,
703                compressions: row.get(1)?,
704                tokens_in,
705                tokens_saved: tokens_in.saturating_sub(tokens_out),
706            })
707        }).map_err(SqzError::SessionStore)?;
708
709        let mut gains = Vec::new();
710        for row in rows {
711            gains.push(row.map_err(SqzError::SessionStore)?);
712        }
713        Ok(gains)
714    }
715
716    /// Get cumulative compression stats for a specific project directory.
717    pub fn compression_stats_for_project(&self, project_dir: &str) -> Result<CompressionStats> {
718        let mut stmt = self.db.prepare(
719            "SELECT COUNT(*), COALESCE(SUM(tokens_original), 0), COALESCE(SUM(tokens_compressed), 0) \
720             FROM compression_log WHERE project_dir = ?1",
721        ).map_err(SqzError::SessionStore)?;
722
723        let stats = stmt.query_row(params![project_dir], |row| {
724            Ok(CompressionStats {
725                total_compressions: row.get::<_, u32>(0)?,
726                total_tokens_in: row.get::<_, u64>(1)?,
727                total_tokens_out: row.get::<_, u64>(2)?,
728            })
729        }).map_err(SqzError::SessionStore)?;
730
731        Ok(stats)
732    }
733
734    /// Get daily compression gains for a specific project directory.
735    pub fn daily_gains_for_project(&self, days: u32, project_dir: &str) -> Result<Vec<DailyGain>> {
736        let mut stmt = self.db.prepare(
737            "SELECT date(created_at) as d, COUNT(*), SUM(tokens_original), SUM(tokens_compressed) \
738             FROM compression_log \
739             WHERE created_at >= date('now', ?1) AND project_dir = ?2 \
740             GROUP BY d ORDER BY d",
741        ).map_err(SqzError::SessionStore)?;
742
743        let offset = format!("-{days} days");
744        let rows = stmt.query_map(params![offset, project_dir], |row| {
745            let tokens_in: u64 = row.get(2)?;
746            let tokens_out: u64 = row.get(3)?;
747            Ok(DailyGain {
748                date: row.get(0)?,
749                compressions: row.get(1)?,
750                tokens_in,
751                tokens_saved: tokens_in.saturating_sub(tokens_out),
752            })
753        }).map_err(SqzError::SessionStore)?;
754
755        let mut gains = Vec::new();
756        for row in rows {
757            gains.push(row.map_err(SqzError::SessionStore)?);
758        }
759        Ok(gains)
760    }
761
762    /// List distinct project directories that have compression data.
763    pub fn list_projects(&self) -> Result<Vec<(String, u32, u64)>> {
764        let mut stmt = self.db.prepare(
765            "SELECT project_dir, COUNT(*), COALESCE(SUM(tokens_original) - SUM(tokens_compressed), 0) \
766             FROM compression_log \
767             WHERE project_dir IS NOT NULL \
768             GROUP BY project_dir \
769             ORDER BY COUNT(*) DESC",
770        ).map_err(SqzError::SessionStore)?;
771
772        let rows = stmt.query_map([], |row| {
773            Ok((
774                row.get::<_, String>(0)?,
775                row.get::<_, u32>(1)?,
776                row.get::<_, u64>(2)?,
777            ))
778        }).map_err(SqzError::SessionStore)?;
779
780        let mut projects = Vec::new();
781        for row in rows {
782            projects.push(row.map_err(SqzError::SessionStore)?);
783        }
784        Ok(projects)
785    }
786
787    // ── Known files (persistent cross-command context tracking) ───────────
788
789    /// Record a file path as "known" (its content is in the dedup cache).
790    /// Used by cross-command context refs to annotate error messages.
791    pub fn add_known_file(&self, path: &str) -> Result<()> {
792        let now = Utc::now().to_rfc3339();
793        self.db.execute(
794            "INSERT OR REPLACE INTO known_files (path, added_at) VALUES (?1, ?2)",
795            params![path, now],
796        ).map_err(SqzError::SessionStore)?;
797        Ok(())
798    }
799
800    /// Load all known file paths from the persistent store.
801    pub fn known_files(&self) -> Result<Vec<String>> {
802        let mut stmt = self.db.prepare(
803            "SELECT path FROM known_files ORDER BY added_at DESC",
804        ).map_err(SqzError::SessionStore)?;
805
806        let rows = stmt.query_map([], |row| {
807            row.get::<_, String>(0)
808        }).map_err(SqzError::SessionStore)?;
809
810        let mut files = Vec::new();
811        for row in rows {
812            files.push(row.map_err(SqzError::SessionStore)?);
813        }
814        Ok(files)
815    }
816
817    /// Clear all known files (e.g. on session reset).
818    pub fn clear_known_files(&self) -> Result<()> {
819        self.db.execute("DELETE FROM known_files", [])
820            .map_err(SqzError::SessionStore)?;
821        Ok(())
822    }
823}
824
825/// Cumulative compression statistics.
826#[derive(Debug, Clone, Default)]
827pub struct CompressionStats {
828    pub total_compressions: u32,
829    pub total_tokens_in: u64,
830    pub total_tokens_out: u64,
831}
832
833impl CompressionStats {
834    pub fn tokens_saved(&self) -> u64 {
835        self.total_tokens_in.saturating_sub(self.total_tokens_out)
836    }
837
838    pub fn reduction_pct(&self) -> f64 {
839        if self.total_tokens_in == 0 {
840            0.0
841        } else {
842            (1.0 - self.total_tokens_out as f64 / self.total_tokens_in as f64) * 100.0
843        }
844    }
845}
846
847/// A single day's compression gain.
848#[derive(Debug, Clone)]
849pub struct DailyGain {
850    pub date: String,
851    pub compressions: u32,
852    pub tokens_saved: u64,
853    pub tokens_in: u64,
854}
855
856// ── Tests ─────────────────────────────────────────────────────────────────────
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861    use crate::types::{BudgetState, CorrectionLog, ModelFamily, SessionState};
862    use chrono::Utc;
863    use proptest::prelude::*;
864    use std::path::PathBuf;
865
866    fn make_session(id: &str, project_dir: &str, summary: &str) -> SessionState {
867        let now = Utc::now();
868        SessionState {
869            id: id.to_string(),
870            project_dir: PathBuf::from(project_dir),
871            conversation: vec![],
872            corrections: CorrectionLog::default(),
873            pins: vec![],
874            learnings: vec![],
875            compressed_summary: summary.to_string(),
876            budget: BudgetState {
877                window_size: 200_000,
878                consumed: 0,
879                pinned: 0,
880                model_family: ModelFamily::AnthropicClaude,
881            },
882            tool_usage: vec![],
883            created_at: now,
884            updated_at: now,
885        }
886    }
887
888    fn in_memory_store() -> SessionStore {
889        let conn = Connection::open_in_memory().unwrap();
890        apply_schema(&conn).unwrap();
891        SessionStore { db: conn }
892    }
893
894    #[test]
895    fn test_save_and_load_session() {
896        let store = in_memory_store();
897        let session = make_session("sess-1", "/home/user/project", "REST API refactor");
898
899        let id = store.save_session(&session).unwrap();
900        assert_eq!(id, "sess-1");
901
902        let loaded = store.load_session("sess-1".to_string()).unwrap();
903        assert_eq!(loaded.id, session.id);
904        assert_eq!(loaded.compressed_summary, session.compressed_summary);
905        assert_eq!(loaded.project_dir, session.project_dir);
906    }
907
908    #[test]
909    fn test_save_session_upsert() {
910        let store = in_memory_store();
911        let mut session = make_session("sess-2", "/proj", "initial summary");
912        store.save_session(&session).unwrap();
913
914        session.compressed_summary = "updated summary".to_string();
915        store.save_session(&session).unwrap();
916
917        let loaded = store.load_session("sess-2".to_string()).unwrap();
918        assert_eq!(loaded.compressed_summary, "updated summary");
919    }
920
921    #[test]
922    fn test_load_nonexistent_session_errors() {
923        let store = in_memory_store();
924        let result = store.load_session("does-not-exist".to_string());
925        assert!(result.is_err());
926    }
927
928    #[test]
929    fn test_search_fts() {
930        let store = in_memory_store();
931        store.save_session(&make_session("s1", "/proj", "REST API refactor with authentication")).unwrap();
932        store.save_session(&make_session("s2", "/proj", "database migration postgres")).unwrap();
933
934        let results = store.search("authentication").unwrap();
935        assert_eq!(results.len(), 1);
936        assert_eq!(results[0].id, "s1");
937    }
938
939    #[test]
940    fn test_search_by_date() {
941        let store = in_memory_store();
942        let now = Utc::now();
943        let past = now - chrono::Duration::hours(2);
944        let future = now + chrono::Duration::hours(2);
945
946        store.save_session(&make_session("s1", "/proj", "recent session")).unwrap();
947
948        let results = store.search_by_date(past, future).unwrap();
949        assert!(!results.is_empty());
950        assert!(results.iter().any(|r| r.id == "s1"));
951    }
952
953    #[test]
954    fn test_search_by_project() {
955        let store = in_memory_store();
956        store.save_session(&make_session("s1", "/home/user/alpha", "alpha project")).unwrap();
957        store.save_session(&make_session("s2", "/home/user/beta", "beta project")).unwrap();
958
959        let results = store.search_by_project(Path::new("/home/user/alpha")).unwrap();
960        assert_eq!(results.len(), 1);
961        assert_eq!(results[0].id, "s1");
962    }
963
964    #[test]
965    fn test_cache_entry_round_trip() {
966        let store = in_memory_store();
967        let entry = CompressedContent {
968            data: "compressed data".to_string(),
969            tokens_compressed: 10,
970            tokens_original: 50,
971            stages_applied: vec!["strip_nulls".to_string()],
972            compression_ratio: 0.2,
973            provenance: crate::types::Provenance::default(),
974            verify: None,
975        };
976
977        store.save_cache_entry("abc123", &entry).unwrap();
978
979        let loaded = store.get_cache_entry("abc123").unwrap().unwrap();
980        assert_eq!(loaded.data, entry.data);
981        assert_eq!(loaded.tokens_compressed, entry.tokens_compressed);
982        assert_eq!(loaded.tokens_original, entry.tokens_original);
983    }
984
985    #[test]
986    fn test_get_cache_entry_missing_returns_none() {
987        let store = in_memory_store();
988        let result = store.get_cache_entry("nonexistent").unwrap();
989        assert!(result.is_none());
990    }
991
992    #[test]
993    fn test_open_or_create_corrupted_db() {
994        let dir = tempfile::tempdir().unwrap();
995        let path = dir.path().join("store.db");
996
997        // Write garbage bytes to simulate a corrupted database.
998        std::fs::write(&path, b"this is not a valid sqlite database").unwrap();
999
1000        // Should succeed by creating a fresh database.
1001        let store = SessionStore::open_or_create(&path).unwrap();
1002        let session = make_session("s1", "/proj", "after corruption");
1003        store.save_session(&session).unwrap();
1004        let loaded = store.load_session("s1".to_string()).unwrap();
1005        assert_eq!(loaded.id, "s1");
1006    }
1007
1008    // ── Property-based tests ──────────────────────────────────────────────────
1009
1010    /// Build a `SessionState` with a specific `updated_at` timestamp.
1011    fn make_session_at(id: &str, summary: &str, updated_at: DateTime<Utc>) -> SessionState {
1012        let now = Utc::now();
1013        SessionState {
1014            id: id.to_string(),
1015            project_dir: PathBuf::from("/proj"),
1016            conversation: vec![],
1017            corrections: CorrectionLog::default(),
1018            pins: vec![],
1019            learnings: vec![],
1020            compressed_summary: summary.to_string(),
1021            budget: BudgetState {
1022                window_size: 200_000,
1023                consumed: 0,
1024                pinned: 0,
1025                model_family: ModelFamily::AnthropicClaude,
1026            },
1027            tool_usage: vec![],
1028            created_at: now,
1029            updated_at,
1030        }
1031    }
1032
1033    // ── Property 26: Session store search correctness ─────────────────────────
1034    // **Validates: Requirements 20.2, 20.3, 20.4**
1035    //
1036    // For any set of sessions saved to the store, a keyword search SHALL return
1037    // all sessions whose compressed_summary contains the keyword, and no
1038    // sessions that don't contain it.
1039
1040    proptest! {
1041        /// **Validates: Requirements 20.2, 20.3, 20.4**
1042        ///
1043        /// For any set of sessions saved to the store, a keyword search SHALL
1044        /// return all sessions whose `compressed_summary` contains the keyword,
1045        /// and no sessions that don't contain it.
1046        #[test]
1047        fn prop_search_correctness(
1048            // A simple ASCII keyword: 5-8 lowercase letters, no common English
1049            // words that the porter stemmer might conflate with other terms.
1050            keyword in "[b-df-hj-np-tv-z]{5,8}",
1051            // 1-6 summaries that embed the keyword
1052            matching_suffixes in proptest::collection::vec("[a-z ]{4,20}", 1..=6usize),
1053            // 1-6 summaries that do NOT contain the keyword
1054            non_matching in proptest::collection::vec("[a-z ]{8,30}", 1..=6usize),
1055        ) {
1056            // Ensure the keyword doesn't accidentally appear in non-matching summaries.
1057            for s in &non_matching {
1058                prop_assume!(!s.contains(keyword.as_str()));
1059            }
1060
1061            let store = in_memory_store();
1062
1063            // Save matching sessions (summary = "<suffix> <keyword> <suffix>")
1064            let mut matching_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1065            for (i, suffix) in matching_suffixes.iter().enumerate() {
1066                let id = format!("match-{i}");
1067                let summary = format!("{} {} end", suffix, keyword);
1068                store.save_session(&make_session(&id, "/proj", &summary)).unwrap();
1069                matching_ids.insert(id);
1070            }
1071
1072            // Save non-matching sessions
1073            let mut non_matching_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1074            for (i, summary) in non_matching.iter().enumerate() {
1075                let id = format!("nomatch-{i}");
1076                store.save_session(&make_session(&id, "/proj", summary)).unwrap();
1077                non_matching_ids.insert(id);
1078            }
1079
1080            let results = store.search(&keyword).unwrap();
1081            let result_ids: std::collections::HashSet<String> =
1082                results.iter().map(|r| r.id.clone()).collect();
1083
1084            // Every matching session must appear in results.
1085            for id in &matching_ids {
1086                prop_assert!(
1087                    result_ids.contains(id),
1088                    "matching session '{}' not found in search results for keyword '{}'",
1089                    id, keyword
1090                );
1091            }
1092
1093            // No non-matching session may appear in results.
1094            for id in &non_matching_ids {
1095                prop_assert!(
1096                    !result_ids.contains(id),
1097                    "non-matching session '{}' incorrectly appeared in search results for keyword '{}'",
1098                    id, keyword
1099                );
1100            }
1101        }
1102    }
1103
1104    // ── Property: search_by_date correctness ─────────────────────────────────
1105    // **Validates: Requirements 20.4**
1106    //
1107    // For any set of sessions with different timestamps, searching by a date
1108    // range SHALL return exactly the sessions whose `updated_at` falls within
1109    // [from, to], and no sessions outside that range.
1110
1111    proptest! {
1112        /// **Validates: Requirements 20.4**
1113        ///
1114        /// For any set of sessions with distinct timestamps, `search_by_date`
1115        /// SHALL return exactly the sessions whose `updated_at` is within
1116        /// `[from, to]`, and no sessions outside that range.
1117        #[test]
1118        fn prop_search_by_date_correctness(
1119            // Generate 2-8 offsets in seconds from epoch (spread over a wide range)
1120            offsets in proptest::collection::vec(0i64..=86400i64 * 365, 2..=8usize),
1121            // The search window: start and end offsets (relative to the minimum offset)
1122            window_start_delta in 0i64..=3600i64,
1123            window_end_delta   in 3600i64..=7200i64,
1124        ) {
1125            use chrono::TimeZone;
1126
1127            // Deduplicate offsets so each session has a unique timestamp.
1128            let mut unique_offsets: Vec<i64> = offsets.clone();
1129            unique_offsets.sort_unstable();
1130            unique_offsets.dedup();
1131            prop_assume!(unique_offsets.len() >= 2);
1132
1133            let base_offset = unique_offsets[0];
1134            let from_offset = base_offset + window_start_delta;
1135            let to_offset   = base_offset + window_end_delta;
1136
1137            let from = Utc.timestamp_opt(from_offset, 0).unwrap();
1138            let to   = Utc.timestamp_opt(to_offset,   0).unwrap();
1139
1140            let store = in_memory_store();
1141
1142            let mut in_range_ids:  std::collections::HashSet<String> = std::collections::HashSet::new();
1143            let mut out_range_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1144
1145            for (i, &offset) in unique_offsets.iter().enumerate() {
1146                let ts = Utc.timestamp_opt(offset, 0).unwrap();
1147                let id = format!("sess-{i}");
1148                let session = make_session_at(&id, "some summary", ts);
1149                store.save_session(&session).unwrap();
1150
1151                if ts >= from && ts <= to {
1152                    in_range_ids.insert(id);
1153                } else {
1154                    out_range_ids.insert(id);
1155                }
1156            }
1157
1158            let results = store.search_by_date(from, to).unwrap();
1159            let result_ids: std::collections::HashSet<String> =
1160                results.iter().map(|r| r.id.clone()).collect();
1161
1162            // Every in-range session must appear.
1163            for id in &in_range_ids {
1164                prop_assert!(
1165                    result_ids.contains(id),
1166                    "in-range session '{}' missing from search_by_date results",
1167                    id
1168                );
1169            }
1170
1171            // No out-of-range session may appear.
1172            for id in &out_range_ids {
1173                prop_assert!(
1174                    !result_ids.contains(id),
1175                    "out-of-range session '{}' incorrectly appeared in search_by_date results",
1176                    id
1177                );
1178            }
1179        }
1180    }
1181}