Skip to main content

khive_db/stores/
text.rs

1//! FTS5-backed `TextSearch` implementation.
2//!
3//! Each `Fts5TextSearch` manages a single FTS5 virtual table for full-text
4//! search. The table stores document metadata alongside the indexed text
5//! columns (`title` and `body`), with non-searchable columns marked
6//! `UNINDEXED`.
7//!
8//! # FTS5 table layout
9//!
10//! ```sql
11//! CREATE VIRTUAL TABLE fts_{key} USING fts5(
12//!     subject_id UNINDEXED,
13//!     kind UNINDEXED,
14//!     title,
15//!     body,
16//!     tags UNINDEXED,
17//!     namespace UNINDEXED,
18//!     metadata UNINDEXED,
19//!     updated_at UNINDEXED
20//! );
21//! ```
22//!
23//! Only `title` and `body` are full-text indexed. The remaining columns are
24//! stored for retrieval and filtering but do not participate in FTS ranking.
25//!
26//! # Connection strategy
27//!
28//! Follows the same dual-mode pattern as `SqliteVecStore`:
29//! - **File-backed**: Opens standalone connections per operation.
30//! - **In-memory**: Acquires pool connections via `spawn_blocking`.
31//!
32//! # Score normalization
33//!
34//! FTS5 `rank` values are negative (more negative = more relevant). We
35//! normalize within each result set so scores span `(0.05, 1.0]`, preserving
36//! relative ordering across all tokenizers (including trigram, which produces
37//! a narrow absolute range that the old `1/(1+|rank|)` formula collapsed into
38//! near-uniform noise). The best hit in a result set receives score `1.0`;
39//! the worst receives `0.05`. When all hits have the same rank (single hit or
40//! degenerate match), score is `1.0`.
41
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use chrono::{DateTime, TimeZone, Utc};
46use uuid::Uuid;
47
48use khive_score::DeterministicScore;
49use khive_storage::error::StorageError;
50use khive_storage::types::{
51    BatchWriteSummary, IndexRebuildScope, TextDocument, TextFilter, TextIndexStats, TextQueryMode,
52    TextSearchHit, TextSearchRequest,
53};
54use khive_storage::StorageCapability;
55use khive_storage::TextSearch;
56use khive_types::SubstrateKind;
57
58use crate::error::SqliteError;
59use crate::pool::ConnectionPool;
60
61/// Ensure the FTS5 virtual table for `table_key` exists.
62///
63/// Used in tests to set up an in-memory FTS5 table without the full `StorageBackend`.
64#[cfg(test)]
65pub(crate) fn ensure_fts5_schema(
66    conn: &rusqlite::Connection,
67    table_key: &str,
68) -> Result<(), rusqlite::Error> {
69    let table_name = format!("fts_{}", table_key);
70    let ddl = format!(
71        "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5(\
72         subject_id UNINDEXED, \
73         kind UNINDEXED, \
74         title, \
75         body, \
76         tags UNINDEXED, \
77         namespace UNINDEXED, \
78         metadata UNINDEXED, \
79         updated_at UNINDEXED\
80         )",
81        table_name
82    );
83    conn.execute_batch(&ddl)
84}
85
86fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
87    StorageError::driver(StorageCapability::Text, op, e)
88}
89
90fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
91    StorageError::driver(StorageCapability::Text, op, e)
92}
93
94/// A TextSearch backed by SQLite FTS5 virtual tables.
95///
96/// Each instance manages one table: `fts_{table_key}`. Documents are stored
97/// with their metadata in UNINDEXED columns; only `title` and `body` are
98/// full-text indexed.
99pub struct Fts5TextSearch {
100    pool: Arc<ConnectionPool>,
101    is_file_backed: bool,
102    table_name: String,
103}
104
105impl Fts5TextSearch {
106    /// Create a new FTS5 text search instance.
107    ///
108    /// The FTS5 virtual table must already exist (created by `StorageBackend::text()`).
109    pub(crate) fn new(pool: Arc<ConnectionPool>, is_file_backed: bool, table_key: String) -> Self {
110        let table_name = format!("fts_{}", table_key);
111        Self {
112            pool,
113            is_file_backed,
114            table_name,
115        }
116    }
117
118    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
119        let config = self.pool.config();
120        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
121            operation: "fts_writer".into(),
122            message: "in-memory databases do not support standalone connections".into(),
123        })?;
124
125        let conn = rusqlite::Connection::open_with_flags(
126            path,
127            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
128                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
129                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
130        )
131        .map_err(|e| map_err(e, "open_fts_writer"))?;
132
133        conn.busy_timeout(config.busy_timeout)
134            .map_err(|e| map_err(e, "open_fts_writer"))?;
135        conn.pragma_update(None, "foreign_keys", "ON")
136            .map_err(|e| map_err(e, "open_fts_writer"))?;
137        conn.pragma_update(None, "synchronous", "NORMAL")
138            .map_err(|e| map_err(e, "open_fts_writer"))?;
139
140        Ok(conn)
141    }
142
143    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
144        let config = self.pool.config();
145        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
146            operation: "fts_reader".into(),
147            message: "in-memory databases do not support standalone connections".into(),
148        })?;
149
150        let conn = rusqlite::Connection::open_with_flags(
151            path,
152            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
153                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
154                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
155        )
156        .map_err(|e| map_err(e, "open_fts_reader"))?;
157
158        conn.busy_timeout(config.busy_timeout)
159            .map_err(|e| map_err(e, "open_fts_reader"))?;
160        conn.pragma_update(None, "foreign_keys", "ON")
161            .map_err(|e| map_err(e, "open_fts_reader"))?;
162        conn.pragma_update(None, "synchronous", "NORMAL")
163            .map_err(|e| map_err(e, "open_fts_reader"))?;
164
165        Ok(conn)
166    }
167
168    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
169    where
170        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
171        R: Send + 'static,
172    {
173        if self.is_file_backed {
174            let conn = self.open_standalone_writer()?;
175            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
176                .await
177                .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
178        } else {
179            let pool = Arc::clone(&self.pool);
180            tokio::task::spawn_blocking(move || {
181                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
182                f(guard.conn()).map_err(|e| map_err(e, op))
183            })
184            .await
185            .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
186        }
187    }
188
189    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
190    where
191        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
192        R: Send + 'static,
193    {
194        if self.is_file_backed {
195            let conn = self.open_standalone_reader()?;
196            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
197                .await
198                .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
199        } else {
200            let pool = Arc::clone(&self.pool);
201            tokio::task::spawn_blocking(move || {
202                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
203                f(guard.conn()).map_err(|e| map_err(e, op))
204            })
205            .await
206            .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
207        }
208    }
209}
210
211// -- Helper functions --
212
213fn tags_to_json(tags: &[String]) -> String {
214    serde_json::to_string(tags).unwrap_or_else(|_| "[]".to_string())
215}
216
217fn tags_from_json(s: &str) -> Vec<String> {
218    serde_json::from_str(s).unwrap_or_default()
219}
220
221fn dt_to_micros(dt: &DateTime<Utc>) -> i64 {
222    dt.timestamp_micros()
223}
224
225fn micros_to_dt(micros: i64) -> DateTime<Utc> {
226    Utc.timestamp_micros(micros)
227        .single()
228        .unwrap_or_else(Utc::now)
229}
230
231/// Escape an FTS5 query string to prevent injection.
232///
233/// FTS5 special characters (`*`, `"`, `(`, `)`, `+`, `-`, `:`, `^`) are
234/// stripped. For Phrase mode, the caller wraps the result in double quotes.
235fn sanitize_fts5_query(query: &str) -> String {
236    let sanitized: String = query
237        .chars()
238        .filter(|c| {
239            !matches!(c, '*' | '"' | '(' | ')' | '+' | '-' | ':' | '^' | '\0') && !c.is_control()
240        })
241        .collect();
242    sanitized
243        .split_whitespace()
244        .filter(|t| {
245            !matches!(
246                t.to_ascii_uppercase().as_str(),
247                "AND" | "OR" | "NOT" | "NEAR"
248            )
249        })
250        .collect::<Vec<_>>()
251        .join(" ")
252}
253
254/// Build a WHERE clause fragment and params for a `TextFilter`.
255///
256/// Returns `(clause, params)` where clause is empty if no filters are active.
257/// Parameter indices start at `?{start_idx}`.
258fn build_filter_clause(
259    filter: &TextFilter,
260    table: &str,
261    start_idx: usize,
262) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
263    let mut conditions: Vec<String> = Vec::new();
264    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
265    let mut idx = start_idx;
266
267    if !filter.ids.is_empty() {
268        let placeholders: Vec<String> = filter
269            .ids
270            .iter()
271            .map(|_| {
272                let p = format!("?{}", idx);
273                idx += 1;
274                p
275            })
276            .collect();
277        conditions.push(format!(
278            "{}.subject_id IN ({})",
279            table,
280            placeholders.join(", ")
281        ));
282        for id in &filter.ids {
283            params.push(Box::new(id.to_string()));
284        }
285    }
286
287    if !filter.kinds.is_empty() {
288        let placeholders: Vec<String> = filter
289            .kinds
290            .iter()
291            .map(|_| {
292                let p = format!("?{}", idx);
293                idx += 1;
294                p
295            })
296            .collect();
297        conditions.push(format!("{}.kind IN ({})", table, placeholders.join(", ")));
298        for kind in &filter.kinds {
299            params.push(Box::new(kind.to_string()));
300        }
301    }
302
303    if !filter.namespaces.is_empty() {
304        let placeholders: Vec<String> = filter
305            .namespaces
306            .iter()
307            .map(|_| {
308                let p = format!("?{}", idx);
309                idx += 1;
310                p
311            })
312            .collect();
313        conditions.push(format!(
314            "{}.namespace IN ({})",
315            table,
316            placeholders.join(", ")
317        ));
318        for ns in &filter.namespaces {
319            params.push(Box::new(ns.clone()));
320        }
321    }
322
323    if conditions.is_empty() {
324        (String::new(), params)
325    } else {
326        (format!(" AND {}", conditions.join(" AND ")), params)
327    }
328}
329
330#[async_trait]
331impl TextSearch for Fts5TextSearch {
332    async fn upsert_document(&self, document: TextDocument) -> Result<(), StorageError> {
333        let table = self.table_name.clone();
334        let namespace = document.namespace.clone();
335
336        self.with_writer("fts_upsert", move |conn| {
337            conn.execute_batch("BEGIN IMMEDIATE")?;
338
339            let del_sql = format!(
340                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
341                table
342            );
343            if let Err(e) = conn.execute(
344                &del_sql,
345                rusqlite::params![&namespace, document.subject_id.to_string()],
346            ) {
347                let _ = conn.execute_batch("ROLLBACK");
348                return Err(e);
349            }
350
351            let ins_sql = format!(
352                "INSERT INTO {} \
353                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
354                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
355                table
356            );
357            let tags_json = tags_to_json(&document.tags);
358            let metadata_json: Option<String> = document.metadata.as_ref().map(|v| v.to_string());
359
360            if let Err(e) = conn.execute(
361                &ins_sql,
362                rusqlite::params![
363                    document.subject_id.to_string(),
364                    document.kind.to_string(),
365                    document.title.as_deref().unwrap_or(""),
366                    document.body,
367                    tags_json,
368                    &namespace,
369                    metadata_json,
370                    dt_to_micros(&document.updated_at),
371                ],
372            ) {
373                let _ = conn.execute_batch("ROLLBACK");
374                return Err(e);
375            }
376
377            conn.execute_batch("COMMIT")?;
378            Ok(())
379        })
380        .await
381    }
382
383    async fn upsert_documents(
384        &self,
385        documents: Vec<TextDocument>,
386    ) -> Result<BatchWriteSummary, StorageError> {
387        let table = self.table_name.clone();
388        let attempted = documents.len() as u64;
389
390        self.with_writer("fts_upsert_batch", move |conn| {
391            let del_sql = format!(
392                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
393                table
394            );
395            let ins_sql = format!(
396                "INSERT INTO {} \
397                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
398                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
399                table
400            );
401
402            conn.execute_batch("BEGIN IMMEDIATE")?;
403            let mut affected = 0u64;
404            let mut failed = 0u64;
405
406            for doc in &documents {
407                conn.execute_batch("SAVEPOINT fts_upsert_doc")?;
408                let id_str = doc.subject_id.to_string();
409                let namespace = &doc.namespace;
410                let result = (|| {
411                    conn.execute(&del_sql, rusqlite::params![namespace, &id_str])?;
412
413                    let tags_json = tags_to_json(&doc.tags);
414                    let metadata_json: Option<String> =
415                        doc.metadata.as_ref().map(|v| v.to_string());
416
417                    conn.execute(
418                        &ins_sql,
419                        rusqlite::params![
420                            &id_str,
421                            &doc.kind.to_string(),
422                            doc.title.as_deref().unwrap_or(""),
423                            &doc.body,
424                            &tags_json,
425                            namespace,
426                            &metadata_json,
427                            dt_to_micros(&doc.updated_at),
428                        ],
429                    )?;
430                    Ok::<(), rusqlite::Error>(())
431                })();
432
433                match result {
434                    Ok(()) => {
435                        conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc")?;
436                        affected += 1;
437                    }
438                    Err(_) => {
439                        let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT fts_upsert_doc");
440                        let _ = conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc");
441                        failed += 1;
442                    }
443                }
444            }
445
446            conn.execute_batch("COMMIT")?;
447
448            Ok(BatchWriteSummary {
449                attempted,
450                affected,
451                failed,
452                first_error: String::new(),
453            })
454        })
455        .await
456    }
457
458    async fn delete_document(
459        &self,
460        namespace: &str,
461        subject_id: Uuid,
462    ) -> Result<bool, StorageError> {
463        let namespace = namespace.to_string();
464        let table = self.table_name.clone();
465
466        self.with_writer("fts_delete", move |conn| {
467            let sql = format!(
468                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
469                table
470            );
471            let deleted =
472                conn.execute(&sql, rusqlite::params![namespace, subject_id.to_string()])?;
473            Ok(deleted > 0)
474        })
475        .await
476    }
477
478    async fn get_document(
479        &self,
480        namespace: &str,
481        subject_id: Uuid,
482    ) -> Result<Option<TextDocument>, StorageError> {
483        let namespace = namespace.to_string();
484        let table = self.table_name.clone();
485
486        self.with_reader("fts_get", move |conn| {
487            let sql = format!(
488                "SELECT subject_id, kind, title, body, tags, namespace, metadata, updated_at \
489                 FROM {} WHERE namespace = ?1 AND subject_id = ?2",
490                table
491            );
492            let mut stmt = conn.prepare(&sql)?;
493            let mut rows = stmt.query(rusqlite::params![namespace, subject_id.to_string()])?;
494
495            match rows.next()? {
496                Some(row) => {
497                    let id_str: String = row.get(0)?;
498                    let kind_str: String = row.get(1)?;
499                    let title: String = row.get(2)?;
500                    let body: String = row.get(3)?;
501                    let tags_json: String = row.get(4)?;
502                    let ns: String = row.get(5)?;
503                    let metadata_json: Option<String> = row.get(6)?;
504                    let updated_at_micros: i64 = row.get(7)?;
505
506                    let sid = Uuid::parse_str(&id_str).map_err(|e| {
507                        rusqlite::Error::FromSqlConversionFailure(
508                            0,
509                            rusqlite::types::Type::Text,
510                            Box::new(e),
511                        )
512                    })?;
513
514                    let kind = kind_str.parse::<SubstrateKind>().map_err(|e| {
515                        rusqlite::Error::FromSqlConversionFailure(
516                            1,
517                            rusqlite::types::Type::Text,
518                            Box::new(e),
519                        )
520                    })?;
521
522                    Ok(Some(TextDocument {
523                        subject_id: sid,
524                        kind,
525                        title: if title.is_empty() { None } else { Some(title) },
526                        body,
527                        tags: tags_from_json(&tags_json),
528                        namespace: ns,
529                        metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
530                        updated_at: micros_to_dt(updated_at_micros),
531                    }))
532                }
533                None => Ok(None),
534            }
535        })
536        .await
537    }
538
539    async fn search(&self, request: TextSearchRequest) -> Result<Vec<TextSearchHit>, StorageError> {
540        let table = self.table_name.clone();
541
542        self.with_reader("fts_search", move |conn| {
543            let sanitized = sanitize_fts5_query(&request.query);
544            if sanitized.is_empty() {
545                return Ok(Vec::new());
546            }
547
548            let match_expr = match request.mode {
549                TextQueryMode::Phrase => format!("\"{}\"", sanitized),
550                TextQueryMode::Plain => sanitized,
551            };
552
553            // Snippet column index 3 = body in the FTS5 schema.
554            let snippet_chars = request.snippet_chars.max(1) as i32;
555
556            let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
557                build_filter_clause(filter, &table, 3)
558            } else {
559                (String::new(), Vec::new())
560            };
561
562            let sql = format!(
563                "SELECT subject_id, rank, title, snippet({table}, 3, '', '', '...', {snippet_chars}) \
564                 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
565                 ORDER BY rank LIMIT ?2",
566            );
567
568            let mut stmt = conn.prepare(&sql)?;
569            stmt.raw_bind_parameter(1, &match_expr)?;
570            stmt.raw_bind_parameter(2, request.top_k as i64)?;
571
572            for (i, param) in filter_params.iter().enumerate() {
573                param
574                    .to_sql()
575                    .map(|val| stmt.raw_bind_parameter(3 + i, val))
576                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
577            }
578
579            let mut hits = Vec::new();
580            let mut rows = stmt.raw_query();
581            let mut rank_idx = 0u32;
582
583            while let Some(row) = rows.next()? {
584                let id_str: String = row.get(0)?;
585                let fts_rank: f64 = row.get(1)?;
586                let title: String = row.get(2)?;
587                let snippet: String = row.get(3)?;
588
589                let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
590                    rusqlite::Error::FromSqlConversionFailure(
591                        0,
592                        rusqlite::types::Type::Text,
593                        Box::new(e),
594                    )
595                })?;
596
597                rank_idx += 1;
598                hits.push((subject_id, fts_rank, rank_idx, title, snippet));
599            }
600
601            // Normalize scores within the result set to (0.05, 1.0].
602            // Best rank (most negative) maps to 1.0, worst to 0.05.
603            let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
604            let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
605            let range = max_rank - min_rank;
606
607            let results = hits
608                .into_iter()
609                .map(|(subject_id, raw_rank, rank, title, snippet)| {
610                    let score = if range.abs() < 1e-12 {
611                        1.0
612                    } else {
613                        let t = (max_rank - raw_rank) / range;
614                        0.05 + 0.95 * t
615                    };
616                    TextSearchHit {
617                        subject_id,
618                        score: DeterministicScore::from_f64(score),
619                        rank,
620                        title: if title.is_empty() { None } else { Some(title) },
621                        snippet: if snippet.is_empty() { None } else { Some(snippet) },
622                    }
623                })
624                .collect();
625
626            Ok(results)
627        })
628        .await
629    }
630
631    async fn count(&self, filter: TextFilter) -> Result<u64, StorageError> {
632        let table = self.table_name.clone();
633
634        self.with_reader("fts_count", move |conn| {
635            let (filter_clause, filter_params) = build_filter_clause(&filter, &table, 1);
636
637            let sql = if filter_clause.is_empty() {
638                format!("SELECT COUNT(*) FROM {}", table)
639            } else {
640                let where_part = filter_clause.trim_start_matches(" AND ");
641                format!("SELECT COUNT(*) FROM {} WHERE {}", table, where_part)
642            };
643
644            let mut stmt = conn.prepare(&sql)?;
645
646            for (i, param) in filter_params.iter().enumerate() {
647                param
648                    .to_sql()
649                    .map(|val| stmt.raw_bind_parameter(1 + i, val))
650                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
651            }
652
653            let mut rows = stmt.raw_query();
654            match rows.next()? {
655                Some(row) => {
656                    let count: i64 = row.get(0)?;
657                    Ok(count as u64)
658                }
659                None => Ok(0),
660            }
661        })
662        .await
663    }
664
665    async fn stats(&self) -> Result<TextIndexStats, StorageError> {
666        let table = self.table_name.clone();
667
668        self.with_reader("fts_stats", move |conn| {
669            let sql = format!("SELECT COUNT(*) FROM {}", table);
670            let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
671
672            Ok(TextIndexStats {
673                document_count: count as u64,
674                needs_rebuild: false,
675                last_rebuild_at: None,
676            })
677        })
678        .await
679    }
680
681    async fn rebuild(&self, _scope: IndexRebuildScope) -> Result<TextIndexStats, StorageError> {
682        let table = self.table_name.clone();
683
684        self.with_writer("fts_rebuild", move |conn| {
685            // FTS5 rebuild command: repopulates the internal index structures.
686            let sql = format!("INSERT INTO {}({}) VALUES('rebuild')", table, table);
687            conn.execute(&sql, [])?;
688
689            let count_sql = format!("SELECT COUNT(*) FROM {}", table);
690            let count: i64 = conn.query_row(&count_sql, [], |row| row.get(0))?;
691
692            Ok(TextIndexStats {
693                document_count: count as u64,
694                needs_rebuild: false,
695                last_rebuild_at: Some(Utc::now()),
696            })
697        })
698        .await
699    }
700}
701
702impl Fts5TextSearch {
703    /// Move all FTS5 documents from `old_namespace` to `new_namespace` in a
704    /// single transaction.
705    ///
706    /// FTS5 virtual tables do not support updating indexed columns (`title`,
707    /// `body`) via UPDATE. The correct approach is read-then-delete-then-reinsert.
708    ///
709    /// Callers must invoke this after any SQL-level namespace change on the
710    /// backing entity table so that FTS5 keyword search stays consistent with
711    /// the entity store.
712    #[allow(dead_code)]
713    pub(crate) async fn rename_namespace(
714        &self,
715        old_namespace: &str,
716        new_namespace: &str,
717    ) -> Result<u64, StorageError> {
718        if old_namespace == new_namespace {
719            return Ok(0);
720        }
721        let table = self.table_name.clone();
722        let old_ns = old_namespace.to_string();
723        let new_ns = new_namespace.to_string();
724
725        self.with_writer("fts_rename_namespace", move |conn| {
726            let sel_sql = format!(
727                "SELECT subject_id, kind, title, body, tags, metadata, updated_at \
728                 FROM {} WHERE namespace = ?1",
729                table
730            );
731            struct Row {
732                subject_id: String,
733                kind: String,
734                title: String,
735                body: String,
736                tags: String,
737                metadata: Option<String>,
738                updated_at: i64,
739            }
740            let rows: Vec<Row> = {
741                let mut stmt = conn.prepare(&sel_sql)?;
742                let iter = stmt.query_map(rusqlite::params![&old_ns], |row| {
743                    Ok(Row {
744                        subject_id: row.get(0)?,
745                        kind: row.get(1)?,
746                        title: row.get(2)?,
747                        body: row.get(3)?,
748                        tags: row.get(4)?,
749                        metadata: row.get(5)?,
750                        updated_at: row.get(6)?,
751                    })
752                })?;
753                iter.collect::<Result<Vec<_>, _>>()?
754            };
755            let moved = rows.len() as u64;
756            if moved == 0 {
757                return Ok(0u64);
758            }
759
760            conn.execute_batch("BEGIN IMMEDIATE")?;
761
762            let del_sql = format!("DELETE FROM {} WHERE namespace = ?1", table);
763            if let Err(e) = conn.execute(&del_sql, rusqlite::params![&old_ns]) {
764                let _ = conn.execute_batch("ROLLBACK");
765                return Err(e);
766            }
767
768            let ins_sql = format!(
769                "INSERT INTO {} \
770                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
771                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
772                table
773            );
774            for row in &rows {
775                if let Err(e) = conn.execute(
776                    &ins_sql,
777                    rusqlite::params![
778                        row.subject_id,
779                        row.kind,
780                        row.title,
781                        row.body,
782                        row.tags,
783                        &new_ns,
784                        row.metadata,
785                        row.updated_at,
786                    ],
787                ) {
788                    let _ = conn.execute_batch("ROLLBACK");
789                    return Err(e);
790                }
791            }
792
793            conn.execute_batch("COMMIT")?;
794            Ok(moved)
795        })
796        .await
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803    use crate::pool::PoolConfig;
804
805    fn setup_memory_store(table_key: &str) -> Fts5TextSearch {
806        let config = PoolConfig {
807            path: None,
808            ..PoolConfig::default()
809        };
810        let pool = Arc::new(ConnectionPool::new(config).unwrap());
811
812        {
813            let writer = pool.writer().unwrap();
814            ensure_fts5_schema(writer.conn(), table_key).unwrap();
815        }
816
817        Fts5TextSearch::new(pool, false, table_key.to_string())
818    }
819
820    fn make_document(subject_id: Uuid, title: &str, body: &str) -> TextDocument {
821        TextDocument {
822            subject_id,
823            kind: SubstrateKind::Note,
824            title: if title.is_empty() {
825                None
826            } else {
827                Some(title.to_string())
828            },
829            body: body.to_string(),
830            tags: vec![],
831            namespace: "test_ns".to_string(),
832            metadata: None,
833            updated_at: Utc::now(),
834        }
835    }
836
837    fn ns_filter(namespace: &str) -> TextFilter {
838        TextFilter {
839            namespaces: vec![namespace.to_string()],
840            ..TextFilter::default()
841        }
842    }
843
844    #[tokio::test]
845    async fn test_upsert_and_search() {
846        let store = setup_memory_store("upsert_search");
847
848        let id = Uuid::new_v4();
849        let doc = TextDocument {
850            subject_id: id,
851            kind: SubstrateKind::Entity,
852            title: Some("Rust Programming".to_string()),
853            body: "Rust is a systems programming language focused on safety and performance."
854                .to_string(),
855            tags: vec!["rust".to_string(), "programming".to_string()],
856            namespace: "tech".to_string(),
857            metadata: None,
858            updated_at: Utc::now(),
859        };
860
861        store.upsert_document(doc).await.unwrap();
862
863        let hits = store
864            .search(TextSearchRequest {
865                query: "Rust programming".to_string(),
866                mode: TextQueryMode::Plain,
867                filter: Some(ns_filter("tech")),
868                top_k: 10,
869                snippet_chars: 64,
870            })
871            .await
872            .unwrap();
873
874        assert_eq!(hits.len(), 1);
875        assert_eq!(hits[0].subject_id, id);
876        assert_eq!(hits[0].rank, 1);
877        assert!(hits[0].score.to_f64() > 0.0);
878        assert!(hits[0].title.is_some());
879    }
880
881    #[tokio::test]
882    async fn test_phrase_search() {
883        let store = setup_memory_store("phrase");
884
885        let id1 = Uuid::new_v4();
886        let id2 = Uuid::new_v4();
887
888        store
889            .upsert_document(make_document(
890                id1,
891                "Animals",
892                "The quick brown fox jumps over the lazy dog.",
893            ))
894            .await
895            .unwrap();
896
897        store
898            .upsert_document(make_document(
899                id2,
900                "Colors",
901                "The brown paint was quick to dry, unlike the fox.",
902            ))
903            .await
904            .unwrap();
905
906        let hits = store
907            .search(TextSearchRequest {
908                query: "quick brown fox".to_string(),
909                mode: TextQueryMode::Phrase,
910                filter: Some(ns_filter("test_ns")),
911                top_k: 10,
912                snippet_chars: 64,
913            })
914            .await
915            .unwrap();
916
917        assert_eq!(hits.len(), 1);
918        assert_eq!(hits[0].subject_id, id1);
919
920        let hits = store
921            .search(TextSearchRequest {
922                query: "quick brown fox".to_string(),
923                mode: TextQueryMode::Plain,
924                filter: Some(ns_filter("test_ns")),
925                top_k: 10,
926                snippet_chars: 64,
927            })
928            .await
929            .unwrap();
930
931        assert_eq!(hits.len(), 2);
932    }
933
934    #[tokio::test]
935    async fn test_delete_document() {
936        let store = setup_memory_store("delete");
937
938        let id1 = Uuid::new_v4();
939        let id2 = Uuid::new_v4();
940
941        store
942            .upsert_document(make_document(id1, "Doc One", "First document content."))
943            .await
944            .unwrap();
945        store
946            .upsert_document(make_document(id2, "Doc Two", "Second document content."))
947            .await
948            .unwrap();
949
950        let stats = store.stats().await.unwrap();
951        assert_eq!(stats.document_count, 2);
952
953        let deleted = store.delete_document("test_ns", id1).await.unwrap();
954        assert!(deleted);
955
956        let stats = store.stats().await.unwrap();
957        assert_eq!(stats.document_count, 1);
958
959        let deleted_again = store.delete_document("test_ns", id1).await.unwrap();
960        assert!(!deleted_again);
961
962        let doc = store.get_document("test_ns", id2).await.unwrap();
963        assert!(doc.is_some());
964
965        let doc = store.get_document("test_ns", id1).await.unwrap();
966        assert!(doc.is_none());
967    }
968
969    #[tokio::test]
970    async fn test_count_with_filter() {
971        let store = setup_memory_store("count_filter");
972        let ns = "test_ns".to_string();
973
974        for i in 0..5 {
975            let kind = if i % 2 == 0 {
976                SubstrateKind::Entity
977            } else {
978                SubstrateKind::Note
979            };
980            let doc = TextDocument {
981                subject_id: Uuid::new_v4(),
982                kind,
983                title: Some(format!("Doc {}", i)),
984                body: format!("Content for document number {}", i),
985                tags: vec![],
986                namespace: ns.clone(),
987                metadata: None,
988                updated_at: Utc::now(),
989            };
990            store.upsert_document(doc).await.unwrap();
991        }
992
993        let total = store
994            .count(TextFilter {
995                namespaces: vec![ns.clone()],
996                ..TextFilter::default()
997            })
998            .await
999            .unwrap();
1000        assert_eq!(total, 5);
1001
1002        let entities = store
1003            .count(TextFilter {
1004                namespaces: vec![ns.clone()],
1005                kinds: vec![SubstrateKind::Entity],
1006                ..TextFilter::default()
1007            })
1008            .await
1009            .unwrap();
1010        assert_eq!(entities, 3);
1011
1012        let notes = store
1013            .count(TextFilter {
1014                namespaces: vec![ns.clone()],
1015                kinds: vec![SubstrateKind::Note],
1016                ..TextFilter::default()
1017            })
1018            .await
1019            .unwrap();
1020        assert_eq!(notes, 2);
1021    }
1022
1023    #[tokio::test]
1024    async fn test_get_document_roundtrip() {
1025        let store = setup_memory_store("get_roundtrip");
1026
1027        let id = Uuid::new_v4();
1028        let original = TextDocument {
1029            subject_id: id,
1030            kind: SubstrateKind::Note,
1031            title: Some("Important Memo".to_string()),
1032            body: "This memo contains critical information.".to_string(),
1033            tags: vec!["important".to_string(), "memo".to_string()],
1034            namespace: "work".to_string(),
1035            metadata: Some(serde_json::json!({"priority": "high"})),
1036            updated_at: Utc::now(),
1037        };
1038
1039        store.upsert_document(original.clone()).await.unwrap();
1040
1041        let retrieved = store.get_document("work", id).await.unwrap().unwrap();
1042        assert_eq!(retrieved.subject_id, id);
1043        assert_eq!(retrieved.kind, SubstrateKind::Note);
1044        assert_eq!(retrieved.title, Some("Important Memo".to_string()));
1045        assert_eq!(retrieved.body, "This memo contains critical information.");
1046        assert_eq!(retrieved.tags, vec!["important", "memo"]);
1047        assert_eq!(retrieved.namespace, "work");
1048    }
1049
1050    #[tokio::test]
1051    async fn test_upsert_replaces_existing() {
1052        let store = setup_memory_store("replace");
1053
1054        let id = Uuid::new_v4();
1055        store
1056            .upsert_document(make_document(id, "Original", "Original body text."))
1057            .await
1058            .unwrap();
1059
1060        store
1061            .upsert_document(make_document(id, "Updated", "Updated body text."))
1062            .await
1063            .unwrap();
1064
1065        let stats = store.stats().await.unwrap();
1066        assert_eq!(stats.document_count, 1);
1067
1068        let doc = store.get_document("test_ns", id).await.unwrap().unwrap();
1069        assert_eq!(doc.title, Some("Updated".to_string()));
1070        assert_eq!(doc.body, "Updated body text.");
1071    }
1072
1073    #[tokio::test]
1074    async fn test_batch_upsert() {
1075        let store = setup_memory_store("batch");
1076
1077        let docs: Vec<TextDocument> = (0..50)
1078            .map(|i| TextDocument {
1079                subject_id: Uuid::new_v4(),
1080                kind: SubstrateKind::Entity,
1081                title: Some(format!("Item {}", i)),
1082                body: format!("This is the body content for item number {}", i),
1083                tags: vec![format!("tag_{}", i % 5)],
1084                namespace: "batch_ns".to_string(),
1085                metadata: None,
1086                updated_at: Utc::now(),
1087            })
1088            .collect();
1089
1090        let summary = store.upsert_documents(docs).await.unwrap();
1091        assert_eq!(summary.attempted, 50);
1092        assert_eq!(summary.affected, 50);
1093        assert_eq!(summary.failed, 0);
1094
1095        let stats = store.stats().await.unwrap();
1096        assert_eq!(stats.document_count, 50);
1097    }
1098
1099    #[tokio::test]
1100    async fn test_empty_search() {
1101        let store = setup_memory_store("empty");
1102
1103        let hits = store
1104            .search(TextSearchRequest {
1105                query: "nonexistent".to_string(),
1106                mode: TextQueryMode::Plain,
1107                filter: Some(ns_filter("test_ns")),
1108                top_k: 10,
1109                snippet_chars: 64,
1110            })
1111            .await
1112            .unwrap();
1113
1114        assert!(hits.is_empty());
1115    }
1116
1117    #[tokio::test]
1118    async fn test_rebuild() {
1119        let store = setup_memory_store("rebuild");
1120
1121        store
1122            .upsert_document(make_document(
1123                Uuid::new_v4(),
1124                "Test",
1125                "Test document for rebuild.",
1126            ))
1127            .await
1128            .unwrap();
1129
1130        let stats = store.rebuild(IndexRebuildScope::Full).await.unwrap();
1131        assert_eq!(stats.document_count, 1);
1132        assert!(!stats.needs_rebuild);
1133        assert!(stats.last_rebuild_at.is_some());
1134    }
1135
1136    #[tokio::test]
1137    async fn test_search_with_kind_filter() {
1138        let store = setup_memory_store("filter_kind");
1139
1140        let id_entity = Uuid::new_v4();
1141        let id_note = Uuid::new_v4();
1142
1143        store
1144            .upsert_document(TextDocument {
1145                subject_id: id_entity,
1146                kind: SubstrateKind::Entity,
1147                title: Some("Rust Guide".to_string()),
1148                body: "A comprehensive guide to Rust programming.".to_string(),
1149                tags: vec![],
1150                namespace: "test_ns".to_string(),
1151                metadata: None,
1152                updated_at: Utc::now(),
1153            })
1154            .await
1155            .unwrap();
1156
1157        store
1158            .upsert_document(TextDocument {
1159                subject_id: id_note,
1160                kind: SubstrateKind::Note,
1161                title: Some("Rust Notes".to_string()),
1162                body: "Quick notes about Rust concepts.".to_string(),
1163                tags: vec![],
1164                namespace: "test_ns".to_string(),
1165                metadata: None,
1166                updated_at: Utc::now(),
1167            })
1168            .await
1169            .unwrap();
1170
1171        let hits = store
1172            .search(TextSearchRequest {
1173                query: "Rust".to_string(),
1174                mode: TextQueryMode::Plain,
1175                filter: Some(TextFilter {
1176                    kinds: vec![SubstrateKind::Entity],
1177                    namespaces: vec!["test_ns".to_string()],
1178                    ..TextFilter::default()
1179                }),
1180                top_k: 10,
1181                snippet_chars: 64,
1182            })
1183            .await
1184            .unwrap();
1185
1186        assert_eq!(hits.len(), 1);
1187        assert_eq!(hits[0].subject_id, id_entity);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_sanitize_fts5_query() {
1192        assert_eq!(sanitize_fts5_query("hello world"), "hello world");
1193        assert_eq!(sanitize_fts5_query("hello*world"), "helloworld");
1194        assert_eq!(sanitize_fts5_query("\"quoted\""), "quoted");
1195        assert_eq!(sanitize_fts5_query("(parens)"), "parens");
1196        assert_eq!(sanitize_fts5_query("a + b - c"), "a b c");
1197        assert_eq!(sanitize_fts5_query("col:value"), "colvalue");
1198        assert_eq!(sanitize_fts5_query(""), "");
1199        assert_eq!(sanitize_fts5_query("***"), "");
1200    }
1201
1202    #[tokio::test]
1203    async fn test_score_is_bounded() {
1204        let store = setup_memory_store("score_bounds");
1205
1206        for i in 0..5 {
1207            store
1208                .upsert_document(make_document(
1209                    Uuid::new_v4(),
1210                    &format!("Doc {}", i),
1211                    &format!("This document discusses topic number {}", i),
1212                ))
1213                .await
1214                .unwrap();
1215        }
1216
1217        let hits = store
1218            .search(TextSearchRequest {
1219                query: "document topic".to_string(),
1220                mode: TextQueryMode::Plain,
1221                filter: Some(ns_filter("test_ns")),
1222                top_k: 10,
1223                snippet_chars: 64,
1224            })
1225            .await
1226            .unwrap();
1227
1228        for hit in &hits {
1229            let score = hit.score.to_f64();
1230            assert!(
1231                score > 0.0 && score <= 1.0,
1232                "score out of (0, 1] range: {}",
1233                score
1234            );
1235        }
1236
1237        for (i, hit) in hits.iter().enumerate() {
1238            assert_eq!(hit.rank, (i + 1) as u32);
1239        }
1240    }
1241
1242    #[tokio::test]
1243    async fn test_rename_namespace() {
1244        let store = setup_memory_store("rename_ns");
1245
1246        let id = Uuid::new_v4();
1247        let doc = TextDocument {
1248            subject_id: id,
1249            kind: SubstrateKind::Note,
1250            title: Some("Rename test".to_string()),
1251            body: "keyword_unique_xyz".to_string(),
1252            tags: vec![],
1253            namespace: "old_ns".to_string(),
1254            metadata: None,
1255            updated_at: Utc::now(),
1256        };
1257        store.upsert_document(doc).await.unwrap();
1258
1259        let before = store
1260            .search(TextSearchRequest {
1261                query: "keyword_unique_xyz".to_string(),
1262                mode: TextQueryMode::Plain,
1263                filter: Some(ns_filter("old_ns")),
1264                top_k: 10,
1265                snippet_chars: 64,
1266            })
1267            .await
1268            .unwrap();
1269        assert_eq!(before.len(), 1);
1270
1271        let moved = store.rename_namespace("old_ns", "new_ns").await.unwrap();
1272        assert_eq!(moved, 1);
1273
1274        let after_new = store
1275            .search(TextSearchRequest {
1276                query: "keyword_unique_xyz".to_string(),
1277                mode: TextQueryMode::Plain,
1278                filter: Some(ns_filter("new_ns")),
1279                top_k: 10,
1280                snippet_chars: 64,
1281            })
1282            .await
1283            .unwrap();
1284        assert_eq!(after_new.len(), 1);
1285
1286        let after_old = store
1287            .search(TextSearchRequest {
1288                query: "keyword_unique_xyz".to_string(),
1289                mode: TextQueryMode::Plain,
1290                filter: Some(ns_filter("old_ns")),
1291                top_k: 10,
1292                snippet_chars: 64,
1293            })
1294            .await
1295            .unwrap();
1296        assert!(after_old.is_empty());
1297    }
1298
1299    #[tokio::test]
1300    async fn test_metadata_none_roundtrip() {
1301        let store = setup_memory_store("meta_none");
1302        let id = uuid::Uuid::new_v4();
1303        let doc = TextDocument {
1304            subject_id: id,
1305            kind: SubstrateKind::Note,
1306            namespace: "test_ns".to_string(),
1307            title: None,
1308            body: "no metadata".to_string(),
1309            tags: vec![],
1310            metadata: None,
1311            updated_at: Utc::now(),
1312        };
1313        store.upsert_document(doc).await.unwrap();
1314        let fetched = store.get_document("test_ns", id).await.unwrap().unwrap();
1315        assert!(fetched.metadata.is_none());
1316    }
1317
1318    #[tokio::test]
1319    async fn test_rename_namespace_noop() {
1320        let store = setup_memory_store("rename_noop");
1321
1322        let id = Uuid::new_v4();
1323        let doc = TextDocument {
1324            subject_id: id,
1325            kind: SubstrateKind::Note,
1326            title: None,
1327            body: "noop_test_content".to_string(),
1328            tags: vec![],
1329            namespace: "same_ns".to_string(),
1330            metadata: None,
1331            updated_at: Utc::now(),
1332        };
1333        store.upsert_document(doc).await.unwrap();
1334
1335        let moved = store.rename_namespace("same_ns", "same_ns").await.unwrap();
1336        assert_eq!(moved, 0);
1337
1338        let hits = store
1339            .search(TextSearchRequest {
1340                query: "noop_test_content".to_string(),
1341                mode: TextQueryMode::Plain,
1342                filter: Some(ns_filter("same_ns")),
1343                top_k: 10,
1344                snippet_chars: 64,
1345            })
1346            .await
1347            .unwrap();
1348        assert_eq!(hits.len(), 1);
1349    }
1350
1351    /// Score normalization: all scores stay in (0, 1], and a single-hit result
1352    /// scores ≈ 1.0. This validates the normalization formula independent of
1353    /// FTS5 rank ordering guarantees (which are already tested via `rank` field).
1354    #[tokio::test]
1355    async fn test_score_normalization_range() {
1356        let store = setup_memory_store("score_range");
1357
1358        // Insert three documents; only two match the query.
1359        let id1 = Uuid::new_v4();
1360        let id2 = Uuid::new_v4();
1361        let id3 = Uuid::new_v4();
1362        store
1363            .upsert_document(make_document(
1364                id1,
1365                "normtest topic",
1366                "normtest normtest normtest",
1367            ))
1368            .await
1369            .unwrap();
1370        store
1371            .upsert_document(make_document(
1372                id2,
1373                "normtest light",
1374                "other content without the keyword",
1375            ))
1376            .await
1377            .unwrap();
1378        store
1379            .upsert_document(make_document(
1380                id3,
1381                "irrelevant title",
1382                "completely different document content",
1383            ))
1384            .await
1385            .unwrap();
1386
1387        let hits = store
1388            .search(TextSearchRequest {
1389                query: "normtest".to_string(),
1390                mode: TextQueryMode::Plain,
1391                filter: Some(ns_filter("test_ns")),
1392                top_k: 10,
1393                snippet_chars: 64,
1394            })
1395            .await
1396            .unwrap();
1397
1398        // id3 must not match; id1 and id2 should.
1399        assert!(!hits.is_empty(), "at least one doc must match");
1400        assert!(
1401            hits.iter().all(|h| h.subject_id != id3),
1402            "id3 must not appear"
1403        );
1404
1405        // All scores must be in (0, 1].
1406        for h in &hits {
1407            let s = h.score.to_f64();
1408            assert!(s > 0.0 && s <= 1.0, "score out of (0,1]: {s}");
1409        }
1410        // Rank field must be 1-indexed and contiguous.
1411        for (i, h) in hits.iter().enumerate() {
1412            assert_eq!(h.rank, (i + 1) as u32, "rank must equal position+1");
1413        }
1414        // Best hit (rank=1) must score ≈ 1.0 — normalization anchors the best
1415        // rank to 1.0 regardless of absolute BM25 magnitude.
1416        assert!(
1417            hits[0].score.to_f64() > 0.99,
1418            "top hit must score ≈ 1.0, got {}",
1419            hits[0].score.to_f64()
1420        );
1421
1422        // Single-hit result: the only match scores ≈ 1.0 (degenerate case:
1423        // range == 0 → all hits get 1.0).
1424        let single_id = Uuid::new_v4();
1425        store
1426            .upsert_document(make_document(
1427                single_id,
1428                "xqzplurp_unique_marker",
1429                "xqzplurp_unique_marker body",
1430            ))
1431            .await
1432            .unwrap();
1433        let single = store
1434            .search(TextSearchRequest {
1435                query: "xqzplurp_unique_marker".to_string(),
1436                mode: TextQueryMode::Plain,
1437                filter: Some(ns_filter("test_ns")),
1438                top_k: 10,
1439                snippet_chars: 64,
1440            })
1441            .await
1442            .unwrap();
1443        assert_eq!(single.len(), 1);
1444        assert!(
1445            single[0].score.to_f64() > 0.99,
1446            "single-hit must score ≈ 1.0, got {}",
1447            single[0].score.to_f64()
1448        );
1449    }
1450}