1use crate::models::activity::ActivitySummary;
28use crate::models::SessionMetadata;
29use anyhow::{Context, Result};
30use rusqlite::{params, Connection, OptionalExtension};
31use std::path::{Path, PathBuf};
32use std::sync::Mutex;
33use std::time::SystemTime;
34use tracing::{debug, warn};
35
36const CACHE_VERSION: i32 = 7;
55
56pub struct MetadataCache {
58 conn: Mutex<Connection>,
59 #[allow(dead_code)]
60 cache_path: PathBuf,
61}
62
63impl MetadataCache {
64 pub fn new(cache_dir: &Path) -> Result<Self> {
66 std::fs::create_dir_all(cache_dir).with_context(|| {
67 format!("Failed to create cache directory: {}", cache_dir.display())
68 })?;
69
70 let cache_path = cache_dir.join("session-metadata.db");
71 let conn = Connection::open(&cache_path)
72 .with_context(|| format!("Failed to open cache database: {}", cache_path.display()))?;
73
74 conn.pragma_update(None, "journal_mode", "WAL")
76 .context("Failed to enable WAL mode")?;
77
78 conn.execute_batch(
80 r#"
81 CREATE TABLE IF NOT EXISTS cache_metadata (
82 key TEXT PRIMARY KEY,
83 value INTEGER NOT NULL
84 );
85
86 CREATE TABLE IF NOT EXISTS session_metadata (
87 path TEXT PRIMARY KEY,
88 mtime INTEGER NOT NULL,
89 project TEXT NOT NULL,
90 session_id TEXT NOT NULL,
91 first_timestamp TEXT,
92 last_timestamp TEXT,
93 message_count INTEGER NOT NULL,
94 total_tokens INTEGER NOT NULL,
95 models_used TEXT NOT NULL,
96 has_subagents INTEGER NOT NULL,
97 first_user_message TEXT,
98 data BLOB NOT NULL
99 );
100
101 CREATE INDEX IF NOT EXISTS idx_project ON session_metadata(project);
102 CREATE INDEX IF NOT EXISTS idx_mtime ON session_metadata(mtime);
103 CREATE INDEX IF NOT EXISTS idx_session_id ON session_metadata(session_id);
104
105 CREATE TABLE IF NOT EXISTS activity_cache (
106 session_path TEXT PRIMARY KEY,
107 mtime INTEGER NOT NULL,
108 session_id TEXT NOT NULL,
109 tool_call_count INTEGER NOT NULL DEFAULT 0,
110 alert_count INTEGER NOT NULL DEFAULT 0,
111 data BLOB NOT NULL
112 );
113
114 CREATE INDEX IF NOT EXISTS idx_activity_session_id ON activity_cache(session_id);
115 CREATE INDEX IF NOT EXISTS idx_activity_mtime ON activity_cache(mtime);
116
117 CREATE TABLE IF NOT EXISTS activity_alerts (
118 id INTEGER PRIMARY KEY AUTOINCREMENT,
119 session_path TEXT NOT NULL,
120 severity TEXT NOT NULL,
121 category TEXT NOT NULL,
122 timestamp TEXT NOT NULL,
123 detail TEXT NOT NULL
124 );
125
126 CREATE INDEX IF NOT EXISTS idx_alerts_session ON activity_alerts(session_path);
127 CREATE INDEX IF NOT EXISTS idx_alerts_severity ON activity_alerts(severity);
128
129 CREATE TABLE IF NOT EXISTS aggregate_stats (
130 key TEXT PRIMARY KEY,
131 value INTEGER NOT NULL DEFAULT 0
132 );
133
134 INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES
135 ('total_sessions', 0),
136 ('total_messages', 0);
137
138 CREATE TRIGGER IF NOT EXISTS stats_ai
139 AFTER INSERT ON session_metadata BEGIN
140 UPDATE aggregate_stats SET value = value + 1 WHERE key = 'total_sessions';
141 UPDATE aggregate_stats SET value = value + new.message_count WHERE key = 'total_messages';
142 END;
143
144 CREATE TRIGGER IF NOT EXISTS stats_ad
145 AFTER DELETE ON session_metadata BEGIN
146 UPDATE aggregate_stats SET value = MAX(0, value - 1) WHERE key = 'total_sessions';
147 UPDATE aggregate_stats SET value = MAX(0, value - old.message_count) WHERE key = 'total_messages';
148 END;
149
150 CREATE VIRTUAL TABLE IF NOT EXISTS session_fts USING fts5(
151 session_id UNINDEXED,
152 project UNINDEXED,
153 first_user_message,
154 models_used,
155 content='session_metadata',
156 content_rowid='rowid',
157 tokenize='unicode61'
158 );
159
160 CREATE TRIGGER IF NOT EXISTS session_fts_ai
161 AFTER INSERT ON session_metadata BEGIN
162 INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
163 VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
164 END;
165
166 CREATE TRIGGER IF NOT EXISTS session_fts_ad
167 AFTER DELETE ON session_metadata BEGIN
168 INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
169 VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
170 END;
171
172 CREATE TRIGGER IF NOT EXISTS session_fts_au
173 AFTER UPDATE ON session_metadata BEGIN
174 INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
175 VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
176 INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
177 VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
178 END;
179 "#,
180 )
181 .context("Failed to create schema")?;
182
183 let stored_version: Option<i32> = conn
185 .query_row(
186 "SELECT value FROM cache_metadata WHERE key = 'version'",
187 [],
188 |row| row.get(0),
189 )
190 .optional()
191 .context("Failed to query cache version")?;
192
193 match stored_version {
194 Some(v) if v != CACHE_VERSION => {
195 warn!(
196 stored = v,
197 current = CACHE_VERSION,
198 "Cache version mismatch detected, clearing stale cache"
199 );
200
201 conn.execute("DELETE FROM session_metadata", [])
203 .context("Failed to clear stale session cache")?;
204 conn.execute("DELETE FROM activity_cache", [])
205 .context("Failed to clear stale activity cache")?;
206 conn.execute("DELETE FROM activity_alerts", [])
207 .context("Failed to clear stale activity alerts")?;
208 conn.execute("DELETE FROM aggregate_stats", [])
209 .context("Failed to clear stale aggregate stats")?;
210 conn.execute(
211 "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_sessions', 0)",
212 [],
213 )
214 .context("Failed to reinitialize total_sessions")?;
215 conn.execute(
216 "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_messages', 0)",
217 [],
218 )
219 .context("Failed to reinitialize total_messages")?;
220
221 conn.execute(
223 "INSERT OR REPLACE INTO cache_metadata (key, value) VALUES ('version', ?)",
224 params![CACHE_VERSION],
225 )
226 .context("Failed to update cache version")?;
227
228 debug!("Cache cleared and version updated to {}", CACHE_VERSION);
229 }
230 None => {
231 conn.execute(
233 "INSERT INTO cache_metadata (key, value) VALUES ('version', ?)",
234 params![CACHE_VERSION],
235 )
236 .context("Failed to initialize cache version")?;
237
238 debug!("Cache version initialized to {}", CACHE_VERSION);
239 }
240 Some(_) => {
241 debug!("Cache version {} matches current", CACHE_VERSION);
242 }
243 }
244
245 let cache = Self {
246 conn: Mutex::new(conn),
247 cache_path: cache_path.clone(),
248 };
249
250 debug!(path = %cache_path.display(), "Metadata cache initialized");
251
252 Ok(cache)
253 }
254
255 pub fn get(&self, path: &Path, current_mtime: SystemTime) -> Result<Option<SessionMetadata>> {
257 let path_str = path.to_string_lossy();
258 let mtime_secs = current_mtime
259 .duration_since(SystemTime::UNIX_EPOCH)
260 .context("Invalid mtime")?
261 .as_secs();
262
263 let conn = self
264 .conn
265 .lock()
266 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
267
268 let result: Option<Vec<u8>> = conn
269 .query_row(
270 "SELECT data FROM session_metadata WHERE path = ? AND mtime = ?",
271 params![path_str.as_ref(), mtime_secs as i64],
272 |row| row.get(0),
273 )
274 .optional()
275 .context("Failed to query cache")?;
276
277 match result {
278 Some(bytes) => {
279 let meta: SessionMetadata = bincode::deserialize(&bytes)
280 .context("Failed to deserialize cached metadata")?;
281 debug!(path = %path.display(), "Cache hit");
282 Ok(Some(meta))
283 }
284 None => {
285 debug!(path = %path.display(), "Cache miss");
286 Ok(None)
287 }
288 }
289 }
290
291 pub fn put(&self, path: &Path, meta: &SessionMetadata, mtime: SystemTime) -> Result<()> {
293 let path_str = path.to_string_lossy();
294 let mtime_secs = mtime
295 .duration_since(SystemTime::UNIX_EPOCH)
296 .context("Invalid mtime")?
297 .as_secs();
298
299 let data = bincode::serialize(meta).context("Failed to serialize metadata")?;
300
301 let models_used =
303 serde_json::to_string(&meta.models_used).context("Failed to serialize models")?;
304
305 let conn = self
306 .conn
307 .lock()
308 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
309
310 conn.execute(
312 r#"
313 INSERT OR IGNORE INTO session_metadata
314 (path, mtime, project, session_id, first_timestamp, last_timestamp,
315 message_count, total_tokens, models_used, has_subagents, first_user_message, data)
316 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
317 "#,
318 params![
319 path_str.as_ref(),
320 mtime_secs as i64,
321 meta.project_path.as_str(),
322 meta.id.as_str(),
323 meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
324 meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
325 meta.message_count as i64,
326 meta.total_tokens as i64,
327 models_used.as_str(),
328 if meta.has_subagents { 1 } else { 0 },
329 &meta.first_user_message,
330 &data,
331 ],
332 )
333 .context("Failed to insert metadata")?;
334
335 conn.execute(
337 r#"
338 UPDATE session_metadata
339 SET mtime = ?, project = ?, session_id = ?, first_timestamp = ?, last_timestamp = ?,
340 message_count = ?, total_tokens = ?, models_used = ?, has_subagents = ?,
341 first_user_message = ?, data = ?
342 WHERE path = ? AND mtime != ?
343 "#,
344 params![
345 mtime_secs as i64,
346 meta.project_path.as_str(),
347 meta.id.as_str(),
348 meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
349 meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
350 meta.message_count as i64,
351 meta.total_tokens as i64,
352 models_used.as_str(),
353 if meta.has_subagents { 1 } else { 0 },
354 &meta.first_user_message,
355 &data,
356 path_str.as_ref(),
357 mtime_secs as i64,
358 ],
359 )
360 .context("Failed to update metadata")?;
361
362 debug!(path = %path.display(), "Metadata cached");
363 Ok(())
364 }
365
366 pub fn invalidate(&self, path: &Path) -> Result<()> {
368 let path_str = path.to_string_lossy();
369
370 let conn = self
371 .conn
372 .lock()
373 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
374
375 conn.execute(
376 "DELETE FROM session_metadata WHERE path = ?",
377 params![path_str.as_ref()],
378 )
379 .context("Failed to delete cache entry")?;
380
381 debug!(path = %path.display(), "Cache entry invalidated");
382 Ok(())
383 }
384
385 pub fn get_project_paths(&self, project: &str) -> Result<Vec<PathBuf>> {
387 let conn = self
388 .conn
389 .lock()
390 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
391
392 let mut stmt = conn
393 .prepare("SELECT path FROM session_metadata WHERE project = ?")
394 .context("Failed to prepare query")?;
395
396 let rows = stmt
397 .query_map(params![project], |row| {
398 let path_str: String = row.get(0)?;
399 Ok(PathBuf::from(path_str))
400 })
401 .context("Failed to query project paths")?;
402
403 let mut paths = Vec::new();
404 for row in rows {
405 paths.push(row.context("Failed to read row")?);
406 }
407
408 Ok(paths)
409 }
410
411 pub fn stats(&self) -> Result<CacheStats> {
413 let conn = self
414 .conn
415 .lock()
416 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
417
418 let total_entries: i64 = conn
419 .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
420 row.get(0)
421 })
422 .context("Failed to count entries")?;
423
424 let total_size: i64 = conn
425 .query_row(
426 "SELECT SUM(LENGTH(data)) FROM session_metadata",
427 [],
428 |row| row.get(0),
429 )
430 .unwrap_or(0);
431
432 let project_count: i64 = conn
433 .query_row(
434 "SELECT COUNT(DISTINCT project) FROM session_metadata",
435 [],
436 |row| row.get(0),
437 )
438 .context("Failed to count projects")?;
439
440 Ok(CacheStats {
441 total_entries: total_entries as usize,
442 total_size_bytes: total_size as usize,
443 project_count: project_count as usize,
444 })
445 }
446
447 pub fn clear(&self) -> Result<()> {
449 let conn = self
450 .conn
451 .lock()
452 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
453
454 conn.execute("DELETE FROM session_metadata", [])
455 .context("Failed to clear cache")?;
456
457 debug!("Cache cleared");
458 Ok(())
459 }
460
461 pub fn vacuum(&self) -> Result<()> {
463 let conn = self
464 .conn
465 .lock()
466 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
467
468 conn.execute("VACUUM", []).context("Failed to vacuum")?;
469
470 debug!("Database vacuumed");
471 Ok(())
472 }
473
474 pub fn get_aggregate_stats(&self) -> Result<AggregateStats> {
478 let conn = self
479 .conn
480 .lock()
481 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
482
483 let mut stmt = conn
484 .prepare("SELECT key, value FROM aggregate_stats")
485 .context("Failed to prepare aggregate_stats query")?;
486
487 let mut total_sessions = 0usize;
488 let mut total_messages = 0usize;
489
490 let rows = stmt
491 .query_map([], |row| {
492 let key: String = row.get(0)?;
493 let value: i64 = row.get(1)?;
494 Ok((key, value))
495 })
496 .context("Failed to query aggregate_stats")?;
497
498 for row in rows {
499 let (key, value) = row.context("Failed to read aggregate_stats row")?;
500 match key.as_str() {
501 "total_sessions" => total_sessions = value.max(0) as usize,
502 "total_messages" => total_messages = value.max(0) as usize,
503 _ => {}
504 }
505 }
506
507 Ok(AggregateStats {
508 total_sessions,
509 total_messages,
510 })
511 }
512
513 pub fn search_sessions(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
518 if query.trim().is_empty() {
519 return Ok(Vec::new());
520 }
521
522 let conn = self
523 .conn
524 .lock()
525 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
526
527 let fts_exists: bool = conn
529 .query_row(
530 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='session_fts'",
531 [],
532 |row| row.get::<_, i64>(0),
533 )
534 .unwrap_or(0)
535 > 0;
536
537 if !fts_exists {
538 return Ok(Vec::new());
539 }
540
541 let mut stmt = conn
542 .prepare(
543 r#"
544 SELECT
545 sm.path,
546 sm.session_id,
547 sm.project,
548 sm.first_user_message,
549 snippet(session_fts, 2, '[', ']', '...', 12) AS snippet,
550 session_fts.rank,
551 sm.first_timestamp,
552 sm.message_count
553 FROM session_fts
554 JOIN session_metadata sm ON session_fts.rowid = sm.rowid
555 WHERE session_fts MATCH ?
556 ORDER BY session_fts.rank
557 LIMIT ?
558 "#,
559 )
560 .context("Failed to prepare FTS5 search query")?;
561
562 let limit_i64 = limit as i64;
563 let rows = stmt
564 .query_map(params![query, limit_i64], |row| {
565 Ok(SearchResult {
566 path: PathBuf::from(row.get::<_, String>(0)?),
567 session_id: row.get(1)?,
568 project: row.get(2)?,
569 first_user_message: row.get(3)?,
570 snippet: row.get(4)?,
571 rank: row.get(5)?,
572 first_timestamp: row.get(6)?,
573 message_count: row.get::<_, Option<i64>>(7)?.unwrap_or(0) as u64,
574 })
575 })
576 .context("Failed to execute FTS5 search")?;
577
578 let mut results = Vec::new();
579 for row in rows {
580 match row {
581 Ok(r) => results.push(r),
582 Err(e) => {
583 warn!("FTS5 search row error: {}", e);
584 }
585 }
586 }
587
588 Ok(results)
589 }
590
591 pub fn rebuild_fts_index(&self) -> Result<usize> {
595 let conn = self
596 .conn
597 .lock()
598 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
599
600 conn.execute("INSERT INTO session_fts(session_fts) VALUES('rebuild')", [])
602 .context("Failed to trigger FTS5 rebuild")?;
603
604 let count: i64 = conn
606 .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
607 row.get(0)
608 })
609 .context("Failed to count sessions")?;
610
611 debug!("FTS5 index rebuilt for {} sessions", count);
612 Ok(count as usize)
613 }
614
615 pub fn get_activity(
621 &self,
622 path: &Path,
623 current_mtime: SystemTime,
624 ) -> Result<Option<ActivitySummary>> {
625 let path_str = path.to_string_lossy();
626 let mtime_secs = current_mtime
627 .duration_since(SystemTime::UNIX_EPOCH)
628 .context("Invalid mtime")?
629 .as_secs();
630
631 let conn = self
632 .conn
633 .lock()
634 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
635
636 let result: Option<Vec<u8>> = conn
637 .query_row(
638 "SELECT data FROM activity_cache WHERE session_path = ? AND mtime = ?",
639 params![path_str.as_ref(), mtime_secs as i64],
640 |row| row.get(0),
641 )
642 .optional()
643 .context("Failed to query activity cache")?;
644
645 match result {
646 Some(bytes) => {
647 let summary: ActivitySummary = bincode::deserialize(&bytes)
648 .context("Failed to deserialize activity summary")?;
649 debug!(path = %path.display(), "Activity cache hit");
650 Ok(Some(summary))
651 }
652 None => {
653 debug!(path = %path.display(), "Activity cache miss");
654 Ok(None)
655 }
656 }
657 }
658
659 pub fn put_activity(
663 &self,
664 path: &Path,
665 session_id: &str,
666 summary: &ActivitySummary,
667 mtime: SystemTime,
668 ) -> Result<()> {
669 let path_str = path.to_string_lossy();
670 let mtime_secs = mtime
671 .duration_since(SystemTime::UNIX_EPOCH)
672 .context("Invalid mtime")?
673 .as_secs();
674
675 let data = bincode::serialize(summary).context("Failed to serialize activity summary")?;
676
677 let conn = self
678 .conn
679 .lock()
680 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
681
682 conn.execute_batch("BEGIN IMMEDIATE")
685 .context("Failed to begin activity cache transaction")?;
686
687 let result = (|| -> anyhow::Result<()> {
688 conn.execute(
690 r#"
691 INSERT OR REPLACE INTO activity_cache
692 (session_path, mtime, session_id, tool_call_count, alert_count, data)
693 VALUES (?, ?, ?, ?, ?, ?)
694 "#,
695 params![
696 path_str.as_ref(),
697 mtime_secs as i64,
698 session_id,
699 (summary.file_accesses.len()
700 + summary.bash_commands.len()
701 + summary.network_calls.len()) as i64,
702 summary.alerts.len() as i64,
703 &data,
704 ],
705 )
706 .context("Failed to insert activity cache entry")?;
707
708 conn.execute(
710 "DELETE FROM activity_alerts WHERE session_path = ?",
711 params![path_str.as_ref()],
712 )
713 .context("Failed to delete old activity alerts")?;
714
715 for alert in &summary.alerts {
716 let severity = format!("{:?}", alert.severity);
717 let category = format!("{:?}", alert.category);
718 conn.execute(
719 r#"
720 INSERT INTO activity_alerts (session_path, severity, category, timestamp, detail)
721 VALUES (?, ?, ?, ?, ?)
722 "#,
723 params![
724 path_str.as_ref(),
725 severity,
726 category,
727 alert.timestamp.to_rfc3339(),
728 &alert.detail,
729 ],
730 )
731 .context("Failed to insert activity alert")?;
732 }
733
734 Ok(())
735 })();
736
737 match result {
738 Ok(()) => conn
739 .execute_batch("COMMIT")
740 .context("Failed to commit activity cache transaction")?,
741 Err(e) => {
742 let _ = conn.execute_batch("ROLLBACK");
743 return Err(e);
744 }
745 }
746
747 debug!(
748 path = %path.display(),
749 alerts = summary.alerts.len(),
750 "Activity summary cached"
751 );
752 Ok(())
753 }
754
755 pub fn invalidate_activity(&self, path: &Path) -> Result<()> {
759 let path_str = path.to_string_lossy();
760
761 let conn = self
762 .conn
763 .lock()
764 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
765
766 conn.execute(
767 "DELETE FROM activity_cache WHERE session_path = ?",
768 params![path_str.as_ref()],
769 )
770 .context("Failed to delete activity cache entry")?;
771
772 conn.execute(
773 "DELETE FROM activity_alerts WHERE session_path = ?",
774 params![path_str.as_ref()],
775 )
776 .context("Failed to delete activity alerts")?;
777
778 debug!(path = %path.display(), "Activity cache invalidated");
779 Ok(())
780 }
781
782 pub fn get_all_alerts(&self, min_severity: Option<&str>) -> Result<Vec<StoredAlert>> {
786 let conn = self
787 .conn
788 .lock()
789 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
790
791 let query = match min_severity {
794 Some("Critical") => "SELECT session_path, severity, category, timestamp, detail \
795 FROM activity_alerts WHERE severity = 'Critical' ORDER BY timestamp DESC",
796 Some("Warning") => "SELECT session_path, severity, category, timestamp, detail \
797 FROM activity_alerts WHERE severity IN ('Warning', 'Critical') ORDER BY timestamp DESC",
798 _ => "SELECT session_path, severity, category, timestamp, detail \
799 FROM activity_alerts ORDER BY timestamp DESC",
800 };
801
802 let mut stmt = conn
803 .prepare(query)
804 .context("Failed to prepare alert query")?;
805
806 let rows = stmt
807 .query_map([], |row| {
808 Ok(StoredAlert {
809 session_path: row.get(0)?,
810 severity: row.get(1)?,
811 category: row.get(2)?,
812 timestamp: row.get(3)?,
813 detail: row.get(4)?,
814 })
815 })
816 .context("Failed to query alerts")?
817 .collect::<Result<Vec<_>, _>>()
818 .context("Failed to collect alerts")?;
819
820 Ok(rows)
821 }
822
823 pub fn activity_stats(&self) -> Result<ActivityCacheStats> {
825 let conn = self
826 .conn
827 .lock()
828 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
829
830 let analyzed_sessions: i64 = conn
831 .query_row("SELECT COUNT(*) FROM activity_cache", [], |row| row.get(0))
832 .context("Failed to count activity cache entries")?;
833
834 let total_alerts: i64 = conn
835 .query_row("SELECT COUNT(*) FROM activity_alerts", [], |row| row.get(0))
836 .context("Failed to count alerts")?;
837
838 let critical_alerts: i64 = conn
839 .query_row(
840 "SELECT COUNT(*) FROM activity_alerts WHERE severity = 'Critical'",
841 [],
842 |row| row.get(0),
843 )
844 .context("Failed to count critical alerts")?;
845
846 Ok(ActivityCacheStats {
847 analyzed_sessions: analyzed_sessions as usize,
848 total_alerts: total_alerts as usize,
849 critical_alerts: critical_alerts as usize,
850 })
851 }
852}
853
854impl Drop for MetadataCache {
855 fn drop(&mut self) {
856 if let Ok(conn) = self.conn.lock() {
859 if let Err(e) = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE") {
860 warn!("Failed to checkpoint WAL on MetadataCache drop: {}", e);
861 } else {
862 debug!("WAL checkpoint completed on MetadataCache drop");
863 }
864 }
865 }
866}
867
868#[derive(Debug, Clone)]
870pub struct CacheStats {
871 pub total_entries: usize,
872 pub total_size_bytes: usize,
873 pub project_count: usize,
874}
875
876#[derive(Debug, Clone, Default)]
878pub struct AggregateStats {
879 pub total_sessions: usize,
880 pub total_messages: usize,
881}
882
883#[derive(Debug, Clone)]
885pub struct SearchResult {
886 pub path: PathBuf,
887 pub session_id: String,
888 pub project: Option<String>,
889 pub first_user_message: Option<String>,
890 pub snippet: Option<String>,
891 pub rank: f64,
892 pub first_timestamp: Option<String>,
894 pub message_count: u64,
896}
897
898#[derive(Debug, Clone)]
900pub struct ActivityCacheStats {
901 pub analyzed_sessions: usize,
902 pub total_alerts: usize,
903 pub critical_alerts: usize,
904}
905
906#[derive(Debug, Clone)]
908pub struct StoredAlert {
909 pub session_path: String,
910 pub severity: String,
911 pub category: String,
912 pub timestamp: String,
913 pub detail: String,
914}
915
916impl CacheStats {
917 pub fn hit_rate(&self, scanned: usize) -> f64 {
918 if scanned == 0 {
919 return 0.0;
920 }
921 (self.total_entries as f64) / (scanned as f64)
922 }
923}
924
925#[cfg(test)]
926mod tests {
927 use super::*;
928 use crate::models::SessionMetadata;
929 use chrono::Utc;
930 use tempfile::tempdir;
931
932 #[test]
933 fn test_cache_creation() {
934 let dir = tempdir().unwrap();
935 let cache = MetadataCache::new(dir.path()).unwrap();
936
937 let stats = cache.stats().unwrap();
938 assert_eq!(stats.total_entries, 0);
939 }
940
941 #[test]
942 fn test_cache_put_get() {
943 let dir = tempdir().unwrap();
944 let cache = MetadataCache::new(dir.path()).unwrap();
945
946 let path = PathBuf::from("/tmp/test.jsonl");
947 let mut meta = SessionMetadata::from_path(path.clone(), "/test".into());
948 meta.id = "test-123".into();
949 meta.message_count = 42;
950 meta.total_tokens = 1000;
951 meta.models_used = vec!["sonnet".to_string()].into_iter().collect();
952 meta.first_timestamp = Some(Utc::now());
953
954 let mtime = SystemTime::now();
955
956 cache.put(&path, &meta, mtime).unwrap();
958
959 let cached = cache.get(&path, mtime).unwrap();
961 assert!(cached.is_some());
962 let cached = cached.unwrap();
963 assert_eq!(cached.id, "test-123");
964 assert_eq!(cached.message_count, 42);
965
966 let old_mtime = mtime - std::time::Duration::from_secs(3600);
968 let cached = cache.get(&path, old_mtime).unwrap();
969 assert!(cached.is_none());
970 }
971
972 #[test]
973 fn test_cache_invalidate() {
974 let dir = tempdir().unwrap();
975 let cache = MetadataCache::new(dir.path()).unwrap();
976
977 let path = PathBuf::from("/tmp/test.jsonl");
978 let meta = SessionMetadata::from_path(path.clone(), "/test".into());
979 let mtime = SystemTime::now();
980
981 cache.put(&path, &meta, mtime).unwrap();
982
983 cache.invalidate(&path).unwrap();
985
986 let cached = cache.get(&path, mtime).unwrap();
988 assert!(cached.is_none());
989 }
990
991 #[test]
992 fn test_cache_project_paths() {
993 let dir = tempdir().unwrap();
994 let cache = MetadataCache::new(dir.path()).unwrap();
995
996 let mtime = SystemTime::now();
997
998 for i in 0..3 {
1000 let path = PathBuf::from(format!("/tmp/project1/session{}.jsonl", i));
1001 let meta = SessionMetadata::from_path(path.clone(), "/project1".into());
1002 cache.put(&path, &meta, mtime).unwrap();
1003 }
1004
1005 for i in 0..2 {
1006 let path = PathBuf::from(format!("/tmp/project2/session{}.jsonl", i));
1007 let meta = SessionMetadata::from_path(path.clone(), "/project2".into());
1008 cache.put(&path, &meta, mtime).unwrap();
1009 }
1010
1011 let paths = cache.get_project_paths("/project1").unwrap();
1013 assert_eq!(paths.len(), 3);
1014
1015 let paths = cache.get_project_paths("/project2").unwrap();
1017 assert_eq!(paths.len(), 2);
1018 }
1019
1020 #[test]
1021 fn test_cache_stats() {
1022 let dir = tempdir().unwrap();
1023 let cache = MetadataCache::new(dir.path()).unwrap();
1024
1025 let mtime = SystemTime::now();
1026
1027 for i in 0..10 {
1029 let path = PathBuf::from(format!("/tmp/session{}.jsonl", i));
1030 let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1031 cache.put(&path, &meta, mtime).unwrap();
1032 }
1033
1034 let stats = cache.stats().unwrap();
1035 assert_eq!(stats.total_entries, 10);
1036 assert!(stats.total_size_bytes > 0);
1037 assert_eq!(stats.project_count, 1);
1038 }
1039
1040 #[test]
1041 fn test_cache_clear() {
1042 let dir = tempdir().unwrap();
1043 let cache = MetadataCache::new(dir.path()).unwrap();
1044
1045 let path = PathBuf::from("/tmp/test.jsonl");
1046 let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1047 cache.put(&path, &meta, SystemTime::now()).unwrap();
1048
1049 assert_eq!(cache.stats().unwrap().total_entries, 1);
1050
1051 cache.clear().unwrap();
1052
1053 assert_eq!(cache.stats().unwrap().total_entries, 0);
1054 }
1055
1056 fn make_summary_with_alerts() -> ActivitySummary {
1059 use crate::models::activity::{Alert, AlertCategory, AlertSeverity};
1060 use chrono::Utc;
1061
1062 ActivitySummary {
1063 file_accesses: vec![],
1064 bash_commands: vec![],
1065 network_calls: vec![],
1066 alerts: vec![
1067 Alert {
1068 session_id: "test-session".to_string(),
1069 timestamp: Utc::now(),
1070 severity: AlertSeverity::Critical,
1071 category: AlertCategory::DestructiveCommand,
1072 detail: "rm -rf /tmp".to_string(),
1073 },
1074 Alert {
1075 session_id: "test-session".to_string(),
1076 timestamp: Utc::now(),
1077 severity: AlertSeverity::Warning,
1078 category: AlertCategory::CredentialAccess,
1079 detail: "Accessed .env".to_string(),
1080 },
1081 ],
1082 }
1083 }
1084
1085 #[test]
1086 fn test_activity_put_get_hit() {
1087 let dir = tempdir().unwrap();
1088 let cache = MetadataCache::new(dir.path()).unwrap();
1089
1090 let path = PathBuf::from("/tmp/session.jsonl");
1091 let summary = make_summary_with_alerts();
1092 let mtime = SystemTime::now();
1093
1094 cache
1095 .put_activity(&path, "test-session", &summary, mtime)
1096 .unwrap();
1097
1098 let cached = cache.get_activity(&path, mtime).unwrap();
1099 assert!(cached.is_some(), "Should be a cache hit");
1100 let cached = cached.unwrap();
1101 assert_eq!(cached.alerts.len(), 2);
1102 }
1103
1104 #[test]
1105 fn test_activity_get_miss_on_mtime_change() {
1106 let dir = tempdir().unwrap();
1107 let cache = MetadataCache::new(dir.path()).unwrap();
1108
1109 let path = PathBuf::from("/tmp/session.jsonl");
1110 let summary = make_summary_with_alerts();
1111 let mtime = SystemTime::now();
1112
1113 cache
1114 .put_activity(&path, "test-session", &summary, mtime)
1115 .unwrap();
1116
1117 let stale_mtime = mtime - std::time::Duration::from_secs(60);
1119 let cached = cache.get_activity(&path, stale_mtime).unwrap();
1120 assert!(cached.is_none(), "Should be a cache miss on mtime change");
1121 }
1122
1123 #[test]
1124 fn test_activity_invalidate() {
1125 let dir = tempdir().unwrap();
1126 let cache = MetadataCache::new(dir.path()).unwrap();
1127
1128 let path = PathBuf::from("/tmp/session.jsonl");
1129 let summary = make_summary_with_alerts();
1130 let mtime = SystemTime::now();
1131
1132 cache
1133 .put_activity(&path, "test-session", &summary, mtime)
1134 .unwrap();
1135
1136 assert!(cache.get_activity(&path, mtime).unwrap().is_some());
1138
1139 cache.invalidate_activity(&path).unwrap();
1141
1142 assert!(
1144 cache.get_activity(&path, mtime).unwrap().is_none(),
1145 "Should be gone after invalidation"
1146 );
1147
1148 let alerts = cache.get_all_alerts(None).unwrap();
1150 assert!(
1151 alerts.is_empty(),
1152 "Alerts should be cleared with activity cache"
1153 );
1154 }
1155
1156 #[test]
1157 fn test_get_all_alerts_returns_stored_alerts() {
1158 let dir = tempdir().unwrap();
1159 let cache = MetadataCache::new(dir.path()).unwrap();
1160
1161 let path = PathBuf::from("/tmp/session.jsonl");
1162 let summary = make_summary_with_alerts();
1163 let mtime = SystemTime::now();
1164
1165 cache
1166 .put_activity(&path, "test-session", &summary, mtime)
1167 .unwrap();
1168
1169 let alerts = cache.get_all_alerts(None).unwrap();
1170 assert_eq!(alerts.len(), 2, "Should return both alerts");
1171
1172 let critical: Vec<_> = alerts.iter().filter(|a| a.severity == "Critical").collect();
1173 assert_eq!(critical.len(), 1);
1174 assert!(critical[0].detail.contains("rm -rf"));
1175 }
1176
1177 #[test]
1178 fn test_get_all_alerts_filter_by_severity() {
1179 let dir = tempdir().unwrap();
1180 let cache = MetadataCache::new(dir.path()).unwrap();
1181
1182 let path = PathBuf::from("/tmp/session.jsonl");
1183 let summary = make_summary_with_alerts();
1184 let mtime = SystemTime::now();
1185
1186 cache
1187 .put_activity(&path, "test-session", &summary, mtime)
1188 .unwrap();
1189
1190 let critical_only = cache.get_all_alerts(Some("Critical")).unwrap();
1191 assert_eq!(critical_only.len(), 1);
1192 assert_eq!(critical_only[0].severity, "Critical");
1193 }
1194
1195 #[test]
1196 fn test_activity_stats() {
1197 let dir = tempdir().unwrap();
1198 let cache = MetadataCache::new(dir.path()).unwrap();
1199
1200 let stats = cache.activity_stats().unwrap();
1201 assert_eq!(stats.analyzed_sessions, 0);
1202 assert_eq!(stats.total_alerts, 0);
1203
1204 let path = PathBuf::from("/tmp/session.jsonl");
1205 let summary = make_summary_with_alerts();
1206 cache
1207 .put_activity(&path, "test-session", &summary, SystemTime::now())
1208 .unwrap();
1209
1210 let stats = cache.activity_stats().unwrap();
1211 assert_eq!(stats.analyzed_sessions, 1);
1212 assert_eq!(stats.total_alerts, 2);
1213 assert_eq!(stats.critical_alerts, 1);
1214 }
1215
1216 #[test]
1217 fn test_activity_put_replaces_stale_alerts() {
1218 let dir = tempdir().unwrap();
1219 let cache = MetadataCache::new(dir.path()).unwrap();
1220
1221 let path = PathBuf::from("/tmp/session.jsonl");
1222 let summary = make_summary_with_alerts(); let mtime = SystemTime::now();
1224
1225 cache
1226 .put_activity(&path, "test-session", &summary, mtime)
1227 .unwrap();
1228 assert_eq!(cache.get_all_alerts(None).unwrap().len(), 2);
1229
1230 let empty_summary = ActivitySummary::default();
1232 let new_mtime = mtime + std::time::Duration::from_secs(1);
1233 cache
1234 .put_activity(&path, "test-session", &empty_summary, new_mtime)
1235 .unwrap();
1236
1237 let alerts = cache.get_all_alerts(None).unwrap();
1239 assert_eq!(alerts.len(), 0, "Stale alerts should be replaced");
1240 }
1241}