Skip to main content

cqs/store/
mod.rs

1//! SQLite storage for chunks, embeddings, and call graph data.
2//!
3//! Provides sync methods that internally use tokio runtime to execute async sqlx operations.
4//! This allows callers to use the Store synchronously while benefiting from sqlx's async features.
5//!
6//! ## Module Structure
7//!
8//! - `helpers` - Types and embedding conversion functions
9//! - `chunks` - Chunk CRUD operations
10//! - `notes` - Note CRUD and search
11//! - `calls` - Call graph storage and queries
12//! - `types` - Type dependency storage and queries
13//! - `migrations` - Database schema migrations
14
15mod calls;
16mod chunks;
17mod migrations;
18mod notes;
19mod types;
20
21/// Helper types and embedding conversion functions.
22///
23/// This module is `pub(crate)` - external consumers should use the re-exported
24/// types from `cqs::store` instead of accessing `cqs::store::helpers` directly.
25pub(crate) mod helpers;
26
27use std::collections::HashMap;
28use std::path::Path;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::RwLock;
31
32use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
33use sqlx::{ConnectOptions, SqlitePool};
34use tokio::runtime::Runtime;
35
36// Re-export public types with documentation
37
38/// In-memory call graph (forward + reverse adjacency lists).
39pub use helpers::CallGraph;
40
41/// Information about a function caller (from call graph).
42pub use helpers::CallerInfo;
43
44/// Caller with call-site context for impact analysis.
45pub use helpers::CallerWithContext;
46
47/// Chunk identity for diff comparison (name, file, line, window info).
48pub use helpers::ChunkIdentity;
49
50/// Summary of an indexed code chunk (function, class, etc.).
51pub use helpers::ChunkSummary;
52
53/// Parent context for expanded search results (small-to-big retrieval).
54pub use helpers::ParentContext;
55
56/// Statistics about the index (chunk counts, languages, etc.).
57pub use helpers::IndexStats;
58
59/// Embedding model metadata.
60pub use helpers::ModelInfo;
61
62/// A note search result with similarity score.
63pub use helpers::NoteSearchResult;
64
65/// Statistics about indexed notes.
66pub use helpers::NoteStats;
67
68/// Summary of a note (text, sentiment, mentions).
69pub use helpers::NoteSummary;
70
71/// Filter and scoring options for search.
72pub use helpers::SearchFilter;
73
74/// A code chunk search result with similarity score.
75pub use helpers::SearchResult;
76
77/// A file in the index whose content has changed on disk.
78pub use helpers::StaleFile;
79
80/// Report of index freshness (stale + missing files).
81pub use helpers::StaleReport;
82
83/// Store operation errors.
84pub use helpers::StoreError;
85
86/// Unified search result (code chunk or note).
87pub use helpers::UnifiedResult;
88
89/// Current database schema version.
90pub use helpers::CURRENT_SCHEMA_VERSION;
91
92/// Expected embedding dimensions (768 model + 1 sentiment).
93pub use helpers::EXPECTED_DIMENSIONS;
94
95/// Name of the embedding model used.
96pub use helpers::MODEL_NAME;
97
98/// Default name_boost weight for CLI search commands.
99pub use helpers::DEFAULT_NAME_BOOST;
100
101/// Score a chunk name against a query for definition search.
102pub use helpers::score_name_match;
103
104/// Score a pre-lowercased chunk name against a pre-lowercased query (loop-optimized variant).
105pub use helpers::score_name_match_pre_lower;
106
107/// Statistics about call graph entries (chunk-level calls table).
108pub use calls::CallStats;
109
110/// A dead function with confidence scoring.
111pub use calls::DeadFunction;
112
113/// Confidence level for dead code detection.
114pub use calls::DeadConfidence;
115
116/// Detailed function call statistics (function_calls table).
117pub use calls::FunctionCallStats;
118
119/// Statistics about type dependency edges (type_edges table).
120pub use types::TypeEdgeStats;
121
122/// In-memory type graph (forward + reverse adjacency lists).
123pub use types::TypeGraph;
124
125/// A type usage relationship from a chunk.
126pub use types::TypeUsage;
127
128// Internal use
129use helpers::{clamp_line_number, ChunkRow};
130
131use crate::nl::normalize_for_fts;
132
133/// Defense-in-depth sanitization for FTS5 query strings.
134///
135/// Strips or escapes FTS5 special characters that could alter query semantics.
136/// Applied after `normalize_for_fts()` as an extra safety layer — if `normalize_for_fts`
137/// ever changes to allow characters through, this prevents FTS5 injection.
138///
139/// FTS5 special characters: `"`, `*`, `(`, `)`, `+`, `-`, `^`, `:`, `NEAR`
140/// FTS5 boolean operators: `OR`, `AND`, `NOT` (case-sensitive in FTS5)
141///
142/// # Safety (injection)
143///
144/// This function independently strips all FTS5-significant characters including
145/// double quotes. Safe for use in `format!`-constructed FTS5 queries even without
146/// `normalize_for_fts()`. The double-pass pattern (`normalize_for_fts` then
147/// `sanitize_fts_query`) is defense-in-depth — either layer alone prevents injection.
148pub(crate) fn sanitize_fts_query(s: &str) -> String {
149    // Single-pass: split on whitespace (no allocation), filter FTS5 boolean
150    // operators, strip FTS5 special chars from each surviving word, write
151    // directly into one output String — no intermediate allocation.
152    let mut out = String::with_capacity(s.len());
153    for word in s
154        .split_whitespace()
155        .filter(|w| !matches!(*w, "OR" | "AND" | "NOT" | "NEAR"))
156    {
157        if !out.is_empty() {
158            out.push(' ');
159        }
160        out.extend(
161            word.chars()
162                .filter(|c| !matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':')),
163        );
164    }
165    out
166}
167
168/// Thread-safe SQLite store for chunks and embeddings
169///
170/// Uses sqlx connection pooling for concurrent reads and WAL mode
171/// for crash safety. All methods are synchronous but internally use
172/// an async runtime to execute sqlx operations.
173///
174/// # Memory-mapped I/O
175///
176/// `open()` sets `PRAGMA mmap_size = 256MB` per connection with a 4-connection pool,
177/// reserving up to 1GB of virtual address space. `open_readonly()` uses 64MB × 1.
178/// This is intentional and benign on 64-bit systems (128TB virtual address space).
179/// Mmap pages are demand-paged from the database file and evicted under memory
180/// pressure — actual RSS reflects only accessed pages, not the mmap reservation.
181///
182/// # Example
183///
184/// ```no_run
185/// use cqs::Store;
186/// use std::path::Path;
187///
188/// let store = Store::open(Path::new(".cqs/index.db"))?;
189/// let stats = store.stats()?;
190/// println!("Indexed {} chunks", stats.total_chunks);
191/// # Ok::<(), anyhow::Error>(())
192/// ```
193pub struct Store {
194    pub(crate) pool: SqlitePool,
195    pub(crate) rt: Runtime,
196    /// Whether close() has already been called (skip WAL checkpoint in Drop)
197    closed: AtomicBool,
198    notes_summaries_cache: RwLock<Option<Vec<NoteSummary>>>,
199    /// Cached call graph — populated on first access, valid for Store lifetime.
200    ///
201    /// **No invalidation mechanism by design.** `OnceLock` is intentionally write-once:
202    /// once populated the cache is never cleared. This is safe because `Store` is opened
203    /// per-command (one `open()` → use → `close()` cycle), so the index cannot change
204    /// while the cache is live. Long-lived `Store` instances (batch mode, watch mode)
205    /// must be re-opened to pick up index changes; the caller is responsible for that
206    /// lifecycle. Do not add invalidation logic here — it would be dead code for the
207    /// normal case and racy for the long-lived case (use a fresh `Store` instead).
208    call_graph_cache: std::sync::OnceLock<CallGraph>,
209    /// Cached test chunks — populated on first access, valid for Store lifetime.
210    ///
211    /// Same no-invalidation contract as `call_graph_cache` above: intentionally
212    /// write-once for the per-command `Store` lifetime. Re-open the `Store` if the
213    /// underlying index has been updated (e.g., after `cqs index` in watch mode).
214    test_chunks_cache: std::sync::OnceLock<Vec<ChunkSummary>>,
215}
216
217impl Store {
218    /// Open an existing index with connection pooling
219    pub fn open(path: &Path) -> Result<Self, StoreError> {
220        let _span = tracing::info_span!("store_open", path = %path.display()).entered();
221        let rt = Runtime::new()?;
222
223        // Use SqliteConnectOptions::filename() to avoid URL parsing issues with
224        // special characters in paths (spaces, #, ?, %, unicode).
225        let connect_opts = SqliteConnectOptions::new()
226            .filename(path)
227            .create_if_missing(true)
228            .foreign_keys(true)
229            .journal_mode(SqliteJournalMode::Wal)
230            .busy_timeout(std::time::Duration::from_secs(5))
231            .synchronous(SqliteSynchronous::Normal)
232            .pragma("mmap_size", "268435456") // 256MB memory-mapped I/O
233            .log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_secs(5));
234
235        // SQLite connection pool with WAL mode for concurrent reads
236        let pool = rt.block_on(async {
237            SqlitePoolOptions::new()
238                .max_connections(4) // 4 = typical CLI parallelism (index, search, watch)
239                .idle_timeout(std::time::Duration::from_secs(300)) // Close idle connections after 5 min
240                .after_connect(|conn, _meta| {
241                    Box::pin(async move {
242                        // 16MB page cache per connection (negative = KB, -16384 = 16MB)
243                        sqlx::query("PRAGMA cache_size = -16384")
244                            .execute(&mut *conn)
245                            .await?;
246                        // Keep temp tables in memory
247                        sqlx::query("PRAGMA temp_store = MEMORY")
248                            .execute(&mut *conn)
249                            .await?;
250                        Ok(())
251                    })
252                })
253                .connect_with(connect_opts)
254                .await
255        })?;
256
257        let store = Self {
258            pool,
259            rt,
260            closed: AtomicBool::new(false),
261            notes_summaries_cache: RwLock::new(None),
262            call_graph_cache: std::sync::OnceLock::new(),
263            test_chunks_cache: std::sync::OnceLock::new(),
264        };
265
266        // Set restrictive permissions on database files (Unix only)
267        // These files contain code embeddings - not secrets, but defense-in-depth
268        #[cfg(unix)]
269        {
270            use std::os::unix::fs::PermissionsExt;
271            let restrictive = std::fs::Permissions::from_mode(0o600);
272            // Main database file
273            if let Err(e) = std::fs::set_permissions(path, restrictive.clone()) {
274                tracing::debug!(path = %path.display(), error = %e, "Failed to set permissions");
275            }
276            // WAL and SHM files (may not exist yet, ignore errors)
277            let wal_path = path.with_extension("db-wal");
278            let shm_path = path.with_extension("db-shm");
279            if let Err(e) = std::fs::set_permissions(&wal_path, restrictive.clone()) {
280                tracing::debug!(path = %wal_path.display(), error = %e, "Failed to set permissions");
281            }
282            if let Err(e) = std::fs::set_permissions(&shm_path, restrictive) {
283                tracing::debug!(path = %shm_path.display(), error = %e, "Failed to set permissions");
284            }
285        }
286
287        tracing::info!(path = %path.display(), "Database connected");
288
289        // Quick integrity check — catches B-tree corruption early
290        store.rt.block_on(async {
291            let result: (String,) = sqlx::query_as("PRAGMA quick_check")
292                .fetch_one(&store.pool)
293                .await?;
294            if result.0 != "ok" {
295                return Err(StoreError::Corruption(result.0));
296            }
297            Ok::<_, StoreError>(())
298        })?;
299
300        // Check model version BEFORE schema migration — if model mismatches,
301        // we don't want to commit a schema upgrade on a DB we'll reject anyway
302        store.check_model_version()?;
303        // Check schema version compatibility (may run migrations)
304        store.check_schema_version(path)?;
305        // Warn if index was created by different cqs version
306        store.check_cq_version();
307
308        Ok(store)
309    }
310
311    /// Open an existing index in read-only mode with reduced resources.
312    ///
313    /// Uses minimal connection pool, smaller cache, and single-threaded runtime.
314    /// Suitable for reference stores and background builds that only read data.
315    pub fn open_readonly(path: &Path) -> Result<Self, StoreError> {
316        let _span = tracing::info_span!("store_open_readonly", path = %path.display()).entered();
317        let rt = tokio::runtime::Builder::new_current_thread()
318            .enable_all()
319            .build()?;
320
321        // Use SqliteConnectOptions::filename() to avoid URL parsing issues with
322        // special characters in paths (spaces, #, ?, %, unicode).
323        let connect_opts = SqliteConnectOptions::new()
324            .filename(path)
325            .read_only(true)
326            .foreign_keys(true)
327            .journal_mode(SqliteJournalMode::Wal)
328            .busy_timeout(std::time::Duration::from_secs(5))
329            .synchronous(SqliteSynchronous::Normal)
330            .pragma("mmap_size", "67108864") // 64MB mmap (reduced from 256MB)
331            .log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_secs(5));
332
333        let pool = rt.block_on(async {
334            SqlitePoolOptions::new()
335                .max_connections(1)
336                .idle_timeout(std::time::Duration::from_secs(300))
337                .after_connect(|conn, _meta| {
338                    Box::pin(async move {
339                        // 4MB page cache (reduced from 16MB)
340                        sqlx::query("PRAGMA cache_size = -4096")
341                            .execute(&mut *conn)
342                            .await?;
343                        // Keep temp tables in memory
344                        sqlx::query("PRAGMA temp_store = MEMORY")
345                            .execute(&mut *conn)
346                            .await?;
347                        Ok(())
348                    })
349                })
350                .connect_with(connect_opts)
351                .await
352        })?;
353
354        let store = Self {
355            pool,
356            rt,
357            closed: AtomicBool::new(false),
358            notes_summaries_cache: RwLock::new(None),
359            call_graph_cache: std::sync::OnceLock::new(),
360            test_chunks_cache: std::sync::OnceLock::new(),
361        };
362
363        // Skip permissions setting (read-only, no file creation)
364
365        tracing::info!(path = %path.display(), "Database connected (read-only)");
366
367        // Quick integrity check — catches B-tree corruption early
368        store.rt.block_on(async {
369            let result: (String,) = sqlx::query_as("PRAGMA quick_check")
370                .fetch_one(&store.pool)
371                .await?;
372            if result.0 != "ok" {
373                return Err(StoreError::Corruption(result.0));
374            }
375            Ok::<_, StoreError>(())
376        })?;
377
378        store.check_schema_version(path)?;
379        store.check_model_version()?;
380        store.check_cq_version();
381
382        Ok(store)
383    }
384
385    /// Create a new index
386    ///
387    /// Wraps all DDL and metadata inserts in a single transaction so a
388    /// crash mid-init cannot leave a partial schema.
389    pub fn init(&self, model_info: &ModelInfo) -> Result<(), StoreError> {
390        let _span = tracing::info_span!("Store::init").entered();
391        self.rt.block_on(async {
392            let mut tx = self.pool.begin().await?;
393
394            // Create tables - execute each statement separately
395            let schema = include_str!("../schema.sql");
396            for statement in schema.split(';') {
397                let stmt: String = statement
398                    .lines()
399                    .skip_while(|line| {
400                        let trimmed = line.trim();
401                        trimmed.is_empty() || trimmed.starts_with("--")
402                    })
403                    .collect::<Vec<_>>()
404                    .join("\n");
405                let stmt = stmt.trim();
406                if stmt.is_empty() {
407                    continue;
408                }
409                sqlx::query(stmt).execute(&mut *tx).await?;
410            }
411
412            // Store metadata (OR REPLACE handles re-init after incomplete cleanup)
413            let now = chrono::Utc::now().to_rfc3339();
414            sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
415                .bind("schema_version")
416                .bind(CURRENT_SCHEMA_VERSION.to_string())
417                .execute(&mut *tx)
418                .await?;
419            sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
420                .bind("model_name")
421                .bind(&model_info.name)
422                .execute(&mut *tx)
423                .await?;
424            sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
425                .bind("dimensions")
426                .bind(model_info.dimensions.to_string())
427                .execute(&mut *tx)
428                .await?;
429            sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
430                .bind("created_at")
431                .bind(&now)
432                .execute(&mut *tx)
433                .await?;
434            sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
435                .bind("cq_version")
436                .bind(env!("CARGO_PKG_VERSION"))
437                .execute(&mut *tx)
438                .await?;
439
440            tx.commit().await?;
441
442            tracing::info!(
443                schema_version = CURRENT_SCHEMA_VERSION,
444                "Schema initialized"
445            );
446
447            Ok(())
448        })
449    }
450
451    fn check_schema_version(&self, path: &Path) -> Result<(), StoreError> {
452        let path_str = path.display().to_string();
453        self.rt.block_on(async {
454            let row: Option<(String,)> =
455                match sqlx::query_as("SELECT value FROM metadata WHERE key = 'schema_version'")
456                    .fetch_optional(&self.pool)
457                    .await
458                {
459                    Ok(r) => r,
460                    Err(sqlx::Error::Database(e)) if e.message().contains("no such table") => {
461                        return Ok(());
462                    }
463                    Err(e) => return Err(e.into()),
464                };
465
466            let version: i32 = match row {
467                Some((s,)) => s.parse().map_err(|e| {
468                    StoreError::Corruption(format!(
469                        "schema_version '{}' is not a valid integer: {}",
470                        s, e
471                    ))
472                })?,
473                None => 0,
474            };
475
476            if version > CURRENT_SCHEMA_VERSION {
477                return Err(StoreError::SchemaNewerThanCq(version));
478            }
479            if version < CURRENT_SCHEMA_VERSION && version > 0 {
480                // Attempt migration instead of failing
481                match migrations::migrate(&self.pool, version, CURRENT_SCHEMA_VERSION).await {
482                    Ok(()) => {
483                        tracing::info!(
484                            path = %path_str,
485                            from = version,
486                            to = CURRENT_SCHEMA_VERSION,
487                            "Schema migrated successfully"
488                        );
489                    }
490                    Err(StoreError::MigrationNotSupported(from, to)) => {
491                        // No migration available, fall back to original error
492                        return Err(StoreError::SchemaMismatch(path_str, from, to));
493                    }
494                    Err(e) => return Err(e),
495                }
496            }
497            Ok(())
498        })
499    }
500
501    fn check_model_version(&self) -> Result<(), StoreError> {
502        self.rt.block_on(async {
503            // Check model name
504            let row: Option<(String,)> =
505                match sqlx::query_as("SELECT value FROM metadata WHERE key = 'model_name'")
506                    .fetch_optional(&self.pool)
507                    .await
508                {
509                    Ok(r) => r,
510                    Err(sqlx::Error::Database(e)) if e.message().contains("no such table") => {
511                        return Ok(());
512                    }
513                    Err(e) => return Err(e.into()),
514                };
515
516            let stored_model = row.map(|(s,)| s).unwrap_or_default();
517
518            if !stored_model.is_empty() && stored_model != MODEL_NAME {
519                return Err(StoreError::ModelMismatch(
520                    stored_model,
521                    MODEL_NAME.to_string(),
522                ));
523            }
524
525            // Check embedding dimensions
526            let dim_row: Option<(String,)> =
527                sqlx::query_as("SELECT value FROM metadata WHERE key = 'dimensions'")
528                    .fetch_optional(&self.pool)
529                    .await?;
530
531            if let Some((dim_str,)) = dim_row {
532                if let Ok(stored_dim) = dim_str.parse::<u32>() {
533                    if stored_dim != EXPECTED_DIMENSIONS {
534                        return Err(StoreError::DimensionMismatch(
535                            stored_dim,
536                            EXPECTED_DIMENSIONS,
537                        ));
538                    }
539                } else {
540                    tracing::warn!(dim = %dim_str, "Failed to parse stored dimension");
541                }
542            }
543
544            Ok(())
545        })
546    }
547
548    fn check_cq_version(&self) {
549        if let Err(e) = self.rt.block_on(async {
550            let row: Option<(String,)> =
551                match sqlx::query_as("SELECT value FROM metadata WHERE key = 'cq_version'")
552                    .fetch_optional(&self.pool)
553                    .await
554                {
555                    Ok(row) => row,
556                    Err(e) => {
557                        tracing::debug!(error = %e, "Failed to read cq_version from metadata");
558                        return Ok::<_, StoreError>(());
559                    }
560                };
561
562            let stored_version = row.map(|(s,)| s).unwrap_or_default();
563            let current_version = env!("CARGO_PKG_VERSION");
564
565            if !stored_version.is_empty() && stored_version != current_version {
566                tracing::info!(
567                    "Index created by cqs v{}, running v{}",
568                    stored_version,
569                    current_version
570                );
571            }
572            Ok::<_, StoreError>(())
573        }) {
574            tracing::debug!(error = %e, "check_cq_version failed");
575        }
576    }
577
578    /// Search FTS5 index for keyword matches.
579    ///
580    /// # Search Method Overview
581    ///
582    /// The Store provides several search methods with different characteristics:
583    ///
584    /// - **`search_fts`**: Full-text keyword search using SQLite FTS5. Returns chunk IDs.
585    ///   Best for: Exact keyword matches, symbol lookup by name fragment.
586    ///
587    /// - **`search_by_name`**: Definition search by function/struct name. Uses FTS5 with
588    ///   heavy weighting on the name column. Returns full `SearchResult` with scores.
589    ///   Best for: "Where is X defined?" queries.
590    ///
591    /// - **`search_filtered`** (in search.rs): Semantic search with optional language/path
592    ///   filters. Can use RRF hybrid search combining semantic + FTS scores.
593    ///   Best for: Natural language queries like "retry with exponential backoff".
594    ///
595    /// - **`search_filtered_with_index`** (in search.rs): Like `search_filtered` but uses
596    ///   HNSW/CAGRA vector index for O(log n) candidate retrieval instead of brute force.
597    ///   Best for: Large indexes (>5k chunks) where brute force is slow.
598    pub fn search_fts(&self, query: &str, limit: usize) -> Result<Vec<String>, StoreError> {
599        let _span = tracing::info_span!("search_fts", limit).entered();
600        let normalized_query = sanitize_fts_query(&normalize_for_fts(query));
601        if normalized_query.is_empty() {
602            tracing::debug!(
603                original_query = %query,
604                "Query normalized to empty string, returning no FTS results"
605            );
606            return Ok(vec![]);
607        }
608
609        self.rt.block_on(async {
610            let rows: Vec<(String,)> = sqlx::query_as(
611                "SELECT id FROM chunks_fts WHERE chunks_fts MATCH ?1 ORDER BY bm25(chunks_fts) LIMIT ?2",
612            )
613            .bind(&normalized_query)
614            .bind(limit as i64)
615            .fetch_all(&self.pool)
616            .await?;
617
618            Ok(rows.into_iter().map(|(id,)| id).collect())
619        })
620    }
621
622    /// Search for chunks by name (definition search).
623    ///
624    /// Searches the FTS5 name column for exact or prefix matches.
625    /// Use this for "where is X defined?" queries instead of semantic search.
626    pub fn search_by_name(
627        &self,
628        name: &str,
629        limit: usize,
630    ) -> Result<Vec<SearchResult>, StoreError> {
631        let _span = tracing::info_span!("search_by_name", %name, limit).entered();
632        let limit = limit.min(100);
633        let normalized = sanitize_fts_query(&normalize_for_fts(name));
634        if normalized.is_empty() {
635            return Ok(vec![]);
636        }
637
638        // Pre-lowercase query once for score_name_match_pre_lower (PF-3)
639        let lower_name = name.to_lowercase();
640
641        // Search name column specifically using FTS5 column filter
642        // Use * for prefix matching (e.g., "parse" matches "parse_config")
643        debug_assert!(
644            !normalized.contains('"'),
645            "sanitized query must not contain double quotes"
646        );
647        if normalized.contains('"') {
648            return Ok(vec![]);
649        }
650        let fts_query = format!("name:\"{}\" OR name:\"{}\"*", normalized, normalized);
651
652        self.rt.block_on(async {
653            let rows: Vec<_> = sqlx::query(
654                "SELECT c.id, c.origin, c.language, c.chunk_type, c.name, c.signature, c.content, c.doc, c.line_start, c.line_end, c.parent_id, c.parent_type_name
655                 FROM chunks c
656                 JOIN chunks_fts f ON c.id = f.id
657                 WHERE chunks_fts MATCH ?1
658                 ORDER BY bm25(chunks_fts, 10.0, 1.0, 1.0, 1.0) -- Heavy weight on name column
659                 LIMIT ?2",
660            )
661            .bind(&fts_query)
662            .bind(limit as i64)
663            .fetch_all(&self.pool)
664            .await?;
665
666            use sqlx::Row;
667            let mut results = rows
668                .into_iter()
669                .map(|row| {
670                    let chunk = ChunkSummary::from(ChunkRow {
671                        id: row.get(0),
672                        origin: row.get(1),
673                        language: row.get(2),
674                        chunk_type: row.get(3),
675                        name: row.get(4),
676                        signature: row.get(5),
677                        content: row.get(6),
678                        doc: row.get(7),
679                        line_start: clamp_line_number(row.get::<i64, _>(8)),
680                        line_end: clamp_line_number(row.get::<i64, _>(9)),
681                        parent_id: row.get(10),
682                        parent_type_name: row.get(11),
683                    });
684                    let name_lower = chunk.name.to_lowercase();
685                    let score = helpers::score_name_match_pre_lower(&name_lower, &lower_name);
686                    SearchResult { chunk, score }
687                })
688                .collect::<Vec<_>>();
689
690            // Re-sort by name-match score (FTS bm25 ordering may differ)
691            results.sort_by(|a, b| b.score.total_cmp(&a.score));
692
693            Ok(results)
694        })
695    }
696
697    /// Compute RRF (Reciprocal Rank Fusion) scores for combining two ranked lists.
698    ///
699    /// Allocates a new HashMap per search. Pre-allocated buffer was considered but:
700    /// - Input size varies (limit*3 semantic + limit*3 FTS = up to 6*limit entries)
701    /// - HashMap with ~30-100 entries costs ~1KB, negligible vs embedding costs (~3KB)
702    /// - Thread-local buffer would add complexity for ~0.1ms savings on typical searches
703    pub(crate) fn rrf_fuse(
704        semantic_ids: &[&str],
705        fts_ids: &[String],
706        limit: usize,
707    ) -> Vec<(String, f32)> {
708        // K=60 is the standard RRF constant from the original paper.
709        // Higher K reduces the impact of rank differences (smoother fusion).
710        const K: f32 = 60.0;
711
712        let mut scores: HashMap<&str, f32> = HashMap::new();
713
714        for (rank, id) in semantic_ids.iter().enumerate() {
715            // RRF formula: 1 / (K + rank). The + 1.0 converts 0-indexed enumerate()
716            // to 1-indexed ranks (first result = rank 1, not rank 0).
717            let contribution = 1.0 / (K + rank as f32 + 1.0);
718            *scores.entry(id).or_insert(0.0) += contribution;
719        }
720
721        for (rank, id) in fts_ids.iter().enumerate() {
722            // Same conversion: enumerate's 0-index -> RRF's 1-indexed rank
723            let contribution = 1.0 / (K + rank as f32 + 1.0);
724            *scores.entry(id.as_str()).or_insert(0.0) += contribution;
725        }
726
727        let mut sorted: Vec<(String, f32)> = scores
728            .into_iter()
729            .map(|(k, v)| (k.to_string(), v))
730            .collect();
731        sorted.sort_by(|a, b| b.1.total_cmp(&a.1));
732        sorted.truncate(limit);
733        sorted
734    }
735
736    /// Exposed for property testing only
737    #[cfg(test)]
738    pub(crate) fn rrf_fuse_test(
739        semantic_ids: &[String],
740        fts_ids: &[String],
741        limit: usize,
742    ) -> Vec<(String, f32)> {
743        let refs: Vec<&str> = semantic_ids.iter().map(|s| s.as_str()).collect();
744        Self::rrf_fuse(&refs, fts_ids, limit)
745    }
746
747    /// Update the `updated_at` metadata timestamp to now.
748    ///
749    /// Call after indexing operations complete (pipeline, watch reindex, note sync)
750    /// to track when the index was last modified.
751    pub fn touch_updated_at(&self) -> Result<(), StoreError> {
752        let now = chrono::Utc::now().to_rfc3339();
753        self.rt.block_on(async {
754            sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES ('updated_at', ?1)")
755                .bind(&now)
756                .execute(&self.pool)
757                .await?;
758            Ok(())
759        })
760    }
761
762    /// Get cached notes summaries (loaded on first call, invalidated on mutation).
763    ///
764    /// Returns a cloned Vec rather than a slice reference to avoid holding the
765    /// RwLock read guard across caller code. The clone cost is negligible — notes
766    /// are typically <100 entries with small strings.
767    pub fn cached_notes_summaries(&self) -> Result<Vec<NoteSummary>, StoreError> {
768        {
769            let guard = self.notes_summaries_cache.read().unwrap_or_else(|p| {
770                tracing::warn!("notes cache read lock poisoned, recovering");
771                p.into_inner()
772            });
773            if let Some(ref ns) = *guard {
774                return Ok(ns.clone());
775            }
776        }
777        // Cache miss — load from DB and populate
778        let ns = self.list_notes_summaries()?;
779        {
780            let mut guard = self.notes_summaries_cache.write().unwrap_or_else(|p| {
781                tracing::warn!("notes cache write lock poisoned, recovering");
782                p.into_inner()
783            });
784            *guard = Some(ns.clone());
785        }
786        Ok(ns)
787    }
788
789    /// Invalidate the cached notes summaries.
790    ///
791    /// Must be called after any operation that modifies notes (upsert, replace, delete)
792    /// so subsequent reads see fresh data.
793    pub(crate) fn invalidate_notes_cache(&self) {
794        match self.notes_summaries_cache.write() {
795            Ok(mut guard) => *guard = None,
796            Err(p) => {
797                tracing::warn!("notes cache write lock poisoned during invalidation, recovering");
798                *p.into_inner() = None;
799            }
800        }
801    }
802
803    /// Gracefully close the store, performing WAL checkpoint.
804    ///
805    /// This ensures all WAL changes are written to the main database file,
806    /// reducing startup time for subsequent opens and freeing disk space
807    /// used by WAL files.
808    ///
809    /// Safe to skip (pool will close connections on drop), but recommended
810    /// for clean shutdown in long-running processes.
811    pub fn close(self) -> Result<(), StoreError> {
812        self.closed.store(true, Ordering::Release);
813        self.rt.block_on(async {
814            // TRUNCATE mode: checkpoint and delete WAL file
815            sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
816                .execute(&self.pool)
817                .await?;
818            tracing::debug!("WAL checkpoint completed");
819            self.pool.close().await;
820            Ok(())
821        })
822    }
823}
824
825impl Drop for Store {
826    fn drop(&mut self) {
827        if self.closed.load(Ordering::Acquire) {
828            return; // Already checkpointed in close()
829        }
830        // Best-effort WAL checkpoint on drop to avoid leaving large WAL files.
831        // Errors are logged but not propagated (Drop can't fail).
832        // catch_unwind guards against block_on panicking when called from
833        // within an async context (e.g., if Store is dropped inside a tokio runtime).
834        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
835            if let Err(e) = self.rt.block_on(async {
836                sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
837                    .execute(&self.pool)
838                    .await
839            }) {
840                tracing::warn!(error = %e, "WAL checkpoint on drop failed (non-fatal)");
841            }
842        }));
843        // Pool closes automatically when dropped
844    }
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850    use proptest::prelude::*;
851
852    // ===== Property-based tests for RRF =====
853
854    proptest! {
855        /// Property: RRF scores are always positive
856        #[test]
857        fn prop_rrf_scores_positive(
858            semantic in prop::collection::vec("[a-z]{1,5}", 0..20),
859            fts in prop::collection::vec("[a-z]{1,5}", 0..20),
860            limit in 1usize..50
861        ) {
862            let result = Store::rrf_fuse_test(&semantic, &fts, limit);
863            for (_, score) in &result {
864                prop_assert!(*score > 0.0, "RRF score should be positive: {}", score);
865            }
866        }
867
868        /// Property: RRF scores are bounded
869        /// Note: Duplicates in input lists can accumulate extra points.
870        /// Max theoretical: sum of 1/(K+r+1) for all appearances across both lists.
871        #[test]
872        fn prop_rrf_scores_bounded(
873            semantic in prop::collection::vec("[a-z]{1,5}", 0..20),
874            fts in prop::collection::vec("[a-z]{1,5}", 0..20),
875            limit in 1usize..50
876        ) {
877            let result = Store::rrf_fuse_test(&semantic, &fts, limit);
878            // Conservative upper bound: sum of first N terms of 1/(K+r+1) for both lists
879            // where N is max list length (20). With duplicates, actual max is ~0.3
880            let max_possible = 0.5; // generous bound accounting for duplicates
881            for (id, score) in &result {
882                prop_assert!(
883                    *score <= max_possible,
884                    "RRF score {} for '{}' exceeds max {}",
885                    score, id, max_possible
886                );
887            }
888        }
889
890        /// Property: RRF respects limit
891        #[test]
892        fn prop_rrf_respects_limit(
893            semantic in prop::collection::vec("[a-z]{1,5}", 0..30),
894            fts in prop::collection::vec("[a-z]{1,5}", 0..30),
895            limit in 1usize..20
896        ) {
897            let result = Store::rrf_fuse_test(&semantic, &fts, limit);
898            prop_assert!(
899                result.len() <= limit,
900                "Result length {} exceeds limit {}",
901                result.len(), limit
902            );
903        }
904
905        /// Property: RRF results are sorted by score descending
906        #[test]
907        fn prop_rrf_sorted_descending(
908            semantic in prop::collection::vec("[a-z]{1,5}", 1..20),
909            fts in prop::collection::vec("[a-z]{1,5}", 1..20),
910            limit in 1usize..50
911        ) {
912            let result = Store::rrf_fuse_test(&semantic, &fts, limit);
913            for window in result.windows(2) {
914                prop_assert!(
915                    window[0].1 >= window[1].1,
916                    "Results not sorted: {} < {}",
917                    window[0].1, window[1].1
918                );
919            }
920        }
921
922        /// Property: Items appearing in both lists get higher scores
923        /// Note: Uses hash_set to ensure unique IDs - duplicates in input lists
924        /// accumulate scores which can violate the "overlap wins" property.
925        #[test]
926        fn prop_rrf_rewards_overlap(
927            common_id in "[a-z]{3}",
928            only_semantic in prop::collection::hash_set("[A-Z]{3}", 1..5),
929            only_fts in prop::collection::hash_set("[0-9]{3}", 1..5)
930        ) {
931            let mut semantic = vec![common_id.clone()];
932            semantic.extend(only_semantic);
933            let mut fts = vec![common_id.clone()];
934            fts.extend(only_fts);
935
936            let result = Store::rrf_fuse_test(&semantic, &fts, 100);
937
938            let common_score = result.iter()
939                .find(|(id, _)| id == &common_id)
940                .map(|(_, s)| *s)
941                .unwrap_or(0.0);
942
943            let max_single = result.iter()
944                .filter(|(id, _)| id != &common_id)
945                .map(|(_, s)| *s)
946                .fold(0.0f32, |a, b| a.max(b));
947
948            prop_assert!(
949                common_score >= max_single,
950                "Common item score {} should be >= single-list max {}",
951                common_score, max_single
952            );
953        }
954
955        // ===== FTS fuzz tests =====
956
957        #[test]
958        fn fuzz_normalize_for_fts_no_panic(input in "\\PC{0,500}") {
959            let _ = normalize_for_fts(&input);
960        }
961
962        #[test]
963        fn fuzz_normalize_for_fts_safe_output(input in "\\PC{0,200}") {
964            let result = normalize_for_fts(&input);
965            for c in result.chars() {
966                prop_assert!(
967                    c.is_alphanumeric() || c == ' ' || c == '_',
968                    "Unexpected char '{}' (U+{:04X}) in output: {}",
969                    c, c as u32, result
970                );
971            }
972        }
973
974        #[test]
975        fn fuzz_normalize_for_fts_special_chars(
976            prefix in "[a-z]{0,10}",
977            special in prop::sample::select(vec!['*', '"', ':', '^', '(', ')', '-', '+']),
978            suffix in "[a-z]{0,10}"
979        ) {
980            let input = format!("{}{}{}", prefix, special, suffix);
981            let result = normalize_for_fts(&input);
982            prop_assert!(
983                !result.contains(special),
984                "Special char '{}' should be stripped from: {} -> {}",
985                special, input, result
986            );
987        }
988
989        #[test]
990        fn fuzz_normalize_for_fts_unicode(input in "[\\p{L}\\p{N}\\s]{0,100}") {
991            let result = normalize_for_fts(&input);
992            prop_assert!(result.len() <= input.len() * 4);
993        }
994
995        // ===== sanitize_fts_query property tests (SEC-4) =====
996
997        /// Output never contains FTS5 special characters
998        #[test]
999        fn prop_sanitize_no_special_chars(input in "\\PC{0,500}") {
1000            let result = sanitize_fts_query(&input);
1001            for c in result.chars() {
1002                prop_assert!(
1003                    !matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
1004                    "FTS5 special char '{}' in sanitized output: {}",
1005                    c, result
1006                );
1007            }
1008        }
1009
1010        /// Output never contains standalone boolean operators
1011        #[test]
1012        fn prop_sanitize_no_operators(input in "\\PC{0,300}") {
1013            let result = sanitize_fts_query(&input);
1014            for word in result.split_whitespace() {
1015                prop_assert!(
1016                    !matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
1017                    "FTS5 operator '{}' survived sanitization: {}",
1018                    word, result
1019                );
1020            }
1021        }
1022
1023        /// Combined pipeline: normalize + sanitize is safe for arbitrary input
1024        #[test]
1025        fn prop_pipeline_safe(input in "\\PC{0,300}") {
1026            let result = sanitize_fts_query(&normalize_for_fts(&input));
1027            // No FTS5 special chars
1028            for c in result.chars() {
1029                prop_assert!(
1030                    !matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
1031                    "Special char '{}' in pipeline output: {}",
1032                    c, result
1033                );
1034            }
1035            // No boolean operators
1036            for word in result.split_whitespace() {
1037                prop_assert!(
1038                    !matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
1039                    "Operator '{}' in pipeline output: {}",
1040                    word, result
1041                );
1042            }
1043        }
1044
1045        /// Targeted: strings composed entirely of special chars produce empty output
1046        #[test]
1047        fn prop_sanitize_all_special(
1048            chars in prop::collection::vec(
1049                prop::sample::select(vec!['"', '*', '(', ')', '+', '-', '^', ':']),
1050                1..50
1051            )
1052        ) {
1053            let input: String = chars.into_iter().collect();
1054            let result = sanitize_fts_query(&input);
1055            prop_assert!(
1056                result.is_empty(),
1057                "All-special input should produce empty output, got: {}",
1058                result
1059            );
1060        }
1061
1062        /// Targeted: operator words surrounded by normal text are stripped
1063        #[test]
1064        fn prop_sanitize_operators_removed(
1065            pre in "[a-z]{1,10}",
1066            op in prop::sample::select(vec!["OR", "AND", "NOT", "NEAR"]),
1067            post in "[a-z]{1,10}"
1068        ) {
1069            let input = format!("{} {} {}", pre, op, post);
1070            let result = sanitize_fts_query(&input);
1071            prop_assert!(
1072                !result.split_whitespace().any(|w| w == op),
1073                "Operator '{}' not stripped from: {} -> {}",
1074                op, input, result
1075            );
1076            // Pre and post words should survive
1077            prop_assert!(result.contains(&pre), "Pre-text '{}' missing from: {}", pre, result);
1078            prop_assert!(result.contains(&post), "Post-text '{}' missing from: {}", post, result);
1079        }
1080
1081        /// Adversarial: mixed special chars + operators + normal text
1082        #[test]
1083        fn prop_sanitize_adversarial(
1084            normal in "[a-z]{1,10}",
1085            special in prop::sample::select(vec!['"', '*', '(', ')', '+', '-', '^', ':']),
1086            op in prop::sample::select(vec!["OR", "AND", "NOT", "NEAR"]),
1087        ) {
1088            let input = format!("{}{} {} {}{}", special, normal, op, normal, special);
1089            let result = sanitize_fts_query(&input);
1090            for c in result.chars() {
1091                prop_assert!(
1092                    !matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
1093                    "Special char '{}' in adversarial output: {}",
1094                    c, result
1095                );
1096            }
1097            for word in result.split_whitespace() {
1098                prop_assert!(
1099                    !matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
1100                    "Operator '{}' in adversarial output: {}",
1101                    word, result
1102                );
1103            }
1104        }
1105    }
1106}