Skip to main content

ccboard_core/cache/
metadata_cache.rs

1//! SQLite metadata cache for session files
2//!
3//! Caches session metadata with mtime-based invalidation for 90% startup speedup.
4//! Also caches activity analysis (tool calls, alerts) per session on demand.
5//!
6//! Schema:
7//! - session_metadata: parsed metadata + mtime + cache_version
8//! - activity_cache: serialized ActivitySummary + mtime per session file
9//! - activity_alerts: searchable alert records (severity/category) across all sessions
10//! - Indexes: project, mtime, session_id, severity for fast queries
11//!
12//! Invalidation:
13//! - File watcher detects modification → delete session + activity cache entries
14//! - Startup: compare mtime → rescan if stale
15//! - Startup: compare cache_version → auto-clear ALL tables if mismatch
16//!
17//! Cache Version History:
18//! - v1: Initial version (pre-TokenUsage fix)
19//! - v2: Fixed TokenUsage::total() to include cache_read_tokens + cache_write_tokens
20//! - v3: Added token breakdown fields (input_tokens, output_tokens, cache_creation_tokens,
21//!   cache_read_tokens) to SessionMetadata + real pricing calculation
22//! - v4: Added branch field to SessionMetadata
23//! - v5: Added activity_cache + activity_alerts tables for Phase 2 activity module
24//! - v6: Added aggregate_stats table with triggers + FTS5 session_fts table
25//! - v7: Added tool_token_usage field to SessionMetadata (Phase K analytics)
26
27use crate::models::activity::ActivitySummary;
28use crate::models::SessionMetadata;
29use anyhow::{Context, Result};
30use rusqlite::{params, Connection, OptionalExtension};
31use std::path::{Path, PathBuf};
32use std::sync::Mutex;
33use std::time::SystemTime;
34use tracing::{debug, warn};
35
36/// Current cache version
37///
38/// **IMPORTANT**: Increment this version when changing how metadata is calculated:
39/// - TokenUsage fields added/removed
40/// - SessionMetadata structure changed
41/// - Parsing logic modified (e.g., token accumulation)
42///
43/// This triggers automatic cache invalidation on startup, preventing stale data bugs.
44/// All tables (session_metadata + activity) are cleared on version mismatch.
45///
46/// Version History:
47/// - v1: Initial version
48/// - v2: Fixed TokenUsage::total() calculation
49/// - v3: Added token breakdown fields
50/// - v4: Added branch field to SessionMetadata
51/// - v5: Added activity_cache + activity_alerts tables
52/// - v6: Added aggregate_stats table with triggers + FTS5 session_fts table
53/// - v7: Added tool_token_usage field to SessionMetadata (Phase K analytics)
54/// - v8: Added lines_added/lines_removed fields to SessionMetadata (code metrics)
55const CACHE_VERSION: i32 = 8;
56
57/// SQLite-based metadata cache (thread-safe)
58pub struct MetadataCache {
59    conn: Mutex<Connection>,
60    #[allow(dead_code)]
61    cache_path: PathBuf,
62}
63
64impl MetadataCache {
65    /// Create or open cache database
66    pub fn new(cache_dir: &Path) -> Result<Self> {
67        std::fs::create_dir_all(cache_dir).with_context(|| {
68            format!("Failed to create cache directory: {}", cache_dir.display())
69        })?;
70
71        let cache_path = cache_dir.join("session-metadata.db");
72        let conn = Connection::open(&cache_path)
73            .with_context(|| format!("Failed to open cache database: {}", cache_path.display()))?;
74
75        // Enable WAL mode for better concurrency
76        conn.pragma_update(None, "journal_mode", "WAL")
77            .context("Failed to enable WAL mode")?;
78
79        // Initialize schema
80        conn.execute_batch(
81            r#"
82            CREATE TABLE IF NOT EXISTS cache_metadata (
83                key TEXT PRIMARY KEY,
84                value INTEGER NOT NULL
85            );
86
87            CREATE TABLE IF NOT EXISTS session_metadata (
88                path TEXT PRIMARY KEY,
89                mtime INTEGER NOT NULL,
90                project TEXT NOT NULL,
91                session_id TEXT NOT NULL,
92                first_timestamp TEXT,
93                last_timestamp TEXT,
94                message_count INTEGER NOT NULL,
95                total_tokens INTEGER NOT NULL,
96                models_used TEXT NOT NULL,
97                has_subagents INTEGER NOT NULL,
98                first_user_message TEXT,
99                data BLOB NOT NULL
100            );
101
102            CREATE INDEX IF NOT EXISTS idx_project ON session_metadata(project);
103            CREATE INDEX IF NOT EXISTS idx_mtime ON session_metadata(mtime);
104            CREATE INDEX IF NOT EXISTS idx_session_id ON session_metadata(session_id);
105
106            CREATE TABLE IF NOT EXISTS activity_cache (
107                session_path TEXT PRIMARY KEY,
108                mtime INTEGER NOT NULL,
109                session_id TEXT NOT NULL,
110                tool_call_count INTEGER NOT NULL DEFAULT 0,
111                alert_count INTEGER NOT NULL DEFAULT 0,
112                data BLOB NOT NULL
113            );
114
115            CREATE INDEX IF NOT EXISTS idx_activity_session_id ON activity_cache(session_id);
116            CREATE INDEX IF NOT EXISTS idx_activity_mtime ON activity_cache(mtime);
117
118            CREATE TABLE IF NOT EXISTS activity_alerts (
119                id INTEGER PRIMARY KEY AUTOINCREMENT,
120                session_path TEXT NOT NULL,
121                severity TEXT NOT NULL,
122                category TEXT NOT NULL,
123                timestamp TEXT NOT NULL,
124                detail TEXT NOT NULL
125            );
126
127            CREATE INDEX IF NOT EXISTS idx_alerts_session ON activity_alerts(session_path);
128            CREATE INDEX IF NOT EXISTS idx_alerts_severity ON activity_alerts(severity);
129
130            CREATE TABLE IF NOT EXISTS aggregate_stats (
131                key   TEXT PRIMARY KEY,
132                value INTEGER NOT NULL DEFAULT 0
133            );
134
135            INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES
136                ('total_sessions', 0),
137                ('total_messages', 0);
138
139            CREATE TRIGGER IF NOT EXISTS stats_ai
140            AFTER INSERT ON session_metadata BEGIN
141                UPDATE aggregate_stats SET value = value + 1 WHERE key = 'total_sessions';
142                UPDATE aggregate_stats SET value = value + new.message_count WHERE key = 'total_messages';
143            END;
144
145            CREATE TRIGGER IF NOT EXISTS stats_ad
146            AFTER DELETE ON session_metadata BEGIN
147                UPDATE aggregate_stats SET value = MAX(0, value - 1) WHERE key = 'total_sessions';
148                UPDATE aggregate_stats SET value = MAX(0, value - old.message_count) WHERE key = 'total_messages';
149            END;
150
151            CREATE VIRTUAL TABLE IF NOT EXISTS session_fts USING fts5(
152                session_id  UNINDEXED,
153                project     UNINDEXED,
154                first_user_message,
155                models_used,
156                content='session_metadata',
157                content_rowid='rowid',
158                tokenize='unicode61'
159            );
160
161            CREATE TRIGGER IF NOT EXISTS session_fts_ai
162            AFTER INSERT ON session_metadata BEGIN
163                INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
164                VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
165            END;
166
167            CREATE TRIGGER IF NOT EXISTS session_fts_ad
168            AFTER DELETE ON session_metadata BEGIN
169                INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
170                VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
171            END;
172
173            CREATE TRIGGER IF NOT EXISTS session_fts_au
174            AFTER UPDATE ON session_metadata BEGIN
175                INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
176                VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
177                INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
178                VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
179            END;
180            "#,
181        )
182        .context("Failed to create schema")?;
183
184        // Check cache version and auto-invalidate if mismatch
185        let stored_version: Option<i32> = conn
186            .query_row(
187                "SELECT value FROM cache_metadata WHERE key = 'version'",
188                [],
189                |row| row.get(0),
190            )
191            .optional()
192            .context("Failed to query cache version")?;
193
194        match stored_version {
195            Some(v) if v != CACHE_VERSION => {
196                warn!(
197                    stored = v,
198                    current = CACHE_VERSION,
199                    "Cache version mismatch detected, clearing stale cache"
200                );
201
202                // Clear all session and activity entries
203                conn.execute("DELETE FROM session_metadata", [])
204                    .context("Failed to clear stale session cache")?;
205                conn.execute("DELETE FROM activity_cache", [])
206                    .context("Failed to clear stale activity cache")?;
207                conn.execute("DELETE FROM activity_alerts", [])
208                    .context("Failed to clear stale activity alerts")?;
209                conn.execute("DELETE FROM aggregate_stats", [])
210                    .context("Failed to clear stale aggregate stats")?;
211                conn.execute(
212                    "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_sessions', 0)",
213                    [],
214                )
215                .context("Failed to reinitialize total_sessions")?;
216                conn.execute(
217                    "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_messages', 0)",
218                    [],
219                )
220                .context("Failed to reinitialize total_messages")?;
221
222                // Update version
223                conn.execute(
224                    "INSERT OR REPLACE INTO cache_metadata (key, value) VALUES ('version', ?)",
225                    params![CACHE_VERSION],
226                )
227                .context("Failed to update cache version")?;
228
229                debug!("Cache cleared and version updated to {}", CACHE_VERSION);
230            }
231            None => {
232                // First run, set version
233                conn.execute(
234                    "INSERT INTO cache_metadata (key, value) VALUES ('version', ?)",
235                    params![CACHE_VERSION],
236                )
237                .context("Failed to initialize cache version")?;
238
239                debug!("Cache version initialized to {}", CACHE_VERSION);
240            }
241            Some(_) => {
242                debug!("Cache version {} matches current", CACHE_VERSION);
243            }
244        }
245
246        let cache = Self {
247            conn: Mutex::new(conn),
248            cache_path: cache_path.clone(),
249        };
250
251        debug!(path = %cache_path.display(), "Metadata cache initialized");
252
253        Ok(cache)
254    }
255
256    /// Get cached metadata if fresh, otherwise None
257    pub fn get(&self, path: &Path, current_mtime: SystemTime) -> Result<Option<SessionMetadata>> {
258        let path_str = path.to_string_lossy();
259        let mtime_secs = current_mtime
260            .duration_since(SystemTime::UNIX_EPOCH)
261            .context("Invalid mtime")?
262            .as_secs();
263
264        let conn = self
265            .conn
266            .lock()
267            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
268
269        let result: Option<Vec<u8>> = conn
270            .query_row(
271                "SELECT data FROM session_metadata WHERE path = ? AND mtime = ?",
272                params![path_str.as_ref(), mtime_secs as i64],
273                |row| row.get(0),
274            )
275            .optional()
276            .context("Failed to query cache")?;
277
278        match result {
279            Some(bytes) => {
280                let meta: SessionMetadata = bincode::deserialize(&bytes)
281                    .context("Failed to deserialize cached metadata")?;
282                debug!(path = %path.display(), "Cache hit");
283                Ok(Some(meta))
284            }
285            None => {
286                debug!(path = %path.display(), "Cache miss");
287                Ok(None)
288            }
289        }
290    }
291
292    /// Store metadata in cache
293    pub fn put(&self, path: &Path, meta: &SessionMetadata, mtime: SystemTime) -> Result<()> {
294        let path_str = path.to_string_lossy();
295        let mtime_secs = mtime
296            .duration_since(SystemTime::UNIX_EPOCH)
297            .context("Invalid mtime")?
298            .as_secs();
299
300        let data = bincode::serialize(meta).context("Failed to serialize metadata")?;
301
302        // Extract searchable fields
303        let models_used =
304            serde_json::to_string(&meta.models_used).context("Failed to serialize models")?;
305
306        let conn = self
307            .conn
308            .lock()
309            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
310
311        // 1. Try to insert new row (only fires stats_ai trigger on NEW rows)
312        conn.execute(
313            r#"
314            INSERT OR IGNORE INTO session_metadata
315            (path, mtime, project, session_id, first_timestamp, last_timestamp,
316             message_count, total_tokens, models_used, has_subagents, first_user_message, data)
317            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
318            "#,
319            params![
320                path_str.as_ref(),
321                mtime_secs as i64,
322                meta.project_path.as_str(),
323                meta.id.as_str(),
324                meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
325                meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
326                meta.message_count as i64,
327                meta.total_tokens as i64,
328                models_used.as_str(),
329                if meta.has_subagents { 1 } else { 0 },
330                &meta.first_user_message,
331                &data,
332            ],
333        )
334        .context("Failed to insert metadata")?;
335
336        // 2. Update if row already existed (no INSERT trigger fires, no stats double-count)
337        conn.execute(
338            r#"
339            UPDATE session_metadata
340            SET mtime = ?, project = ?, session_id = ?, first_timestamp = ?, last_timestamp = ?,
341                message_count = ?, total_tokens = ?, models_used = ?, has_subagents = ?,
342                first_user_message = ?, data = ?
343            WHERE path = ? AND mtime != ?
344            "#,
345            params![
346                mtime_secs as i64,
347                meta.project_path.as_str(),
348                meta.id.as_str(),
349                meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
350                meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
351                meta.message_count as i64,
352                meta.total_tokens as i64,
353                models_used.as_str(),
354                if meta.has_subagents { 1 } else { 0 },
355                &meta.first_user_message,
356                &data,
357                path_str.as_ref(),
358                mtime_secs as i64,
359            ],
360        )
361        .context("Failed to update metadata")?;
362
363        debug!(path = %path.display(), "Metadata cached");
364        Ok(())
365    }
366
367    /// Invalidate cache entry for a path
368    pub fn invalidate(&self, path: &Path) -> Result<()> {
369        let path_str = path.to_string_lossy();
370
371        let conn = self
372            .conn
373            .lock()
374            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
375
376        conn.execute(
377            "DELETE FROM session_metadata WHERE path = ?",
378            params![path_str.as_ref()],
379        )
380        .context("Failed to delete cache entry")?;
381
382        debug!(path = %path.display(), "Cache entry invalidated");
383        Ok(())
384    }
385
386    /// Get all cached paths for a project
387    pub fn get_project_paths(&self, project: &str) -> Result<Vec<PathBuf>> {
388        let conn = self
389            .conn
390            .lock()
391            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
392
393        let mut stmt = conn
394            .prepare("SELECT path FROM session_metadata WHERE project = ?")
395            .context("Failed to prepare query")?;
396
397        let rows = stmt
398            .query_map(params![project], |row| {
399                let path_str: String = row.get(0)?;
400                Ok(PathBuf::from(path_str))
401            })
402            .context("Failed to query project paths")?;
403
404        let mut paths = Vec::new();
405        for row in rows {
406            paths.push(row.context("Failed to read row")?);
407        }
408
409        Ok(paths)
410    }
411
412    /// Get cache statistics
413    pub fn stats(&self) -> Result<CacheStats> {
414        let conn = self
415            .conn
416            .lock()
417            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
418
419        let total_entries: i64 = conn
420            .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
421                row.get(0)
422            })
423            .context("Failed to count entries")?;
424
425        let total_size: i64 = conn
426            .query_row(
427                "SELECT SUM(LENGTH(data)) FROM session_metadata",
428                [],
429                |row| row.get(0),
430            )
431            .unwrap_or(0);
432
433        let project_count: i64 = conn
434            .query_row(
435                "SELECT COUNT(DISTINCT project) FROM session_metadata",
436                [],
437                |row| row.get(0),
438            )
439            .context("Failed to count projects")?;
440
441        Ok(CacheStats {
442            total_entries: total_entries as usize,
443            total_size_bytes: total_size as usize,
444            project_count: project_count as usize,
445        })
446    }
447
448    /// Clear all cache entries (for testing or rebuild)
449    pub fn clear(&self) -> Result<()> {
450        let conn = self
451            .conn
452            .lock()
453            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
454
455        conn.execute("DELETE FROM session_metadata", [])
456            .context("Failed to clear cache")?;
457
458        debug!("Cache cleared");
459        Ok(())
460    }
461
462    /// Vacuum database to reclaim space
463    pub fn vacuum(&self) -> Result<()> {
464        let conn = self
465            .conn
466            .lock()
467            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
468
469        conn.execute("VACUUM", []).context("Failed to vacuum")?;
470
471        debug!("Database vacuumed");
472        Ok(())
473    }
474
475    // ─── Aggregate stats + FTS5 search methods ───────────────────────────────
476
477    /// Get aggregate session stats from O(1) table (total sessions + messages)
478    pub fn get_aggregate_stats(&self) -> Result<AggregateStats> {
479        let conn = self
480            .conn
481            .lock()
482            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
483
484        let mut stmt = conn
485            .prepare("SELECT key, value FROM aggregate_stats")
486            .context("Failed to prepare aggregate_stats query")?;
487
488        let mut total_sessions = 0usize;
489        let mut total_messages = 0usize;
490
491        let rows = stmt
492            .query_map([], |row| {
493                let key: String = row.get(0)?;
494                let value: i64 = row.get(1)?;
495                Ok((key, value))
496            })
497            .context("Failed to query aggregate_stats")?;
498
499        for row in rows {
500            let (key, value) = row.context("Failed to read aggregate_stats row")?;
501            match key.as_str() {
502                "total_sessions" => total_sessions = value.max(0) as usize,
503                "total_messages" => total_messages = value.max(0) as usize,
504                _ => {}
505            }
506        }
507
508        Ok(AggregateStats {
509            total_sessions,
510            total_messages,
511        })
512    }
513
514    /// Search sessions using FTS5 full-text search.
515    ///
516    /// Returns up to `limit` results ranked by relevance (BM25).
517    /// Returns empty vec (not error) if FTS5 index doesn't exist yet (graceful degradation).
518    pub fn search_sessions(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
519        if query.trim().is_empty() {
520            return Ok(Vec::new());
521        }
522
523        let conn = self
524            .conn
525            .lock()
526            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
527
528        // Check if FTS5 table exists (graceful degradation for old cache DBs)
529        let fts_exists: bool = conn
530            .query_row(
531                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='session_fts'",
532                [],
533                |row| row.get::<_, i64>(0),
534            )
535            .unwrap_or(0)
536            > 0;
537
538        if !fts_exists {
539            return Ok(Vec::new());
540        }
541
542        let mut stmt = conn
543            .prepare(
544                r#"
545                SELECT
546                    sm.path,
547                    sm.session_id,
548                    sm.project,
549                    sm.first_user_message,
550                    snippet(session_fts, 2, '[', ']', '...', 12) AS snippet,
551                    session_fts.rank,
552                    sm.first_timestamp,
553                    sm.message_count
554                FROM session_fts
555                JOIN session_metadata sm ON session_fts.rowid = sm.rowid
556                WHERE session_fts MATCH ?
557                ORDER BY session_fts.rank
558                LIMIT ?
559                "#,
560            )
561            .context("Failed to prepare FTS5 search query")?;
562
563        let limit_i64 = limit as i64;
564        let rows = stmt
565            .query_map(params![query, limit_i64], |row| {
566                Ok(SearchResult {
567                    path: PathBuf::from(row.get::<_, String>(0)?),
568                    session_id: row.get(1)?,
569                    project: row.get(2)?,
570                    first_user_message: row.get(3)?,
571                    snippet: row.get(4)?,
572                    rank: row.get(5)?,
573                    first_timestamp: row.get(6)?,
574                    message_count: row.get::<_, Option<i64>>(7)?.unwrap_or(0) as u64,
575                })
576            })
577            .context("Failed to execute FTS5 search")?;
578
579        let mut results = Vec::new();
580        for row in rows {
581            match row {
582                Ok(r) => results.push(r),
583                Err(e) => {
584                    warn!("FTS5 search row error: {}", e);
585                }
586            }
587        }
588
589        Ok(results)
590    }
591
592    /// Rebuild FTS5 index from existing session_metadata rows.
593    ///
594    /// Called once after cache version bump to populate FTS5 for existing sessions.
595    pub fn rebuild_fts_index(&self) -> Result<usize> {
596        let conn = self
597            .conn
598            .lock()
599            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
600
601        // Trigger FTS5 content table rebuild
602        conn.execute("INSERT INTO session_fts(session_fts) VALUES('rebuild')", [])
603            .context("Failed to trigger FTS5 rebuild")?;
604
605        // Count sessions indexed
606        let count: i64 = conn
607            .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
608                row.get(0)
609            })
610            .context("Failed to count sessions")?;
611
612        debug!("FTS5 index rebuilt for {} sessions", count);
613        Ok(count as usize)
614    }
615
616    // ─── Activity cache methods ───────────────────────────────────────────────
617
618    /// Get cached ActivitySummary if the session file mtime matches.
619    ///
620    /// Returns None on cache miss or mtime mismatch (session was modified).
621    pub fn get_activity(
622        &self,
623        path: &Path,
624        current_mtime: SystemTime,
625    ) -> Result<Option<ActivitySummary>> {
626        let path_str = path.to_string_lossy();
627        let mtime_secs = current_mtime
628            .duration_since(SystemTime::UNIX_EPOCH)
629            .context("Invalid mtime")?
630            .as_secs();
631
632        let conn = self
633            .conn
634            .lock()
635            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
636
637        let result: Option<Vec<u8>> = conn
638            .query_row(
639                "SELECT data FROM activity_cache WHERE session_path = ? AND mtime = ?",
640                params![path_str.as_ref(), mtime_secs as i64],
641                |row| row.get(0),
642            )
643            .optional()
644            .context("Failed to query activity cache")?;
645
646        match result {
647            Some(bytes) => {
648                let summary: ActivitySummary = bincode::deserialize(&bytes)
649                    .context("Failed to deserialize activity summary")?;
650                debug!(path = %path.display(), "Activity cache hit");
651                Ok(Some(summary))
652            }
653            None => {
654                debug!(path = %path.display(), "Activity cache miss");
655                Ok(None)
656            }
657        }
658    }
659
660    /// Store an ActivitySummary in cache, keyed by session file path + mtime.
661    ///
662    /// Also populates the `activity_alerts` table for cross-session alert queries.
663    pub fn put_activity(
664        &self,
665        path: &Path,
666        session_id: &str,
667        summary: &ActivitySummary,
668        mtime: SystemTime,
669    ) -> Result<()> {
670        let path_str = path.to_string_lossy();
671        let mtime_secs = mtime
672            .duration_since(SystemTime::UNIX_EPOCH)
673            .context("Invalid mtime")?
674            .as_secs();
675
676        let data = bincode::serialize(summary).context("Failed to serialize activity summary")?;
677
678        let conn = self
679            .conn
680            .lock()
681            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
682
683        // Wrap all writes in a transaction for atomicity.
684        // Without this, a crash between DELETE and re-insert leaves stale alerts.
685        conn.execute_batch("BEGIN IMMEDIATE")
686            .context("Failed to begin activity cache transaction")?;
687
688        let result = (|| -> anyhow::Result<()> {
689            // Upsert activity_cache
690            conn.execute(
691                r#"
692                INSERT OR REPLACE INTO activity_cache
693                (session_path, mtime, session_id, tool_call_count, alert_count, data)
694                VALUES (?, ?, ?, ?, ?, ?)
695                "#,
696                params![
697                    path_str.as_ref(),
698                    mtime_secs as i64,
699                    session_id,
700                    (summary.file_accesses.len()
701                        + summary.bash_commands.len()
702                        + summary.network_calls.len()) as i64,
703                    summary.alerts.len() as i64,
704                    &data,
705                ],
706            )
707            .context("Failed to insert activity cache entry")?;
708
709            // Refresh activity_alerts for this session (delete + re-insert)
710            conn.execute(
711                "DELETE FROM activity_alerts WHERE session_path = ?",
712                params![path_str.as_ref()],
713            )
714            .context("Failed to delete old activity alerts")?;
715
716            for alert in &summary.alerts {
717                let severity = format!("{:?}", alert.severity);
718                let category = format!("{:?}", alert.category);
719                conn.execute(
720                    r#"
721                    INSERT INTO activity_alerts (session_path, severity, category, timestamp, detail)
722                    VALUES (?, ?, ?, ?, ?)
723                    "#,
724                    params![
725                        path_str.as_ref(),
726                        severity,
727                        category,
728                        alert.timestamp.to_rfc3339(),
729                        &alert.detail,
730                    ],
731                )
732                .context("Failed to insert activity alert")?;
733            }
734
735            Ok(())
736        })();
737
738        match result {
739            Ok(()) => conn
740                .execute_batch("COMMIT")
741                .context("Failed to commit activity cache transaction")?,
742            Err(e) => {
743                let _ = conn.execute_batch("ROLLBACK");
744                return Err(e);
745            }
746        }
747
748        debug!(
749            path = %path.display(),
750            alerts = summary.alerts.len(),
751            "Activity summary cached"
752        );
753        Ok(())
754    }
755
756    /// Invalidate activity cache entry for a session file.
757    ///
758    /// Called by file watcher when a session file is modified.
759    pub fn invalidate_activity(&self, path: &Path) -> Result<()> {
760        let path_str = path.to_string_lossy();
761
762        let conn = self
763            .conn
764            .lock()
765            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
766
767        conn.execute(
768            "DELETE FROM activity_cache WHERE session_path = ?",
769            params![path_str.as_ref()],
770        )
771        .context("Failed to delete activity cache entry")?;
772
773        conn.execute(
774            "DELETE FROM activity_alerts WHERE session_path = ?",
775            params![path_str.as_ref()],
776        )
777        .context("Failed to delete activity alerts")?;
778
779        debug!(path = %path.display(), "Activity cache invalidated");
780        Ok(())
781    }
782
783    /// Get all stored alerts, optionally filtered by minimum severity.
784    ///
785    /// Useful for a global alert view across all analyzed sessions.
786    pub fn get_all_alerts(&self, min_severity: Option<&str>) -> Result<Vec<StoredAlert>> {
787        let conn = self
788            .conn
789            .lock()
790            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
791
792        // Build a query that respects severity hierarchy: Critical > Warning > Info.
793        // "Warning" means Warning + Critical; "Critical" means Critical only; None means all.
794        let query = match min_severity {
795            Some("Critical") => "SELECT session_path, severity, category, timestamp, detail \
796                 FROM activity_alerts WHERE severity = 'Critical' ORDER BY timestamp DESC",
797            Some("Warning") => "SELECT session_path, severity, category, timestamp, detail \
798                 FROM activity_alerts WHERE severity IN ('Warning', 'Critical') ORDER BY timestamp DESC",
799            _ => "SELECT session_path, severity, category, timestamp, detail \
800                 FROM activity_alerts ORDER BY timestamp DESC",
801        };
802
803        let mut stmt = conn
804            .prepare(query)
805            .context("Failed to prepare alert query")?;
806
807        let rows = stmt
808            .query_map([], |row| {
809                Ok(StoredAlert {
810                    session_path: row.get(0)?,
811                    severity: row.get(1)?,
812                    category: row.get(2)?,
813                    timestamp: row.get(3)?,
814                    detail: row.get(4)?,
815                })
816            })
817            .context("Failed to query alerts")?
818            .collect::<Result<Vec<_>, _>>()
819            .context("Failed to collect alerts")?;
820
821        Ok(rows)
822    }
823
824    /// Get activity cache statistics
825    pub fn activity_stats(&self) -> Result<ActivityCacheStats> {
826        let conn = self
827            .conn
828            .lock()
829            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
830
831        let analyzed_sessions: i64 = conn
832            .query_row("SELECT COUNT(*) FROM activity_cache", [], |row| row.get(0))
833            .context("Failed to count activity cache entries")?;
834
835        let total_alerts: i64 = conn
836            .query_row("SELECT COUNT(*) FROM activity_alerts", [], |row| row.get(0))
837            .context("Failed to count alerts")?;
838
839        let critical_alerts: i64 = conn
840            .query_row(
841                "SELECT COUNT(*) FROM activity_alerts WHERE severity = 'Critical'",
842                [],
843                |row| row.get(0),
844            )
845            .context("Failed to count critical alerts")?;
846
847        Ok(ActivityCacheStats {
848            analyzed_sessions: analyzed_sessions as usize,
849            total_alerts: total_alerts as usize,
850            critical_alerts: critical_alerts as usize,
851        })
852    }
853}
854
855impl Drop for MetadataCache {
856    fn drop(&mut self) {
857        // WAL checkpoint on drop to ensure all data is flushed to main database file
858        // and WAL file doesn't grow unbounded across restarts
859        if let Ok(conn) = self.conn.lock() {
860            if let Err(e) = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE") {
861                warn!("Failed to checkpoint WAL on MetadataCache drop: {}", e);
862            } else {
863                debug!("WAL checkpoint completed on MetadataCache drop");
864            }
865        }
866    }
867}
868
869/// Session metadata cache statistics
870#[derive(Debug, Clone)]
871pub struct CacheStats {
872    pub total_entries: usize,
873    pub total_size_bytes: usize,
874    pub project_count: usize,
875}
876
877/// Aggregate statistics from O(1) table
878#[derive(Debug, Clone, Default)]
879pub struct AggregateStats {
880    pub total_sessions: usize,
881    pub total_messages: usize,
882}
883
884/// A single FTS5 search result
885#[derive(Debug, Clone)]
886pub struct SearchResult {
887    pub path: PathBuf,
888    pub session_id: String,
889    pub project: Option<String>,
890    pub first_user_message: Option<String>,
891    pub snippet: Option<String>,
892    pub rank: f64,
893    /// ISO 8601 timestamp string from session_metadata (e.g. "2026-03-20T14:30:00Z")
894    pub first_timestamp: Option<String>,
895    /// Total message count for this session
896    pub message_count: u64,
897}
898
899/// Activity cache statistics
900#[derive(Debug, Clone)]
901pub struct ActivityCacheStats {
902    pub analyzed_sessions: usize,
903    pub total_alerts: usize,
904    pub critical_alerts: usize,
905}
906
907/// A single alert record from the activity_alerts table
908#[derive(Debug, Clone)]
909pub struct StoredAlert {
910    pub session_path: String,
911    pub severity: String,
912    pub category: String,
913    pub timestamp: String,
914    pub detail: String,
915}
916
917impl CacheStats {
918    pub fn hit_rate(&self, scanned: usize) -> f64 {
919        if scanned == 0 {
920            return 0.0;
921        }
922        (self.total_entries as f64) / (scanned as f64)
923    }
924}
925
926#[cfg(test)]
927mod tests {
928    use super::*;
929    use crate::models::SessionMetadata;
930    use chrono::Utc;
931    use tempfile::tempdir;
932
933    #[test]
934    fn test_cache_creation() {
935        let dir = tempdir().unwrap();
936        let cache = MetadataCache::new(dir.path()).unwrap();
937
938        let stats = cache.stats().unwrap();
939        assert_eq!(stats.total_entries, 0);
940    }
941
942    #[test]
943    fn test_cache_put_get() {
944        let dir = tempdir().unwrap();
945        let cache = MetadataCache::new(dir.path()).unwrap();
946
947        let path = PathBuf::from("/tmp/test.jsonl");
948        let mut meta = SessionMetadata::from_path(path.clone(), "/test".into());
949        meta.id = "test-123".into();
950        meta.message_count = 42;
951        meta.total_tokens = 1000;
952        meta.models_used = vec!["sonnet".to_string()].into_iter().collect();
953        meta.first_timestamp = Some(Utc::now());
954
955        let mtime = SystemTime::now();
956
957        // Put
958        cache.put(&path, &meta, mtime).unwrap();
959
960        // Get with same mtime (hit)
961        let cached = cache.get(&path, mtime).unwrap();
962        assert!(cached.is_some());
963        let cached = cached.unwrap();
964        assert_eq!(cached.id, "test-123");
965        assert_eq!(cached.message_count, 42);
966
967        // Get with different mtime (miss)
968        let old_mtime = mtime - std::time::Duration::from_secs(3600);
969        let cached = cache.get(&path, old_mtime).unwrap();
970        assert!(cached.is_none());
971    }
972
973    #[test]
974    fn test_cache_invalidate() {
975        let dir = tempdir().unwrap();
976        let cache = MetadataCache::new(dir.path()).unwrap();
977
978        let path = PathBuf::from("/tmp/test.jsonl");
979        let meta = SessionMetadata::from_path(path.clone(), "/test".into());
980        let mtime = SystemTime::now();
981
982        cache.put(&path, &meta, mtime).unwrap();
983
984        // Invalidate
985        cache.invalidate(&path).unwrap();
986
987        // Should be gone
988        let cached = cache.get(&path, mtime).unwrap();
989        assert!(cached.is_none());
990    }
991
992    #[test]
993    fn test_cache_project_paths() {
994        let dir = tempdir().unwrap();
995        let cache = MetadataCache::new(dir.path()).unwrap();
996
997        let mtime = SystemTime::now();
998
999        // Add sessions for two projects
1000        for i in 0..3 {
1001            let path = PathBuf::from(format!("/tmp/project1/session{}.jsonl", i));
1002            let meta = SessionMetadata::from_path(path.clone(), "/project1".into());
1003            cache.put(&path, &meta, mtime).unwrap();
1004        }
1005
1006        for i in 0..2 {
1007            let path = PathBuf::from(format!("/tmp/project2/session{}.jsonl", i));
1008            let meta = SessionMetadata::from_path(path.clone(), "/project2".into());
1009            cache.put(&path, &meta, mtime).unwrap();
1010        }
1011
1012        // Get project1 paths
1013        let paths = cache.get_project_paths("/project1").unwrap();
1014        assert_eq!(paths.len(), 3);
1015
1016        // Get project2 paths
1017        let paths = cache.get_project_paths("/project2").unwrap();
1018        assert_eq!(paths.len(), 2);
1019    }
1020
1021    #[test]
1022    fn test_cache_stats() {
1023        let dir = tempdir().unwrap();
1024        let cache = MetadataCache::new(dir.path()).unwrap();
1025
1026        let mtime = SystemTime::now();
1027
1028        // Add some entries
1029        for i in 0..10 {
1030            let path = PathBuf::from(format!("/tmp/session{}.jsonl", i));
1031            let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1032            cache.put(&path, &meta, mtime).unwrap();
1033        }
1034
1035        let stats = cache.stats().unwrap();
1036        assert_eq!(stats.total_entries, 10);
1037        assert!(stats.total_size_bytes > 0);
1038        assert_eq!(stats.project_count, 1);
1039    }
1040
1041    #[test]
1042    fn test_cache_clear() {
1043        let dir = tempdir().unwrap();
1044        let cache = MetadataCache::new(dir.path()).unwrap();
1045
1046        let path = PathBuf::from("/tmp/test.jsonl");
1047        let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1048        cache.put(&path, &meta, SystemTime::now()).unwrap();
1049
1050        assert_eq!(cache.stats().unwrap().total_entries, 1);
1051
1052        cache.clear().unwrap();
1053
1054        assert_eq!(cache.stats().unwrap().total_entries, 0);
1055    }
1056
1057    // ── activity cache tests ─────────────────────────────────────────────────
1058
1059    fn make_summary_with_alerts() -> ActivitySummary {
1060        use crate::models::activity::{Alert, AlertCategory, AlertSeverity};
1061        use chrono::Utc;
1062
1063        ActivitySummary {
1064            file_accesses: vec![],
1065            bash_commands: vec![],
1066            network_calls: vec![],
1067            alerts: vec![
1068                Alert {
1069                    session_id: "test-session".to_string(),
1070                    timestamp: Utc::now(),
1071                    severity: AlertSeverity::Critical,
1072                    category: AlertCategory::DestructiveCommand,
1073                    detail: "rm -rf /tmp".to_string(),
1074                },
1075                Alert {
1076                    session_id: "test-session".to_string(),
1077                    timestamp: Utc::now(),
1078                    severity: AlertSeverity::Warning,
1079                    category: AlertCategory::CredentialAccess,
1080                    detail: "Accessed .env".to_string(),
1081                },
1082            ],
1083        }
1084    }
1085
1086    #[test]
1087    fn test_activity_put_get_hit() {
1088        let dir = tempdir().unwrap();
1089        let cache = MetadataCache::new(dir.path()).unwrap();
1090
1091        let path = PathBuf::from("/tmp/session.jsonl");
1092        let summary = make_summary_with_alerts();
1093        let mtime = SystemTime::now();
1094
1095        cache
1096            .put_activity(&path, "test-session", &summary, mtime)
1097            .unwrap();
1098
1099        let cached = cache.get_activity(&path, mtime).unwrap();
1100        assert!(cached.is_some(), "Should be a cache hit");
1101        let cached = cached.unwrap();
1102        assert_eq!(cached.alerts.len(), 2);
1103    }
1104
1105    #[test]
1106    fn test_activity_get_miss_on_mtime_change() {
1107        let dir = tempdir().unwrap();
1108        let cache = MetadataCache::new(dir.path()).unwrap();
1109
1110        let path = PathBuf::from("/tmp/session.jsonl");
1111        let summary = make_summary_with_alerts();
1112        let mtime = SystemTime::now();
1113
1114        cache
1115            .put_activity(&path, "test-session", &summary, mtime)
1116            .unwrap();
1117
1118        // Different mtime → miss
1119        let stale_mtime = mtime - std::time::Duration::from_secs(60);
1120        let cached = cache.get_activity(&path, stale_mtime).unwrap();
1121        assert!(cached.is_none(), "Should be a cache miss on mtime change");
1122    }
1123
1124    #[test]
1125    fn test_activity_invalidate() {
1126        let dir = tempdir().unwrap();
1127        let cache = MetadataCache::new(dir.path()).unwrap();
1128
1129        let path = PathBuf::from("/tmp/session.jsonl");
1130        let summary = make_summary_with_alerts();
1131        let mtime = SystemTime::now();
1132
1133        cache
1134            .put_activity(&path, "test-session", &summary, mtime)
1135            .unwrap();
1136
1137        // Verify stored
1138        assert!(cache.get_activity(&path, mtime).unwrap().is_some());
1139
1140        // Invalidate
1141        cache.invalidate_activity(&path).unwrap();
1142
1143        // Should be gone
1144        assert!(
1145            cache.get_activity(&path, mtime).unwrap().is_none(),
1146            "Should be gone after invalidation"
1147        );
1148
1149        // Alerts should also be cleared
1150        let alerts = cache.get_all_alerts(None).unwrap();
1151        assert!(
1152            alerts.is_empty(),
1153            "Alerts should be cleared with activity cache"
1154        );
1155    }
1156
1157    #[test]
1158    fn test_get_all_alerts_returns_stored_alerts() {
1159        let dir = tempdir().unwrap();
1160        let cache = MetadataCache::new(dir.path()).unwrap();
1161
1162        let path = PathBuf::from("/tmp/session.jsonl");
1163        let summary = make_summary_with_alerts();
1164        let mtime = SystemTime::now();
1165
1166        cache
1167            .put_activity(&path, "test-session", &summary, mtime)
1168            .unwrap();
1169
1170        let alerts = cache.get_all_alerts(None).unwrap();
1171        assert_eq!(alerts.len(), 2, "Should return both alerts");
1172
1173        let critical: Vec<_> = alerts.iter().filter(|a| a.severity == "Critical").collect();
1174        assert_eq!(critical.len(), 1);
1175        assert!(critical[0].detail.contains("rm -rf"));
1176    }
1177
1178    #[test]
1179    fn test_get_all_alerts_filter_by_severity() {
1180        let dir = tempdir().unwrap();
1181        let cache = MetadataCache::new(dir.path()).unwrap();
1182
1183        let path = PathBuf::from("/tmp/session.jsonl");
1184        let summary = make_summary_with_alerts();
1185        let mtime = SystemTime::now();
1186
1187        cache
1188            .put_activity(&path, "test-session", &summary, mtime)
1189            .unwrap();
1190
1191        let critical_only = cache.get_all_alerts(Some("Critical")).unwrap();
1192        assert_eq!(critical_only.len(), 1);
1193        assert_eq!(critical_only[0].severity, "Critical");
1194    }
1195
1196    #[test]
1197    fn test_activity_stats() {
1198        let dir = tempdir().unwrap();
1199        let cache = MetadataCache::new(dir.path()).unwrap();
1200
1201        let stats = cache.activity_stats().unwrap();
1202        assert_eq!(stats.analyzed_sessions, 0);
1203        assert_eq!(stats.total_alerts, 0);
1204
1205        let path = PathBuf::from("/tmp/session.jsonl");
1206        let summary = make_summary_with_alerts();
1207        cache
1208            .put_activity(&path, "test-session", &summary, SystemTime::now())
1209            .unwrap();
1210
1211        let stats = cache.activity_stats().unwrap();
1212        assert_eq!(stats.analyzed_sessions, 1);
1213        assert_eq!(stats.total_alerts, 2);
1214        assert_eq!(stats.critical_alerts, 1);
1215    }
1216
1217    #[test]
1218    fn test_activity_put_replaces_stale_alerts() {
1219        let dir = tempdir().unwrap();
1220        let cache = MetadataCache::new(dir.path()).unwrap();
1221
1222        let path = PathBuf::from("/tmp/session.jsonl");
1223        let summary = make_summary_with_alerts(); // 2 alerts
1224        let mtime = SystemTime::now();
1225
1226        cache
1227            .put_activity(&path, "test-session", &summary, mtime)
1228            .unwrap();
1229        assert_eq!(cache.get_all_alerts(None).unwrap().len(), 2);
1230
1231        // Re-put with a clean summary (0 alerts)
1232        let empty_summary = ActivitySummary::default();
1233        let new_mtime = mtime + std::time::Duration::from_secs(1);
1234        cache
1235            .put_activity(&path, "test-session", &empty_summary, new_mtime)
1236            .unwrap();
1237
1238        // Old alerts should be gone
1239        let alerts = cache.get_all_alerts(None).unwrap();
1240        assert_eq!(alerts.len(), 0, "Stale alerts should be replaced");
1241    }
1242}