1use crate::models::activity::ActivitySummary;
28use crate::models::SessionMetadata;
29use anyhow::{Context, Result};
30use rusqlite::{params, Connection, OptionalExtension};
31use std::path::{Path, PathBuf};
32use std::sync::Mutex;
33use std::time::SystemTime;
34use tracing::{debug, warn};
35
36const CACHE_VERSION: i32 = 8;
56
57pub struct MetadataCache {
59 conn: Mutex<Connection>,
60 #[allow(dead_code)]
61 cache_path: PathBuf,
62}
63
64impl MetadataCache {
65 pub fn new(cache_dir: &Path) -> Result<Self> {
67 std::fs::create_dir_all(cache_dir).with_context(|| {
68 format!("Failed to create cache directory: {}", cache_dir.display())
69 })?;
70
71 let cache_path = cache_dir.join("session-metadata.db");
72 let conn = Connection::open(&cache_path)
73 .with_context(|| format!("Failed to open cache database: {}", cache_path.display()))?;
74
75 conn.pragma_update(None, "journal_mode", "WAL")
77 .context("Failed to enable WAL mode")?;
78
79 conn.execute_batch(
81 r#"
82 CREATE TABLE IF NOT EXISTS cache_metadata (
83 key TEXT PRIMARY KEY,
84 value INTEGER NOT NULL
85 );
86
87 CREATE TABLE IF NOT EXISTS session_metadata (
88 path TEXT PRIMARY KEY,
89 mtime INTEGER NOT NULL,
90 project TEXT NOT NULL,
91 session_id TEXT NOT NULL,
92 first_timestamp TEXT,
93 last_timestamp TEXT,
94 message_count INTEGER NOT NULL,
95 total_tokens INTEGER NOT NULL,
96 models_used TEXT NOT NULL,
97 has_subagents INTEGER NOT NULL,
98 first_user_message TEXT,
99 data BLOB NOT NULL
100 );
101
102 CREATE INDEX IF NOT EXISTS idx_project ON session_metadata(project);
103 CREATE INDEX IF NOT EXISTS idx_mtime ON session_metadata(mtime);
104 CREATE INDEX IF NOT EXISTS idx_session_id ON session_metadata(session_id);
105
106 CREATE TABLE IF NOT EXISTS activity_cache (
107 session_path TEXT PRIMARY KEY,
108 mtime INTEGER NOT NULL,
109 session_id TEXT NOT NULL,
110 tool_call_count INTEGER NOT NULL DEFAULT 0,
111 alert_count INTEGER NOT NULL DEFAULT 0,
112 data BLOB NOT NULL
113 );
114
115 CREATE INDEX IF NOT EXISTS idx_activity_session_id ON activity_cache(session_id);
116 CREATE INDEX IF NOT EXISTS idx_activity_mtime ON activity_cache(mtime);
117
118 CREATE TABLE IF NOT EXISTS activity_alerts (
119 id INTEGER PRIMARY KEY AUTOINCREMENT,
120 session_path TEXT NOT NULL,
121 severity TEXT NOT NULL,
122 category TEXT NOT NULL,
123 timestamp TEXT NOT NULL,
124 detail TEXT NOT NULL
125 );
126
127 CREATE INDEX IF NOT EXISTS idx_alerts_session ON activity_alerts(session_path);
128 CREATE INDEX IF NOT EXISTS idx_alerts_severity ON activity_alerts(severity);
129
130 CREATE TABLE IF NOT EXISTS aggregate_stats (
131 key TEXT PRIMARY KEY,
132 value INTEGER NOT NULL DEFAULT 0
133 );
134
135 INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES
136 ('total_sessions', 0),
137 ('total_messages', 0);
138
139 CREATE TRIGGER IF NOT EXISTS stats_ai
140 AFTER INSERT ON session_metadata BEGIN
141 UPDATE aggregate_stats SET value = value + 1 WHERE key = 'total_sessions';
142 UPDATE aggregate_stats SET value = value + new.message_count WHERE key = 'total_messages';
143 END;
144
145 CREATE TRIGGER IF NOT EXISTS stats_ad
146 AFTER DELETE ON session_metadata BEGIN
147 UPDATE aggregate_stats SET value = MAX(0, value - 1) WHERE key = 'total_sessions';
148 UPDATE aggregate_stats SET value = MAX(0, value - old.message_count) WHERE key = 'total_messages';
149 END;
150
151 CREATE VIRTUAL TABLE IF NOT EXISTS session_fts USING fts5(
152 session_id UNINDEXED,
153 project UNINDEXED,
154 first_user_message,
155 models_used,
156 content='session_metadata',
157 content_rowid='rowid',
158 tokenize='unicode61'
159 );
160
161 CREATE TRIGGER IF NOT EXISTS session_fts_ai
162 AFTER INSERT ON session_metadata BEGIN
163 INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
164 VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
165 END;
166
167 CREATE TRIGGER IF NOT EXISTS session_fts_ad
168 AFTER DELETE ON session_metadata BEGIN
169 INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
170 VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
171 END;
172
173 CREATE TRIGGER IF NOT EXISTS session_fts_au
174 AFTER UPDATE ON session_metadata BEGIN
175 INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
176 VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
177 INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
178 VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
179 END;
180 "#,
181 )
182 .context("Failed to create schema")?;
183
184 let stored_version: Option<i32> = conn
186 .query_row(
187 "SELECT value FROM cache_metadata WHERE key = 'version'",
188 [],
189 |row| row.get(0),
190 )
191 .optional()
192 .context("Failed to query cache version")?;
193
194 match stored_version {
195 Some(v) if v != CACHE_VERSION => {
196 warn!(
197 stored = v,
198 current = CACHE_VERSION,
199 "Cache version mismatch detected, clearing stale cache"
200 );
201
202 conn.execute("DELETE FROM session_metadata", [])
204 .context("Failed to clear stale session cache")?;
205 conn.execute("DELETE FROM activity_cache", [])
206 .context("Failed to clear stale activity cache")?;
207 conn.execute("DELETE FROM activity_alerts", [])
208 .context("Failed to clear stale activity alerts")?;
209 conn.execute("DELETE FROM aggregate_stats", [])
210 .context("Failed to clear stale aggregate stats")?;
211 conn.execute(
212 "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_sessions', 0)",
213 [],
214 )
215 .context("Failed to reinitialize total_sessions")?;
216 conn.execute(
217 "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_messages', 0)",
218 [],
219 )
220 .context("Failed to reinitialize total_messages")?;
221
222 conn.execute(
224 "INSERT OR REPLACE INTO cache_metadata (key, value) VALUES ('version', ?)",
225 params![CACHE_VERSION],
226 )
227 .context("Failed to update cache version")?;
228
229 debug!("Cache cleared and version updated to {}", CACHE_VERSION);
230 }
231 None => {
232 conn.execute(
234 "INSERT INTO cache_metadata (key, value) VALUES ('version', ?)",
235 params![CACHE_VERSION],
236 )
237 .context("Failed to initialize cache version")?;
238
239 debug!("Cache version initialized to {}", CACHE_VERSION);
240 }
241 Some(_) => {
242 debug!("Cache version {} matches current", CACHE_VERSION);
243 }
244 }
245
246 let cache = Self {
247 conn: Mutex::new(conn),
248 cache_path: cache_path.clone(),
249 };
250
251 debug!(path = %cache_path.display(), "Metadata cache initialized");
252
253 Ok(cache)
254 }
255
256 pub fn get(&self, path: &Path, current_mtime: SystemTime) -> Result<Option<SessionMetadata>> {
258 let path_str = path.to_string_lossy();
259 let mtime_secs = current_mtime
260 .duration_since(SystemTime::UNIX_EPOCH)
261 .context("Invalid mtime")?
262 .as_secs();
263
264 let conn = self
265 .conn
266 .lock()
267 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
268
269 let result: Option<Vec<u8>> = conn
270 .query_row(
271 "SELECT data FROM session_metadata WHERE path = ? AND mtime = ?",
272 params![path_str.as_ref(), mtime_secs as i64],
273 |row| row.get(0),
274 )
275 .optional()
276 .context("Failed to query cache")?;
277
278 match result {
279 Some(bytes) => {
280 let meta: SessionMetadata = bincode::deserialize(&bytes)
281 .context("Failed to deserialize cached metadata")?;
282 debug!(path = %path.display(), "Cache hit");
283 Ok(Some(meta))
284 }
285 None => {
286 debug!(path = %path.display(), "Cache miss");
287 Ok(None)
288 }
289 }
290 }
291
292 pub fn put(&self, path: &Path, meta: &SessionMetadata, mtime: SystemTime) -> Result<()> {
294 let path_str = path.to_string_lossy();
295 let mtime_secs = mtime
296 .duration_since(SystemTime::UNIX_EPOCH)
297 .context("Invalid mtime")?
298 .as_secs();
299
300 let data = bincode::serialize(meta).context("Failed to serialize metadata")?;
301
302 let models_used =
304 serde_json::to_string(&meta.models_used).context("Failed to serialize models")?;
305
306 let conn = self
307 .conn
308 .lock()
309 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
310
311 conn.execute(
313 r#"
314 INSERT OR IGNORE INTO session_metadata
315 (path, mtime, project, session_id, first_timestamp, last_timestamp,
316 message_count, total_tokens, models_used, has_subagents, first_user_message, data)
317 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
318 "#,
319 params![
320 path_str.as_ref(),
321 mtime_secs as i64,
322 meta.project_path.as_str(),
323 meta.id.as_str(),
324 meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
325 meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
326 meta.message_count as i64,
327 meta.total_tokens as i64,
328 models_used.as_str(),
329 if meta.has_subagents { 1 } else { 0 },
330 &meta.first_user_message,
331 &data,
332 ],
333 )
334 .context("Failed to insert metadata")?;
335
336 conn.execute(
338 r#"
339 UPDATE session_metadata
340 SET mtime = ?, project = ?, session_id = ?, first_timestamp = ?, last_timestamp = ?,
341 message_count = ?, total_tokens = ?, models_used = ?, has_subagents = ?,
342 first_user_message = ?, data = ?
343 WHERE path = ? AND mtime != ?
344 "#,
345 params![
346 mtime_secs as i64,
347 meta.project_path.as_str(),
348 meta.id.as_str(),
349 meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
350 meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
351 meta.message_count as i64,
352 meta.total_tokens as i64,
353 models_used.as_str(),
354 if meta.has_subagents { 1 } else { 0 },
355 &meta.first_user_message,
356 &data,
357 path_str.as_ref(),
358 mtime_secs as i64,
359 ],
360 )
361 .context("Failed to update metadata")?;
362
363 debug!(path = %path.display(), "Metadata cached");
364 Ok(())
365 }
366
367 pub fn invalidate(&self, path: &Path) -> Result<()> {
369 let path_str = path.to_string_lossy();
370
371 let conn = self
372 .conn
373 .lock()
374 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
375
376 conn.execute(
377 "DELETE FROM session_metadata WHERE path = ?",
378 params![path_str.as_ref()],
379 )
380 .context("Failed to delete cache entry")?;
381
382 debug!(path = %path.display(), "Cache entry invalidated");
383 Ok(())
384 }
385
386 pub fn get_project_paths(&self, project: &str) -> Result<Vec<PathBuf>> {
388 let conn = self
389 .conn
390 .lock()
391 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
392
393 let mut stmt = conn
394 .prepare("SELECT path FROM session_metadata WHERE project = ?")
395 .context("Failed to prepare query")?;
396
397 let rows = stmt
398 .query_map(params![project], |row| {
399 let path_str: String = row.get(0)?;
400 Ok(PathBuf::from(path_str))
401 })
402 .context("Failed to query project paths")?;
403
404 let mut paths = Vec::new();
405 for row in rows {
406 paths.push(row.context("Failed to read row")?);
407 }
408
409 Ok(paths)
410 }
411
412 pub fn stats(&self) -> Result<CacheStats> {
414 let conn = self
415 .conn
416 .lock()
417 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
418
419 let total_entries: i64 = conn
420 .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
421 row.get(0)
422 })
423 .context("Failed to count entries")?;
424
425 let total_size: i64 = conn
426 .query_row(
427 "SELECT SUM(LENGTH(data)) FROM session_metadata",
428 [],
429 |row| row.get(0),
430 )
431 .unwrap_or(0);
432
433 let project_count: i64 = conn
434 .query_row(
435 "SELECT COUNT(DISTINCT project) FROM session_metadata",
436 [],
437 |row| row.get(0),
438 )
439 .context("Failed to count projects")?;
440
441 Ok(CacheStats {
442 total_entries: total_entries as usize,
443 total_size_bytes: total_size as usize,
444 project_count: project_count as usize,
445 })
446 }
447
448 pub fn clear(&self) -> Result<()> {
450 let conn = self
451 .conn
452 .lock()
453 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
454
455 conn.execute("DELETE FROM session_metadata", [])
456 .context("Failed to clear cache")?;
457
458 debug!("Cache cleared");
459 Ok(())
460 }
461
462 pub fn vacuum(&self) -> Result<()> {
464 let conn = self
465 .conn
466 .lock()
467 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
468
469 conn.execute("VACUUM", []).context("Failed to vacuum")?;
470
471 debug!("Database vacuumed");
472 Ok(())
473 }
474
475 pub fn get_aggregate_stats(&self) -> Result<AggregateStats> {
479 let conn = self
480 .conn
481 .lock()
482 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
483
484 let mut stmt = conn
485 .prepare("SELECT key, value FROM aggregate_stats")
486 .context("Failed to prepare aggregate_stats query")?;
487
488 let mut total_sessions = 0usize;
489 let mut total_messages = 0usize;
490
491 let rows = stmt
492 .query_map([], |row| {
493 let key: String = row.get(0)?;
494 let value: i64 = row.get(1)?;
495 Ok((key, value))
496 })
497 .context("Failed to query aggregate_stats")?;
498
499 for row in rows {
500 let (key, value) = row.context("Failed to read aggregate_stats row")?;
501 match key.as_str() {
502 "total_sessions" => total_sessions = value.max(0) as usize,
503 "total_messages" => total_messages = value.max(0) as usize,
504 _ => {}
505 }
506 }
507
508 Ok(AggregateStats {
509 total_sessions,
510 total_messages,
511 })
512 }
513
514 pub fn search_sessions(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
519 if query.trim().is_empty() {
520 return Ok(Vec::new());
521 }
522
523 let conn = self
524 .conn
525 .lock()
526 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
527
528 let fts_exists: bool = conn
530 .query_row(
531 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='session_fts'",
532 [],
533 |row| row.get::<_, i64>(0),
534 )
535 .unwrap_or(0)
536 > 0;
537
538 if !fts_exists {
539 return Ok(Vec::new());
540 }
541
542 let mut stmt = conn
543 .prepare(
544 r#"
545 SELECT
546 sm.path,
547 sm.session_id,
548 sm.project,
549 sm.first_user_message,
550 snippet(session_fts, 2, '[', ']', '...', 12) AS snippet,
551 session_fts.rank,
552 sm.first_timestamp,
553 sm.message_count
554 FROM session_fts
555 JOIN session_metadata sm ON session_fts.rowid = sm.rowid
556 WHERE session_fts MATCH ?
557 ORDER BY session_fts.rank
558 LIMIT ?
559 "#,
560 )
561 .context("Failed to prepare FTS5 search query")?;
562
563 let limit_i64 = limit as i64;
564 let rows = stmt
565 .query_map(params![query, limit_i64], |row| {
566 Ok(SearchResult {
567 path: PathBuf::from(row.get::<_, String>(0)?),
568 session_id: row.get(1)?,
569 project: row.get(2)?,
570 first_user_message: row.get(3)?,
571 snippet: row.get(4)?,
572 rank: row.get(5)?,
573 first_timestamp: row.get(6)?,
574 message_count: row.get::<_, Option<i64>>(7)?.unwrap_or(0) as u64,
575 })
576 })
577 .context("Failed to execute FTS5 search")?;
578
579 let mut results = Vec::new();
580 for row in rows {
581 match row {
582 Ok(r) => results.push(r),
583 Err(e) => {
584 warn!("FTS5 search row error: {}", e);
585 }
586 }
587 }
588
589 Ok(results)
590 }
591
592 pub fn rebuild_fts_index(&self) -> Result<usize> {
596 let conn = self
597 .conn
598 .lock()
599 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
600
601 conn.execute("INSERT INTO session_fts(session_fts) VALUES('rebuild')", [])
603 .context("Failed to trigger FTS5 rebuild")?;
604
605 let count: i64 = conn
607 .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
608 row.get(0)
609 })
610 .context("Failed to count sessions")?;
611
612 debug!("FTS5 index rebuilt for {} sessions", count);
613 Ok(count as usize)
614 }
615
616 pub fn get_activity(
622 &self,
623 path: &Path,
624 current_mtime: SystemTime,
625 ) -> Result<Option<ActivitySummary>> {
626 let path_str = path.to_string_lossy();
627 let mtime_secs = current_mtime
628 .duration_since(SystemTime::UNIX_EPOCH)
629 .context("Invalid mtime")?
630 .as_secs();
631
632 let conn = self
633 .conn
634 .lock()
635 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
636
637 let result: Option<Vec<u8>> = conn
638 .query_row(
639 "SELECT data FROM activity_cache WHERE session_path = ? AND mtime = ?",
640 params![path_str.as_ref(), mtime_secs as i64],
641 |row| row.get(0),
642 )
643 .optional()
644 .context("Failed to query activity cache")?;
645
646 match result {
647 Some(bytes) => {
648 let summary: ActivitySummary = bincode::deserialize(&bytes)
649 .context("Failed to deserialize activity summary")?;
650 debug!(path = %path.display(), "Activity cache hit");
651 Ok(Some(summary))
652 }
653 None => {
654 debug!(path = %path.display(), "Activity cache miss");
655 Ok(None)
656 }
657 }
658 }
659
660 pub fn put_activity(
664 &self,
665 path: &Path,
666 session_id: &str,
667 summary: &ActivitySummary,
668 mtime: SystemTime,
669 ) -> Result<()> {
670 let path_str = path.to_string_lossy();
671 let mtime_secs = mtime
672 .duration_since(SystemTime::UNIX_EPOCH)
673 .context("Invalid mtime")?
674 .as_secs();
675
676 let data = bincode::serialize(summary).context("Failed to serialize activity summary")?;
677
678 let conn = self
679 .conn
680 .lock()
681 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
682
683 conn.execute_batch("BEGIN IMMEDIATE")
686 .context("Failed to begin activity cache transaction")?;
687
688 let result = (|| -> anyhow::Result<()> {
689 conn.execute(
691 r#"
692 INSERT OR REPLACE INTO activity_cache
693 (session_path, mtime, session_id, tool_call_count, alert_count, data)
694 VALUES (?, ?, ?, ?, ?, ?)
695 "#,
696 params![
697 path_str.as_ref(),
698 mtime_secs as i64,
699 session_id,
700 (summary.file_accesses.len()
701 + summary.bash_commands.len()
702 + summary.network_calls.len()) as i64,
703 summary.alerts.len() as i64,
704 &data,
705 ],
706 )
707 .context("Failed to insert activity cache entry")?;
708
709 conn.execute(
711 "DELETE FROM activity_alerts WHERE session_path = ?",
712 params![path_str.as_ref()],
713 )
714 .context("Failed to delete old activity alerts")?;
715
716 for alert in &summary.alerts {
717 let severity = format!("{:?}", alert.severity);
718 let category = format!("{:?}", alert.category);
719 conn.execute(
720 r#"
721 INSERT INTO activity_alerts (session_path, severity, category, timestamp, detail)
722 VALUES (?, ?, ?, ?, ?)
723 "#,
724 params![
725 path_str.as_ref(),
726 severity,
727 category,
728 alert.timestamp.to_rfc3339(),
729 &alert.detail,
730 ],
731 )
732 .context("Failed to insert activity alert")?;
733 }
734
735 Ok(())
736 })();
737
738 match result {
739 Ok(()) => conn
740 .execute_batch("COMMIT")
741 .context("Failed to commit activity cache transaction")?,
742 Err(e) => {
743 let _ = conn.execute_batch("ROLLBACK");
744 return Err(e);
745 }
746 }
747
748 debug!(
749 path = %path.display(),
750 alerts = summary.alerts.len(),
751 "Activity summary cached"
752 );
753 Ok(())
754 }
755
756 pub fn invalidate_activity(&self, path: &Path) -> Result<()> {
760 let path_str = path.to_string_lossy();
761
762 let conn = self
763 .conn
764 .lock()
765 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
766
767 conn.execute(
768 "DELETE FROM activity_cache WHERE session_path = ?",
769 params![path_str.as_ref()],
770 )
771 .context("Failed to delete activity cache entry")?;
772
773 conn.execute(
774 "DELETE FROM activity_alerts WHERE session_path = ?",
775 params![path_str.as_ref()],
776 )
777 .context("Failed to delete activity alerts")?;
778
779 debug!(path = %path.display(), "Activity cache invalidated");
780 Ok(())
781 }
782
783 pub fn get_all_alerts(&self, min_severity: Option<&str>) -> Result<Vec<StoredAlert>> {
787 let conn = self
788 .conn
789 .lock()
790 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
791
792 let query = match min_severity {
795 Some("Critical") => "SELECT session_path, severity, category, timestamp, detail \
796 FROM activity_alerts WHERE severity = 'Critical' ORDER BY timestamp DESC",
797 Some("Warning") => "SELECT session_path, severity, category, timestamp, detail \
798 FROM activity_alerts WHERE severity IN ('Warning', 'Critical') ORDER BY timestamp DESC",
799 _ => "SELECT session_path, severity, category, timestamp, detail \
800 FROM activity_alerts ORDER BY timestamp DESC",
801 };
802
803 let mut stmt = conn
804 .prepare(query)
805 .context("Failed to prepare alert query")?;
806
807 let rows = stmt
808 .query_map([], |row| {
809 Ok(StoredAlert {
810 session_path: row.get(0)?,
811 severity: row.get(1)?,
812 category: row.get(2)?,
813 timestamp: row.get(3)?,
814 detail: row.get(4)?,
815 })
816 })
817 .context("Failed to query alerts")?
818 .collect::<Result<Vec<_>, _>>()
819 .context("Failed to collect alerts")?;
820
821 Ok(rows)
822 }
823
824 pub fn activity_stats(&self) -> Result<ActivityCacheStats> {
826 let conn = self
827 .conn
828 .lock()
829 .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
830
831 let analyzed_sessions: i64 = conn
832 .query_row("SELECT COUNT(*) FROM activity_cache", [], |row| row.get(0))
833 .context("Failed to count activity cache entries")?;
834
835 let total_alerts: i64 = conn
836 .query_row("SELECT COUNT(*) FROM activity_alerts", [], |row| row.get(0))
837 .context("Failed to count alerts")?;
838
839 let critical_alerts: i64 = conn
840 .query_row(
841 "SELECT COUNT(*) FROM activity_alerts WHERE severity = 'Critical'",
842 [],
843 |row| row.get(0),
844 )
845 .context("Failed to count critical alerts")?;
846
847 Ok(ActivityCacheStats {
848 analyzed_sessions: analyzed_sessions as usize,
849 total_alerts: total_alerts as usize,
850 critical_alerts: critical_alerts as usize,
851 })
852 }
853}
854
855impl Drop for MetadataCache {
856 fn drop(&mut self) {
857 if let Ok(conn) = self.conn.lock() {
860 if let Err(e) = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE") {
861 warn!("Failed to checkpoint WAL on MetadataCache drop: {}", e);
862 } else {
863 debug!("WAL checkpoint completed on MetadataCache drop");
864 }
865 }
866 }
867}
868
869#[derive(Debug, Clone)]
871pub struct CacheStats {
872 pub total_entries: usize,
873 pub total_size_bytes: usize,
874 pub project_count: usize,
875}
876
877#[derive(Debug, Clone, Default)]
879pub struct AggregateStats {
880 pub total_sessions: usize,
881 pub total_messages: usize,
882}
883
884#[derive(Debug, Clone)]
886pub struct SearchResult {
887 pub path: PathBuf,
888 pub session_id: String,
889 pub project: Option<String>,
890 pub first_user_message: Option<String>,
891 pub snippet: Option<String>,
892 pub rank: f64,
893 pub first_timestamp: Option<String>,
895 pub message_count: u64,
897}
898
899#[derive(Debug, Clone)]
901pub struct ActivityCacheStats {
902 pub analyzed_sessions: usize,
903 pub total_alerts: usize,
904 pub critical_alerts: usize,
905}
906
907#[derive(Debug, Clone)]
909pub struct StoredAlert {
910 pub session_path: String,
911 pub severity: String,
912 pub category: String,
913 pub timestamp: String,
914 pub detail: String,
915}
916
917impl CacheStats {
918 pub fn hit_rate(&self, scanned: usize) -> f64 {
919 if scanned == 0 {
920 return 0.0;
921 }
922 (self.total_entries as f64) / (scanned as f64)
923 }
924}
925
926#[cfg(test)]
927mod tests {
928 use super::*;
929 use crate::models::SessionMetadata;
930 use chrono::Utc;
931 use tempfile::tempdir;
932
933 #[test]
934 fn test_cache_creation() {
935 let dir = tempdir().unwrap();
936 let cache = MetadataCache::new(dir.path()).unwrap();
937
938 let stats = cache.stats().unwrap();
939 assert_eq!(stats.total_entries, 0);
940 }
941
942 #[test]
943 fn test_cache_put_get() {
944 let dir = tempdir().unwrap();
945 let cache = MetadataCache::new(dir.path()).unwrap();
946
947 let path = PathBuf::from("/tmp/test.jsonl");
948 let mut meta = SessionMetadata::from_path(path.clone(), "/test".into());
949 meta.id = "test-123".into();
950 meta.message_count = 42;
951 meta.total_tokens = 1000;
952 meta.models_used = vec!["sonnet".to_string()].into_iter().collect();
953 meta.first_timestamp = Some(Utc::now());
954
955 let mtime = SystemTime::now();
956
957 cache.put(&path, &meta, mtime).unwrap();
959
960 let cached = cache.get(&path, mtime).unwrap();
962 assert!(cached.is_some());
963 let cached = cached.unwrap();
964 assert_eq!(cached.id, "test-123");
965 assert_eq!(cached.message_count, 42);
966
967 let old_mtime = mtime - std::time::Duration::from_secs(3600);
969 let cached = cache.get(&path, old_mtime).unwrap();
970 assert!(cached.is_none());
971 }
972
973 #[test]
974 fn test_cache_invalidate() {
975 let dir = tempdir().unwrap();
976 let cache = MetadataCache::new(dir.path()).unwrap();
977
978 let path = PathBuf::from("/tmp/test.jsonl");
979 let meta = SessionMetadata::from_path(path.clone(), "/test".into());
980 let mtime = SystemTime::now();
981
982 cache.put(&path, &meta, mtime).unwrap();
983
984 cache.invalidate(&path).unwrap();
986
987 let cached = cache.get(&path, mtime).unwrap();
989 assert!(cached.is_none());
990 }
991
992 #[test]
993 fn test_cache_project_paths() {
994 let dir = tempdir().unwrap();
995 let cache = MetadataCache::new(dir.path()).unwrap();
996
997 let mtime = SystemTime::now();
998
999 for i in 0..3 {
1001 let path = PathBuf::from(format!("/tmp/project1/session{}.jsonl", i));
1002 let meta = SessionMetadata::from_path(path.clone(), "/project1".into());
1003 cache.put(&path, &meta, mtime).unwrap();
1004 }
1005
1006 for i in 0..2 {
1007 let path = PathBuf::from(format!("/tmp/project2/session{}.jsonl", i));
1008 let meta = SessionMetadata::from_path(path.clone(), "/project2".into());
1009 cache.put(&path, &meta, mtime).unwrap();
1010 }
1011
1012 let paths = cache.get_project_paths("/project1").unwrap();
1014 assert_eq!(paths.len(), 3);
1015
1016 let paths = cache.get_project_paths("/project2").unwrap();
1018 assert_eq!(paths.len(), 2);
1019 }
1020
1021 #[test]
1022 fn test_cache_stats() {
1023 let dir = tempdir().unwrap();
1024 let cache = MetadataCache::new(dir.path()).unwrap();
1025
1026 let mtime = SystemTime::now();
1027
1028 for i in 0..10 {
1030 let path = PathBuf::from(format!("/tmp/session{}.jsonl", i));
1031 let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1032 cache.put(&path, &meta, mtime).unwrap();
1033 }
1034
1035 let stats = cache.stats().unwrap();
1036 assert_eq!(stats.total_entries, 10);
1037 assert!(stats.total_size_bytes > 0);
1038 assert_eq!(stats.project_count, 1);
1039 }
1040
1041 #[test]
1042 fn test_cache_clear() {
1043 let dir = tempdir().unwrap();
1044 let cache = MetadataCache::new(dir.path()).unwrap();
1045
1046 let path = PathBuf::from("/tmp/test.jsonl");
1047 let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1048 cache.put(&path, &meta, SystemTime::now()).unwrap();
1049
1050 assert_eq!(cache.stats().unwrap().total_entries, 1);
1051
1052 cache.clear().unwrap();
1053
1054 assert_eq!(cache.stats().unwrap().total_entries, 0);
1055 }
1056
1057 fn make_summary_with_alerts() -> ActivitySummary {
1060 use crate::models::activity::{Alert, AlertCategory, AlertSeverity};
1061 use chrono::Utc;
1062
1063 ActivitySummary {
1064 file_accesses: vec![],
1065 bash_commands: vec![],
1066 network_calls: vec![],
1067 alerts: vec![
1068 Alert {
1069 session_id: "test-session".to_string(),
1070 timestamp: Utc::now(),
1071 severity: AlertSeverity::Critical,
1072 category: AlertCategory::DestructiveCommand,
1073 detail: "rm -rf /tmp".to_string(),
1074 },
1075 Alert {
1076 session_id: "test-session".to_string(),
1077 timestamp: Utc::now(),
1078 severity: AlertSeverity::Warning,
1079 category: AlertCategory::CredentialAccess,
1080 detail: "Accessed .env".to_string(),
1081 },
1082 ],
1083 }
1084 }
1085
1086 #[test]
1087 fn test_activity_put_get_hit() {
1088 let dir = tempdir().unwrap();
1089 let cache = MetadataCache::new(dir.path()).unwrap();
1090
1091 let path = PathBuf::from("/tmp/session.jsonl");
1092 let summary = make_summary_with_alerts();
1093 let mtime = SystemTime::now();
1094
1095 cache
1096 .put_activity(&path, "test-session", &summary, mtime)
1097 .unwrap();
1098
1099 let cached = cache.get_activity(&path, mtime).unwrap();
1100 assert!(cached.is_some(), "Should be a cache hit");
1101 let cached = cached.unwrap();
1102 assert_eq!(cached.alerts.len(), 2);
1103 }
1104
1105 #[test]
1106 fn test_activity_get_miss_on_mtime_change() {
1107 let dir = tempdir().unwrap();
1108 let cache = MetadataCache::new(dir.path()).unwrap();
1109
1110 let path = PathBuf::from("/tmp/session.jsonl");
1111 let summary = make_summary_with_alerts();
1112 let mtime = SystemTime::now();
1113
1114 cache
1115 .put_activity(&path, "test-session", &summary, mtime)
1116 .unwrap();
1117
1118 let stale_mtime = mtime - std::time::Duration::from_secs(60);
1120 let cached = cache.get_activity(&path, stale_mtime).unwrap();
1121 assert!(cached.is_none(), "Should be a cache miss on mtime change");
1122 }
1123
1124 #[test]
1125 fn test_activity_invalidate() {
1126 let dir = tempdir().unwrap();
1127 let cache = MetadataCache::new(dir.path()).unwrap();
1128
1129 let path = PathBuf::from("/tmp/session.jsonl");
1130 let summary = make_summary_with_alerts();
1131 let mtime = SystemTime::now();
1132
1133 cache
1134 .put_activity(&path, "test-session", &summary, mtime)
1135 .unwrap();
1136
1137 assert!(cache.get_activity(&path, mtime).unwrap().is_some());
1139
1140 cache.invalidate_activity(&path).unwrap();
1142
1143 assert!(
1145 cache.get_activity(&path, mtime).unwrap().is_none(),
1146 "Should be gone after invalidation"
1147 );
1148
1149 let alerts = cache.get_all_alerts(None).unwrap();
1151 assert!(
1152 alerts.is_empty(),
1153 "Alerts should be cleared with activity cache"
1154 );
1155 }
1156
1157 #[test]
1158 fn test_get_all_alerts_returns_stored_alerts() {
1159 let dir = tempdir().unwrap();
1160 let cache = MetadataCache::new(dir.path()).unwrap();
1161
1162 let path = PathBuf::from("/tmp/session.jsonl");
1163 let summary = make_summary_with_alerts();
1164 let mtime = SystemTime::now();
1165
1166 cache
1167 .put_activity(&path, "test-session", &summary, mtime)
1168 .unwrap();
1169
1170 let alerts = cache.get_all_alerts(None).unwrap();
1171 assert_eq!(alerts.len(), 2, "Should return both alerts");
1172
1173 let critical: Vec<_> = alerts.iter().filter(|a| a.severity == "Critical").collect();
1174 assert_eq!(critical.len(), 1);
1175 assert!(critical[0].detail.contains("rm -rf"));
1176 }
1177
1178 #[test]
1179 fn test_get_all_alerts_filter_by_severity() {
1180 let dir = tempdir().unwrap();
1181 let cache = MetadataCache::new(dir.path()).unwrap();
1182
1183 let path = PathBuf::from("/tmp/session.jsonl");
1184 let summary = make_summary_with_alerts();
1185 let mtime = SystemTime::now();
1186
1187 cache
1188 .put_activity(&path, "test-session", &summary, mtime)
1189 .unwrap();
1190
1191 let critical_only = cache.get_all_alerts(Some("Critical")).unwrap();
1192 assert_eq!(critical_only.len(), 1);
1193 assert_eq!(critical_only[0].severity, "Critical");
1194 }
1195
1196 #[test]
1197 fn test_activity_stats() {
1198 let dir = tempdir().unwrap();
1199 let cache = MetadataCache::new(dir.path()).unwrap();
1200
1201 let stats = cache.activity_stats().unwrap();
1202 assert_eq!(stats.analyzed_sessions, 0);
1203 assert_eq!(stats.total_alerts, 0);
1204
1205 let path = PathBuf::from("/tmp/session.jsonl");
1206 let summary = make_summary_with_alerts();
1207 cache
1208 .put_activity(&path, "test-session", &summary, SystemTime::now())
1209 .unwrap();
1210
1211 let stats = cache.activity_stats().unwrap();
1212 assert_eq!(stats.analyzed_sessions, 1);
1213 assert_eq!(stats.total_alerts, 2);
1214 assert_eq!(stats.critical_alerts, 1);
1215 }
1216
1217 #[test]
1218 fn test_activity_put_replaces_stale_alerts() {
1219 let dir = tempdir().unwrap();
1220 let cache = MetadataCache::new(dir.path()).unwrap();
1221
1222 let path = PathBuf::from("/tmp/session.jsonl");
1223 let summary = make_summary_with_alerts(); let mtime = SystemTime::now();
1225
1226 cache
1227 .put_activity(&path, "test-session", &summary, mtime)
1228 .unwrap();
1229 assert_eq!(cache.get_all_alerts(None).unwrap().len(), 2);
1230
1231 let empty_summary = ActivitySummary::default();
1233 let new_mtime = mtime + std::time::Duration::from_secs(1);
1234 cache
1235 .put_activity(&path, "test-session", &empty_summary, new_mtime)
1236 .unwrap();
1237
1238 let alerts = cache.get_all_alerts(None).unwrap();
1240 assert_eq!(alerts.len(), 0, "Stale alerts should be replaced");
1241 }
1242}