1use std::path::Path;
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection, OpenFlags};
6use serde::{Deserialize, Serialize};
7
8use crate::error::{Result, SqzError};
9use crate::types::{CompressedContent, SessionId, SessionState};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct SessionSummary {
14 pub id: SessionId,
15 pub project_dir: PathBuf,
16 pub compressed_summary: String,
17 pub created_at: DateTime<Utc>,
18 pub updated_at: DateTime<Utc>,
19}
20
21pub struct SessionStore {
23 db: Connection,
24}
25
26const SCHEMA: &str = r#"
29PRAGMA journal_mode = WAL;
30
31CREATE TABLE IF NOT EXISTS sessions (
32 id TEXT PRIMARY KEY,
33 project_dir TEXT NOT NULL,
34 compressed_summary TEXT NOT NULL,
35 created_at TEXT NOT NULL,
36 updated_at TEXT NOT NULL,
37 data BLOB NOT NULL
38);
39
40CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
41 id,
42 project_dir,
43 compressed_summary,
44 content='sessions',
45 content_rowid='rowid',
46 tokenize='porter ascii'
47);
48
49CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
50 INSERT INTO sessions_fts(rowid, id, project_dir, compressed_summary)
51 VALUES (new.rowid, new.id, new.project_dir, new.compressed_summary);
52END;
53
54CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
55 INSERT INTO sessions_fts(sessions_fts, rowid, id, project_dir, compressed_summary)
56 VALUES ('delete', old.rowid, old.id, old.project_dir, old.compressed_summary);
57END;
58
59CREATE TRIGGER IF NOT EXISTS sessions_au AFTER UPDATE ON sessions BEGIN
60 INSERT INTO sessions_fts(sessions_fts, rowid, id, project_dir, compressed_summary)
61 VALUES ('delete', old.rowid, old.id, old.project_dir, old.compressed_summary);
62 INSERT INTO sessions_fts(rowid, id, project_dir, compressed_summary)
63 VALUES (new.rowid, new.id, new.project_dir, new.compressed_summary);
64END;
65
66CREATE TABLE IF NOT EXISTS cache_entries (
67 hash TEXT PRIMARY KEY,
68 data TEXT NOT NULL,
69 accessed_at TEXT NOT NULL
70);
71"#;
72
73pub(crate) fn apply_schema(conn: &Connection) -> rusqlite::Result<()> {
76 conn.execute_batch(SCHEMA)
77}
78
79fn open_connection(path: &Path) -> rusqlite::Result<Connection> {
80 let conn = Connection::open(path)?;
81 apply_schema(&conn)?;
82 Ok(conn)
83}
84
85fn row_to_summary(
86 id: String,
87 project_dir: String,
88 compressed_summary: String,
89 created_at: String,
90 updated_at: String,
91) -> Result<SessionSummary> {
92 let created_at = created_at
93 .parse::<DateTime<Utc>>()
94 .map_err(|e| SqzError::Other(format!("invalid created_at timestamp: {e}")))?;
95 let updated_at = updated_at
96 .parse::<DateTime<Utc>>()
97 .map_err(|e| SqzError::Other(format!("invalid updated_at timestamp: {e}")))?;
98 Ok(SessionSummary {
99 id,
100 project_dir: PathBuf::from(project_dir),
101 compressed_summary,
102 created_at,
103 updated_at,
104 })
105}
106
107impl SessionStore {
110 #[cfg(test)]
113 pub(crate) fn from_connection(conn: Connection) -> Self {
114 Self { db: conn }
115 }
116
117 pub fn open(path: &Path) -> Result<Self> {
120 let conn = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE)?;
121 apply_schema(&conn)?;
122 Ok(Self { db: conn })
123 }
124
125 pub fn open_or_create(path: &Path) -> Result<Self> {
129 match open_connection(path) {
130 Ok(conn) => Ok(Self { db: conn }),
131 Err(e) => {
132 eprintln!(
133 "sqz warning: session store at '{}' is corrupted or inaccessible ({e}). \
134 Creating a new database. Prior session data has been lost.",
135 path.display()
136 );
137 if path.exists() {
139 let _ = std::fs::remove_file(path);
140 }
141 let conn = open_connection(path)
142 .map_err(|e2| SqzError::Other(format!("failed to create new session store: {e2}")))?;
143 Ok(Self { db: conn })
144 }
145 }
146 }
147
148 pub fn save_session(&self, session: &SessionState) -> Result<SessionId> {
152 let data = serde_json::to_vec(session)?;
153 let project_dir = session.project_dir.to_string_lossy().to_string();
154 let created_at = session.created_at.to_rfc3339();
155 let updated_at = session.updated_at.to_rfc3339();
156
157 self.db.execute(
158 r#"INSERT INTO sessions (id, project_dir, compressed_summary, created_at, updated_at, data)
159 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
160 ON CONFLICT(id) DO UPDATE SET
161 project_dir = excluded.project_dir,
162 compressed_summary = excluded.compressed_summary,
163 created_at = excluded.created_at,
164 updated_at = excluded.updated_at,
165 data = excluded.data"#,
166 params![
167 session.id,
168 project_dir,
169 session.compressed_summary,
170 created_at,
171 updated_at,
172 data,
173 ],
174 )?;
175
176 Ok(session.id.clone())
177 }
178
179 pub fn load_session(&self, id: SessionId) -> Result<SessionState> {
181 let data: Vec<u8> = self.db.query_row(
182 "SELECT data FROM sessions WHERE id = ?1",
183 params![id],
184 |row| row.get(0),
185 )?;
186 let session: SessionState = serde_json::from_slice(&data)?;
187 Ok(session)
188 }
189
190 pub fn search(&self, query: &str) -> Result<Vec<SessionSummary>> {
194 let mut stmt = self.db.prepare(
195 r#"SELECT s.id, s.project_dir, s.compressed_summary, s.created_at, s.updated_at
196 FROM sessions s
197 JOIN sessions_fts f ON s.rowid = f.rowid
198 WHERE sessions_fts MATCH ?1
199 ORDER BY rank"#,
200 )?;
201
202 let rows = stmt.query_map(params![query], |row| {
203 Ok((
204 row.get::<_, String>(0)?,
205 row.get::<_, String>(1)?,
206 row.get::<_, String>(2)?,
207 row.get::<_, String>(3)?,
208 row.get::<_, String>(4)?,
209 ))
210 })?;
211
212 let mut results = Vec::new();
213 for row in rows {
214 let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
215 results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
216 }
217 Ok(results)
218 }
219
220 pub fn search_by_date(
222 &self,
223 from: DateTime<Utc>,
224 to: DateTime<Utc>,
225 ) -> Result<Vec<SessionSummary>> {
226 let mut stmt = self.db.prepare(
227 r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
228 FROM sessions
229 WHERE updated_at >= ?1 AND updated_at <= ?2
230 ORDER BY updated_at DESC"#,
231 )?;
232
233 let rows = stmt.query_map(params![from.to_rfc3339(), to.to_rfc3339()], |row| {
234 Ok((
235 row.get::<_, String>(0)?,
236 row.get::<_, String>(1)?,
237 row.get::<_, String>(2)?,
238 row.get::<_, String>(3)?,
239 row.get::<_, String>(4)?,
240 ))
241 })?;
242
243 let mut results = Vec::new();
244 for row in rows {
245 let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
246 results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
247 }
248 Ok(results)
249 }
250
251 pub fn search_by_project(&self, dir: &Path) -> Result<Vec<SessionSummary>> {
253 let dir_str = dir.to_string_lossy().to_string();
254 let mut stmt = self.db.prepare(
255 r#"SELECT id, project_dir, compressed_summary, created_at, updated_at
256 FROM sessions
257 WHERE project_dir = ?1
258 ORDER BY updated_at DESC"#,
259 )?;
260
261 let rows = stmt.query_map(params![dir_str], |row| {
262 Ok((
263 row.get::<_, String>(0)?,
264 row.get::<_, String>(1)?,
265 row.get::<_, String>(2)?,
266 row.get::<_, String>(3)?,
267 row.get::<_, String>(4)?,
268 ))
269 })?;
270
271 let mut results = Vec::new();
272 for row in rows {
273 let (id, project_dir, compressed_summary, created_at, updated_at) = row?;
274 results.push(row_to_summary(id, project_dir, compressed_summary, created_at, updated_at)?);
275 }
276 Ok(results)
277 }
278
279 pub fn save_cache_entry(&self, hash: &str, compressed: &CompressedContent) -> Result<()> {
283 let data = serde_json::to_string(compressed)?;
284 let now = Utc::now().to_rfc3339();
285 self.db.execute(
286 r#"INSERT INTO cache_entries (hash, data, accessed_at)
287 VALUES (?1, ?2, ?3)
288 ON CONFLICT(hash) DO UPDATE SET data = excluded.data, accessed_at = excluded.accessed_at"#,
289 params![hash, data, now],
290 )?;
291 Ok(())
292 }
293
294 pub fn delete_cache_entry(&self, hash: &str) -> Result<()> {
296 self.db.execute(
297 "DELETE FROM cache_entries WHERE hash = ?1",
298 params![hash],
299 )?;
300 Ok(())
301 }
302
303 pub fn list_cache_entries_lru(&self) -> Result<Vec<(String, u64)>> {
307 let mut stmt = self.db.prepare(
308 "SELECT hash, length(data) FROM cache_entries ORDER BY accessed_at ASC",
309 )?;
310 let rows = stmt.query_map([], |row| {
311 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
312 })?;
313 let mut entries = Vec::new();
314 for row in rows {
315 let (hash, size) = row?;
316 entries.push((hash, size as u64));
317 }
318 Ok(entries)
319 }
320
321 pub fn get_cache_entry(&self, hash: &str) -> Result<Option<CompressedContent>> {
323 let result: rusqlite::Result<String> = self.db.query_row(
324 "SELECT data FROM cache_entries WHERE hash = ?1",
325 params![hash],
326 |row| row.get(0),
327 );
328
329 match result {
330 Ok(data) => {
331 let now = Utc::now().to_rfc3339();
333 let _ = self.db.execute(
334 "UPDATE cache_entries SET accessed_at = ?1 WHERE hash = ?2",
335 params![now, hash],
336 );
337 let entry: CompressedContent = serde_json::from_str(&data)?;
338 Ok(Some(entry))
339 }
340 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
341 Err(e) => Err(SqzError::SessionStore(e)),
342 }
343 }
344}
345
346#[cfg(test)]
349mod tests {
350 use super::*;
351 use crate::types::{BudgetState, CorrectionLog, ModelFamily, SessionState};
352 use chrono::Utc;
353 use proptest::prelude::*;
354 use std::path::PathBuf;
355
356 fn make_session(id: &str, project_dir: &str, summary: &str) -> SessionState {
357 let now = Utc::now();
358 SessionState {
359 id: id.to_string(),
360 project_dir: PathBuf::from(project_dir),
361 conversation: vec![],
362 corrections: CorrectionLog::default(),
363 pins: vec![],
364 learnings: vec![],
365 compressed_summary: summary.to_string(),
366 budget: BudgetState {
367 window_size: 200_000,
368 consumed: 0,
369 pinned: 0,
370 model_family: ModelFamily::AnthropicClaude,
371 },
372 tool_usage: vec![],
373 created_at: now,
374 updated_at: now,
375 }
376 }
377
378 fn in_memory_store() -> SessionStore {
379 let conn = Connection::open_in_memory().unwrap();
380 apply_schema(&conn).unwrap();
381 SessionStore { db: conn }
382 }
383
384 #[test]
385 fn test_save_and_load_session() {
386 let store = in_memory_store();
387 let session = make_session("sess-1", "/home/user/project", "REST API refactor");
388
389 let id = store.save_session(&session).unwrap();
390 assert_eq!(id, "sess-1");
391
392 let loaded = store.load_session("sess-1".to_string()).unwrap();
393 assert_eq!(loaded.id, session.id);
394 assert_eq!(loaded.compressed_summary, session.compressed_summary);
395 assert_eq!(loaded.project_dir, session.project_dir);
396 }
397
398 #[test]
399 fn test_save_session_upsert() {
400 let store = in_memory_store();
401 let mut session = make_session("sess-2", "/proj", "initial summary");
402 store.save_session(&session).unwrap();
403
404 session.compressed_summary = "updated summary".to_string();
405 store.save_session(&session).unwrap();
406
407 let loaded = store.load_session("sess-2".to_string()).unwrap();
408 assert_eq!(loaded.compressed_summary, "updated summary");
409 }
410
411 #[test]
412 fn test_load_nonexistent_session_errors() {
413 let store = in_memory_store();
414 let result = store.load_session("does-not-exist".to_string());
415 assert!(result.is_err());
416 }
417
418 #[test]
419 fn test_search_fts() {
420 let store = in_memory_store();
421 store.save_session(&make_session("s1", "/proj", "REST API refactor with authentication")).unwrap();
422 store.save_session(&make_session("s2", "/proj", "database migration postgres")).unwrap();
423
424 let results = store.search("authentication").unwrap();
425 assert_eq!(results.len(), 1);
426 assert_eq!(results[0].id, "s1");
427 }
428
429 #[test]
430 fn test_search_by_date() {
431 let store = in_memory_store();
432 let now = Utc::now();
433 let past = now - chrono::Duration::hours(2);
434 let future = now + chrono::Duration::hours(2);
435
436 store.save_session(&make_session("s1", "/proj", "recent session")).unwrap();
437
438 let results = store.search_by_date(past, future).unwrap();
439 assert!(!results.is_empty());
440 assert!(results.iter().any(|r| r.id == "s1"));
441 }
442
443 #[test]
444 fn test_search_by_project() {
445 let store = in_memory_store();
446 store.save_session(&make_session("s1", "/home/user/alpha", "alpha project")).unwrap();
447 store.save_session(&make_session("s2", "/home/user/beta", "beta project")).unwrap();
448
449 let results = store.search_by_project(Path::new("/home/user/alpha")).unwrap();
450 assert_eq!(results.len(), 1);
451 assert_eq!(results[0].id, "s1");
452 }
453
454 #[test]
455 fn test_cache_entry_round_trip() {
456 let store = in_memory_store();
457 let entry = CompressedContent {
458 data: "compressed data".to_string(),
459 tokens_compressed: 10,
460 tokens_original: 50,
461 stages_applied: vec!["strip_nulls".to_string()],
462 compression_ratio: 0.2,
463 };
464
465 store.save_cache_entry("abc123", &entry).unwrap();
466
467 let loaded = store.get_cache_entry("abc123").unwrap().unwrap();
468 assert_eq!(loaded.data, entry.data);
469 assert_eq!(loaded.tokens_compressed, entry.tokens_compressed);
470 assert_eq!(loaded.tokens_original, entry.tokens_original);
471 }
472
473 #[test]
474 fn test_get_cache_entry_missing_returns_none() {
475 let store = in_memory_store();
476 let result = store.get_cache_entry("nonexistent").unwrap();
477 assert!(result.is_none());
478 }
479
480 #[test]
481 fn test_open_or_create_corrupted_db() {
482 let dir = tempfile::tempdir().unwrap();
483 let path = dir.path().join("store.db");
484
485 std::fs::write(&path, b"this is not a valid sqlite database").unwrap();
487
488 let store = SessionStore::open_or_create(&path).unwrap();
490 let session = make_session("s1", "/proj", "after corruption");
491 store.save_session(&session).unwrap();
492 let loaded = store.load_session("s1".to_string()).unwrap();
493 assert_eq!(loaded.id, "s1");
494 }
495
496 fn make_session_at(id: &str, summary: &str, updated_at: DateTime<Utc>) -> SessionState {
500 let now = Utc::now();
501 SessionState {
502 id: id.to_string(),
503 project_dir: PathBuf::from("/proj"),
504 conversation: vec![],
505 corrections: CorrectionLog::default(),
506 pins: vec![],
507 learnings: vec![],
508 compressed_summary: summary.to_string(),
509 budget: BudgetState {
510 window_size: 200_000,
511 consumed: 0,
512 pinned: 0,
513 model_family: ModelFamily::AnthropicClaude,
514 },
515 tool_usage: vec![],
516 created_at: now,
517 updated_at,
518 }
519 }
520
521 proptest! {
529 #[test]
535 fn prop_search_correctness(
536 keyword in "[b-df-hj-np-tv-z]{5,8}",
539 matching_suffixes in proptest::collection::vec("[a-z ]{4,20}", 1..=6usize),
541 non_matching in proptest::collection::vec("[a-z ]{8,30}", 1..=6usize),
543 ) {
544 for s in &non_matching {
546 prop_assume!(!s.contains(keyword.as_str()));
547 }
548
549 let store = in_memory_store();
550
551 let mut matching_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
553 for (i, suffix) in matching_suffixes.iter().enumerate() {
554 let id = format!("match-{i}");
555 let summary = format!("{} {} end", suffix, keyword);
556 store.save_session(&make_session(&id, "/proj", &summary)).unwrap();
557 matching_ids.insert(id);
558 }
559
560 let mut non_matching_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
562 for (i, summary) in non_matching.iter().enumerate() {
563 let id = format!("nomatch-{i}");
564 store.save_session(&make_session(&id, "/proj", summary)).unwrap();
565 non_matching_ids.insert(id);
566 }
567
568 let results = store.search(&keyword).unwrap();
569 let result_ids: std::collections::HashSet<String> =
570 results.iter().map(|r| r.id.clone()).collect();
571
572 for id in &matching_ids {
574 prop_assert!(
575 result_ids.contains(id),
576 "matching session '{}' not found in search results for keyword '{}'",
577 id, keyword
578 );
579 }
580
581 for id in &non_matching_ids {
583 prop_assert!(
584 !result_ids.contains(id),
585 "non-matching session '{}' incorrectly appeared in search results for keyword '{}'",
586 id, keyword
587 );
588 }
589 }
590 }
591
592 proptest! {
600 #[test]
606 fn prop_search_by_date_correctness(
607 offsets in proptest::collection::vec(0i64..=86400i64 * 365, 2..=8usize),
609 window_start_delta in 0i64..=3600i64,
611 window_end_delta in 3600i64..=7200i64,
612 ) {
613 use chrono::TimeZone;
614
615 let mut unique_offsets: Vec<i64> = offsets.clone();
617 unique_offsets.sort_unstable();
618 unique_offsets.dedup();
619 prop_assume!(unique_offsets.len() >= 2);
620
621 let base_offset = unique_offsets[0];
622 let from_offset = base_offset + window_start_delta;
623 let to_offset = base_offset + window_end_delta;
624
625 let from = Utc.timestamp_opt(from_offset, 0).unwrap();
626 let to = Utc.timestamp_opt(to_offset, 0).unwrap();
627
628 let store = in_memory_store();
629
630 let mut in_range_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
631 let mut out_range_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
632
633 for (i, &offset) in unique_offsets.iter().enumerate() {
634 let ts = Utc.timestamp_opt(offset, 0).unwrap();
635 let id = format!("sess-{i}");
636 let session = make_session_at(&id, "some summary", ts);
637 store.save_session(&session).unwrap();
638
639 if ts >= from && ts <= to {
640 in_range_ids.insert(id);
641 } else {
642 out_range_ids.insert(id);
643 }
644 }
645
646 let results = store.search_by_date(from, to).unwrap();
647 let result_ids: std::collections::HashSet<String> =
648 results.iter().map(|r| r.id.clone()).collect();
649
650 for id in &in_range_ids {
652 prop_assert!(
653 result_ids.contains(id),
654 "in-range session '{}' missing from search_by_date results",
655 id
656 );
657 }
658
659 for id in &out_range_ids {
661 prop_assert!(
662 !result_ids.contains(id),
663 "out-of-range session '{}' incorrectly appeared in search_by_date results",
664 id
665 );
666 }
667 }
668 }
669}