Skip to main content

sqz_engine/
session_store.rs

1use std::path::Path;
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection, OpenFlags};
6use serde::{Deserialize, Serialize};
7
8use crate::error::{Result, SqzError};
9use crate::types::{CompressedContent, SessionId, SessionState};
10
11/// A lightweight summary of a session for search results. Doesn't include
12/// the full conversation — just enough to identify and filter sessions.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct SessionSummary {
15    pub id: SessionId,
16    pub project_dir: PathBuf,
17    pub compressed_summary: String,
18    pub created_at: DateTime<Utc>,
19    pub updated_at: DateTime<Utc>,
20}
21
22/// SQLite-backed persistent session and cache store with FTS5 full-text search.
23///
24/// Stores sessions, cache entries, compression logs, and known files in a
25/// single SQLite database. Uses WAL mode for concurrent read access.
26///
27/// ```rust,no_run
28/// use sqz_engine::SessionStore;
29/// use std::path::Path;
30///
31/// let store = SessionStore::open_or_create(Path::new("~/.sqz/sessions.db")).unwrap();
32/// let results = store.search("authentication refactor").unwrap();
33/// for session in &results {
34///     println!("{}: {}", session.id, session.compressed_summary);
35/// }
36/// ```
37pub struct SessionStore {
38    db: Connection,
39}
40
41// ── Schema ────────────────────────────────────────────────────────────────────
42
43const SCHEMA: &str = r#"
44PRAGMA journal_mode = WAL;
45
46CREATE TABLE IF NOT EXISTS sessions (
47    id               TEXT PRIMARY KEY,
48    project_dir      TEXT NOT NULL,
49    compressed_summary TEXT NOT NULL,
50    created_at       TEXT NOT NULL,
51    updated_at       TEXT NOT NULL,
52    data             BLOB NOT NULL
53);
54
55CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
56    id,
57    project_dir,
58    compressed_summary,
59    content='sessions',
60    content_rowid='rowid',
61    tokenize='porter ascii'
62);
63
64CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
65    INSERT INTO sessions_fts(rowid, id, project_dir, compressed_summary)
66    VALUES (new.rowid, new.id, new.project_dir, new.compressed_summary);
67END;
68
69CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
70    INSERT INTO sessions_fts(sessions_fts, rowid, id, project_dir, compressed_summary)
71    VALUES ('delete', old.rowid, old.id, old.project_dir, old.compressed_summary);
72END;
73
74CREATE TRIGGER IF NOT EXISTS sessions_au AFTER UPDATE ON sessions BEGIN
75    INSERT INTO sessions_fts(sessions_fts, rowid, id, project_dir, compressed_summary)
76    VALUES ('delete', old.rowid, old.id, old.project_dir, old.compressed_summary);
77    INSERT INTO sessions_fts(rowid, id, project_dir, compressed_summary)
78    VALUES (new.rowid, new.id, new.project_dir, new.compressed_summary);
79END;
80
81CREATE TABLE IF NOT EXISTS cache_entries (
82    hash        TEXT PRIMARY KEY,
83    data        TEXT NOT NULL,
84    accessed_at TEXT NOT NULL,
85    -- Raw pre-compression bytes so `sqz expand <prefix>` can serve
86    -- truly uncompressed content to agents that cannot parse
87    -- `§ref:…§` dedup tokens. Nullable because the column was added
88    -- in an additive migration; rows written before that migration
89    -- (or via callers that don't have the original bytes) have NULL.
90    original    BLOB
91);
92
93CREATE TABLE IF NOT EXISTS compression_log (
94    id               INTEGER PRIMARY KEY AUTOINCREMENT,
95    tokens_original  INTEGER NOT NULL,
96    tokens_compressed INTEGER NOT NULL,
97    stages_applied   TEXT NOT NULL,
98    mode             TEXT NOT NULL DEFAULT 'auto',
99    created_at       TEXT NOT NULL
100);
101
102CREATE TABLE IF NOT EXISTS known_files (
103    path        TEXT PRIMARY KEY,
104    added_at    TEXT NOT NULL
105);
106
107-- Small key/value store for engine-wide state that needs to persist across
108-- short-lived sqz processes (each shell-hook invocation is a new process).
109-- Initially used only for the last_compaction_at marker: cache entries with
110-- `accessed_at < last_compaction_at` are treated as stale even if still
111-- within the normal TTL. See cache_manager.rs for the freshness model.
112CREATE TABLE IF NOT EXISTS metadata (
113    key         TEXT PRIMARY KEY,
114    value       TEXT NOT NULL
115);
116"#;
117
118// ── Helpers ───────────────────────────────────────────────────────────────────
119
120pub(crate) fn apply_schema(conn: &Connection) -> rusqlite::Result<()> {
121    conn.execute_batch(SCHEMA)?;
122    // Additive migration: add `original` BLOB column to cache_entries if
123    // it does not yet exist. Stores the raw pre-compression bytes so
124    // `sqz expand <prefix>` can return truly uncompressed content when
125    // an agent cannot parse `§ref:…§` tokens (reported on-list by
126    // SquireNed for GLM 5.1). `NULL` for rows written by older sqz
127    // versions — `expand` treats those as "original unavailable, fall
128    // back to the compressed blob" so users don't get spurious errors
129    // on pre-migration data.
130    //
131    // Using pragma_table_info rather than a version table because the
132    // rest of sqz does the same — this is the first additive migration.
133    let has_original: bool = conn
134        .prepare("SELECT 1 FROM pragma_table_info('cache_entries') WHERE name = 'original'")?
135        .query_row([], |_| Ok(()))
136        .is_ok();
137    if !has_original {
138        conn.execute("ALTER TABLE cache_entries ADD COLUMN original BLOB", [])?;
139    }
140    Ok(())
141}
142
143fn open_connection(path: &Path) -> rusqlite::Result<Connection> {
144    let conn = Connection::open(path)?;
145    apply_schema(&conn)?;
146    Ok(conn)
147}
148
149fn row_to_summary(
150    id: String,
151    project_dir: String,
152    compressed_summary: String,
153    created_at: String,
154    updated_at: String,
155) -> Result<SessionSummary> {
156    let created_at = created_at
157        .parse::<DateTime<Utc>>()
158        .map_err(|e| SqzError::Other(format!("invalid created_at timestamp: {e}")))?;
159    let updated_at = updated_at
160        .parse::<DateTime<Utc>>()
161        .map_err(|e| SqzError::Other(format!("invalid updated_at timestamp: {e}")))?;
162    Ok(SessionSummary {
163        id,
164        project_dir: PathBuf::from(project_dir),
165        compressed_summary,
166        created_at,
167        updated_at,
168    })
169}
170
171// ── SessionStore ──────────────────────────────────────────────────────────────
172
173impl SessionStore {
174    /// Construct a `SessionStore` from an already-open `Connection`.
175    /// Intended for testing (e.g., in-memory databases).
176    #[cfg(test)]
177    pub(crate) fn from_connection(conn: Connection) -> Self {
178        Self { db: conn }
179    }
180
181    /// Open an existing database at `path`. Returns an error if the file does
182    /// not exist or cannot be opened.
183    pub fn open(path: &Path) -> Result<Self> {
184        let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE)?;
185        apply_schema(&conn)?;
186        Ok(Self { db: conn })
187    }
188
189    /// Open the database at `path`, creating it if it does not exist.
190    /// If the database is corrupted, a fresh database is created at the same
191    /// path and a warning is logged to stderr.
192    pub fn open_or_create(path: &Path) -> Result<Self> {
193        match open_connection(path) {
194            Ok(conn) => Ok(Self { db: conn }),
195            Err(e) => {
196                eprintln!(
197                    "sqz warning: session store at '{}' is corrupted or inaccessible ({e}). \
198                     Creating a new database. Prior session data has been lost.",
199                    path.display()
200                );
201                // Remove the corrupted file so we can start fresh.
202                if path.exists() {
203                    let _ = std::fs::remove_file(path);
204                }
205                let conn = open_connection(path)
206                    .map_err(|e2| SqzError::Other(format!("failed to create new session store: {e2}")))?;
207                Ok(Self { db: conn })
208            }
209        }
210    }
211
212    // ── Session CRUD ──────────────────────────────────────────────────────────
213
214    /// Persist a session. Returns the session id.
215    pub fn save_session(&self, session: &SessionState) -> Result<SessionId> {
216        let data = serde_json::to_vec(session)?;
217        let project_dir = session.project_dir.to_string_lossy().to_string();
218        let created_at = session.created_at.to_rfc3339();
219        let updated_at = session.updated_at.to_rfc3339();
220
221        self.db.execute(
222            r#"INSERT INTO sessions (id, project_dir, compressed_summary, created_at, updated_at, data)
223               VALUES (?1, ?2, ?3, ?4, ?5, ?6)
224               ON CONFLICT(id) DO UPDATE SET
225                   project_dir        = excluded.project_dir,
226                   compressed_summary = excluded.compressed_summary,
227                   created_at         = excluded.created_at,
228                   updated_at         = excluded.updated_at,
229                   data               = excluded.data"#,
230            params![
231                session.id,
232                project_dir,
233                session.compressed_summary,
234                created_at,
235                updated_at,
236                data,
237            ],
238        )?;
239
240        Ok(session.id.clone())
241    }
242
243    /// Load a session by id.
244    pub fn load_session(&self, id: SessionId) -> Result<SessionState> {
245        let data: Vec<u8> = self.db.query_row(
246            "SELECT data FROM sessions WHERE id = ?1",
247            params![id],
248            |row| row.get(0),
249        )?;
250        let session: SessionState = serde_json::from_slice(&data)?;
251        Ok(session)
252    }
253
254    // ── Search ────────────────────────────────────────────────────────────────
255
256    /// Full-text search using FTS5 (porter stemmer, ASCII tokenizer).
257    pub fn search(&self, query: &str) -> Result<Vec<SessionSummary>> {
258        let mut stmt = self.db.prepare(
259            r#"SELECT s.id, s.project_dir, s.compressed_summary, s.created_at, s.updated_at
260               FROM sessions s
261               JOIN sessions_fts f ON s.rowid = f.rowid
262               WHERE sessions_fts MATCH ?1
263               ORDER BY rank"#,
264        )?;
265
266        let rows = stmt.query_map(params![query], |row| {
267            Ok((
268                row.get::<_, String>(0)?,
269                row.get::<_, String>(1)?,
270                row.get::<_, String>(2)?,
271                row.get::<_, String>(3)?,
272                row.get::<_, String>(4)?,
273            ))
274        })?;
275
276        let mut results = Vec::new();
277        for row in rows {
278            let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
279            results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
280        }
281        Ok(results)
282    }
283
284    /// Query sessions whose `updated_at` falls within `[from, to]`.
285    pub fn search_by_date(
286        &self,
287        from: DateTime<Utc>,
288        to: DateTime<Utc>,
289    ) -> Result<Vec<SessionSummary>> {
290        let mut stmt = self.db.prepare(
291            r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
292               FROM sessions
293               WHERE updated_at >= ?1 AND updated_at <= ?2
294               ORDER BY updated_at DESC"#,
295        )?;
296
297        let rows = stmt.query_map(params![from.to_rfc3339(), to.to_rfc3339()], |row| {
298            Ok((
299                row.get::<_, String>(0)?,
300                row.get::<_, String>(1)?,
301                row.get::<_, String>(2)?,
302                row.get::<_, String>(3)?,
303                row.get::<_, String>(4)?,
304            ))
305        })?;
306
307        let mut results = Vec::new();
308        for row in rows {
309            let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
310            results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
311        }
312        Ok(results)
313    }
314
315    /// Return the most recently updated session, or `None` if no sessions exist.
316    pub fn latest_session(&self) -> Result<Option<SessionSummary>> {
317        let mut stmt = self.db.prepare(
318            r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
319               FROM sessions
320               ORDER BY updated_at DESC
321               LIMIT 1"#,
322        ).map_err(SqzError::SessionStore)?;
323
324        let rows = stmt.query_map([], |row| {
325            Ok((
326                row.get::<_, String>(0)?,
327                row.get::<_, String>(1)?,
328                row.get::<_, String>(2)?,
329                row.get::<_, String>(3)?,
330                row.get::<_, String>(4)?,
331            ))
332        }).map_err(SqzError::SessionStore)?;
333
334        for row in rows {
335            let (id, project_dir, compressed_summary, created_at, updated_at) =
336                row.map_err(SqzError::SessionStore)?;
337            return Ok(Some(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?));
338        }
339        Ok(None)
340    }
341
342    /// Query sessions whose `project_dir` matches `dir` exactly.
343    pub fn search_by_project(&self, dir: &Path) -> Result<Vec<SessionSummary>> {
344        let dir_str = dir.to_string_lossy().to_string();
345        let mut stmt = self.db.prepare(
346            r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
347               FROM sessions
348               WHERE project_dir = ?1
349               ORDER BY updated_at DESC"#,
350        )?;
351
352        let rows = stmt.query_map(params![dir_str], |row| {
353            Ok((
354                row.get::<_, String>(0)?,
355                row.get::<_, String>(1)?,
356                row.get::<_, String>(2)?,
357                row.get::<_, String>(3)?,
358                row.get::<_, String>(4)?,
359            ))
360        })?;
361
362        let mut results = Vec::new();
363        for row in rows {
364            let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
365            results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
366        }
367        Ok(results)
368    }
369
370    // ── Cache entries ─────────────────────────────────────────────────────────
371
372    /// Persist a cache entry keyed by content hash.
373    ///
374    /// Stores both the compressed JSON (`data`) used for dedup-hit
375    /// responses AND the raw uncompressed bytes (`original`) so that
376    /// `sqz expand <prefix>` can serve truly uncompressed content. See
377    /// [`save_cache_entry_with_original`] for the original-aware version;
378    /// this convenience wrapper exists for callers that do not (yet)
379    /// have the pre-compression bytes handy. Rows written through this
380    /// path leave `original` as `NULL`, and `expand` will degrade to
381    /// returning the compressed blob with a note.
382    pub fn save_cache_entry(&self, hash: &str, compressed: &CompressedContent) -> Result<()> {
383        self.save_cache_entry_with_original(hash, compressed, None)
384    }
385
386    /// Persist a cache entry with both compressed and original content.
387    ///
388    /// `original` must be the exact bytes that produced `compressed`, so
389    /// that `expand` is a true inverse of dedup. We store the raw bytes
390    /// (not the UTF-8 string) because command output may include
391    /// non-UTF-8 sequences — storing the text would lose them.
392    pub fn save_cache_entry_with_original(
393        &self,
394        hash: &str,
395        compressed: &CompressedContent,
396        original: Option<&[u8]>,
397    ) -> Result<()> {
398        let data = serde_json::to_string(compressed)?;
399        let now = Utc::now().to_rfc3339();
400        self.db.execute(
401            r#"INSERT INTO cache_entries (hash, data, accessed_at, original)
402               VALUES (?1, ?2, ?3, ?4)
403               ON CONFLICT(hash) DO UPDATE
404                   SET data = excluded.data,
405                       accessed_at = excluded.accessed_at,
406                       -- Don't overwrite a previously-stored `original`
407                       -- with NULL. Older callers (that go through
408                       -- save_cache_entry rather than the _with_original
409                       -- variant) shouldn't erase the expand-able bytes.
410                       original = COALESCE(excluded.original, original)"#,
411            params![hash, data, now, original],
412        )?;
413        Ok(())
414    }
415
416    /// Retrieve the stored original bytes for a cached hash, if the
417    /// caller populated them via `save_cache_entry_with_original`.
418    ///
419    /// Returns `Ok(None)` for missing entries AND for entries that were
420    /// saved by an older call site that did not pass `original`. The
421    /// caller should fall back to the compressed blob in the latter case
422    /// and surface a note to the user so they know this specific entry
423    /// wasn't round-trippable.
424    pub fn get_cache_entry_original(&self, hash: &str) -> Result<Option<Vec<u8>>> {
425        let result: rusqlite::Result<Option<Vec<u8>>> = self.db.query_row(
426            "SELECT original FROM cache_entries WHERE hash = ?1",
427            params![hash],
428            |row| row.get(0),
429        );
430        match result {
431            Ok(v) => Ok(v),
432            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
433            Err(e) => Err(SqzError::SessionStore(e)),
434        }
435    }
436
437    /// Delete a cache entry by content hash.
438    pub fn delete_cache_entry(&self, hash: &str) -> Result<()> {
439        self.db.execute(
440            "DELETE FROM cache_entries WHERE hash = ?1",
441            params![hash],
442        )?;
443        Ok(())
444    }
445
446    /// Return all cache entries ordered by `accessed_at` ASC (oldest first),
447    /// as `(hash, size_bytes)` pairs where `size_bytes` is the byte length of
448    /// the stored JSON data.
449    pub fn list_cache_entries_lru(&self) -> Result<Vec<(String, u64)>> {
450        let mut stmt = self.db.prepare(
451            "SELECT hash, length(data) FROM cache_entries ORDER BY accessed_at ASC",
452        )?;
453        let rows = stmt.query_map([], |row| {
454            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
455        })?;
456        let mut entries = Vec::new();
457        for row in rows {
458            let (hash, size) = row?;
459            entries.push((hash, size as u64));
460        }
461        Ok(entries)
462    }
463
464    /// Retrieve a cache entry by content hash, updating `accessed_at`.
465    pub fn get_cache_entry(&self, hash: &str) -> Result<Option<CompressedContent>> {
466        let result: rusqlite::Result<String> = self.db.query_row(
467            "SELECT data FROM cache_entries WHERE hash = ?1",
468            params![hash],
469            |row| row.get(0),
470        );
471
472        match result {
473            Ok(data) => {
474                // Touch accessed_at for LRU tracking.
475                let now = Utc::now().to_rfc3339();
476                let _ = self.db.execute(
477                    "UPDATE cache_entries SET accessed_at = ?1 WHERE hash = ?2",
478                    params![now, hash],
479                );
480                let entry: CompressedContent = serde_json::from_str(&data)?;
481                Ok(Some(entry))
482            }
483            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
484            Err(e) => Err(SqzError::SessionStore(e)),
485        }
486    }
487
488    /// Look up a cache entry by a **prefix** of the content hash.
489    ///
490    /// The inline dedup refs we hand to the LLM carry only the first 16
491    /// hex chars of the SHA-256 (`§ref:<16-hex>§`), so when an agent runs
492    /// `sqz expand <prefix>` we need to resolve those 16 chars back to the
493    /// full 64-char key. Uses `LIKE 'prefix%'` with an index-friendly
494    /// anchored pattern so the query stays O(log n) on the primary key.
495    ///
496    /// Returns `Ok(Some((full_hash, entry)))` on unique match.
497    /// Returns `Ok(None)` if no entries match.
498    /// Returns `Err(_)` if the prefix is ambiguous (2+ matches) — the
499    /// caller should tell the user to use a longer prefix. 16-hex
500    /// collisions are astronomically unlikely (one in 2^64) but we
501    /// refuse to guess rather than quietly serve a surprise file.
502    ///
503    /// The prefix is validated as lowercase hex. Non-hex input returns
504    /// `None` without touching the database — this is how we handle the
505    /// common user-error case of pasting the ref with the `§` markers
506    /// still attached (they get rejected before we query SQLite).
507    pub fn get_cache_entry_by_prefix(
508        &self,
509        prefix: &str,
510    ) -> Result<Option<(String, CompressedContent)>> {
511        // Reject anything that isn't pure lowercase hex. The inline refs
512        // we emit are always lowercase so there's no reason to case-fold
513        // here; accidentally matching uppercase input would also match
514        // unrelated entries if someone hand-crafted a collision.
515        if prefix.is_empty() || !prefix.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()) {
516            return Ok(None);
517        }
518        let pattern = format!("{prefix}%");
519        let mut stmt = self
520            .db
521            .prepare("SELECT hash, data FROM cache_entries WHERE hash LIKE ?1 LIMIT 2")?;
522        let mut rows = stmt.query(params![pattern])?;
523
524        let first = match rows.next()? {
525            Some(r) => {
526                let hash: String = r.get(0)?;
527                let data: String = r.get(1)?;
528                (hash, data)
529            }
530            None => return Ok(None),
531        };
532
533        // Two or more hits — refuse. Prefix ambiguity is user-recoverable:
534        // rerun with a longer prefix. Matching one arbitrarily would be
535        // a silent data surprise.
536        if rows.next()?.is_some() {
537            return Err(SqzError::Other(format!(
538                "cache: prefix '{prefix}' matches multiple entries — use a longer prefix"
539            )));
540        }
541        drop(rows);
542        drop(stmt);
543
544        // Touch accessed_at for LRU tracking — symmetric with get_cache_entry.
545        let now = Utc::now().to_rfc3339();
546        let _ = self.db.execute(
547            "UPDATE cache_entries SET accessed_at = ?1 WHERE hash = ?2",
548            params![now, first.0],
549        );
550        let entry: CompressedContent = serde_json::from_str(&first.1)?;
551        Ok(Some((first.0, entry)))
552    }
553
554    /// Read the `accessed_at` timestamp for a cached hash without updating
555    /// it. Returns `None` if the hash is not cached.
556    ///
557    /// Used by the dedup freshness check: if `accessed_at` is recent, the
558    /// LLM likely still has the original content in its context window, so
559    /// returning a ref is safe. If it's old, re-send the full content.
560    pub fn get_cache_entry_accessed_at(&self, hash: &str) -> Result<Option<DateTime<Utc>>> {
561        let result: rusqlite::Result<String> = self.db.query_row(
562            "SELECT accessed_at FROM cache_entries WHERE hash = ?1",
563            params![hash],
564            |row| row.get(0),
565        );
566        match result {
567            Ok(s) => {
568                let ts = s
569                    .parse::<DateTime<Utc>>()
570                    .map_err(|e| SqzError::Other(format!("invalid accessed_at: {e}")))?;
571                Ok(Some(ts))
572            }
573            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
574            Err(e) => Err(SqzError::SessionStore(e)),
575        }
576    }
577
578    /// Check if a cache entry exists without updating `accessed_at`.
579    pub fn cache_entry_exists(&self, hash: &str) -> Result<bool> {
580        let result: rusqlite::Result<i64> = self.db.query_row(
581            "SELECT 1 FROM cache_entries WHERE hash = ?1",
582            params![hash],
583            |row| row.get(0),
584        );
585        match result {
586            Ok(_) => Ok(true),
587            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
588            Err(e) => Err(SqzError::SessionStore(e)),
589        }
590    }
591
592    /// Update `accessed_at` for a cached hash to the current time. Called by
593    /// the cache manager when a ref is served so the next staleness check
594    /// sees the recent send.
595    pub fn touch_cache_entry(&self, hash: &str) -> Result<()> {
596        let now = Utc::now().to_rfc3339();
597        self.db.execute(
598            "UPDATE cache_entries SET accessed_at = ?1 WHERE hash = ?2",
599            params![now, hash],
600        )?;
601        Ok(())
602    }
603
604    /// Set a metadata key/value. Persists across sqz process boundaries
605    /// (each shell-hook invocation is a short-lived process).
606    pub fn set_metadata(&self, key: &str, value: &str) -> Result<()> {
607        self.db.execute(
608            "INSERT INTO metadata (key, value) VALUES (?1, ?2)
609             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
610            params![key, value],
611        )?;
612        Ok(())
613    }
614
615    /// Get a metadata value. Returns `None` if the key has never been set.
616    pub fn get_metadata(&self, key: &str) -> Result<Option<String>> {
617        let result: rusqlite::Result<String> = self.db.query_row(
618            "SELECT value FROM metadata WHERE key = ?1",
619            params![key],
620            |row| row.get(0),
621        );
622        match result {
623            Ok(v) => Ok(Some(v)),
624            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
625            Err(e) => Err(SqzError::SessionStore(e)),
626        }
627    }
628
629    /// Log a compression event for cumulative stats tracking.
630    pub fn log_compression(
631        &self,
632        tokens_original: u32,
633        tokens_compressed: u32,
634        stages: &[String],
635        mode: &str,
636    ) -> Result<()> {
637        let now = Utc::now().to_rfc3339();
638        let stages_str = stages.join(",");
639        self.db.execute(
640            "INSERT INTO compression_log (tokens_original, tokens_compressed, stages_applied, mode, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
641            params![tokens_original, tokens_compressed, stages_str, mode, now],
642        ).map_err(SqzError::SessionStore)?;
643        Ok(())
644    }
645
646    /// Get cumulative compression stats from the log.
647    pub fn compression_stats(&self) -> Result<CompressionStats> {
648        let mut stmt = self.db.prepare(
649            "SELECT COUNT(*), COALESCE(SUM(tokens_original), 0), COALESCE(SUM(tokens_compressed), 0) FROM compression_log",
650        ).map_err(SqzError::SessionStore)?;
651
652        let stats = stmt.query_row([], |row| {
653            Ok(CompressionStats {
654                total_compressions: row.get::<_, u32>(0)?,
655                total_tokens_in: row.get::<_, u64>(1)?,
656                total_tokens_out: row.get::<_, u64>(2)?,
657            })
658        }).map_err(SqzError::SessionStore)?;
659
660        Ok(stats)
661    }
662
663    /// Get daily compression gains for the last N days.
664    pub fn daily_gains(&self, days: u32) -> Result<Vec<DailyGain>> {
665        let mut stmt = self.db.prepare(
666            "SELECT date(created_at) as d, COUNT(*), SUM(tokens_original), SUM(tokens_compressed) \
667             FROM compression_log \
668             WHERE created_at >= date('now', ?1) \
669             GROUP BY d ORDER BY d",
670        ).map_err(SqzError::SessionStore)?;
671
672        let offset = format!("-{days} days");
673        let rows = stmt.query_map(params![offset], |row| {
674            let tokens_in: u64 = row.get(2)?;
675            let tokens_out: u64 = row.get(3)?;
676            Ok(DailyGain {
677                date: row.get(0)?,
678                compressions: row.get(1)?,
679                tokens_in,
680                tokens_saved: tokens_in.saturating_sub(tokens_out),
681            })
682        }).map_err(SqzError::SessionStore)?;
683
684        let mut gains = Vec::new();
685        for row in rows {
686            gains.push(row.map_err(SqzError::SessionStore)?);
687        }
688        Ok(gains)
689    }
690
691    // ── Known files (persistent cross-command context tracking) ───────────
692
693    /// Record a file path as "known" (its content is in the dedup cache).
694    /// Used by cross-command context refs to annotate error messages.
695    pub fn add_known_file(&self, path: &str) -> Result<()> {
696        let now = Utc::now().to_rfc3339();
697        self.db.execute(
698            "INSERT OR REPLACE INTO known_files (path, added_at) VALUES (?1, ?2)",
699            params![path, now],
700        ).map_err(SqzError::SessionStore)?;
701        Ok(())
702    }
703
704    /// Load all known file paths from the persistent store.
705    pub fn known_files(&self) -> Result<Vec<String>> {
706        let mut stmt = self.db.prepare(
707            "SELECT path FROM known_files ORDER BY added_at DESC",
708        ).map_err(SqzError::SessionStore)?;
709
710        let rows = stmt.query_map([], |row| {
711            row.get::<_, String>(0)
712        }).map_err(SqzError::SessionStore)?;
713
714        let mut files = Vec::new();
715        for row in rows {
716            files.push(row.map_err(SqzError::SessionStore)?);
717        }
718        Ok(files)
719    }
720
721    /// Clear all known files (e.g. on session reset).
722    pub fn clear_known_files(&self) -> Result<()> {
723        self.db.execute("DELETE FROM known_files", [])
724            .map_err(SqzError::SessionStore)?;
725        Ok(())
726    }
727}
728
729/// Cumulative compression statistics.
730#[derive(Debug, Clone, Default)]
731pub struct CompressionStats {
732    pub total_compressions: u32,
733    pub total_tokens_in: u64,
734    pub total_tokens_out: u64,
735}
736
737impl CompressionStats {
738    pub fn tokens_saved(&self) -> u64 {
739        self.total_tokens_in.saturating_sub(self.total_tokens_out)
740    }
741
742    pub fn reduction_pct(&self) -> f64 {
743        if self.total_tokens_in == 0 {
744            0.0
745        } else {
746            (1.0 - self.total_tokens_out as f64 / self.total_tokens_in as f64) * 100.0
747        }
748    }
749}
750
751/// A single day's compression gain.
752#[derive(Debug, Clone)]
753pub struct DailyGain {
754    pub date: String,
755    pub compressions: u32,
756    pub tokens_saved: u64,
757    pub tokens_in: u64,
758}
759
760// ── Tests ─────────────────────────────────────────────────────────────────────
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765    use crate::types::{BudgetState, CorrectionLog, ModelFamily, SessionState};
766    use chrono::Utc;
767    use proptest::prelude::*;
768    use std::path::PathBuf;
769
770    fn make_session(id: &str, project_dir: &str, summary: &str) -> SessionState {
771        let now = Utc::now();
772        SessionState {
773            id: id.to_string(),
774            project_dir: PathBuf::from(project_dir),
775            conversation: vec![],
776            corrections: CorrectionLog::default(),
777            pins: vec![],
778            learnings: vec![],
779            compressed_summary: summary.to_string(),
780            budget: BudgetState {
781                window_size: 200_000,
782                consumed: 0,
783                pinned: 0,
784                model_family: ModelFamily::AnthropicClaude,
785            },
786            tool_usage: vec![],
787            created_at: now,
788            updated_at: now,
789        }
790    }
791
792    fn in_memory_store() -> SessionStore {
793        let conn = Connection::open_in_memory().unwrap();
794        apply_schema(&conn).unwrap();
795        SessionStore { db: conn }
796    }
797
798    #[test]
799    fn test_save_and_load_session() {
800        let store = in_memory_store();
801        let session = make_session("sess-1", "/home/user/project", "REST API refactor");
802
803        let id = store.save_session(&session).unwrap();
804        assert_eq!(id, "sess-1");
805
806        let loaded = store.load_session("sess-1".to_string()).unwrap();
807        assert_eq!(loaded.id, session.id);
808        assert_eq!(loaded.compressed_summary, session.compressed_summary);
809        assert_eq!(loaded.project_dir, session.project_dir);
810    }
811
812    #[test]
813    fn test_save_session_upsert() {
814        let store = in_memory_store();
815        let mut session = make_session("sess-2", "/proj", "initial summary");
816        store.save_session(&session).unwrap();
817
818        session.compressed_summary = "updated summary".to_string();
819        store.save_session(&session).unwrap();
820
821        let loaded = store.load_session("sess-2".to_string()).unwrap();
822        assert_eq!(loaded.compressed_summary, "updated summary");
823    }
824
825    #[test]
826    fn test_load_nonexistent_session_errors() {
827        let store = in_memory_store();
828        let result = store.load_session("does-not-exist".to_string());
829        assert!(result.is_err());
830    }
831
832    #[test]
833    fn test_search_fts() {
834        let store = in_memory_store();
835        store.save_session(&make_session("s1", "/proj", "REST API refactor with authentication")).unwrap();
836        store.save_session(&make_session("s2", "/proj", "database migration postgres")).unwrap();
837
838        let results = store.search("authentication").unwrap();
839        assert_eq!(results.len(), 1);
840        assert_eq!(results[0].id, "s1");
841    }
842
843    #[test]
844    fn test_search_by_date() {
845        let store = in_memory_store();
846        let now = Utc::now();
847        let past = now - chrono::Duration::hours(2);
848        let future = now + chrono::Duration::hours(2);
849
850        store.save_session(&make_session("s1", "/proj", "recent session")).unwrap();
851
852        let results = store.search_by_date(past, future).unwrap();
853        assert!(!results.is_empty());
854        assert!(results.iter().any(|r| r.id == "s1"));
855    }
856
857    #[test]
858    fn test_search_by_project() {
859        let store = in_memory_store();
860        store.save_session(&make_session("s1", "/home/user/alpha", "alpha project")).unwrap();
861        store.save_session(&make_session("s2", "/home/user/beta", "beta project")).unwrap();
862
863        let results = store.search_by_project(Path::new("/home/user/alpha")).unwrap();
864        assert_eq!(results.len(), 1);
865        assert_eq!(results[0].id, "s1");
866    }
867
868    #[test]
869    fn test_cache_entry_round_trip() {
870        let store = in_memory_store();
871        let entry = CompressedContent {
872            data: "compressed data".to_string(),
873            tokens_compressed: 10,
874            tokens_original: 50,
875            stages_applied: vec!["strip_nulls".to_string()],
876            compression_ratio: 0.2,
877            provenance: crate::types::Provenance::default(),
878            verify: None,
879        };
880
881        store.save_cache_entry("abc123", &entry).unwrap();
882
883        let loaded = store.get_cache_entry("abc123").unwrap().unwrap();
884        assert_eq!(loaded.data, entry.data);
885        assert_eq!(loaded.tokens_compressed, entry.tokens_compressed);
886        assert_eq!(loaded.tokens_original, entry.tokens_original);
887    }
888
889    #[test]
890    fn test_get_cache_entry_missing_returns_none() {
891        let store = in_memory_store();
892        let result = store.get_cache_entry("nonexistent").unwrap();
893        assert!(result.is_none());
894    }
895
896    #[test]
897    fn test_open_or_create_corrupted_db() {
898        let dir = tempfile::tempdir().unwrap();
899        let path = dir.path().join("store.db");
900
901        // Write garbage bytes to simulate a corrupted database.
902        std::fs::write(&path, b"this is not a valid sqlite database").unwrap();
903
904        // Should succeed by creating a fresh database.
905        let store = SessionStore::open_or_create(&path).unwrap();
906        let session = make_session("s1", "/proj", "after corruption");
907        store.save_session(&session).unwrap();
908        let loaded = store.load_session("s1".to_string()).unwrap();
909        assert_eq!(loaded.id, "s1");
910    }
911
912    // ── Property-based tests ──────────────────────────────────────────────────
913
914    /// Build a `SessionState` with a specific `updated_at` timestamp.
915    fn make_session_at(id: &str, summary: &str, updated_at: DateTime<Utc>) -> SessionState {
916        let now = Utc::now();
917        SessionState {
918            id: id.to_string(),
919            project_dir: PathBuf::from("/proj"),
920            conversation: vec![],
921            corrections: CorrectionLog::default(),
922            pins: vec![],
923            learnings: vec![],
924            compressed_summary: summary.to_string(),
925            budget: BudgetState {
926                window_size: 200_000,
927                consumed: 0,
928                pinned: 0,
929                model_family: ModelFamily::AnthropicClaude,
930            },
931            tool_usage: vec![],
932            created_at: now,
933            updated_at,
934        }
935    }
936
937    // ── Property 26: Session store search correctness ─────────────────────────
938    // **Validates: Requirements 20.2, 20.3, 20.4**
939    //
940    // For any set of sessions saved to the store, a keyword search SHALL return
941    // all sessions whose compressed_summary contains the keyword, and no
942    // sessions that don't contain it.
943
944    proptest! {
945        /// **Validates: Requirements 20.2, 20.3, 20.4**
946        ///
947        /// For any set of sessions saved to the store, a keyword search SHALL
948        /// return all sessions whose `compressed_summary` contains the keyword,
949        /// and no sessions that don't contain it.
950        #[test]
951        fn prop_search_correctness(
952            // A simple ASCII keyword: 5-8 lowercase letters, no common English
953            // words that the porter stemmer might conflate with other terms.
954            keyword in "[b-df-hj-np-tv-z]{5,8}",
955            // 1-6 summaries that embed the keyword
956            matching_suffixes in proptest::collection::vec("[a-z ]{4,20}", 1..=6usize),
957            // 1-6 summaries that do NOT contain the keyword
958            non_matching in proptest::collection::vec("[a-z ]{8,30}", 1..=6usize),
959        ) {
960            // Ensure the keyword doesn't accidentally appear in non-matching summaries.
961            for s in &non_matching {
962                prop_assume!(!s.contains(keyword.as_str()));
963            }
964
965            let store = in_memory_store();
966
967            // Save matching sessions (summary = "<suffix> <keyword> <suffix>")
968            let mut matching_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
969            for (i, suffix) in matching_suffixes.iter().enumerate() {
970                let id = format!("match-{i}");
971                let summary = format!("{} {} end", suffix, keyword);
972                store.save_session(&make_session(&id, "/proj", &summary)).unwrap();
973                matching_ids.insert(id);
974            }
975
976            // Save non-matching sessions
977            let mut non_matching_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
978            for (i, summary) in non_matching.iter().enumerate() {
979                let id = format!("nomatch-{i}");
980                store.save_session(&make_session(&id, "/proj", summary)).unwrap();
981                non_matching_ids.insert(id);
982            }
983
984            let results = store.search(&keyword).unwrap();
985            let result_ids: std::collections::HashSet<String> =
986                results.iter().map(|r| r.id.clone()).collect();
987
988            // Every matching session must appear in results.
989            for id in &matching_ids {
990                prop_assert!(
991                    result_ids.contains(id),
992                    "matching session '{}' not found in search results for keyword '{}'",
993                    id, keyword
994                );
995            }
996
997            // No non-matching session may appear in results.
998            for id in &non_matching_ids {
999                prop_assert!(
1000                    !result_ids.contains(id),
1001                    "non-matching session '{}' incorrectly appeared in search results for keyword '{}'",
1002                    id, keyword
1003                );
1004            }
1005        }
1006    }
1007
1008    // ── Property: search_by_date correctness ─────────────────────────────────
1009    // **Validates: Requirements 20.4**
1010    //
1011    // For any set of sessions with different timestamps, searching by a date
1012    // range SHALL return exactly the sessions whose `updated_at` falls within
1013    // [from, to], and no sessions outside that range.
1014
1015    proptest! {
1016        /// **Validates: Requirements 20.4**
1017        ///
1018        /// For any set of sessions with distinct timestamps, `search_by_date`
1019        /// SHALL return exactly the sessions whose `updated_at` is within
1020        /// `[from, to]`, and no sessions outside that range.
1021        #[test]
1022        fn prop_search_by_date_correctness(
1023            // Generate 2-8 offsets in seconds from epoch (spread over a wide range)
1024            offsets in proptest::collection::vec(0i64..=86400i64 * 365, 2..=8usize),
1025            // The search window: start and end offsets (relative to the minimum offset)
1026            window_start_delta in 0i64..=3600i64,
1027            window_end_delta   in 3600i64..=7200i64,
1028        ) {
1029            use chrono::TimeZone;
1030
1031            // Deduplicate offsets so each session has a unique timestamp.
1032            let mut unique_offsets: Vec<i64> = offsets.clone();
1033            unique_offsets.sort_unstable();
1034            unique_offsets.dedup();
1035            prop_assume!(unique_offsets.len() >= 2);
1036
1037            let base_offset = unique_offsets[0];
1038            let from_offset = base_offset + window_start_delta;
1039            let to_offset   = base_offset + window_end_delta;
1040
1041            let from = Utc.timestamp_opt(from_offset, 0).unwrap();
1042            let to   = Utc.timestamp_opt(to_offset,   0).unwrap();
1043
1044            let store = in_memory_store();
1045
1046            let mut in_range_ids:  std::collections::HashSet<String> = std::collections::HashSet::new();
1047            let mut out_range_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1048
1049            for (i, &offset) in unique_offsets.iter().enumerate() {
1050                let ts = Utc.timestamp_opt(offset, 0).unwrap();
1051                let id = format!("sess-{i}");
1052                let session = make_session_at(&id, "some summary", ts);
1053                store.save_session(&session).unwrap();
1054
1055                if ts >= from && ts <= to {
1056                    in_range_ids.insert(id);
1057                } else {
1058                    out_range_ids.insert(id);
1059                }
1060            }
1061
1062            let results = store.search_by_date(from, to).unwrap();
1063            let result_ids: std::collections::HashSet<String> =
1064                results.iter().map(|r| r.id.clone()).collect();
1065
1066            // Every in-range session must appear.
1067            for id in &in_range_ids {
1068                prop_assert!(
1069                    result_ids.contains(id),
1070                    "in-range session '{}' missing from search_by_date results",
1071                    id
1072                );
1073            }
1074
1075            // No out-of-range session may appear.
1076            for id in &out_range_ids {
1077                prop_assert!(
1078                    !result_ids.contains(id),
1079                    "out-of-range session '{}' incorrectly appeared in search_by_date results",
1080                    id
1081                );
1082            }
1083        }
1084    }
1085}