Skip to main content

ccboard_core/cache/
metadata_cache.rs

1//! SQLite metadata cache for session files
2//!
3//! Caches session metadata with mtime-based invalidation for 90% startup speedup.
4//! Also caches activity analysis (tool calls, alerts) per session on demand.
5//!
6//! Schema:
7//! - session_metadata: parsed metadata + mtime + cache_version
8//! - activity_cache: serialized ActivitySummary + mtime per session file
9//! - activity_alerts: searchable alert records (severity/category) across all sessions
10//! - Indexes: project, mtime, session_id, severity for fast queries
11//!
12//! Invalidation:
13//! - File watcher detects modification → delete session + activity cache entries
14//! - Startup: compare mtime → rescan if stale
15//! - Startup: compare cache_version → auto-clear ALL tables if mismatch
16//!
17//! Cache Version History:
18//! - v1: Initial version (pre-TokenUsage fix)
19//! - v2: Fixed TokenUsage::total() to include cache_read_tokens + cache_write_tokens
20//! - v3: Added token breakdown fields (input_tokens, output_tokens, cache_creation_tokens,
21//!   cache_read_tokens) to SessionMetadata + real pricing calculation
22//! - v4: Added branch field to SessionMetadata
23//! - v5: Added activity_cache + activity_alerts tables for Phase 2 activity module
24//! - v6: Added aggregate_stats table with triggers + FTS5 session_fts table
25//! - v7: Added tool_token_usage field to SessionMetadata (Phase K analytics)
26
27use crate::models::activity::ActivitySummary;
28use crate::models::SessionMetadata;
29use anyhow::{Context, Result};
30use rusqlite::{params, Connection, OptionalExtension};
31use std::path::{Path, PathBuf};
32use std::sync::Mutex;
33use std::time::SystemTime;
34use tracing::{debug, warn};
35
36/// Current cache version
37///
38/// **IMPORTANT**: Increment this version when changing how metadata is calculated:
39/// - TokenUsage fields added/removed
40/// - SessionMetadata structure changed
41/// - Parsing logic modified (e.g., token accumulation)
42///
43/// This triggers automatic cache invalidation on startup, preventing stale data bugs.
44/// All tables (session_metadata + activity) are cleared on version mismatch.
45///
46/// Version History:
47/// - v1: Initial version
48/// - v2: Fixed TokenUsage::total() calculation
49/// - v3: Added token breakdown fields
50/// - v4: Added branch field to SessionMetadata
51/// - v5: Added activity_cache + activity_alerts tables
52/// - v6: Added aggregate_stats table with triggers + FTS5 session_fts table
53/// - v7: Added tool_token_usage field to SessionMetadata (Phase K analytics)
54const CACHE_VERSION: i32 = 7;
55
56/// SQLite-based metadata cache (thread-safe)
57pub struct MetadataCache {
58    conn: Mutex<Connection>,
59    #[allow(dead_code)]
60    cache_path: PathBuf,
61}
62
63impl MetadataCache {
64    /// Create or open cache database
65    pub fn new(cache_dir: &Path) -> Result<Self> {
66        std::fs::create_dir_all(cache_dir).with_context(|| {
67            format!("Failed to create cache directory: {}", cache_dir.display())
68        })?;
69
70        let cache_path = cache_dir.join("session-metadata.db");
71        let conn = Connection::open(&cache_path)
72            .with_context(|| format!("Failed to open cache database: {}", cache_path.display()))?;
73
74        // Enable WAL mode for better concurrency
75        conn.pragma_update(None, "journal_mode", "WAL")
76            .context("Failed to enable WAL mode")?;
77
78        // Initialize schema
79        conn.execute_batch(
80            r#"
81            CREATE TABLE IF NOT EXISTS cache_metadata (
82                key TEXT PRIMARY KEY,
83                value INTEGER NOT NULL
84            );
85
86            CREATE TABLE IF NOT EXISTS session_metadata (
87                path TEXT PRIMARY KEY,
88                mtime INTEGER NOT NULL,
89                project TEXT NOT NULL,
90                session_id TEXT NOT NULL,
91                first_timestamp TEXT,
92                last_timestamp TEXT,
93                message_count INTEGER NOT NULL,
94                total_tokens INTEGER NOT NULL,
95                models_used TEXT NOT NULL,
96                has_subagents INTEGER NOT NULL,
97                first_user_message TEXT,
98                data BLOB NOT NULL
99            );
100
101            CREATE INDEX IF NOT EXISTS idx_project ON session_metadata(project);
102            CREATE INDEX IF NOT EXISTS idx_mtime ON session_metadata(mtime);
103            CREATE INDEX IF NOT EXISTS idx_session_id ON session_metadata(session_id);
104
105            CREATE TABLE IF NOT EXISTS activity_cache (
106                session_path TEXT PRIMARY KEY,
107                mtime INTEGER NOT NULL,
108                session_id TEXT NOT NULL,
109                tool_call_count INTEGER NOT NULL DEFAULT 0,
110                alert_count INTEGER NOT NULL DEFAULT 0,
111                data BLOB NOT NULL
112            );
113
114            CREATE INDEX IF NOT EXISTS idx_activity_session_id ON activity_cache(session_id);
115            CREATE INDEX IF NOT EXISTS idx_activity_mtime ON activity_cache(mtime);
116
117            CREATE TABLE IF NOT EXISTS activity_alerts (
118                id INTEGER PRIMARY KEY AUTOINCREMENT,
119                session_path TEXT NOT NULL,
120                severity TEXT NOT NULL,
121                category TEXT NOT NULL,
122                timestamp TEXT NOT NULL,
123                detail TEXT NOT NULL
124            );
125
126            CREATE INDEX IF NOT EXISTS idx_alerts_session ON activity_alerts(session_path);
127            CREATE INDEX IF NOT EXISTS idx_alerts_severity ON activity_alerts(severity);
128
129            CREATE TABLE IF NOT EXISTS aggregate_stats (
130                key   TEXT PRIMARY KEY,
131                value INTEGER NOT NULL DEFAULT 0
132            );
133
134            INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES
135                ('total_sessions', 0),
136                ('total_messages', 0);
137
138            CREATE TRIGGER IF NOT EXISTS stats_ai
139            AFTER INSERT ON session_metadata BEGIN
140                UPDATE aggregate_stats SET value = value + 1 WHERE key = 'total_sessions';
141                UPDATE aggregate_stats SET value = value + new.message_count WHERE key = 'total_messages';
142            END;
143
144            CREATE TRIGGER IF NOT EXISTS stats_ad
145            AFTER DELETE ON session_metadata BEGIN
146                UPDATE aggregate_stats SET value = MAX(0, value - 1) WHERE key = 'total_sessions';
147                UPDATE aggregate_stats SET value = MAX(0, value - old.message_count) WHERE key = 'total_messages';
148            END;
149
150            CREATE VIRTUAL TABLE IF NOT EXISTS session_fts USING fts5(
151                session_id  UNINDEXED,
152                project     UNINDEXED,
153                first_user_message,
154                models_used,
155                content='session_metadata',
156                content_rowid='rowid',
157                tokenize='unicode61'
158            );
159
160            CREATE TRIGGER IF NOT EXISTS session_fts_ai
161            AFTER INSERT ON session_metadata BEGIN
162                INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
163                VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
164            END;
165
166            CREATE TRIGGER IF NOT EXISTS session_fts_ad
167            AFTER DELETE ON session_metadata BEGIN
168                INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
169                VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
170            END;
171
172            CREATE TRIGGER IF NOT EXISTS session_fts_au
173            AFTER UPDATE ON session_metadata BEGIN
174                INSERT INTO session_fts(session_fts, rowid, session_id, project, first_user_message, models_used)
175                VALUES ('delete', old.rowid, old.session_id, old.project, old.first_user_message, old.models_used);
176                INSERT INTO session_fts(rowid, session_id, project, first_user_message, models_used)
177                VALUES (new.rowid, new.session_id, new.project, new.first_user_message, new.models_used);
178            END;
179            "#,
180        )
181        .context("Failed to create schema")?;
182
183        // Check cache version and auto-invalidate if mismatch
184        let stored_version: Option<i32> = conn
185            .query_row(
186                "SELECT value FROM cache_metadata WHERE key = 'version'",
187                [],
188                |row| row.get(0),
189            )
190            .optional()
191            .context("Failed to query cache version")?;
192
193        match stored_version {
194            Some(v) if v != CACHE_VERSION => {
195                warn!(
196                    stored = v,
197                    current = CACHE_VERSION,
198                    "Cache version mismatch detected, clearing stale cache"
199                );
200
201                // Clear all session and activity entries
202                conn.execute("DELETE FROM session_metadata", [])
203                    .context("Failed to clear stale session cache")?;
204                conn.execute("DELETE FROM activity_cache", [])
205                    .context("Failed to clear stale activity cache")?;
206                conn.execute("DELETE FROM activity_alerts", [])
207                    .context("Failed to clear stale activity alerts")?;
208                conn.execute("DELETE FROM aggregate_stats", [])
209                    .context("Failed to clear stale aggregate stats")?;
210                conn.execute(
211                    "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_sessions', 0)",
212                    [],
213                )
214                .context("Failed to reinitialize total_sessions")?;
215                conn.execute(
216                    "INSERT OR IGNORE INTO aggregate_stats (key, value) VALUES ('total_messages', 0)",
217                    [],
218                )
219                .context("Failed to reinitialize total_messages")?;
220
221                // Update version
222                conn.execute(
223                    "INSERT OR REPLACE INTO cache_metadata (key, value) VALUES ('version', ?)",
224                    params![CACHE_VERSION],
225                )
226                .context("Failed to update cache version")?;
227
228                debug!("Cache cleared and version updated to {}", CACHE_VERSION);
229            }
230            None => {
231                // First run, set version
232                conn.execute(
233                    "INSERT INTO cache_metadata (key, value) VALUES ('version', ?)",
234                    params![CACHE_VERSION],
235                )
236                .context("Failed to initialize cache version")?;
237
238                debug!("Cache version initialized to {}", CACHE_VERSION);
239            }
240            Some(_) => {
241                debug!("Cache version {} matches current", CACHE_VERSION);
242            }
243        }
244
245        let cache = Self {
246            conn: Mutex::new(conn),
247            cache_path: cache_path.clone(),
248        };
249
250        debug!(path = %cache_path.display(), "Metadata cache initialized");
251
252        Ok(cache)
253    }
254
255    /// Get cached metadata if fresh, otherwise None
256    pub fn get(&self, path: &Path, current_mtime: SystemTime) -> Result<Option<SessionMetadata>> {
257        let path_str = path.to_string_lossy();
258        let mtime_secs = current_mtime
259            .duration_since(SystemTime::UNIX_EPOCH)
260            .context("Invalid mtime")?
261            .as_secs();
262
263        let conn = self
264            .conn
265            .lock()
266            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
267
268        let result: Option<Vec<u8>> = conn
269            .query_row(
270                "SELECT data FROM session_metadata WHERE path = ? AND mtime = ?",
271                params![path_str.as_ref(), mtime_secs as i64],
272                |row| row.get(0),
273            )
274            .optional()
275            .context("Failed to query cache")?;
276
277        match result {
278            Some(bytes) => {
279                let meta: SessionMetadata = bincode::deserialize(&bytes)
280                    .context("Failed to deserialize cached metadata")?;
281                debug!(path = %path.display(), "Cache hit");
282                Ok(Some(meta))
283            }
284            None => {
285                debug!(path = %path.display(), "Cache miss");
286                Ok(None)
287            }
288        }
289    }
290
291    /// Store metadata in cache
292    pub fn put(&self, path: &Path, meta: &SessionMetadata, mtime: SystemTime) -> Result<()> {
293        let path_str = path.to_string_lossy();
294        let mtime_secs = mtime
295            .duration_since(SystemTime::UNIX_EPOCH)
296            .context("Invalid mtime")?
297            .as_secs();
298
299        let data = bincode::serialize(meta).context("Failed to serialize metadata")?;
300
301        // Extract searchable fields
302        let models_used =
303            serde_json::to_string(&meta.models_used).context("Failed to serialize models")?;
304
305        let conn = self
306            .conn
307            .lock()
308            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
309
310        // 1. Try to insert new row (only fires stats_ai trigger on NEW rows)
311        conn.execute(
312            r#"
313            INSERT OR IGNORE INTO session_metadata
314            (path, mtime, project, session_id, first_timestamp, last_timestamp,
315             message_count, total_tokens, models_used, has_subagents, first_user_message, data)
316            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
317            "#,
318            params![
319                path_str.as_ref(),
320                mtime_secs as i64,
321                meta.project_path.as_str(),
322                meta.id.as_str(),
323                meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
324                meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
325                meta.message_count as i64,
326                meta.total_tokens as i64,
327                models_used.as_str(),
328                if meta.has_subagents { 1 } else { 0 },
329                &meta.first_user_message,
330                &data,
331            ],
332        )
333        .context("Failed to insert metadata")?;
334
335        // 2. Update if row already existed (no INSERT trigger fires, no stats double-count)
336        conn.execute(
337            r#"
338            UPDATE session_metadata
339            SET mtime = ?, project = ?, session_id = ?, first_timestamp = ?, last_timestamp = ?,
340                message_count = ?, total_tokens = ?, models_used = ?, has_subagents = ?,
341                first_user_message = ?, data = ?
342            WHERE path = ? AND mtime != ?
343            "#,
344            params![
345                mtime_secs as i64,
346                meta.project_path.as_str(),
347                meta.id.as_str(),
348                meta.first_timestamp.as_ref().map(|t| t.to_rfc3339()),
349                meta.last_timestamp.as_ref().map(|t| t.to_rfc3339()),
350                meta.message_count as i64,
351                meta.total_tokens as i64,
352                models_used.as_str(),
353                if meta.has_subagents { 1 } else { 0 },
354                &meta.first_user_message,
355                &data,
356                path_str.as_ref(),
357                mtime_secs as i64,
358            ],
359        )
360        .context("Failed to update metadata")?;
361
362        debug!(path = %path.display(), "Metadata cached");
363        Ok(())
364    }
365
366    /// Invalidate cache entry for a path
367    pub fn invalidate(&self, path: &Path) -> Result<()> {
368        let path_str = path.to_string_lossy();
369
370        let conn = self
371            .conn
372            .lock()
373            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
374
375        conn.execute(
376            "DELETE FROM session_metadata WHERE path = ?",
377            params![path_str.as_ref()],
378        )
379        .context("Failed to delete cache entry")?;
380
381        debug!(path = %path.display(), "Cache entry invalidated");
382        Ok(())
383    }
384
385    /// Get all cached paths for a project
386    pub fn get_project_paths(&self, project: &str) -> Result<Vec<PathBuf>> {
387        let conn = self
388            .conn
389            .lock()
390            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
391
392        let mut stmt = conn
393            .prepare("SELECT path FROM session_metadata WHERE project = ?")
394            .context("Failed to prepare query")?;
395
396        let rows = stmt
397            .query_map(params![project], |row| {
398                let path_str: String = row.get(0)?;
399                Ok(PathBuf::from(path_str))
400            })
401            .context("Failed to query project paths")?;
402
403        let mut paths = Vec::new();
404        for row in rows {
405            paths.push(row.context("Failed to read row")?);
406        }
407
408        Ok(paths)
409    }
410
411    /// Get cache statistics
412    pub fn stats(&self) -> Result<CacheStats> {
413        let conn = self
414            .conn
415            .lock()
416            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
417
418        let total_entries: i64 = conn
419            .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
420                row.get(0)
421            })
422            .context("Failed to count entries")?;
423
424        let total_size: i64 = conn
425            .query_row(
426                "SELECT SUM(LENGTH(data)) FROM session_metadata",
427                [],
428                |row| row.get(0),
429            )
430            .unwrap_or(0);
431
432        let project_count: i64 = conn
433            .query_row(
434                "SELECT COUNT(DISTINCT project) FROM session_metadata",
435                [],
436                |row| row.get(0),
437            )
438            .context("Failed to count projects")?;
439
440        Ok(CacheStats {
441            total_entries: total_entries as usize,
442            total_size_bytes: total_size as usize,
443            project_count: project_count as usize,
444        })
445    }
446
447    /// Clear all cache entries (for testing or rebuild)
448    pub fn clear(&self) -> Result<()> {
449        let conn = self
450            .conn
451            .lock()
452            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
453
454        conn.execute("DELETE FROM session_metadata", [])
455            .context("Failed to clear cache")?;
456
457        debug!("Cache cleared");
458        Ok(())
459    }
460
461    /// Vacuum database to reclaim space
462    pub fn vacuum(&self) -> Result<()> {
463        let conn = self
464            .conn
465            .lock()
466            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
467
468        conn.execute("VACUUM", []).context("Failed to vacuum")?;
469
470        debug!("Database vacuumed");
471        Ok(())
472    }
473
474    // ─── Aggregate stats + FTS5 search methods ───────────────────────────────
475
476    /// Get aggregate session stats from O(1) table (total sessions + messages)
477    pub fn get_aggregate_stats(&self) -> Result<AggregateStats> {
478        let conn = self
479            .conn
480            .lock()
481            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
482
483        let mut stmt = conn
484            .prepare("SELECT key, value FROM aggregate_stats")
485            .context("Failed to prepare aggregate_stats query")?;
486
487        let mut total_sessions = 0usize;
488        let mut total_messages = 0usize;
489
490        let rows = stmt
491            .query_map([], |row| {
492                let key: String = row.get(0)?;
493                let value: i64 = row.get(1)?;
494                Ok((key, value))
495            })
496            .context("Failed to query aggregate_stats")?;
497
498        for row in rows {
499            let (key, value) = row.context("Failed to read aggregate_stats row")?;
500            match key.as_str() {
501                "total_sessions" => total_sessions = value.max(0) as usize,
502                "total_messages" => total_messages = value.max(0) as usize,
503                _ => {}
504            }
505        }
506
507        Ok(AggregateStats {
508            total_sessions,
509            total_messages,
510        })
511    }
512
513    /// Search sessions using FTS5 full-text search.
514    ///
515    /// Returns up to `limit` results ranked by relevance (BM25).
516    /// Returns empty vec (not error) if FTS5 index doesn't exist yet (graceful degradation).
517    pub fn search_sessions(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>> {
518        if query.trim().is_empty() {
519            return Ok(Vec::new());
520        }
521
522        let conn = self
523            .conn
524            .lock()
525            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
526
527        // Check if FTS5 table exists (graceful degradation for old cache DBs)
528        let fts_exists: bool = conn
529            .query_row(
530                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='session_fts'",
531                [],
532                |row| row.get::<_, i64>(0),
533            )
534            .unwrap_or(0)
535            > 0;
536
537        if !fts_exists {
538            return Ok(Vec::new());
539        }
540
541        let mut stmt = conn
542            .prepare(
543                r#"
544                SELECT
545                    sm.path,
546                    sm.session_id,
547                    sm.project,
548                    sm.first_user_message,
549                    snippet(session_fts, 2, '[', ']', '...', 12) AS snippet,
550                    session_fts.rank,
551                    sm.first_timestamp,
552                    sm.message_count
553                FROM session_fts
554                JOIN session_metadata sm ON session_fts.rowid = sm.rowid
555                WHERE session_fts MATCH ?
556                ORDER BY session_fts.rank
557                LIMIT ?
558                "#,
559            )
560            .context("Failed to prepare FTS5 search query")?;
561
562        let limit_i64 = limit as i64;
563        let rows = stmt
564            .query_map(params![query, limit_i64], |row| {
565                Ok(SearchResult {
566                    path: PathBuf::from(row.get::<_, String>(0)?),
567                    session_id: row.get(1)?,
568                    project: row.get(2)?,
569                    first_user_message: row.get(3)?,
570                    snippet: row.get(4)?,
571                    rank: row.get(5)?,
572                    first_timestamp: row.get(6)?,
573                    message_count: row.get::<_, Option<i64>>(7)?.unwrap_or(0) as u64,
574                })
575            })
576            .context("Failed to execute FTS5 search")?;
577
578        let mut results = Vec::new();
579        for row in rows {
580            match row {
581                Ok(r) => results.push(r),
582                Err(e) => {
583                    warn!("FTS5 search row error: {}", e);
584                }
585            }
586        }
587
588        Ok(results)
589    }
590
591    /// Rebuild FTS5 index from existing session_metadata rows.
592    ///
593    /// Called once after cache version bump to populate FTS5 for existing sessions.
594    pub fn rebuild_fts_index(&self) -> Result<usize> {
595        let conn = self
596            .conn
597            .lock()
598            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
599
600        // Trigger FTS5 content table rebuild
601        conn.execute("INSERT INTO session_fts(session_fts) VALUES('rebuild')", [])
602            .context("Failed to trigger FTS5 rebuild")?;
603
604        // Count sessions indexed
605        let count: i64 = conn
606            .query_row("SELECT COUNT(*) FROM session_metadata", [], |row| {
607                row.get(0)
608            })
609            .context("Failed to count sessions")?;
610
611        debug!("FTS5 index rebuilt for {} sessions", count);
612        Ok(count as usize)
613    }
614
615    // ─── Activity cache methods ───────────────────────────────────────────────
616
617    /// Get cached ActivitySummary if the session file mtime matches.
618    ///
619    /// Returns None on cache miss or mtime mismatch (session was modified).
620    pub fn get_activity(
621        &self,
622        path: &Path,
623        current_mtime: SystemTime,
624    ) -> Result<Option<ActivitySummary>> {
625        let path_str = path.to_string_lossy();
626        let mtime_secs = current_mtime
627            .duration_since(SystemTime::UNIX_EPOCH)
628            .context("Invalid mtime")?
629            .as_secs();
630
631        let conn = self
632            .conn
633            .lock()
634            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
635
636        let result: Option<Vec<u8>> = conn
637            .query_row(
638                "SELECT data FROM activity_cache WHERE session_path = ? AND mtime = ?",
639                params![path_str.as_ref(), mtime_secs as i64],
640                |row| row.get(0),
641            )
642            .optional()
643            .context("Failed to query activity cache")?;
644
645        match result {
646            Some(bytes) => {
647                let summary: ActivitySummary = bincode::deserialize(&bytes)
648                    .context("Failed to deserialize activity summary")?;
649                debug!(path = %path.display(), "Activity cache hit");
650                Ok(Some(summary))
651            }
652            None => {
653                debug!(path = %path.display(), "Activity cache miss");
654                Ok(None)
655            }
656        }
657    }
658
659    /// Store an ActivitySummary in cache, keyed by session file path + mtime.
660    ///
661    /// Also populates the `activity_alerts` table for cross-session alert queries.
662    pub fn put_activity(
663        &self,
664        path: &Path,
665        session_id: &str,
666        summary: &ActivitySummary,
667        mtime: SystemTime,
668    ) -> Result<()> {
669        let path_str = path.to_string_lossy();
670        let mtime_secs = mtime
671            .duration_since(SystemTime::UNIX_EPOCH)
672            .context("Invalid mtime")?
673            .as_secs();
674
675        let data = bincode::serialize(summary).context("Failed to serialize activity summary")?;
676
677        let conn = self
678            .conn
679            .lock()
680            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
681
682        // Wrap all writes in a transaction for atomicity.
683        // Without this, a crash between DELETE and re-insert leaves stale alerts.
684        conn.execute_batch("BEGIN IMMEDIATE")
685            .context("Failed to begin activity cache transaction")?;
686
687        let result = (|| -> anyhow::Result<()> {
688            // Upsert activity_cache
689            conn.execute(
690                r#"
691                INSERT OR REPLACE INTO activity_cache
692                (session_path, mtime, session_id, tool_call_count, alert_count, data)
693                VALUES (?, ?, ?, ?, ?, ?)
694                "#,
695                params![
696                    path_str.as_ref(),
697                    mtime_secs as i64,
698                    session_id,
699                    (summary.file_accesses.len()
700                        + summary.bash_commands.len()
701                        + summary.network_calls.len()) as i64,
702                    summary.alerts.len() as i64,
703                    &data,
704                ],
705            )
706            .context("Failed to insert activity cache entry")?;
707
708            // Refresh activity_alerts for this session (delete + re-insert)
709            conn.execute(
710                "DELETE FROM activity_alerts WHERE session_path = ?",
711                params![path_str.as_ref()],
712            )
713            .context("Failed to delete old activity alerts")?;
714
715            for alert in &summary.alerts {
716                let severity = format!("{:?}", alert.severity);
717                let category = format!("{:?}", alert.category);
718                conn.execute(
719                    r#"
720                    INSERT INTO activity_alerts (session_path, severity, category, timestamp, detail)
721                    VALUES (?, ?, ?, ?, ?)
722                    "#,
723                    params![
724                        path_str.as_ref(),
725                        severity,
726                        category,
727                        alert.timestamp.to_rfc3339(),
728                        &alert.detail,
729                    ],
730                )
731                .context("Failed to insert activity alert")?;
732            }
733
734            Ok(())
735        })();
736
737        match result {
738            Ok(()) => conn
739                .execute_batch("COMMIT")
740                .context("Failed to commit activity cache transaction")?,
741            Err(e) => {
742                let _ = conn.execute_batch("ROLLBACK");
743                return Err(e);
744            }
745        }
746
747        debug!(
748            path = %path.display(),
749            alerts = summary.alerts.len(),
750            "Activity summary cached"
751        );
752        Ok(())
753    }
754
755    /// Invalidate activity cache entry for a session file.
756    ///
757    /// Called by file watcher when a session file is modified.
758    pub fn invalidate_activity(&self, path: &Path) -> Result<()> {
759        let path_str = path.to_string_lossy();
760
761        let conn = self
762            .conn
763            .lock()
764            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
765
766        conn.execute(
767            "DELETE FROM activity_cache WHERE session_path = ?",
768            params![path_str.as_ref()],
769        )
770        .context("Failed to delete activity cache entry")?;
771
772        conn.execute(
773            "DELETE FROM activity_alerts WHERE session_path = ?",
774            params![path_str.as_ref()],
775        )
776        .context("Failed to delete activity alerts")?;
777
778        debug!(path = %path.display(), "Activity cache invalidated");
779        Ok(())
780    }
781
782    /// Get all stored alerts, optionally filtered by minimum severity.
783    ///
784    /// Useful for a global alert view across all analyzed sessions.
785    pub fn get_all_alerts(&self, min_severity: Option<&str>) -> Result<Vec<StoredAlert>> {
786        let conn = self
787            .conn
788            .lock()
789            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
790
791        // Build a query that respects severity hierarchy: Critical > Warning > Info.
792        // "Warning" means Warning + Critical; "Critical" means Critical only; None means all.
793        let query = match min_severity {
794            Some("Critical") => "SELECT session_path, severity, category, timestamp, detail \
795                 FROM activity_alerts WHERE severity = 'Critical' ORDER BY timestamp DESC",
796            Some("Warning") => "SELECT session_path, severity, category, timestamp, detail \
797                 FROM activity_alerts WHERE severity IN ('Warning', 'Critical') ORDER BY timestamp DESC",
798            _ => "SELECT session_path, severity, category, timestamp, detail \
799                 FROM activity_alerts ORDER BY timestamp DESC",
800        };
801
802        let mut stmt = conn
803            .prepare(query)
804            .context("Failed to prepare alert query")?;
805
806        let rows = stmt
807            .query_map([], |row| {
808                Ok(StoredAlert {
809                    session_path: row.get(0)?,
810                    severity: row.get(1)?,
811                    category: row.get(2)?,
812                    timestamp: row.get(3)?,
813                    detail: row.get(4)?,
814                })
815            })
816            .context("Failed to query alerts")?
817            .collect::<Result<Vec<_>, _>>()
818            .context("Failed to collect alerts")?;
819
820        Ok(rows)
821    }
822
823    /// Get activity cache statistics
824    pub fn activity_stats(&self) -> Result<ActivityCacheStats> {
825        let conn = self
826            .conn
827            .lock()
828            .map_err(|e| anyhow::anyhow!("Metadata cache lock poisoned: {}", e))?;
829
830        let analyzed_sessions: i64 = conn
831            .query_row("SELECT COUNT(*) FROM activity_cache", [], |row| row.get(0))
832            .context("Failed to count activity cache entries")?;
833
834        let total_alerts: i64 = conn
835            .query_row("SELECT COUNT(*) FROM activity_alerts", [], |row| row.get(0))
836            .context("Failed to count alerts")?;
837
838        let critical_alerts: i64 = conn
839            .query_row(
840                "SELECT COUNT(*) FROM activity_alerts WHERE severity = 'Critical'",
841                [],
842                |row| row.get(0),
843            )
844            .context("Failed to count critical alerts")?;
845
846        Ok(ActivityCacheStats {
847            analyzed_sessions: analyzed_sessions as usize,
848            total_alerts: total_alerts as usize,
849            critical_alerts: critical_alerts as usize,
850        })
851    }
852}
853
854impl Drop for MetadataCache {
855    fn drop(&mut self) {
856        // WAL checkpoint on drop to ensure all data is flushed to main database file
857        // and WAL file doesn't grow unbounded across restarts
858        if let Ok(conn) = self.conn.lock() {
859            if let Err(e) = conn.pragma_update(None, "wal_checkpoint", "TRUNCATE") {
860                warn!("Failed to checkpoint WAL on MetadataCache drop: {}", e);
861            } else {
862                debug!("WAL checkpoint completed on MetadataCache drop");
863            }
864        }
865    }
866}
867
868/// Session metadata cache statistics
869#[derive(Debug, Clone)]
870pub struct CacheStats {
871    pub total_entries: usize,
872    pub total_size_bytes: usize,
873    pub project_count: usize,
874}
875
876/// Aggregate statistics from O(1) table
877#[derive(Debug, Clone, Default)]
878pub struct AggregateStats {
879    pub total_sessions: usize,
880    pub total_messages: usize,
881}
882
883/// A single FTS5 search result
884#[derive(Debug, Clone)]
885pub struct SearchResult {
886    pub path: PathBuf,
887    pub session_id: String,
888    pub project: Option<String>,
889    pub first_user_message: Option<String>,
890    pub snippet: Option<String>,
891    pub rank: f64,
892    /// ISO 8601 timestamp string from session_metadata (e.g. "2026-03-20T14:30:00Z")
893    pub first_timestamp: Option<String>,
894    /// Total message count for this session
895    pub message_count: u64,
896}
897
898/// Activity cache statistics
899#[derive(Debug, Clone)]
900pub struct ActivityCacheStats {
901    pub analyzed_sessions: usize,
902    pub total_alerts: usize,
903    pub critical_alerts: usize,
904}
905
906/// A single alert record from the activity_alerts table
907#[derive(Debug, Clone)]
908pub struct StoredAlert {
909    pub session_path: String,
910    pub severity: String,
911    pub category: String,
912    pub timestamp: String,
913    pub detail: String,
914}
915
916impl CacheStats {
917    pub fn hit_rate(&self, scanned: usize) -> f64 {
918        if scanned == 0 {
919            return 0.0;
920        }
921        (self.total_entries as f64) / (scanned as f64)
922    }
923}
924
925#[cfg(test)]
926mod tests {
927    use super::*;
928    use crate::models::SessionMetadata;
929    use chrono::Utc;
930    use tempfile::tempdir;
931
932    #[test]
933    fn test_cache_creation() {
934        let dir = tempdir().unwrap();
935        let cache = MetadataCache::new(dir.path()).unwrap();
936
937        let stats = cache.stats().unwrap();
938        assert_eq!(stats.total_entries, 0);
939    }
940
941    #[test]
942    fn test_cache_put_get() {
943        let dir = tempdir().unwrap();
944        let cache = MetadataCache::new(dir.path()).unwrap();
945
946        let path = PathBuf::from("/tmp/test.jsonl");
947        let mut meta = SessionMetadata::from_path(path.clone(), "/test".into());
948        meta.id = "test-123".into();
949        meta.message_count = 42;
950        meta.total_tokens = 1000;
951        meta.models_used = vec!["sonnet".to_string()].into_iter().collect();
952        meta.first_timestamp = Some(Utc::now());
953
954        let mtime = SystemTime::now();
955
956        // Put
957        cache.put(&path, &meta, mtime).unwrap();
958
959        // Get with same mtime (hit)
960        let cached = cache.get(&path, mtime).unwrap();
961        assert!(cached.is_some());
962        let cached = cached.unwrap();
963        assert_eq!(cached.id, "test-123");
964        assert_eq!(cached.message_count, 42);
965
966        // Get with different mtime (miss)
967        let old_mtime = mtime - std::time::Duration::from_secs(3600);
968        let cached = cache.get(&path, old_mtime).unwrap();
969        assert!(cached.is_none());
970    }
971
972    #[test]
973    fn test_cache_invalidate() {
974        let dir = tempdir().unwrap();
975        let cache = MetadataCache::new(dir.path()).unwrap();
976
977        let path = PathBuf::from("/tmp/test.jsonl");
978        let meta = SessionMetadata::from_path(path.clone(), "/test".into());
979        let mtime = SystemTime::now();
980
981        cache.put(&path, &meta, mtime).unwrap();
982
983        // Invalidate
984        cache.invalidate(&path).unwrap();
985
986        // Should be gone
987        let cached = cache.get(&path, mtime).unwrap();
988        assert!(cached.is_none());
989    }
990
991    #[test]
992    fn test_cache_project_paths() {
993        let dir = tempdir().unwrap();
994        let cache = MetadataCache::new(dir.path()).unwrap();
995
996        let mtime = SystemTime::now();
997
998        // Add sessions for two projects
999        for i in 0..3 {
1000            let path = PathBuf::from(format!("/tmp/project1/session{}.jsonl", i));
1001            let meta = SessionMetadata::from_path(path.clone(), "/project1".into());
1002            cache.put(&path, &meta, mtime).unwrap();
1003        }
1004
1005        for i in 0..2 {
1006            let path = PathBuf::from(format!("/tmp/project2/session{}.jsonl", i));
1007            let meta = SessionMetadata::from_path(path.clone(), "/project2".into());
1008            cache.put(&path, &meta, mtime).unwrap();
1009        }
1010
1011        // Get project1 paths
1012        let paths = cache.get_project_paths("/project1").unwrap();
1013        assert_eq!(paths.len(), 3);
1014
1015        // Get project2 paths
1016        let paths = cache.get_project_paths("/project2").unwrap();
1017        assert_eq!(paths.len(), 2);
1018    }
1019
1020    #[test]
1021    fn test_cache_stats() {
1022        let dir = tempdir().unwrap();
1023        let cache = MetadataCache::new(dir.path()).unwrap();
1024
1025        let mtime = SystemTime::now();
1026
1027        // Add some entries
1028        for i in 0..10 {
1029            let path = PathBuf::from(format!("/tmp/session{}.jsonl", i));
1030            let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1031            cache.put(&path, &meta, mtime).unwrap();
1032        }
1033
1034        let stats = cache.stats().unwrap();
1035        assert_eq!(stats.total_entries, 10);
1036        assert!(stats.total_size_bytes > 0);
1037        assert_eq!(stats.project_count, 1);
1038    }
1039
1040    #[test]
1041    fn test_cache_clear() {
1042        let dir = tempdir().unwrap();
1043        let cache = MetadataCache::new(dir.path()).unwrap();
1044
1045        let path = PathBuf::from("/tmp/test.jsonl");
1046        let meta = SessionMetadata::from_path(path.clone(), "/test".into());
1047        cache.put(&path, &meta, SystemTime::now()).unwrap();
1048
1049        assert_eq!(cache.stats().unwrap().total_entries, 1);
1050
1051        cache.clear().unwrap();
1052
1053        assert_eq!(cache.stats().unwrap().total_entries, 0);
1054    }
1055
1056    // ── activity cache tests ─────────────────────────────────────────────────
1057
1058    fn make_summary_with_alerts() -> ActivitySummary {
1059        use crate::models::activity::{Alert, AlertCategory, AlertSeverity};
1060        use chrono::Utc;
1061
1062        ActivitySummary {
1063            file_accesses: vec![],
1064            bash_commands: vec![],
1065            network_calls: vec![],
1066            alerts: vec![
1067                Alert {
1068                    session_id: "test-session".to_string(),
1069                    timestamp: Utc::now(),
1070                    severity: AlertSeverity::Critical,
1071                    category: AlertCategory::DestructiveCommand,
1072                    detail: "rm -rf /tmp".to_string(),
1073                },
1074                Alert {
1075                    session_id: "test-session".to_string(),
1076                    timestamp: Utc::now(),
1077                    severity: AlertSeverity::Warning,
1078                    category: AlertCategory::CredentialAccess,
1079                    detail: "Accessed .env".to_string(),
1080                },
1081            ],
1082        }
1083    }
1084
1085    #[test]
1086    fn test_activity_put_get_hit() {
1087        let dir = tempdir().unwrap();
1088        let cache = MetadataCache::new(dir.path()).unwrap();
1089
1090        let path = PathBuf::from("/tmp/session.jsonl");
1091        let summary = make_summary_with_alerts();
1092        let mtime = SystemTime::now();
1093
1094        cache
1095            .put_activity(&path, "test-session", &summary, mtime)
1096            .unwrap();
1097
1098        let cached = cache.get_activity(&path, mtime).unwrap();
1099        assert!(cached.is_some(), "Should be a cache hit");
1100        let cached = cached.unwrap();
1101        assert_eq!(cached.alerts.len(), 2);
1102    }
1103
1104    #[test]
1105    fn test_activity_get_miss_on_mtime_change() {
1106        let dir = tempdir().unwrap();
1107        let cache = MetadataCache::new(dir.path()).unwrap();
1108
1109        let path = PathBuf::from("/tmp/session.jsonl");
1110        let summary = make_summary_with_alerts();
1111        let mtime = SystemTime::now();
1112
1113        cache
1114            .put_activity(&path, "test-session", &summary, mtime)
1115            .unwrap();
1116
1117        // Different mtime → miss
1118        let stale_mtime = mtime - std::time::Duration::from_secs(60);
1119        let cached = cache.get_activity(&path, stale_mtime).unwrap();
1120        assert!(cached.is_none(), "Should be a cache miss on mtime change");
1121    }
1122
1123    #[test]
1124    fn test_activity_invalidate() {
1125        let dir = tempdir().unwrap();
1126        let cache = MetadataCache::new(dir.path()).unwrap();
1127
1128        let path = PathBuf::from("/tmp/session.jsonl");
1129        let summary = make_summary_with_alerts();
1130        let mtime = SystemTime::now();
1131
1132        cache
1133            .put_activity(&path, "test-session", &summary, mtime)
1134            .unwrap();
1135
1136        // Verify stored
1137        assert!(cache.get_activity(&path, mtime).unwrap().is_some());
1138
1139        // Invalidate
1140        cache.invalidate_activity(&path).unwrap();
1141
1142        // Should be gone
1143        assert!(
1144            cache.get_activity(&path, mtime).unwrap().is_none(),
1145            "Should be gone after invalidation"
1146        );
1147
1148        // Alerts should also be cleared
1149        let alerts = cache.get_all_alerts(None).unwrap();
1150        assert!(
1151            alerts.is_empty(),
1152            "Alerts should be cleared with activity cache"
1153        );
1154    }
1155
1156    #[test]
1157    fn test_get_all_alerts_returns_stored_alerts() {
1158        let dir = tempdir().unwrap();
1159        let cache = MetadataCache::new(dir.path()).unwrap();
1160
1161        let path = PathBuf::from("/tmp/session.jsonl");
1162        let summary = make_summary_with_alerts();
1163        let mtime = SystemTime::now();
1164
1165        cache
1166            .put_activity(&path, "test-session", &summary, mtime)
1167            .unwrap();
1168
1169        let alerts = cache.get_all_alerts(None).unwrap();
1170        assert_eq!(alerts.len(), 2, "Should return both alerts");
1171
1172        let critical: Vec<_> = alerts.iter().filter(|a| a.severity == "Critical").collect();
1173        assert_eq!(critical.len(), 1);
1174        assert!(critical[0].detail.contains("rm -rf"));
1175    }
1176
1177    #[test]
1178    fn test_get_all_alerts_filter_by_severity() {
1179        let dir = tempdir().unwrap();
1180        let cache = MetadataCache::new(dir.path()).unwrap();
1181
1182        let path = PathBuf::from("/tmp/session.jsonl");
1183        let summary = make_summary_with_alerts();
1184        let mtime = SystemTime::now();
1185
1186        cache
1187            .put_activity(&path, "test-session", &summary, mtime)
1188            .unwrap();
1189
1190        let critical_only = cache.get_all_alerts(Some("Critical")).unwrap();
1191        assert_eq!(critical_only.len(), 1);
1192        assert_eq!(critical_only[0].severity, "Critical");
1193    }
1194
1195    #[test]
1196    fn test_activity_stats() {
1197        let dir = tempdir().unwrap();
1198        let cache = MetadataCache::new(dir.path()).unwrap();
1199
1200        let stats = cache.activity_stats().unwrap();
1201        assert_eq!(stats.analyzed_sessions, 0);
1202        assert_eq!(stats.total_alerts, 0);
1203
1204        let path = PathBuf::from("/tmp/session.jsonl");
1205        let summary = make_summary_with_alerts();
1206        cache
1207            .put_activity(&path, "test-session", &summary, SystemTime::now())
1208            .unwrap();
1209
1210        let stats = cache.activity_stats().unwrap();
1211        assert_eq!(stats.analyzed_sessions, 1);
1212        assert_eq!(stats.total_alerts, 2);
1213        assert_eq!(stats.critical_alerts, 1);
1214    }
1215
1216    #[test]
1217    fn test_activity_put_replaces_stale_alerts() {
1218        let dir = tempdir().unwrap();
1219        let cache = MetadataCache::new(dir.path()).unwrap();
1220
1221        let path = PathBuf::from("/tmp/session.jsonl");
1222        let summary = make_summary_with_alerts(); // 2 alerts
1223        let mtime = SystemTime::now();
1224
1225        cache
1226            .put_activity(&path, "test-session", &summary, mtime)
1227            .unwrap();
1228        assert_eq!(cache.get_all_alerts(None).unwrap().len(), 2);
1229
1230        // Re-put with a clean summary (0 alerts)
1231        let empty_summary = ActivitySummary::default();
1232        let new_mtime = mtime + std::time::Duration::from_secs(1);
1233        cache
1234            .put_activity(&path, "test-session", &empty_summary, new_mtime)
1235            .unwrap();
1236
1237        // Old alerts should be gone
1238        let alerts = cache.get_all_alerts(None).unwrap();
1239        assert_eq!(alerts.len(), 0, "Stale alerts should be replaced");
1240    }
1241}