Skip to main content

ccboard_core/cache/
metadata_cache.rs

1//! SQLite metadata cache for session files
2//!
3//! Caches session metadata with mtime-based invalidation for 90% startup speedup.
4//! Also caches activity analysis (tool calls, alerts) per session on demand.
5//!
6//! Schema:
7//! - session_metadata: parsed metadata + mtime + cache_version
8//! - activity_cache: serialized ActivitySummary + mtime per session file
9//! - activity_alerts: searchable alert records (severity/category) across all sessions
10//! - Indexes: project, mtime, session_id, severity for fast queries
11//!
12//! Invalidation:
13//! - File watcher detects modification → delete session + activity cache entries
14//! - Startup: compare mtime → rescan if stale
15//! - Startup: compare cache_version → auto-clear ALL tables if mismatch
16//!
17//! Cache Version History:
18//! - v1: Initial version (pre-TokenUsage fix)
19//! - v2: Fixed TokenUsage::total() to include cache_read_tokens + cache_write_tokens
20//! - v3: Added token breakdown fields (input_tokens, output_tokens, cache_creation_tokens,
21//!   cache_read_tokens) to SessionMetadata + real pricing calculation
22//! - v4: Added branch field to SessionMetadata
23//! - v5: Added activity_cache + activity_alerts tables for Phase 2 activity module
24//! - v6: Added aggregate_stats table with triggers + FTS5 session_fts table
25
26use crate::models::activity::ActivitySummary;
27use crate::models::SessionMetadata;
28use anyhow::{Context, Result};
29use rusqlite::{params, Connection, OptionalExtension};
30use std::path::{Path, PathBuf};
31use std::sync::Mutex;
32use std::time::SystemTime;
33use tracing::{debug, warn};
34
35/// Current cache version
36///
37/// **IMPORTANT**: Increment this version when changing how metadata is calculated:
38/// - TokenUsage fields added/removed
39/// - SessionMetadata structure changed
40/// - Parsing logic modified (e.g., token accumulation)
41///
42/// This triggers automatic cache invalidation on startup, preventing stale data bugs.
43/// All tables (session_metadata + activity) are cleared on version mismatch.
44///
45/// Version History:
46/// - v1: Initial version
47/// - v2: Fixed TokenUsage::total() calculation
48/// - v3: Added token breakdown fields
49/// - v4: Added branch field to SessionMetadata
50/// - v5: Added activity_cache + activity_alerts tables
51/// - v6: Added aggregate_stats table with triggers + FTS5 session_fts table
52const CACHE_VERSION: i32 = 6;
53
54/// SQLite-based metadata cache (thread-safe)
55pub struct MetadataCache {
56    conn: Mutex<Connection>,
57    #[allow(dead_code)]
58    cache_path: PathBuf,
59}
60
61impl MetadataCache {
62    /// Create or open cache database
63    pub fn new(cache_dir: &Path) -> Result<Self> {
64        std::fs::create_dir_all(cache_dir).with_context(|| {
65            format!("Failed to create cache directory: {}", cache_dir.display())
66        })?;
67
68        let cache_path = cache_dir.join("session-metadata.db");
69        let conn = Connection::open(&cache_path)
70            .with_context(|| format!("Failed to open cache database: {}", cache_path.display()))?;
71
72        // Enable WAL mode for better concurrency
73        conn.pragma_update(None, "journal_mode", "WAL")
74            .context("Failed to enable WAL mode")?;
75
76        // Initialize schema
77        conn.execute_batch(
78            r#"
79            CREATE TABLE IF NOT EXISTS cache_metadata (
80                key TEXT PRIMARY KEY,
81                value INTEGER NOT NULL
82            );
83
84            CREATE TABLE IF NOT EXISTS session_metadata (
85                path TEXT PRIMARY KEY,
86                mtime INTEGER NOT NULL,
87                project TEXT NOT NULL,
88                session_id TEXT NOT NULL,
89                first_timestamp TEXT,
90                last_timestamp TEXT,
91                message_count INTEGER NOT NULL,
92                total_tokens INTEGER NOT NULL,
93                models_used TEXT NOT NULL,
94                has_subagents INTEGER NOT NULL,
95                first_user_message TEXT,
96                data BLOB NOT NULL
97            );
98
99            CREATE INDEX IF NOT EXISTS idx_project ON session_metadata(project);
100            CREATE INDEX IF NOT EXISTS idx_mtime ON session_metadata(mtime);
101            CREATE INDEX IF NOT EXISTS idx_session_id ON session_metadata(session_id);
102
103            CREATE TABLE IF NOT EXISTS activity_cache (
104                session_path TEXT PRIMARY KEY,
105                mtime INTEGER NOT NULL,
106                session_id TEXT NOT NULL,
107                tool_call_count INTEGER NOT NULL DEFAULT 0,
108                alert_count INTEGER NOT NULL DEFAULT 0,
109                data BLOB NOT NULL
110            );
111
112            CREATE INDEX IF NOT EXISTS idx_activity_session_id ON activity_cache(session_id);
113            CREATE INDEX IF NOT EXISTS idx_activity_mtime ON activity_cache(mtime);
114
115            CREATE TABLE IF NOT EXISTS activity_alerts (
116                id INTEGER PRIMARY KEY AUTOINCREMENT,
117                session_path TEXT NOT NULL,
118                severity TEXT NOT NULL,
119                category TEXT NOT NULL,
120                timestamp TEXT NOT NULL,
121                detail TEXT NOT NULL
122            );
123
124            CREATE INDEX IF NOT EXISTS idx_alerts_session ON activity_alerts(session_path);
125            CREATE INDEX IF NOT EXISTS idx_alerts_severity ON activity_alerts(severity);
126
127            CREATE TABLE IF NOT EXISTS aggregate_stats (
128                key   TEXT PRIMARY KEY,
129                value INTEGER NOT NULL DEFAULT 0
130            );
131
132            INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES
133                ('total_sessions', 0),
134                ('total_messages', 0);
135
136            CREATE TRIGGER IF NOT EXISTS stats_ai
137            AFTER INSERT ON session_metadata BEGIN
138                UPDATE aggregate_stats SET value = value + 1 WHERE key = 'total_sessions';
139                UPDATE aggregate_stats SET value = value + new.message_count WHERE key = 'total_messages';
140            END;
141
142            CREATE TRIGGER IF NOT EXISTS stats_ad
143            AFTER DELETE ON session_metadata BEGIN
144                UPDATE aggregate_stats SET value = MAX(0, value - 1) WHERE key = 'total_sessions';
145                UPDATE aggregate_stats SET value = MAX(0, value - old.message_count) WHERE key = 'total_messages';
146            END;
147
148            CREATE VIRTUAL TABLE IF NOT EXISTS session_fts USING fts5(
149                session_id  UNINDEXED,
150                project     UNINDEXED,
151                first_user_message,
152                models_used,
153                content='session_metadata',
154                content_rowid='rowid',
155                tokenize='unicode61'
156            );
157
158            CREATE TRIGGER IF NOT EXISTS session_fts_ai
159            AFTER INSERT ON session_metadata BEGIN
160                INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
161                VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
162            END;
163
164            CREATE TRIGGER IF NOT EXISTS session_fts_ad
165            AFTER DELETE ON session_metadata BEGIN
166                INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
167                VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
168            END;
169
170            CREATE TRIGGER IF NOT EXISTS session_fts_au
171            AFTER UPDATE ON session_metadata BEGIN
172                INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
173                VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
174                INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
175                VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
176            END;
177            "#,
178        )
179        .context("Failed to create schema")?;
180
181        // Check cache version and auto-invalidate if mismatch
182        let stored_version: Option<i32> = conn
183            .query_row(
184                "SELECT value FROM cache_metadata WHERE key = 'version'",
185                [],
186                |row| row.get(0),
187            )
188            .optional()
189            .context("Failed to query cache version")?;
190
191        match stored_version {
192            Some(v) if v != CACHE_VERSION => {
193                warn!(
194                    stored = v,
195                    current = CACHE_VERSION,
196                    "Cache version mismatch detected, clearing stale cache"
197                );
198
199                // Clear all session and activity entries
200                conn.execute("DELETE FROM session_metadata", [])
201                    .context("Failed to clear stale session cache")?;
202                conn.execute("DELETE FROM activity_cache", [])
203                    .context("Failed to clear stale activity cache")?;
204                conn.execute("DELETE FROM activity_alerts", [])
205                    .context("Failed to clear stale activity alerts")?;
206                conn.execute("DELETE FROM aggregate_stats", [])
207                    .context("Failed to clear stale aggregate stats")?;
208                conn.execute(
209                    "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_sessions', 0)",
210                    [],
211                )
212                .context("Failed to reinitialize total_sessions")?;
213                conn.execute(
214                    "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_messages', 0)",
215                    [],
216                )
217                .context("Failed to reinitialize total_messages")?;
218
219                // Update version
220                conn.execute(
221                    "INSERT OR REPLACE INTO cache_metadata (key, value) VALUES ('version', ?)",
222                    params![CACHE_VERSION],
223                )
224                .context("Failed to update cache version")?;
225
226                debug!("Cache cleared and version updated to {}", CACHE_VERSION);
227            }
228            None => {
229                // First run, set version
230                conn.execute(
231                    "INSERT INTO cache_metadata (key, value) VALUES ('version', ?)",
232                    params![CACHE_VERSION],
233                )
234                .context("Failed to initialize cache version")?;
235
236                debug!("Cache version initialized to {}", CACHE_VERSION);
237            }
238            Some(_) => {
239                debug!("Cache version {} matches current", CACHE_VERSION);
240            }
241        }
242
243        let cache = Self {
244            conn: Mutex::new(conn),
245            cache_path: cache_path.clone(),
246        };
247
248        debug!(path = %cache_path.display(), "Metadata cache initialized");
249
250        Ok(cache)
251    }
252
253    /// Get cached metadata if fresh, otherwise None
254    pub fn get(&self, path: &Path, current_mtime: SystemTime) -> Result<Option<SessionMetadata>> {
255        let path_str = path.to_string_lossy();
256        let mtime_secs = current_mtime
257            .duration_since(SystemTime::UNIX_EPOCH)
258            .context("Invalid mtime")?
259            .as_secs();
260
261        let conn = self
262            .conn
263            .lock()
264            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
265
266        let result: Option<Vec<u8>> = conn
267            .query_row(
268                "SELECT data FROM session_metadata WHERE path = ? AND mtime = ?",
269                params![path_str.as_ref(), mtime_secs as i64],
270                |row| row.get(0),
271            )
272            .optional()
273            .context("Failed to query cache")?;
274
275        match result {
276            Some(bytes) => {
277                let meta: SessionMetadata = bincode::deserialize(&bytes)
278                    .context("Failed to deserialize cached metadata")?;
279                debug!(path = %path.display(), "Cache hit");
280                Ok(Some(meta))
281            }
282            None => {
283                debug!(path = %path.display(), "Cache miss");
284                Ok(None)
285            }
286        }
287    }
288
289    /// Store metadata in cache
290    pub fn put(&self, path: &Path, meta: &SessionMetadata, mtime: SystemTime) -> Result<()> {
291        let path_str = path.to_string_lossy();
292        let mtime_secs = mtime
293            .duration_since(SystemTime::UNIX_EPOCH)
294            .context("Invalid mtime")?
295            .as_secs();
296
297        let data = bincode::serialize(meta).context("Failed to serialize metadata")?;
298
299        // Extract searchable fields
300        let models_used =
301            serde_json::to_string(&meta.models_used).context("Failed to serialize models")?;
302
303        let conn = self
304            .conn
305            .lock()
306            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
307
308        // 1. Try to insert new row (only fires stats_ai trigger on NEW rows)
309        conn.execute(
310            r#"
311            INSERT OR IGNORE INTO session_metadata
312            (path, mtime, project, session_id, first_timestamp, last_timestamp,
313             message_count, total_tokens, models_used, has_subagents, first_user_message, data)
314            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
315            "#,
316            params![
317                path_str.as_ref(),
318                mtime_secs as i64,
319                meta.project_path.as_str(),
320                meta.id.as_str(),
321                meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
322                meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
323                meta.message_count as i64,
324                meta.total_tokens as i64,
325                models_used.as_str(),
326                if meta.has_subagents { 1 } else { 0 },
327                &meta.first_user_message,
328                &data,
329            ],
330        )
331        .context("Failed to insert metadata")?;
332
333        // 2. Update if row already existed (no INSERT trigger fires, no stats double-count)
334        conn.execute(
335            r#"
336            UPDATE session_metadata
337            SET mtime = ?, project = ?, session_id = ?, first_timestamp = ?, last_timestamp = ?,
338                message_count = ?, total_tokens = ?, models_used = ?, has_subagents = ?,
339                first_user_message = ?, data = ?
340            WHERE path = ? AND mtime != ?
341            "#,
342            params![
343                mtime_secs as i64,
344                meta.project_path.as_str(),
345                meta.id.as_str(),
346                meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
347                meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
348                meta.message_count as i64,
349                meta.total_tokens as i64,
350                models_used.as_str(),
351                if meta.has_subagents { 1 } else { 0 },
352                &meta.first_user_message,
353                &data,
354                path_str.as_ref(),
355                mtime_secs as i64,
356            ],
357        )
358        .context("Failed to update metadata")?;
359
360        debug!(path = %path.display(), "Metadata cached");
361        Ok(())
362    }
363
364    /// Invalidate cache entry for a path
365    pub fn invalidate(&self, path: &Path) -> Result<()> {
366        let path_str = path.to_string_lossy();
367
368        let conn = self
369            .conn
370            .lock()
371            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
372
373        conn.execute(
374            "DELETE FROM session_metadata WHERE path = ?",
375            params![path_str.as_ref()],
376        )
377        .context("Failed to delete cache entry")?;
378
379        debug!(path = %path.display(), "Cache entry invalidated");
380        Ok(())
381    }
382
383    /// Get all cached paths for a project
384    pub fn get_project_paths(&self, project: &str) -> Result<Vec<PathBuf>> {
385        let conn = self
386            .conn
387            .lock()
388            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
389
390        let mut stmt = conn
391            .prepare("SELECT path FROM session_metadata WHERE project = ?")
392            .context("Failed to prepare query")?;
393
394        let rows = stmt
395            .query_map(params![project], |row| {
396                let path_str: String = row.get(0)?;
397                Ok(PathBuf::from(path_str))
398            })
399            .context("Failed to query project paths")?;
400
401        let mut paths = Vec::new();
402        for row in rows {
403            paths.push(row.context("Failed to read row")?);
404        }
405
406        Ok(paths)
407    }
408
409    /// Get cache statistics
410    pub fn stats(&self) -> Result<CacheStats> {
411        let conn = self
412            .conn
413            .lock()
414            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
415
416        let total_entries: i64 = conn
417            .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
418                row.get(0)
419            })
420            .context("Failed to count entries")?;
421
422        let total_size: i64 = conn
423            .query_row(
424                "SELECT SUM(LENGTH(data)) FROM session_metadata",
425                [],
426                |row| row.get(0),
427            )
428            .unwrap_or(0);
429
430        let project_count: i64 = conn
431            .query_row(
432                "SELECT COUNT(DISTINCT project) FROM session_metadata",
433                [],
434                |row| row.get(0),
435            )
436            .context("Failed to count projects")?;
437
438        Ok(CacheStats {
439            total_entries: total_entries as usize,
440            total_size_bytes: total_size as usize,
441            project_count: project_count as usize,
442        })
443    }
444
445    /// Clear all cache entries (for testing or rebuild)
446    pub fn clear(&self) -> Result<()> {
447        let conn = self
448            .conn
449            .lock()
450            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
451
452        conn.execute("DELETE FROM session_metadata", [])
453            .context("Failed to clear cache")?;
454
455        debug!("Cache cleared");
456        Ok(())
457    }
458
459    /// Vacuum database to reclaim space
460    pub fn vacuum(&self) -> Result<()> {
461        let conn = self
462            .conn
463            .lock()
464            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
465
466        conn.execute("VACUUM", []).context("Failed to vacuum")?;
467
468        debug!("Database vacuumed");
469        Ok(())
470    }
471
472    // ─── Aggregate stats + FTS5 search methods ───────────────────────────────
473
474    /// Get aggregate session stats from O(1) table (total sessions + messages)
475    pub fn get_aggregate_stats(&self) -> Result<AggregateStats> {
476        let conn = self
477            .conn
478            .lock()
479            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
480
481        let mut stmt = conn
482            .prepare("SELECT key, value FROM aggregate_stats")
483            .context("Failed to prepare aggregate_stats query")?;
484
485        let mut total_sessions = 0usize;
486        let mut total_messages = 0usize;
487
488        let rows = stmt
489            .query_map([], |row| {
490                let key: String = row.get(0)?;
491                let value: i64 = row.get(1)?;
492                Ok((key, value))
493            })
494            .context("Failed to query aggregate_stats")?;
495
496        for row in rows {
497            let (key, value) = row.context("Failed to read aggregate_stats row")?;
498            match key.as_str() {
499                "total_sessions" => total_sessions = value.max(0) as usize,
500                "total_messages" => total_messages = value.max(0) as usize,
501                _ => {}
502            }
503        }
504
505        Ok(AggregateStats {
506            total_sessions,
507            total_messages,
508        })
509    }
510
511    /// Search sessions using FTS5 full-text search.
512    ///
513    /// Returns up to `limit` results ranked by relevance (BM25).
514    /// Returns empty vec (not error) if FTS5 index doesn't exist yet (graceful degradation).
515    pub fn search_sessions(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
516        if query.trim().is_empty() {
517            return Ok(Vec::new());
518        }
519
520        let conn = self
521            .conn
522            .lock()
523            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
524
525        // Check if FTS5 table exists (graceful degradation for old cache DBs)
526        let fts_exists: bool = conn
527            .query_row(
528                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='session_fts'",
529                [],
530                |row| row.get::<_, i64>(0),
531            )
532            .unwrap_or(0)
533            > 0;
534
535        if !fts_exists {
536            return Ok(Vec::new());
537        }
538
539        let mut stmt = conn
540            .prepare(
541                r#"
542                SELECT
543                    sm.path,
544                    sm.session_id,
545                    sm.project,
546                    sm.first_user_message,
547                    snippet(session_fts, 2, '[', ']', '...', 8) AS snippet,
548                    session_fts.rank
549                FROM session_fts
550                JOIN session_metadata sm ON session_fts.rowid = sm.rowid
551                WHERE session_fts MATCH ?
552                ORDER BY session_fts.rank
553                LIMIT ?
554                "#,
555            )
556            .context("Failed to prepare FTS5 search query")?;
557
558        let limit_i64 = limit as i64;
559        let rows = stmt
560            .query_map(params![query, limit_i64], |row| {
561                Ok(SearchResult {
562                    path: PathBuf::from(row.get::<_, String>(0)?),
563                    session_id: row.get(1)?,
564                    project: row.get(2)?,
565                    first_user_message: row.get(3)?,
566                    snippet: row.get(4)?,
567                    rank: row.get(5)?,
568                })
569            })
570            .context("Failed to execute FTS5 search")?;
571
572        let mut results = Vec::new();
573        for row in rows {
574            match row {
575                Ok(r) => results.push(r),
576                Err(e) => {
577                    warn!("FTS5 search row error: {}", e);
578                }
579            }
580        }
581
582        Ok(results)
583    }
584
585    /// Rebuild FTS5 index from existing session_metadata rows.
586    ///
587    /// Called once after cache version bump to populate FTS5 for existing sessions.
588    pub fn rebuild_fts_index(&self) -> Result<usize> {
589        let conn = self
590            .conn
591            .lock()
592            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
593
594        // Trigger FTS5 content table rebuild
595        conn.execute("INSERT INTO session_fts(session_fts) VALUES('rebuild')", [])
596            .context("Failed to trigger FTS5 rebuild")?;
597
598        // Count sessions indexed
599        let count: i64 = conn
600            .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
601                row.get(0)
602            })
603            .context("Failed to count sessions")?;
604
605        debug!("FTS5 index rebuilt for {} sessions", count);
606        Ok(count as usize)
607    }
608
609    // ─── Activity cache methods ───────────────────────────────────────────────
610
611    /// Get cached ActivitySummary if the session file mtime matches.
612    ///
613    /// Returns None on cache miss or mtime mismatch (session was modified).
614    pub fn get_activity(
615        &self,
616        path: &Path,
617        current_mtime: SystemTime,
618    ) -> Result<Option<ActivitySummary>> {
619        let path_str = path.to_string_lossy();
620        let mtime_secs = current_mtime
621            .duration_since(SystemTime::UNIX_EPOCH)
622            .context("Invalid mtime")?
623            .as_secs();
624
625        let conn = self
626            .conn
627            .lock()
628            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
629
630        let result: Option<Vec<u8>> = conn
631            .query_row(
632                "SELECT data FROM activity_cache WHERE session_path = ? AND mtime = ?",
633                params![path_str.as_ref(), mtime_secs as i64],
634                |row| row.get(0),
635            )
636            .optional()
637            .context("Failed to query activity cache")?;
638
639        match result {
640            Some(bytes) => {
641                let summary: ActivitySummary = bincode::deserialize(&bytes)
642                    .context("Failed to deserialize activity summary")?;
643                debug!(path = %path.display(), "Activity cache hit");
644                Ok(Some(summary))
645            }
646            None => {
647                debug!(path = %path.display(), "Activity cache miss");
648                Ok(None)
649            }
650        }
651    }
652
653    /// Store an ActivitySummary in cache, keyed by session file path + mtime.
654    ///
655    /// Also populates the `activity_alerts` table for cross-session alert queries.
656    pub fn put_activity(
657        &self,
658        path: &Path,
659        session_id: &str,
660        summary: &ActivitySummary,
661        mtime: SystemTime,
662    ) -> Result<()> {
663        let path_str = path.to_string_lossy();
664        let mtime_secs = mtime
665            .duration_since(SystemTime::UNIX_EPOCH)
666            .context("Invalid mtime")?
667            .as_secs();
668
669        let data = bincode::serialize(summary).context("Failed to serialize activity summary")?;
670
671        let conn = self
672            .conn
673            .lock()
674            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
675
676        // Wrap all writes in a transaction for atomicity.
677        // Without this, a crash between DELETE and re-insert leaves stale alerts.
678        conn.execute_batch("BEGIN IMMEDIATE")
679            .context("Failed to begin activity cache transaction")?;
680
681        let result = (|| -> anyhow::Result<()> {
682            // Upsert activity_cache
683            conn.execute(
684                r#"
685                INSERT OR REPLACE INTO activity_cache
686                (session_path, mtime, session_id, tool_call_count, alert_count, data)
687                VALUES (?, ?, ?, ?, ?, ?)
688                "#,
689                params![
690                    path_str.as_ref(),
691                    mtime_secs as i64,
692                    session_id,
693                    (summary.file_accesses.len()
694                        + summary.bash_commands.len()
695                        + summary.network_calls.len()) as i64,
696                    summary.alerts.len() as i64,
697                    &data,
698                ],
699            )
700            .context("Failed to insert activity cache entry")?;
701
702            // Refresh activity_alerts for this session (delete + re-insert)
703            conn.execute(
704                "DELETE FROM activity_alerts WHERE session_path = ?",
705                params![path_str.as_ref()],
706            )
707            .context("Failed to delete old activity alerts")?;
708
709            for alert in &summary.alerts {
710                let severity = format!("{:?}", alert.severity);
711                let category = format!("{:?}", alert.category);
712                conn.execute(
713                    r#"
714                    INSERT INTO activity_alerts (session_path, severity, category, timestamp, detail)
715                    VALUES (?, ?, ?, ?, ?)
716                    "#,
717                    params![
718                        path_str.as_ref(),
719                        severity,
720                        category,
721                        alert.timestamp.to_rfc3339(),
722                        &alert.detail,
723                    ],
724                )
725                .context("Failed to insert activity alert")?;
726            }
727
728            Ok(())
729        })();
730
731        match result {
732            Ok(()) => conn
733                .execute_batch("COMMIT")
734                .context("Failed to commit activity cache transaction")?,
735            Err(e) => {
736                let _ = conn.execute_batch("ROLLBACK");
737                return Err(e);
738            }
739        }
740
741        debug!(
742            path = %path.display(),
743            alerts = summary.alerts.len(),
744            "Activity summary cached"
745        );
746        Ok(())
747    }
748
749    /// Invalidate activity cache entry for a session file.
750    ///
751    /// Called by file watcher when a session file is modified.
752    pub fn invalidate_activity(&self, path: &Path) -> Result<()> {
753        let path_str = path.to_string_lossy();
754
755        let conn = self
756            .conn
757            .lock()
758            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
759
760        conn.execute(
761            "DELETE FROM activity_cache WHERE session_path = ?",
762            params![path_str.as_ref()],
763        )
764        .context("Failed to delete activity cache entry")?;
765
766        conn.execute(
767            "DELETE FROM activity_alerts WHERE session_path = ?",
768            params![path_str.as_ref()],
769        )
770        .context("Failed to delete activity alerts")?;
771
772        debug!(path = %path.display(), "Activity cache invalidated");
773        Ok(())
774    }
775
776    /// Get all stored alerts, optionally filtered by minimum severity.
777    ///
778    /// Useful for a global alert view across all analyzed sessions.
779    pub fn get_all_alerts(&self, min_severity: Option<&str>) -> Result<Vec<StoredAlert>> {
780        let conn = self
781            .conn
782            .lock()
783            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
784
785        // Build a query that respects severity hierarchy: Critical > Warning > Info.
786        // "Warning" means Warning + Critical; "Critical" means Critical only; None means all.
787        let query = match min_severity {
788            Some("Critical") => "SELECT session_path, severity, category, timestamp, detail \
789                 FROM activity_alerts WHERE severity = 'Critical' ORDER BY timestamp DESC",
790            Some("Warning") => "SELECT session_path, severity, category, timestamp, detail \
791                 FROM activity_alerts WHERE severity IN ('Warning', 'Critical') ORDER BY timestamp DESC",
792            _ => "SELECT session_path, severity, category, timestamp, detail \
793                 FROM activity_alerts ORDER BY timestamp DESC",
794        };
795
796        let mut stmt = conn
797            .prepare(query)
798            .context("Failed to prepare alert query")?;
799
800        let rows = stmt
801            .query_map([], |row| {
802                Ok(StoredAlert {
803                    session_path: row.get(0)?,
804                    severity: row.get(1)?,
805                    category: row.get(2)?,
806                    timestamp: row.get(3)?,
807                    detail: row.get(4)?,
808                })
809            })
810            .context("Failed to query alerts")?
811            .collect::<Result<Vec<_>, _>>()
812            .context("Failed to collect alerts")?;
813
814        Ok(rows)
815    }
816
817    /// Get activity cache statistics
818    pub fn activity_stats(&self) -> Result<ActivityCacheStats> {
819        let conn = self
820            .conn
821            .lock()
822            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
823
824        let analyzed_sessions: i64 = conn
825            .query_row("SELECT COUNT(*) FROM activity_cache", [], |row| row.get(0))
826            .context("Failed to count activity cache entries")?;
827
828        let total_alerts: i64 = conn
829            .query_row("SELECT COUNT(*) FROM activity_alerts", [], |row| row.get(0))
830            .context("Failed to count alerts")?;
831
832        let critical_alerts: i64 = conn
833            .query_row(
834                "SELECT COUNT(*) FROM activity_alerts WHERE severity = 'Critical'",
835                [],
836                |row| row.get(0),
837            )
838            .context("Failed to count critical alerts")?;
839
840        Ok(ActivityCacheStats {
841            analyzed_sessions: analyzed_sessions as usize,
842            total_alerts: total_alerts as usize,
843            critical_alerts: critical_alerts as usize,
844        })
845    }
846}
847
848impl Drop for MetadataCache {
849    fn drop(&mut self) {
850        // WAL checkpoint on drop to ensure all data is flushed to main database file
851        // and WAL file doesn't grow unbounded across restarts
852        if let Ok(conn) = self.conn.lock() {
853            if let Err(e) = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE") {
854                warn!("Failed to checkpoint WAL on MetadataCache drop: {}", e);
855            } else {
856                debug!("WAL checkpoint completed on MetadataCache drop");
857            }
858        }
859    }
860}
861
862/// Session metadata cache statistics
863#[derive(Debug, Clone)]
864pub struct CacheStats {
865    pub total_entries: usize,
866    pub total_size_bytes: usize,
867    pub project_count: usize,
868}
869
870/// Aggregate statistics from O(1) table
871#[derive(Debug, Clone, Default)]
872pub struct AggregateStats {
873    pub total_sessions: usize,
874    pub total_messages: usize,
875}
876
877/// A single FTS5 search result
878#[derive(Debug, Clone)]
879pub struct SearchResult {
880    pub path: PathBuf,
881    pub session_id: String,
882    pub project: Option<String>,
883    pub first_user_message: Option<String>,
884    pub snippet: Option<String>,
885    pub rank: f64,
886}
887
888/// Activity cache statistics
889#[derive(Debug, Clone)]
890pub struct ActivityCacheStats {
891    pub analyzed_sessions: usize,
892    pub total_alerts: usize,
893    pub critical_alerts: usize,
894}
895
896/// A single alert record from the activity_alerts table
897#[derive(Debug, Clone)]
898pub struct StoredAlert {
899    pub session_path: String,
900    pub severity: String,
901    pub category: String,
902    pub timestamp: String,
903    pub detail: String,
904}
905
906impl CacheStats {
907    pub fn hit_rate(&self, scanned: usize) -> f64 {
908        if scanned == 0 {
909            return 0.0;
910        }
911        (self.total_entries as f64) / (scanned as f64)
912    }
913}
914
915#[cfg(test)]
916mod tests {
917    use super::*;
918    use crate::models::SessionMetadata;
919    use chrono::Utc;
920    use tempfile::tempdir;
921
922    #[test]
923    fn test_cache_creation() {
924        let dir = tempdir().unwrap();
925        let cache = MetadataCache::new(dir.path()).unwrap();
926
927        let stats = cache.stats().unwrap();
928        assert_eq!(stats.total_entries, 0);
929    }
930
931    #[test]
932    fn test_cache_put_get() {
933        let dir = tempdir().unwrap();
934        let cache = MetadataCache::new(dir.path()).unwrap();
935
936        let path = PathBuf::from("/tmp/test.jsonl");
937        let mut meta = SessionMetadata::from_path(path.clone(), "/test".into());
938        meta.id = "test-123".into();
939        meta.message_count = 42;
940        meta.total_tokens = 1000;
941        meta.models_used = vec!["sonnet".to_string()].into_iter().collect();
942        meta.first_timestamp = Some(Utc::now());
943
944        let mtime = SystemTime::now();
945
946        // Put
947        cache.put(&path, &meta, mtime).unwrap();
948
949        // Get with same mtime (hit)
950        let cached = cache.get(&path, mtime).unwrap();
951        assert!(cached.is_some());
952        let cached = cached.unwrap();
953        assert_eq!(cached.id, "test-123");
954        assert_eq!(cached.message_count, 42);
955
956        // Get with different mtime (miss)
957        let old_mtime = mtime - std::time::Duration::from_secs(3600);
958        let cached = cache.get(&path, old_mtime).unwrap();
959        assert!(cached.is_none());
960    }
961
962    #[test]
963    fn test_cache_invalidate() {
964        let dir = tempdir().unwrap();
965        let cache = MetadataCache::new(dir.path()).unwrap();
966
967        let path = PathBuf::from("/tmp/test.jsonl");
968        let meta = SessionMetadata::from_path(path.clone(), "/test".into());
969        let mtime = SystemTime::now();
970
971        cache.put(&path, &meta, mtime).unwrap();
972
973        // Invalidate
974        cache.invalidate(&path).unwrap();
975
976        // Should be gone
977        let cached = cache.get(&path, mtime).unwrap();
978        assert!(cached.is_none());
979    }
980
981    #[test]
982    fn test_cache_project_paths() {
983        let dir = tempdir().unwrap();
984        let cache = MetadataCache::new(dir.path()).unwrap();
985
986        let mtime = SystemTime::now();
987
988        // Add sessions for two projects
989        for i in 0..3 {
990            let path = PathBuf::from(format!("/tmp/project1/session{}.jsonl", i));
991            let meta = SessionMetadata::from_path(path.clone(), "/project1".into());
992            cache.put(&path, &meta, mtime).unwrap();
993        }
994
995        for i in 0..2 {
996            let path = PathBuf::from(format!("/tmp/project2/session{}.jsonl", i));
997            let meta = SessionMetadata::from_path(path.clone(), "/project2".into());
998            cache.put(&path, &meta, mtime).unwrap();
999        }
1000
1001        // Get project1 paths
1002        let paths = cache.get_project_paths("/project1").unwrap();
1003        assert_eq!(paths.len(), 3);
1004
1005        // Get project2 paths
1006        let paths = cache.get_project_paths("/project2").unwrap();
1007        assert_eq!(paths.len(), 2);
1008    }
1009
1010    #[test]
1011    fn test_cache_stats() {
1012        let dir = tempdir().unwrap();
1013        let cache = MetadataCache::new(dir.path()).unwrap();
1014
1015        let mtime = SystemTime::now();
1016
1017        // Add some entries
1018        for i in 0..10 {
1019            let path = PathBuf::from(format!("/tmp/session{}.jsonl", i));
1020            let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1021            cache.put(&path, &meta, mtime).unwrap();
1022        }
1023
1024        let stats = cache.stats().unwrap();
1025        assert_eq!(stats.total_entries, 10);
1026        assert!(stats.total_size_bytes > 0);
1027        assert_eq!(stats.project_count, 1);
1028    }
1029
1030    #[test]
1031    fn test_cache_clear() {
1032        let dir = tempdir().unwrap();
1033        let cache = MetadataCache::new(dir.path()).unwrap();
1034
1035        let path = PathBuf::from("/tmp/test.jsonl");
1036        let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1037        cache.put(&path, &meta, SystemTime::now()).unwrap();
1038
1039        assert_eq!(cache.stats().unwrap().total_entries, 1);
1040
1041        cache.clear().unwrap();
1042
1043        assert_eq!(cache.stats().unwrap().total_entries, 0);
1044    }
1045
1046    // ── activity cache tests ─────────────────────────────────────────────────
1047
1048    fn make_summary_with_alerts() -> ActivitySummary {
1049        use crate::models::activity::{Alert, AlertCategory, AlertSeverity};
1050        use chrono::Utc;
1051
1052        ActivitySummary {
1053            file_accesses: vec![],
1054            bash_commands: vec![],
1055            network_calls: vec![],
1056            alerts: vec![
1057                Alert {
1058                    session_id: "test-session".to_string(),
1059                    timestamp: Utc::now(),
1060                    severity: AlertSeverity::Critical,
1061                    category: AlertCategory::DestructiveCommand,
1062                    detail: "rm -rf /tmp".to_string(),
1063                },
1064                Alert {
1065                    session_id: "test-session".to_string(),
1066                    timestamp: Utc::now(),
1067                    severity: AlertSeverity::Warning,
1068                    category: AlertCategory::CredentialAccess,
1069                    detail: "Accessed .env".to_string(),
1070                },
1071            ],
1072        }
1073    }
1074
1075    #[test]
1076    fn test_activity_put_get_hit() {
1077        let dir = tempdir().unwrap();
1078        let cache = MetadataCache::new(dir.path()).unwrap();
1079
1080        let path = PathBuf::from("/tmp/session.jsonl");
1081        let summary = make_summary_with_alerts();
1082        let mtime = SystemTime::now();
1083
1084        cache
1085            .put_activity(&path, "test-session", &summary, mtime)
1086            .unwrap();
1087
1088        let cached = cache.get_activity(&path, mtime).unwrap();
1089        assert!(cached.is_some(), "Should be a cache hit");
1090        let cached = cached.unwrap();
1091        assert_eq!(cached.alerts.len(), 2);
1092    }
1093
1094    #[test]
1095    fn test_activity_get_miss_on_mtime_change() {
1096        let dir = tempdir().unwrap();
1097        let cache = MetadataCache::new(dir.path()).unwrap();
1098
1099        let path = PathBuf::from("/tmp/session.jsonl");
1100        let summary = make_summary_with_alerts();
1101        let mtime = SystemTime::now();
1102
1103        cache
1104            .put_activity(&path, "test-session", &summary, mtime)
1105            .unwrap();
1106
1107        // Different mtime → miss
1108        let stale_mtime = mtime - std::time::Duration::from_secs(60);
1109        let cached = cache.get_activity(&path, stale_mtime).unwrap();
1110        assert!(cached.is_none(), "Should be a cache miss on mtime change");
1111    }
1112
1113    #[test]
1114    fn test_activity_invalidate() {
1115        let dir = tempdir().unwrap();
1116        let cache = MetadataCache::new(dir.path()).unwrap();
1117
1118        let path = PathBuf::from("/tmp/session.jsonl");
1119        let summary = make_summary_with_alerts();
1120        let mtime = SystemTime::now();
1121
1122        cache
1123            .put_activity(&path, "test-session", &summary, mtime)
1124            .unwrap();
1125
1126        // Verify stored
1127        assert!(cache.get_activity(&path, mtime).unwrap().is_some());
1128
1129        // Invalidate
1130        cache.invalidate_activity(&path).unwrap();
1131
1132        // Should be gone
1133        assert!(
1134            cache.get_activity(&path, mtime).unwrap().is_none(),
1135            "Should be gone after invalidation"
1136        );
1137
1138        // Alerts should also be cleared
1139        let alerts = cache.get_all_alerts(None).unwrap();
1140        assert!(
1141            alerts.is_empty(),
1142            "Alerts should be cleared with activity cache"
1143        );
1144    }
1145
1146    #[test]
1147    fn test_get_all_alerts_returns_stored_alerts() {
1148        let dir = tempdir().unwrap();
1149        let cache = MetadataCache::new(dir.path()).unwrap();
1150
1151        let path = PathBuf::from("/tmp/session.jsonl");
1152        let summary = make_summary_with_alerts();
1153        let mtime = SystemTime::now();
1154
1155        cache
1156            .put_activity(&path, "test-session", &summary, mtime)
1157            .unwrap();
1158
1159        let alerts = cache.get_all_alerts(None).unwrap();
1160        assert_eq!(alerts.len(), 2, "Should return both alerts");
1161
1162        let critical: Vec<_> = alerts.iter().filter(|a| a.severity == "Critical").collect();
1163        assert_eq!(critical.len(), 1);
1164        assert!(critical[0].detail.contains("rm -rf"));
1165    }
1166
1167    #[test]
1168    fn test_get_all_alerts_filter_by_severity() {
1169        let dir = tempdir().unwrap();
1170        let cache = MetadataCache::new(dir.path()).unwrap();
1171
1172        let path = PathBuf::from("/tmp/session.jsonl");
1173        let summary = make_summary_with_alerts();
1174        let mtime = SystemTime::now();
1175
1176        cache
1177            .put_activity(&path, "test-session", &summary, mtime)
1178            .unwrap();
1179
1180        let critical_only = cache.get_all_alerts(Some("Critical")).unwrap();
1181        assert_eq!(critical_only.len(), 1);
1182        assert_eq!(critical_only[0].severity, "Critical");
1183    }
1184
1185    #[test]
1186    fn test_activity_stats() {
1187        let dir = tempdir().unwrap();
1188        let cache = MetadataCache::new(dir.path()).unwrap();
1189
1190        let stats = cache.activity_stats().unwrap();
1191        assert_eq!(stats.analyzed_sessions, 0);
1192        assert_eq!(stats.total_alerts, 0);
1193
1194        let path = PathBuf::from("/tmp/session.jsonl");
1195        let summary = make_summary_with_alerts();
1196        cache
1197            .put_activity(&path, "test-session", &summary, SystemTime::now())
1198            .unwrap();
1199
1200        let stats = cache.activity_stats().unwrap();
1201        assert_eq!(stats.analyzed_sessions, 1);
1202        assert_eq!(stats.total_alerts, 2);
1203        assert_eq!(stats.critical_alerts, 1);
1204    }
1205
1206    #[test]
1207    fn test_activity_put_replaces_stale_alerts() {
1208        let dir = tempdir().unwrap();
1209        let cache = MetadataCache::new(dir.path()).unwrap();
1210
1211        let path = PathBuf::from("/tmp/session.jsonl");
1212        let summary = make_summary_with_alerts(); // 2 alerts
1213        let mtime = SystemTime::now();
1214
1215        cache
1216            .put_activity(&path, "test-session", &summary, mtime)
1217            .unwrap();
1218        assert_eq!(cache.get_all_alerts(None).unwrap().len(), 2);
1219
1220        // Re-put with a clean summary (0 alerts)
1221        let empty_summary = ActivitySummary::default();
1222        let new_mtime = mtime + std::time::Duration::from_secs(1);
1223        cache
1224            .put_activity(&path, "test-session", &empty_summary, new_mtime)
1225            .unwrap();
1226
1227        // Old alerts should be gone
1228        let alerts = cache.get_all_alerts(None).unwrap();
1229        assert_eq!(alerts.len(), 0, "Stale alerts should be replaced");
1230    }
1231}