Skip to main content

khive_db/stores/
text.rs

1//! FTS5-backed `TextSearch` implementation.
2//!
3//! Each `Fts5TextSearch` manages a single FTS5 virtual table for full-text
4//! search. The table stores document metadata alongside the indexed text
5//! columns (`title` and `body`), with non-searchable columns marked
6//! `UNINDEXED`.
7//!
8//! # FTS5 table layout
9//!
10//! ```sql
11//! CREATE VIRTUAL TABLE fts_{key} USING fts5(
12//!     subject_id UNINDEXED,
13//!     kind UNINDEXED,
14//!     title,
15//!     body,
16//!     tags UNINDEXED,
17//!     namespace UNINDEXED,
18//!     metadata UNINDEXED,
19//!     updated_at UNINDEXED
20//! );
21//! ```
22//!
23//! Only `title` and `body` are full-text indexed. The remaining columns are
24//! stored for retrieval and filtering but do not participate in FTS ranking.
25//!
26//! # Connection strategy
27//!
28//! Follows the same dual-mode pattern as `SqliteVecStore`:
29//! - **File-backed**: Opens standalone connections per operation.
30//! - **In-memory**: Acquires pool connections via `spawn_blocking`.
31//!
32//! # Score normalization
33//!
34//! FTS5 `rank` values are negative (more negative = more relevant). We negate
35//! the rank so higher scores mean better matches, then normalize to `(0, 1]`
36//! via `1 / (1 + abs(rank))`.
37
38use std::sync::Arc;
39
40use async_trait::async_trait;
41use chrono::{DateTime, TimeZone, Utc};
42use uuid::Uuid;
43
44use khive_score::DeterministicScore;
45use khive_storage::error::StorageError;
46use khive_storage::types::{
47    BatchWriteSummary, IndexRebuildScope, TextDocument, TextFilter, TextIndexStats, TextQueryMode,
48    TextSearchHit, TextSearchRequest,
49};
50use khive_storage::StorageCapability;
51use khive_storage::TextSearch;
52use khive_types::SubstrateKind;
53
54use crate::error::SqliteError;
55use crate::pool::ConnectionPool;
56
57/// Ensure the FTS5 virtual table for `table_key` exists.
58///
59/// Used in tests to set up an in-memory FTS5 table without the full `StorageBackend`.
60#[cfg(test)]
61pub(crate) fn ensure_fts5_schema(
62    conn: &rusqlite::Connection,
63    table_key: &str,
64) -> Result<(), rusqlite::Error> {
65    let table_name = format!("fts_{}", table_key);
66    let ddl = format!(
67        "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5(\
68         subject_id UNINDEXED, \
69         kind UNINDEXED, \
70         title, \
71         body, \
72         tags UNINDEXED, \
73         namespace UNINDEXED, \
74         metadata UNINDEXED, \
75         updated_at UNINDEXED\
76         )",
77        table_name
78    );
79    conn.execute_batch(&ddl)
80}
81
82fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
83    StorageError::driver(StorageCapability::Text, op, e)
84}
85
86fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
87    StorageError::driver(StorageCapability::Text, op, e)
88}
89
90/// A TextSearch backed by SQLite FTS5 virtual tables.
91///
92/// Each instance manages one table: `fts_{table_key}`. Documents are stored
93/// with their metadata in UNINDEXED columns; only `title` and `body` are
94/// full-text indexed.
95pub struct Fts5TextSearch {
96    pool: Arc<ConnectionPool>,
97    is_file_backed: bool,
98    table_name: String,
99}
100
101impl Fts5TextSearch {
102    /// Create a new FTS5 text search instance.
103    ///
104    /// The FTS5 virtual table must already exist (created by `StorageBackend::text()`).
105    pub(crate) fn new(pool: Arc<ConnectionPool>, is_file_backed: bool, table_key: String) -> Self {
106        let table_name = format!("fts_{}", table_key);
107        Self {
108            pool,
109            is_file_backed,
110            table_name,
111        }
112    }
113
114    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
115        let config = self.pool.config();
116        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
117            operation: "fts_writer".into(),
118            message: "in-memory databases do not support standalone connections".into(),
119        })?;
120
121        let conn = rusqlite::Connection::open_with_flags(
122            path,
123            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
124                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
125                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
126        )
127        .map_err(|e| map_err(e, "open_fts_writer"))?;
128
129        conn.busy_timeout(config.busy_timeout)
130            .map_err(|e| map_err(e, "open_fts_writer"))?;
131        conn.pragma_update(None, "foreign_keys", "ON")
132            .map_err(|e| map_err(e, "open_fts_writer"))?;
133        conn.pragma_update(None, "synchronous", "NORMAL")
134            .map_err(|e| map_err(e, "open_fts_writer"))?;
135
136        Ok(conn)
137    }
138
139    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
140        let config = self.pool.config();
141        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
142            operation: "fts_reader".into(),
143            message: "in-memory databases do not support standalone connections".into(),
144        })?;
145
146        let conn = rusqlite::Connection::open_with_flags(
147            path,
148            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
149                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
150                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
151        )
152        .map_err(|e| map_err(e, "open_fts_reader"))?;
153
154        conn.busy_timeout(config.busy_timeout)
155            .map_err(|e| map_err(e, "open_fts_reader"))?;
156        conn.pragma_update(None, "foreign_keys", "ON")
157            .map_err(|e| map_err(e, "open_fts_reader"))?;
158        conn.pragma_update(None, "synchronous", "NORMAL")
159            .map_err(|e| map_err(e, "open_fts_reader"))?;
160
161        Ok(conn)
162    }
163
164    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
165    where
166        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
167        R: Send + 'static,
168    {
169        if self.is_file_backed {
170            let conn = self.open_standalone_writer()?;
171            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
172                .await
173                .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
174        } else {
175            let pool = Arc::clone(&self.pool);
176            tokio::task::spawn_blocking(move || {
177                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
178                f(guard.conn()).map_err(|e| map_err(e, op))
179            })
180            .await
181            .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
182        }
183    }
184
185    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
186    where
187        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
188        R: Send + 'static,
189    {
190        if self.is_file_backed {
191            let conn = self.open_standalone_reader()?;
192            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
193                .await
194                .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
195        } else {
196            let pool = Arc::clone(&self.pool);
197            tokio::task::spawn_blocking(move || {
198                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
199                f(guard.conn()).map_err(|e| map_err(e, op))
200            })
201            .await
202            .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
203        }
204    }
205}
206
207// -- Helper functions --
208
209fn tags_to_json(tags: &[String]) -> String {
210    serde_json::to_string(tags).unwrap_or_else(|_| "[]".to_string())
211}
212
213fn tags_from_json(s: &str) -> Vec<String> {
214    serde_json::from_str(s).unwrap_or_default()
215}
216
217fn dt_to_micros(dt: &DateTime<Utc>) -> i64 {
218    dt.timestamp_micros()
219}
220
221fn micros_to_dt(micros: i64) -> DateTime<Utc> {
222    Utc.timestamp_micros(micros)
223        .single()
224        .unwrap_or_else(Utc::now)
225}
226
227/// Escape an FTS5 query string to prevent injection.
228///
229/// FTS5 special characters (`*`, `"`, `(`, `)`, `+`, `-`, `:`, `^`) are
230/// stripped. For Phrase mode, the caller wraps the result in double quotes.
231fn sanitize_fts5_query(query: &str) -> String {
232    let sanitized: String = query
233        .chars()
234        .filter(|c| {
235            !matches!(c, '*' | '"' | '(' | ')' | '+' | '-' | ':' | '^' | '\0') && !c.is_control()
236        })
237        .collect();
238    sanitized
239        .split_whitespace()
240        .filter(|t| {
241            !matches!(
242                t.to_ascii_uppercase().as_str(),
243                "AND" | "OR" | "NOT" | "NEAR"
244            )
245        })
246        .collect::<Vec<_>>()
247        .join(" ")
248}
249
250/// Build a WHERE clause fragment and params for a `TextFilter`.
251///
252/// Returns `(clause, params)` where clause is empty if no filters are active.
253/// Parameter indices start at `?{start_idx}`.
254fn build_filter_clause(
255    filter: &TextFilter,
256    table: &str,
257    start_idx: usize,
258) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
259    let mut conditions: Vec<String> = Vec::new();
260    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
261    let mut idx = start_idx;
262
263    if !filter.ids.is_empty() {
264        let placeholders: Vec<String> = filter
265            .ids
266            .iter()
267            .map(|_| {
268                let p = format!("?{}", idx);
269                idx += 1;
270                p
271            })
272            .collect();
273        conditions.push(format!(
274            "{}.subject_id IN ({})",
275            table,
276            placeholders.join(", ")
277        ));
278        for id in &filter.ids {
279            params.push(Box::new(id.to_string()));
280        }
281    }
282
283    if !filter.kinds.is_empty() {
284        let placeholders: Vec<String> = filter
285            .kinds
286            .iter()
287            .map(|_| {
288                let p = format!("?{}", idx);
289                idx += 1;
290                p
291            })
292            .collect();
293        conditions.push(format!("{}.kind IN ({})", table, placeholders.join(", ")));
294        for kind in &filter.kinds {
295            params.push(Box::new(kind.to_string()));
296        }
297    }
298
299    if !filter.namespaces.is_empty() {
300        let placeholders: Vec<String> = filter
301            .namespaces
302            .iter()
303            .map(|_| {
304                let p = format!("?{}", idx);
305                idx += 1;
306                p
307            })
308            .collect();
309        conditions.push(format!(
310            "{}.namespace IN ({})",
311            table,
312            placeholders.join(", ")
313        ));
314        for ns in &filter.namespaces {
315            params.push(Box::new(ns.clone()));
316        }
317    }
318
319    if conditions.is_empty() {
320        (String::new(), params)
321    } else {
322        (format!(" AND {}", conditions.join(" AND ")), params)
323    }
324}
325
326#[async_trait]
327impl TextSearch for Fts5TextSearch {
328    async fn upsert_document(&self, document: TextDocument) -> Result<(), StorageError> {
329        let table = self.table_name.clone();
330        let namespace = document.namespace.clone();
331
332        self.with_writer("fts_upsert", move |conn| {
333            conn.execute_batch("BEGIN IMMEDIATE")?;
334
335            let del_sql = format!(
336                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
337                table
338            );
339            if let Err(e) = conn.execute(
340                &del_sql,
341                rusqlite::params![&namespace, document.subject_id.to_string()],
342            ) {
343                let _ = conn.execute_batch("ROLLBACK");
344                return Err(e);
345            }
346
347            let ins_sql = format!(
348                "INSERT INTO {} \
349                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
350                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
351                table
352            );
353            let tags_json = tags_to_json(&document.tags);
354            let metadata_json: Option<String> = document.metadata.as_ref().map(|v| v.to_string());
355
356            if let Err(e) = conn.execute(
357                &ins_sql,
358                rusqlite::params![
359                    document.subject_id.to_string(),
360                    document.kind.to_string(),
361                    document.title.as_deref().unwrap_or(""),
362                    document.body,
363                    tags_json,
364                    &namespace,
365                    metadata_json,
366                    dt_to_micros(&document.updated_at),
367                ],
368            ) {
369                let _ = conn.execute_batch("ROLLBACK");
370                return Err(e);
371            }
372
373            conn.execute_batch("COMMIT")?;
374            Ok(())
375        })
376        .await
377    }
378
379    async fn upsert_documents(
380        &self,
381        documents: Vec<TextDocument>,
382    ) -> Result<BatchWriteSummary, StorageError> {
383        let table = self.table_name.clone();
384        let attempted = documents.len() as u64;
385
386        self.with_writer("fts_upsert_batch", move |conn| {
387            let del_sql = format!(
388                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
389                table
390            );
391            let ins_sql = format!(
392                "INSERT INTO {} \
393                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
394                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
395                table
396            );
397
398            conn.execute_batch("BEGIN IMMEDIATE")?;
399            let mut affected = 0u64;
400            let mut failed = 0u64;
401
402            for doc in &documents {
403                conn.execute_batch("SAVEPOINT fts_upsert_doc")?;
404                let id_str = doc.subject_id.to_string();
405                let namespace = &doc.namespace;
406                let result = (|| {
407                    conn.execute(&del_sql, rusqlite::params![namespace, &id_str])?;
408
409                    let tags_json = tags_to_json(&doc.tags);
410                    let metadata_json: Option<String> =
411                        doc.metadata.as_ref().map(|v| v.to_string());
412
413                    conn.execute(
414                        &ins_sql,
415                        rusqlite::params![
416                            &id_str,
417                            &doc.kind.to_string(),
418                            doc.title.as_deref().unwrap_or(""),
419                            &doc.body,
420                            &tags_json,
421                            namespace,
422                            &metadata_json,
423                            dt_to_micros(&doc.updated_at),
424                        ],
425                    )?;
426                    Ok::<(), rusqlite::Error>(())
427                })();
428
429                match result {
430                    Ok(()) => {
431                        conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc")?;
432                        affected += 1;
433                    }
434                    Err(_) => {
435                        let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT fts_upsert_doc");
436                        let _ = conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc");
437                        failed += 1;
438                    }
439                }
440            }
441
442            conn.execute_batch("COMMIT")?;
443
444            Ok(BatchWriteSummary {
445                attempted,
446                affected,
447                failed,
448                first_error: String::new(),
449            })
450        })
451        .await
452    }
453
454    async fn delete_document(
455        &self,
456        namespace: &str,
457        subject_id: Uuid,
458    ) -> Result<bool, StorageError> {
459        let namespace = namespace.to_string();
460        let table = self.table_name.clone();
461
462        self.with_writer("fts_delete", move |conn| {
463            let sql = format!(
464                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
465                table
466            );
467            let deleted =
468                conn.execute(&sql, rusqlite::params![namespace, subject_id.to_string()])?;
469            Ok(deleted > 0)
470        })
471        .await
472    }
473
474    async fn get_document(
475        &self,
476        namespace: &str,
477        subject_id: Uuid,
478    ) -> Result<Option<TextDocument>, StorageError> {
479        let namespace = namespace.to_string();
480        let table = self.table_name.clone();
481
482        self.with_reader("fts_get", move |conn| {
483            let sql = format!(
484                "SELECT subject_id, kind, title, body, tags, namespace, metadata, updated_at \
485                 FROM {} WHERE namespace = ?1 AND subject_id = ?2",
486                table
487            );
488            let mut stmt = conn.prepare(&sql)?;
489            let mut rows = stmt.query(rusqlite::params![namespace, subject_id.to_string()])?;
490
491            match rows.next()? {
492                Some(row) => {
493                    let id_str: String = row.get(0)?;
494                    let kind_str: String = row.get(1)?;
495                    let title: String = row.get(2)?;
496                    let body: String = row.get(3)?;
497                    let tags_json: String = row.get(4)?;
498                    let ns: String = row.get(5)?;
499                    let metadata_json: Option<String> = row.get(6)?;
500                    let updated_at_micros: i64 = row.get(7)?;
501
502                    let sid = Uuid::parse_str(&id_str).map_err(|e| {
503                        rusqlite::Error::FromSqlConversionFailure(
504                            0,
505                            rusqlite::types::Type::Text,
506                            Box::new(e),
507                        )
508                    })?;
509
510                    let kind = kind_str.parse::<SubstrateKind>().map_err(|e| {
511                        rusqlite::Error::FromSqlConversionFailure(
512                            1,
513                            rusqlite::types::Type::Text,
514                            Box::new(e),
515                        )
516                    })?;
517
518                    Ok(Some(TextDocument {
519                        subject_id: sid,
520                        kind,
521                        title: if title.is_empty() { None } else { Some(title) },
522                        body,
523                        tags: tags_from_json(&tags_json),
524                        namespace: ns,
525                        metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
526                        updated_at: micros_to_dt(updated_at_micros),
527                    }))
528                }
529                None => Ok(None),
530            }
531        })
532        .await
533    }
534
535    async fn search(&self, request: TextSearchRequest) -> Result<Vec<TextSearchHit>, StorageError> {
536        let table = self.table_name.clone();
537
538        self.with_reader("fts_search", move |conn| {
539            let sanitized = sanitize_fts5_query(&request.query);
540            if sanitized.is_empty() {
541                return Ok(Vec::new());
542            }
543
544            let match_expr = match request.mode {
545                TextQueryMode::Phrase => format!("\"{}\"", sanitized),
546                TextQueryMode::Plain => sanitized,
547            };
548
549            // Snippet column index 3 = body in the FTS5 schema.
550            let snippet_chars = request.snippet_chars.max(1) as i32;
551
552            let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
553                build_filter_clause(filter, &table, 3)
554            } else {
555                (String::new(), Vec::new())
556            };
557
558            let sql = format!(
559                "SELECT subject_id, rank, title, snippet({table}, 3, '', '', '...', {snippet_chars}) \
560                 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
561                 ORDER BY rank LIMIT ?2",
562            );
563
564            let mut stmt = conn.prepare(&sql)?;
565            stmt.raw_bind_parameter(1, &match_expr)?;
566            stmt.raw_bind_parameter(2, request.top_k as i64)?;
567
568            for (i, param) in filter_params.iter().enumerate() {
569                param
570                    .to_sql()
571                    .map(|val| stmt.raw_bind_parameter(3 + i, val))
572                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
573            }
574
575            let mut hits = Vec::new();
576            let mut rows = stmt.raw_query();
577            let mut rank_idx = 0u32;
578
579            while let Some(row) = rows.next()? {
580                let id_str: String = row.get(0)?;
581                let fts_rank: f64 = row.get(1)?;
582                let title: String = row.get(2)?;
583                let snippet: String = row.get(3)?;
584
585                let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
586                    rusqlite::Error::FromSqlConversionFailure(
587                        0,
588                        rusqlite::types::Type::Text,
589                        Box::new(e),
590                    )
591                })?;
592
593                // FTS5 rank is negative (more negative = more relevant).
594                // Normalize: score = 1 / (1 + |rank|), giving (0, 1].
595                let score = 1.0 / (1.0 + fts_rank.abs());
596
597                rank_idx += 1;
598                hits.push(TextSearchHit {
599                    subject_id,
600                    score: DeterministicScore::from_f64(score),
601                    rank: rank_idx,
602                    title: if title.is_empty() { None } else { Some(title) },
603                    snippet: if snippet.is_empty() {
604                        None
605                    } else {
606                        Some(snippet)
607                    },
608                });
609            }
610
611            Ok(hits)
612        })
613        .await
614    }
615
616    async fn count(&self, filter: TextFilter) -> Result<u64, StorageError> {
617        let table = self.table_name.clone();
618
619        self.with_reader("fts_count", move |conn| {
620            let (filter_clause, filter_params) = build_filter_clause(&filter, &table, 1);
621
622            let sql = if filter_clause.is_empty() {
623                format!("SELECT COUNT(*) FROM {}", table)
624            } else {
625                let where_part = filter_clause.trim_start_matches(" AND ");
626                format!("SELECT COUNT(*) FROM {} WHERE {}", table, where_part)
627            };
628
629            let mut stmt = conn.prepare(&sql)?;
630
631            for (i, param) in filter_params.iter().enumerate() {
632                param
633                    .to_sql()
634                    .map(|val| stmt.raw_bind_parameter(1 + i, val))
635                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
636            }
637
638            let mut rows = stmt.raw_query();
639            match rows.next()? {
640                Some(row) => {
641                    let count: i64 = row.get(0)?;
642                    Ok(count as u64)
643                }
644                None => Ok(0),
645            }
646        })
647        .await
648    }
649
650    async fn stats(&self) -> Result<TextIndexStats, StorageError> {
651        let table = self.table_name.clone();
652
653        self.with_reader("fts_stats", move |conn| {
654            let sql = format!("SELECT COUNT(*) FROM {}", table);
655            let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
656
657            Ok(TextIndexStats {
658                document_count: count as u64,
659                needs_rebuild: false,
660                last_rebuild_at: None,
661            })
662        })
663        .await
664    }
665
666    async fn rebuild(&self, _scope: IndexRebuildScope) -> Result<TextIndexStats, StorageError> {
667        let table = self.table_name.clone();
668
669        self.with_writer("fts_rebuild", move |conn| {
670            // FTS5 rebuild command: repopulates the internal index structures.
671            let sql = format!("INSERT INTO {}({}) VALUES('rebuild')", table, table);
672            conn.execute(&sql, [])?;
673
674            let count_sql = format!("SELECT COUNT(*) FROM {}", table);
675            let count: i64 = conn.query_row(&count_sql, [], |row| row.get(0))?;
676
677            Ok(TextIndexStats {
678                document_count: count as u64,
679                needs_rebuild: false,
680                last_rebuild_at: Some(Utc::now()),
681            })
682        })
683        .await
684    }
685}
686
687impl Fts5TextSearch {
688    /// Move all FTS5 documents from `old_namespace` to `new_namespace` in a
689    /// single transaction.
690    ///
691    /// FTS5 virtual tables do not support updating indexed columns (`title`,
692    /// `body`) via UPDATE. The correct approach is read-then-delete-then-reinsert.
693    ///
694    /// Callers must invoke this after any SQL-level namespace change on the
695    /// backing entity table so that FTS5 keyword search stays consistent with
696    /// the entity store.
697    #[allow(dead_code)]
698    pub(crate) async fn rename_namespace(
699        &self,
700        old_namespace: &str,
701        new_namespace: &str,
702    ) -> Result<u64, StorageError> {
703        if old_namespace == new_namespace {
704            return Ok(0);
705        }
706        let table = self.table_name.clone();
707        let old_ns = old_namespace.to_string();
708        let new_ns = new_namespace.to_string();
709
710        self.with_writer("fts_rename_namespace", move |conn| {
711            let sel_sql = format!(
712                "SELECT subject_id, kind, title, body, tags, metadata, updated_at \
713                 FROM {} WHERE namespace = ?1",
714                table
715            );
716            struct Row {
717                subject_id: String,
718                kind: String,
719                title: String,
720                body: String,
721                tags: String,
722                metadata: Option<String>,
723                updated_at: i64,
724            }
725            let rows: Vec<Row> = {
726                let mut stmt = conn.prepare(&sel_sql)?;
727                let iter = stmt.query_map(rusqlite::params![&old_ns], |row| {
728                    Ok(Row {
729                        subject_id: row.get(0)?,
730                        kind: row.get(1)?,
731                        title: row.get(2)?,
732                        body: row.get(3)?,
733                        tags: row.get(4)?,
734                        metadata: row.get(5)?,
735                        updated_at: row.get(6)?,
736                    })
737                })?;
738                iter.collect::<Result<Vec<_>, _>>()?
739            };
740            let moved = rows.len() as u64;
741            if moved == 0 {
742                return Ok(0u64);
743            }
744
745            conn.execute_batch("BEGIN IMMEDIATE")?;
746
747            let del_sql = format!("DELETE FROM {} WHERE namespace = ?1", table);
748            if let Err(e) = conn.execute(&del_sql, rusqlite::params![&old_ns]) {
749                let _ = conn.execute_batch("ROLLBACK");
750                return Err(e);
751            }
752
753            let ins_sql = format!(
754                "INSERT INTO {} \
755                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
756                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
757                table
758            );
759            for row in &rows {
760                if let Err(e) = conn.execute(
761                    &ins_sql,
762                    rusqlite::params![
763                        row.subject_id,
764                        row.kind,
765                        row.title,
766                        row.body,
767                        row.tags,
768                        &new_ns,
769                        row.metadata,
770                        row.updated_at,
771                    ],
772                ) {
773                    let _ = conn.execute_batch("ROLLBACK");
774                    return Err(e);
775                }
776            }
777
778            conn.execute_batch("COMMIT")?;
779            Ok(moved)
780        })
781        .await
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788    use crate::pool::PoolConfig;
789
790    fn setup_memory_store(table_key: &str) -> Fts5TextSearch {
791        let config = PoolConfig {
792            path: None,
793            ..PoolConfig::default()
794        };
795        let pool = Arc::new(ConnectionPool::new(config).unwrap());
796
797        {
798            let writer = pool.writer().unwrap();
799            ensure_fts5_schema(writer.conn(), table_key).unwrap();
800        }
801
802        Fts5TextSearch::new(pool, false, table_key.to_string())
803    }
804
805    fn make_document(subject_id: Uuid, title: &str, body: &str) -> TextDocument {
806        TextDocument {
807            subject_id,
808            kind: SubstrateKind::Note,
809            title: if title.is_empty() {
810                None
811            } else {
812                Some(title.to_string())
813            },
814            body: body.to_string(),
815            tags: vec![],
816            namespace: "test_ns".to_string(),
817            metadata: None,
818            updated_at: Utc::now(),
819        }
820    }
821
822    fn ns_filter(namespace: &str) -> TextFilter {
823        TextFilter {
824            namespaces: vec![namespace.to_string()],
825            ..TextFilter::default()
826        }
827    }
828
829    #[tokio::test]
830    async fn test_upsert_and_search() {
831        let store = setup_memory_store("upsert_search");
832
833        let id = Uuid::new_v4();
834        let doc = TextDocument {
835            subject_id: id,
836            kind: SubstrateKind::Entity,
837            title: Some("Rust Programming".to_string()),
838            body: "Rust is a systems programming language focused on safety and performance."
839                .to_string(),
840            tags: vec!["rust".to_string(), "programming".to_string()],
841            namespace: "tech".to_string(),
842            metadata: None,
843            updated_at: Utc::now(),
844        };
845
846        store.upsert_document(doc).await.unwrap();
847
848        let hits = store
849            .search(TextSearchRequest {
850                query: "Rust programming".to_string(),
851                mode: TextQueryMode::Plain,
852                filter: Some(ns_filter("tech")),
853                top_k: 10,
854                snippet_chars: 64,
855            })
856            .await
857            .unwrap();
858
859        assert_eq!(hits.len(), 1);
860        assert_eq!(hits[0].subject_id, id);
861        assert_eq!(hits[0].rank, 1);
862        assert!(hits[0].score.to_f64() > 0.0);
863        assert!(hits[0].title.is_some());
864    }
865
866    #[tokio::test]
867    async fn test_phrase_search() {
868        let store = setup_memory_store("phrase");
869
870        let id1 = Uuid::new_v4();
871        let id2 = Uuid::new_v4();
872
873        store
874            .upsert_document(make_document(
875                id1,
876                "Animals",
877                "The quick brown fox jumps over the lazy dog.",
878            ))
879            .await
880            .unwrap();
881
882        store
883            .upsert_document(make_document(
884                id2,
885                "Colors",
886                "The brown paint was quick to dry, unlike the fox.",
887            ))
888            .await
889            .unwrap();
890
891        let hits = store
892            .search(TextSearchRequest {
893                query: "quick brown fox".to_string(),
894                mode: TextQueryMode::Phrase,
895                filter: Some(ns_filter("test_ns")),
896                top_k: 10,
897                snippet_chars: 64,
898            })
899            .await
900            .unwrap();
901
902        assert_eq!(hits.len(), 1);
903        assert_eq!(hits[0].subject_id, id1);
904
905        let hits = store
906            .search(TextSearchRequest {
907                query: "quick brown fox".to_string(),
908                mode: TextQueryMode::Plain,
909                filter: Some(ns_filter("test_ns")),
910                top_k: 10,
911                snippet_chars: 64,
912            })
913            .await
914            .unwrap();
915
916        assert_eq!(hits.len(), 2);
917    }
918
919    #[tokio::test]
920    async fn test_delete_document() {
921        let store = setup_memory_store("delete");
922
923        let id1 = Uuid::new_v4();
924        let id2 = Uuid::new_v4();
925
926        store
927            .upsert_document(make_document(id1, "Doc One", "First document content."))
928            .await
929            .unwrap();
930        store
931            .upsert_document(make_document(id2, "Doc Two", "Second document content."))
932            .await
933            .unwrap();
934
935        let stats = store.stats().await.unwrap();
936        assert_eq!(stats.document_count, 2);
937
938        let deleted = store.delete_document("test_ns", id1).await.unwrap();
939        assert!(deleted);
940
941        let stats = store.stats().await.unwrap();
942        assert_eq!(stats.document_count, 1);
943
944        let deleted_again = store.delete_document("test_ns", id1).await.unwrap();
945        assert!(!deleted_again);
946
947        let doc = store.get_document("test_ns", id2).await.unwrap();
948        assert!(doc.is_some());
949
950        let doc = store.get_document("test_ns", id1).await.unwrap();
951        assert!(doc.is_none());
952    }
953
954    #[tokio::test]
955    async fn test_count_with_filter() {
956        let store = setup_memory_store("count_filter");
957        let ns = "test_ns".to_string();
958
959        for i in 0..5 {
960            let kind = if i % 2 == 0 {
961                SubstrateKind::Entity
962            } else {
963                SubstrateKind::Note
964            };
965            let doc = TextDocument {
966                subject_id: Uuid::new_v4(),
967                kind,
968                title: Some(format!("Doc {}", i)),
969                body: format!("Content for document number {}", i),
970                tags: vec![],
971                namespace: ns.clone(),
972                metadata: None,
973                updated_at: Utc::now(),
974            };
975            store.upsert_document(doc).await.unwrap();
976        }
977
978        let total = store
979            .count(TextFilter {
980                namespaces: vec![ns.clone()],
981                ..TextFilter::default()
982            })
983            .await
984            .unwrap();
985        assert_eq!(total, 5);
986
987        let entities = store
988            .count(TextFilter {
989                namespaces: vec![ns.clone()],
990                kinds: vec![SubstrateKind::Entity],
991                ..TextFilter::default()
992            })
993            .await
994            .unwrap();
995        assert_eq!(entities, 3);
996
997        let notes = store
998            .count(TextFilter {
999                namespaces: vec![ns.clone()],
1000                kinds: vec![SubstrateKind::Note],
1001                ..TextFilter::default()
1002            })
1003            .await
1004            .unwrap();
1005        assert_eq!(notes, 2);
1006    }
1007
1008    #[tokio::test]
1009    async fn test_get_document_roundtrip() {
1010        let store = setup_memory_store("get_roundtrip");
1011
1012        let id = Uuid::new_v4();
1013        let original = TextDocument {
1014            subject_id: id,
1015            kind: SubstrateKind::Note,
1016            title: Some("Important Memo".to_string()),
1017            body: "This memo contains critical information.".to_string(),
1018            tags: vec!["important".to_string(), "memo".to_string()],
1019            namespace: "work".to_string(),
1020            metadata: Some(serde_json::json!({"priority": "high"})),
1021            updated_at: Utc::now(),
1022        };
1023
1024        store.upsert_document(original.clone()).await.unwrap();
1025
1026        let retrieved = store.get_document("work", id).await.unwrap().unwrap();
1027        assert_eq!(retrieved.subject_id, id);
1028        assert_eq!(retrieved.kind, SubstrateKind::Note);
1029        assert_eq!(retrieved.title, Some("Important Memo".to_string()));
1030        assert_eq!(retrieved.body, "This memo contains critical information.");
1031        assert_eq!(retrieved.tags, vec!["important", "memo"]);
1032        assert_eq!(retrieved.namespace, "work");
1033    }
1034
1035    #[tokio::test]
1036    async fn test_upsert_replaces_existing() {
1037        let store = setup_memory_store("replace");
1038
1039        let id = Uuid::new_v4();
1040        store
1041            .upsert_document(make_document(id, "Original", "Original body text."))
1042            .await
1043            .unwrap();
1044
1045        store
1046            .upsert_document(make_document(id, "Updated", "Updated body text."))
1047            .await
1048            .unwrap();
1049
1050        let stats = store.stats().await.unwrap();
1051        assert_eq!(stats.document_count, 1);
1052
1053        let doc = store.get_document("test_ns", id).await.unwrap().unwrap();
1054        assert_eq!(doc.title, Some("Updated".to_string()));
1055        assert_eq!(doc.body, "Updated body text.");
1056    }
1057
1058    #[tokio::test]
1059    async fn test_batch_upsert() {
1060        let store = setup_memory_store("batch");
1061
1062        let docs: Vec<TextDocument> = (0..50)
1063            .map(|i| TextDocument {
1064                subject_id: Uuid::new_v4(),
1065                kind: SubstrateKind::Entity,
1066                title: Some(format!("Item {}", i)),
1067                body: format!("This is the body content for item number {}", i),
1068                tags: vec![format!("tag_{}", i % 5)],
1069                namespace: "batch_ns".to_string(),
1070                metadata: None,
1071                updated_at: Utc::now(),
1072            })
1073            .collect();
1074
1075        let summary = store.upsert_documents(docs).await.unwrap();
1076        assert_eq!(summary.attempted, 50);
1077        assert_eq!(summary.affected, 50);
1078        assert_eq!(summary.failed, 0);
1079
1080        let stats = store.stats().await.unwrap();
1081        assert_eq!(stats.document_count, 50);
1082    }
1083
1084    #[tokio::test]
1085    async fn test_empty_search() {
1086        let store = setup_memory_store("empty");
1087
1088        let hits = store
1089            .search(TextSearchRequest {
1090                query: "nonexistent".to_string(),
1091                mode: TextQueryMode::Plain,
1092                filter: Some(ns_filter("test_ns")),
1093                top_k: 10,
1094                snippet_chars: 64,
1095            })
1096            .await
1097            .unwrap();
1098
1099        assert!(hits.is_empty());
1100    }
1101
1102    #[tokio::test]
1103    async fn test_rebuild() {
1104        let store = setup_memory_store("rebuild");
1105
1106        store
1107            .upsert_document(make_document(
1108                Uuid::new_v4(),
1109                "Test",
1110                "Test document for rebuild.",
1111            ))
1112            .await
1113            .unwrap();
1114
1115        let stats = store.rebuild(IndexRebuildScope::Full).await.unwrap();
1116        assert_eq!(stats.document_count, 1);
1117        assert!(!stats.needs_rebuild);
1118        assert!(stats.last_rebuild_at.is_some());
1119    }
1120
1121    #[tokio::test]
1122    async fn test_search_with_kind_filter() {
1123        let store = setup_memory_store("filter_kind");
1124
1125        let id_entity = Uuid::new_v4();
1126        let id_note = Uuid::new_v4();
1127
1128        store
1129            .upsert_document(TextDocument {
1130                subject_id: id_entity,
1131                kind: SubstrateKind::Entity,
1132                title: Some("Rust Guide".to_string()),
1133                body: "A comprehensive guide to Rust programming.".to_string(),
1134                tags: vec![],
1135                namespace: "test_ns".to_string(),
1136                metadata: None,
1137                updated_at: Utc::now(),
1138            })
1139            .await
1140            .unwrap();
1141
1142        store
1143            .upsert_document(TextDocument {
1144                subject_id: id_note,
1145                kind: SubstrateKind::Note,
1146                title: Some("Rust Notes".to_string()),
1147                body: "Quick notes about Rust concepts.".to_string(),
1148                tags: vec![],
1149                namespace: "test_ns".to_string(),
1150                metadata: None,
1151                updated_at: Utc::now(),
1152            })
1153            .await
1154            .unwrap();
1155
1156        let hits = store
1157            .search(TextSearchRequest {
1158                query: "Rust".to_string(),
1159                mode: TextQueryMode::Plain,
1160                filter: Some(TextFilter {
1161                    kinds: vec![SubstrateKind::Entity],
1162                    namespaces: vec!["test_ns".to_string()],
1163                    ..TextFilter::default()
1164                }),
1165                top_k: 10,
1166                snippet_chars: 64,
1167            })
1168            .await
1169            .unwrap();
1170
1171        assert_eq!(hits.len(), 1);
1172        assert_eq!(hits[0].subject_id, id_entity);
1173    }
1174
1175    #[tokio::test]
1176    async fn test_sanitize_fts5_query() {
1177        assert_eq!(sanitize_fts5_query("hello world"), "hello world");
1178        assert_eq!(sanitize_fts5_query("hello*world"), "helloworld");
1179        assert_eq!(sanitize_fts5_query("\"quoted\""), "quoted");
1180        assert_eq!(sanitize_fts5_query("(parens)"), "parens");
1181        assert_eq!(sanitize_fts5_query("a + b - c"), "a b c");
1182        assert_eq!(sanitize_fts5_query("col:value"), "colvalue");
1183        assert_eq!(sanitize_fts5_query(""), "");
1184        assert_eq!(sanitize_fts5_query("***"), "");
1185    }
1186
1187    #[tokio::test]
1188    async fn test_score_is_bounded() {
1189        let store = setup_memory_store("score_bounds");
1190
1191        for i in 0..5 {
1192            store
1193                .upsert_document(make_document(
1194                    Uuid::new_v4(),
1195                    &format!("Doc {}", i),
1196                    &format!("This document discusses topic number {}", i),
1197                ))
1198                .await
1199                .unwrap();
1200        }
1201
1202        let hits = store
1203            .search(TextSearchRequest {
1204                query: "document topic".to_string(),
1205                mode: TextQueryMode::Plain,
1206                filter: Some(ns_filter("test_ns")),
1207                top_k: 10,
1208                snippet_chars: 64,
1209            })
1210            .await
1211            .unwrap();
1212
1213        for hit in &hits {
1214            let score = hit.score.to_f64();
1215            assert!(
1216                score > 0.0 && score <= 1.0,
1217                "score out of (0, 1] range: {}",
1218                score
1219            );
1220        }
1221
1222        for (i, hit) in hits.iter().enumerate() {
1223            assert_eq!(hit.rank, (i + 1) as u32);
1224        }
1225    }
1226
1227    #[tokio::test]
1228    async fn test_rename_namespace() {
1229        let store = setup_memory_store("rename_ns");
1230
1231        let id = Uuid::new_v4();
1232        let doc = TextDocument {
1233            subject_id: id,
1234            kind: SubstrateKind::Note,
1235            title: Some("Rename test".to_string()),
1236            body: "keyword_unique_xyz".to_string(),
1237            tags: vec![],
1238            namespace: "old_ns".to_string(),
1239            metadata: None,
1240            updated_at: Utc::now(),
1241        };
1242        store.upsert_document(doc).await.unwrap();
1243
1244        let before = store
1245            .search(TextSearchRequest {
1246                query: "keyword_unique_xyz".to_string(),
1247                mode: TextQueryMode::Plain,
1248                filter: Some(ns_filter("old_ns")),
1249                top_k: 10,
1250                snippet_chars: 64,
1251            })
1252            .await
1253            .unwrap();
1254        assert_eq!(before.len(), 1);
1255
1256        let moved = store.rename_namespace("old_ns", "new_ns").await.unwrap();
1257        assert_eq!(moved, 1);
1258
1259        let after_new = store
1260            .search(TextSearchRequest {
1261                query: "keyword_unique_xyz".to_string(),
1262                mode: TextQueryMode::Plain,
1263                filter: Some(ns_filter("new_ns")),
1264                top_k: 10,
1265                snippet_chars: 64,
1266            })
1267            .await
1268            .unwrap();
1269        assert_eq!(after_new.len(), 1);
1270
1271        let after_old = store
1272            .search(TextSearchRequest {
1273                query: "keyword_unique_xyz".to_string(),
1274                mode: TextQueryMode::Plain,
1275                filter: Some(ns_filter("old_ns")),
1276                top_k: 10,
1277                snippet_chars: 64,
1278            })
1279            .await
1280            .unwrap();
1281        assert!(after_old.is_empty());
1282    }
1283
1284    #[tokio::test]
1285    async fn test_metadata_none_roundtrip() {
1286        let store = setup_memory_store("meta_none");
1287        let id = uuid::Uuid::new_v4();
1288        let doc = TextDocument {
1289            subject_id: id,
1290            kind: SubstrateKind::Note,
1291            namespace: "test_ns".to_string(),
1292            title: None,
1293            body: "no metadata".to_string(),
1294            tags: vec![],
1295            metadata: None,
1296            updated_at: Utc::now(),
1297        };
1298        store.upsert_document(doc).await.unwrap();
1299        let fetched = store.get_document("test_ns", id).await.unwrap().unwrap();
1300        assert!(fetched.metadata.is_none());
1301    }
1302
1303    #[tokio::test]
1304    async fn test_rename_namespace_noop() {
1305        let store = setup_memory_store("rename_noop");
1306
1307        let id = Uuid::new_v4();
1308        let doc = TextDocument {
1309            subject_id: id,
1310            kind: SubstrateKind::Note,
1311            title: None,
1312            body: "noop_test_content".to_string(),
1313            tags: vec![],
1314            namespace: "same_ns".to_string(),
1315            metadata: None,
1316            updated_at: Utc::now(),
1317        };
1318        store.upsert_document(doc).await.unwrap();
1319
1320        let moved = store.rename_namespace("same_ns", "same_ns").await.unwrap();
1321        assert_eq!(moved, 0);
1322
1323        let hits = store
1324            .search(TextSearchRequest {
1325                query: "noop_test_content".to_string(),
1326                mode: TextQueryMode::Plain,
1327                filter: Some(ns_filter("same_ns")),
1328                top_k: 10,
1329                snippet_chars: 64,
1330            })
1331            .await
1332            .unwrap();
1333        assert_eq!(hits.len(), 1);
1334    }
1335}