Skip to main content

khive_db/stores/
text.rs

1//! FTS5-backed `TextSearch`: one virtual table per model, scores normalized to `(0.05, 1.0]`.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::{DateTime, TimeZone, Utc};
7use uuid::Uuid;
8
9use khive_score::DeterministicScore;
10use khive_storage::error::StorageError;
11use khive_storage::types::{
12    BatchWriteSummary, IndexRebuildScope, TextDocument, TextFilter, TextGatherMode, TextIndexStats,
13    TextQueryMode, TextSearchHit, TextSearchOptions, TextSearchRequest, TextTermStats,
14    TextTermStatsRequest,
15};
16use khive_storage::StorageCapability;
17use khive_storage::TextSearch;
18use khive_types::SubstrateKind;
19
20use crate::error::SqliteError;
21use crate::pool::ConnectionPool;
22
23/// Ensure the FTS5 virtual table for `table_key` exists.
24///
25/// Used in tests to set up an in-memory FTS5 table without the full `StorageBackend`.
26#[cfg(test)]
27pub(crate) fn ensure_fts5_schema(
28    conn: &rusqlite::Connection,
29    table_key: &str,
30) -> Result<(), rusqlite::Error> {
31    let table_name = format!("fts_{}", table_key);
32    let ddl = format!(
33        "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5(\
34         subject_id UNINDEXED, \
35         kind UNINDEXED, \
36         title, \
37         body, \
38         tags UNINDEXED, \
39         namespace UNINDEXED, \
40         metadata UNINDEXED, \
41         updated_at UNINDEXED\
42         )",
43        table_name
44    );
45    conn.execute_batch(&ddl)
46}
47
48fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
49    StorageError::driver(StorageCapability::Text, op, e)
50}
51
52fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
53    StorageError::driver(StorageCapability::Text, op, e)
54}
55
56/// A TextSearch backed by SQLite FTS5 virtual tables.
57///
58/// Each instance manages one table: `fts_{table_key}`. Documents are stored
59/// with their metadata in UNINDEXED columns; only `title` and `body` are
60/// full-text indexed.
61pub struct Fts5TextSearch {
62    pool: Arc<ConnectionPool>,
63    is_file_backed: bool,
64    table_name: String,
65}
66
67impl Fts5TextSearch {
68    /// Create a new FTS5 text search instance.
69    ///
70    /// The FTS5 virtual table must already exist (created by `StorageBackend::text()`).
71    pub(crate) fn new(pool: Arc<ConnectionPool>, is_file_backed: bool, table_key: String) -> Self {
72        let table_name = format!("fts_{}", table_key);
73        Self {
74            pool,
75            is_file_backed,
76            table_name,
77        }
78    }
79
80    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
81        let config = self.pool.config();
82        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
83            operation: "fts_writer".into(),
84            message: "in-memory databases do not support standalone connections".into(),
85        })?;
86
87        let conn = rusqlite::Connection::open_with_flags(
88            path,
89            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
90                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
91                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
92        )
93        .map_err(|e| map_err(e, "open_fts_writer"))?;
94
95        conn.busy_timeout(config.busy_timeout)
96            .map_err(|e| map_err(e, "open_fts_writer"))?;
97        conn.pragma_update(None, "foreign_keys", "ON")
98            .map_err(|e| map_err(e, "open_fts_writer"))?;
99        conn.pragma_update(None, "synchronous", "NORMAL")
100            .map_err(|e| map_err(e, "open_fts_writer"))?;
101
102        Ok(conn)
103    }
104
105    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
106        let config = self.pool.config();
107        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
108            operation: "fts_reader".into(),
109            message: "in-memory databases do not support standalone connections".into(),
110        })?;
111
112        let conn = rusqlite::Connection::open_with_flags(
113            path,
114            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
115                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
116                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
117        )
118        .map_err(|e| map_err(e, "open_fts_reader"))?;
119
120        conn.busy_timeout(config.busy_timeout)
121            .map_err(|e| map_err(e, "open_fts_reader"))?;
122        conn.pragma_update(None, "foreign_keys", "ON")
123            .map_err(|e| map_err(e, "open_fts_reader"))?;
124        conn.pragma_update(None, "synchronous", "NORMAL")
125            .map_err(|e| map_err(e, "open_fts_reader"))?;
126
127        Ok(conn)
128    }
129
130    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
131    where
132        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
133        R: Send + 'static,
134    {
135        if self.is_file_backed {
136            let conn = self.open_standalone_writer()?;
137            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
138                .await
139                .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
140        } else {
141            let pool = Arc::clone(&self.pool);
142            tokio::task::spawn_blocking(move || {
143                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
144                f(guard.conn()).map_err(|e| map_err(e, op))
145            })
146            .await
147            .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
148        }
149    }
150
151    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
152    where
153        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
154        R: Send + 'static,
155    {
156        if self.is_file_backed {
157            let conn = self.open_standalone_reader()?;
158            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
159                .await
160                .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
161        } else {
162            let pool = Arc::clone(&self.pool);
163            tokio::task::spawn_blocking(move || {
164                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
165                f(guard.conn()).map_err(|e| map_err(e, op))
166            })
167            .await
168            .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
169        }
170    }
171}
172
173// -- Helper functions --
174
175fn tags_to_json(tags: &[String]) -> String {
176    serde_json::to_string(tags).unwrap_or_else(|_| "[]".to_string())
177}
178
179fn tags_from_json(s: &str) -> Vec<String> {
180    serde_json::from_str(s).unwrap_or_default()
181}
182
183fn dt_to_micros(dt: &DateTime<Utc>) -> i64 {
184    dt.timestamp_micros()
185}
186
187fn micros_to_dt(micros: i64) -> DateTime<Utc> {
188    Utc.timestamp_micros(micros)
189        .single()
190        .unwrap_or_else(Utc::now)
191}
192
193/// Sanitize an FTS5 query string to prevent driver errors from special chars.
194///
195/// Two-pass approach:
196/// 1. **Replace** grouping/separator chars with spaces so adjacent tokens are
197///    not merged. This prevents `NEAR(smile,5)` from becoming `NEARsmile5`.
198///    Chars replaced with space: `(`, `)`, `,`
199/// 2. **Remove** remaining FTS5 operator characters (H1: `~`, `!` added):
200///    `*`, `"`, `+`, `-`, `:`, `^`, `.`, `~`, `!`, `\0`, control characters
201///
202/// After character processing, split on whitespace and remove FTS5 keyword
203/// tokens: AND, OR, NOT, NEAR.
204///
205/// For Phrase mode, the caller wraps the result in double quotes.
206fn sanitize_fts5_query(query: &str) -> String {
207    // Pass 1: replace grouping/separator chars with spaces to isolate tokens.
208    // Colon is included here (not in Pass 2) so that "tenant:isolation" becomes
209    // "tenant isolation" rather than "tenantisolation".
210    let spaced: String = query
211        .chars()
212        .map(|c| {
213            if matches!(c, '(' | ')' | ',' | ':') {
214                ' '
215            } else {
216                c
217            }
218        })
219        .collect();
220
221    // Pass 2: remove remaining FTS5 special chars and control characters.
222    // Single quote (apostrophe) is included because FTS5 Plain-mode queries treat
223    // it as a string-literal delimiter causing "syntax error near '''".
224    let sanitized: String = spaced
225        .chars()
226        .filter(|c| {
227            !matches!(
228                c,
229                '*' | '"' | '\'' | '+' | '-' | '^' | '.' | '~' | '!' | '\0'
230            ) && !c.is_control()
231        })
232        .collect();
233
234    // Pass 3: filter FTS5 operator keywords.
235    sanitized
236        .split_whitespace()
237        .filter(|t| {
238            !matches!(
239                t.to_ascii_uppercase().as_str(),
240                "AND" | "OR" | "NOT" | "NEAR"
241            )
242        })
243        .collect::<Vec<_>>()
244        .join(" ")
245}
246
247/// Build a WHERE clause fragment and params for a `TextFilter`.
248///
249/// Returns `(clause, params)` where clause is empty if no filters are active.
250/// Parameter indices start at `?{start_idx}`.
251fn build_filter_clause(
252    filter: &TextFilter,
253    table: &str,
254    start_idx: usize,
255) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
256    let mut conditions: Vec<String> = Vec::new();
257    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
258    let mut idx = start_idx;
259
260    if !filter.ids.is_empty() {
261        let placeholders: Vec<String> = filter
262            .ids
263            .iter()
264            .map(|_| {
265                let p = format!("?{}", idx);
266                idx += 1;
267                p
268            })
269            .collect();
270        conditions.push(format!(
271            "{}.subject_id IN ({})",
272            table,
273            placeholders.join(", ")
274        ));
275        for id in &filter.ids {
276            params.push(Box::new(id.to_string()));
277        }
278    }
279
280    if !filter.kinds.is_empty() {
281        let placeholders: Vec<String> = filter
282            .kinds
283            .iter()
284            .map(|_| {
285                let p = format!("?{}", idx);
286                idx += 1;
287                p
288            })
289            .collect();
290        conditions.push(format!("{}.kind IN ({})", table, placeholders.join(", ")));
291        for kind in &filter.kinds {
292            params.push(Box::new(kind.to_string()));
293        }
294    }
295
296    if !filter.namespaces.is_empty() {
297        let placeholders: Vec<String> = filter
298            .namespaces
299            .iter()
300            .map(|_| {
301                let p = format!("?{}", idx);
302                idx += 1;
303                p
304            })
305            .collect();
306        conditions.push(format!(
307            "{}.namespace IN ({})",
308            table,
309            placeholders.join(", ")
310        ));
311        for ns in &filter.namespaces {
312            params.push(Box::new(ns.clone()));
313        }
314    }
315
316    if conditions.is_empty() {
317        (String::new(), params)
318    } else {
319        (format!(" AND {}", conditions.join(" AND ")), params)
320    }
321}
322
323#[async_trait]
324impl TextSearch for Fts5TextSearch {
325    async fn upsert_document(&self, document: TextDocument) -> Result<(), StorageError> {
326        let table = self.table_name.clone();
327        let namespace = document.namespace.clone();
328
329        self.with_writer("fts_upsert", move |conn| {
330            conn.execute_batch("BEGIN IMMEDIATE")?;
331
332            let del_sql = format!(
333                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
334                table
335            );
336            if let Err(e) = conn.execute(
337                &del_sql,
338                rusqlite::params![&namespace, document.subject_id.to_string()],
339            ) {
340                let _ = conn.execute_batch("ROLLBACK");
341                return Err(e);
342            }
343
344            let ins_sql = format!(
345                "INSERT INTO {} \
346                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
347                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
348                table
349            );
350            let tags_json = tags_to_json(&document.tags);
351            let metadata_json: Option<String> = document.metadata.as_ref().map(|v| v.to_string());
352
353            if let Err(e) = conn.execute(
354                &ins_sql,
355                rusqlite::params![
356                    document.subject_id.to_string(),
357                    document.kind.to_string(),
358                    document.title.as_deref().unwrap_or(""),
359                    document.body,
360                    tags_json,
361                    &namespace,
362                    metadata_json,
363                    dt_to_micros(&document.updated_at),
364                ],
365            ) {
366                let _ = conn.execute_batch("ROLLBACK");
367                return Err(e);
368            }
369
370            conn.execute_batch("COMMIT")?;
371            Ok(())
372        })
373        .await
374    }
375
376    async fn upsert_documents(
377        &self,
378        documents: Vec<TextDocument>,
379    ) -> Result<BatchWriteSummary, StorageError> {
380        let table = self.table_name.clone();
381        let attempted = documents.len() as u64;
382
383        self.with_writer("fts_upsert_batch", move |conn| {
384            let del_sql = format!(
385                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
386                table
387            );
388            let ins_sql = format!(
389                "INSERT INTO {} \
390                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
391                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
392                table
393            );
394
395            conn.execute_batch("BEGIN IMMEDIATE")?;
396            let mut affected = 0u64;
397            let mut failed = 0u64;
398
399            for doc in &documents {
400                conn.execute_batch("SAVEPOINT fts_upsert_doc")?;
401                let id_str = doc.subject_id.to_string();
402                let namespace = &doc.namespace;
403                let result = (|| {
404                    conn.execute(&del_sql, rusqlite::params![namespace, &id_str])?;
405
406                    let tags_json = tags_to_json(&doc.tags);
407                    let metadata_json: Option<String> =
408                        doc.metadata.as_ref().map(|v| v.to_string());
409
410                    conn.execute(
411                        &ins_sql,
412                        rusqlite::params![
413                            &id_str,
414                            &doc.kind.to_string(),
415                            doc.title.as_deref().unwrap_or(""),
416                            &doc.body,
417                            &tags_json,
418                            namespace,
419                            &metadata_json,
420                            dt_to_micros(&doc.updated_at),
421                        ],
422                    )?;
423                    Ok::<(), rusqlite::Error>(())
424                })();
425
426                match result {
427                    Ok(()) => {
428                        conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc")?;
429                        affected += 1;
430                    }
431                    Err(_) => {
432                        let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT fts_upsert_doc");
433                        let _ = conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc");
434                        failed += 1;
435                    }
436                }
437            }
438
439            conn.execute_batch("COMMIT")?;
440
441            Ok(BatchWriteSummary {
442                attempted,
443                affected,
444                failed,
445                first_error: String::new(),
446            })
447        })
448        .await
449    }
450
451    async fn delete_document(
452        &self,
453        namespace: &str,
454        subject_id: Uuid,
455    ) -> Result<bool, StorageError> {
456        let namespace = namespace.to_string();
457        let table = self.table_name.clone();
458
459        self.with_writer("fts_delete", move |conn| {
460            let sql = format!(
461                "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
462                table
463            );
464            let deleted =
465                conn.execute(&sql, rusqlite::params![namespace, subject_id.to_string()])?;
466            Ok(deleted > 0)
467        })
468        .await
469    }
470
471    async fn get_document(
472        &self,
473        namespace: &str,
474        subject_id: Uuid,
475    ) -> Result<Option<TextDocument>, StorageError> {
476        let namespace = namespace.to_string();
477        let table = self.table_name.clone();
478
479        self.with_reader("fts_get", move |conn| {
480            let sql = format!(
481                "SELECT subject_id, kind, title, body, tags, namespace, metadata, updated_at \
482                 FROM {} WHERE namespace = ?1 AND subject_id = ?2",
483                table
484            );
485            let mut stmt = conn.prepare(&sql)?;
486            let mut rows = stmt.query(rusqlite::params![namespace, subject_id.to_string()])?;
487
488            match rows.next()? {
489                Some(row) => {
490                    let id_str: String = row.get(0)?;
491                    let kind_str: String = row.get(1)?;
492                    let title: String = row.get(2)?;
493                    let body: String = row.get(3)?;
494                    let tags_json: String = row.get(4)?;
495                    let ns: String = row.get(5)?;
496                    let metadata_json: Option<String> = row.get(6)?;
497                    let updated_at_micros: i64 = row.get(7)?;
498
499                    let sid = Uuid::parse_str(&id_str).map_err(|e| {
500                        rusqlite::Error::FromSqlConversionFailure(
501                            0,
502                            rusqlite::types::Type::Text,
503                            Box::new(e),
504                        )
505                    })?;
506
507                    let kind = kind_str.parse::<SubstrateKind>().map_err(|e| {
508                        rusqlite::Error::FromSqlConversionFailure(
509                            1,
510                            rusqlite::types::Type::Text,
511                            Box::new(e),
512                        )
513                    })?;
514
515                    Ok(Some(TextDocument {
516                        subject_id: sid,
517                        kind,
518                        title: if title.is_empty() { None } else { Some(title) },
519                        body,
520                        tags: tags_from_json(&tags_json),
521                        namespace: ns,
522                        metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
523                        updated_at: micros_to_dt(updated_at_micros),
524                    }))
525                }
526                None => Ok(None),
527            }
528        })
529        .await
530    }
531
532    async fn search(&self, request: TextSearchRequest) -> Result<Vec<TextSearchHit>, StorageError> {
533        let table = self.table_name.clone();
534
535        self.with_reader("fts_search", move |conn| {
536            let match_expr = match request.mode {
537                TextQueryMode::AnyTerm => {
538                    // Sanitize each token independently so the OR joiner isn't
539                    // stripped by sanitize_fts5_query's keyword filter.
540                    let parts: Vec<String> = request
541                        .query
542                        .split_whitespace()
543                        .map(sanitize_fts5_query)
544                        .filter(|t| !t.is_empty())
545                        .collect();
546                    if parts.is_empty() {
547                        return Ok(Vec::new());
548                    }
549                    parts.join(" OR ")
550                }
551                _ => {
552                    let sanitized = sanitize_fts5_query(&request.query);
553                    if sanitized.is_empty() {
554                        return Ok(Vec::new());
555                    }
556                    match request.mode {
557                        TextQueryMode::Phrase => format!("\"{}\"", sanitized),
558                        TextQueryMode::Plain => sanitized,
559                        TextQueryMode::AnyTerm => unreachable!(),
560                    }
561                }
562            };
563
564            // Snippet column index 3 = body in the FTS5 schema.
565            // snippet_chars == 0 is the sentinel for "no snippet" — skip the
566            // snippet(...) call entirely and return NULL instead.  This avoids
567            // the ~12ms BM25 snippet computation on the hot recall path where
568            // snippets are unused.  Callers that need snippets (diagnostics) pass
569            // snippet_chars > 0 and get the same behaviour as before.
570            let snippet_expr = if request.snippet_chars == 0 {
571                "NULL AS snippet".to_string()
572            } else {
573                let chars = i32::try_from(request.snippet_chars).unwrap_or(i32::MAX);
574                format!("snippet({table}, 3, '', '', '...', {chars})")
575            };
576
577            let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
578                build_filter_clause(filter, &table, 3)
579            } else {
580                (String::new(), Vec::new())
581            };
582
583            let sql = format!(
584                "SELECT subject_id, rank, title, {snippet_expr} \
585                 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
586                 ORDER BY rank LIMIT ?2",
587            );
588
589            let mut stmt = conn.prepare(&sql)?;
590            stmt.raw_bind_parameter(1, &match_expr)?;
591            stmt.raw_bind_parameter(2, request.top_k as i64)?;
592
593            for (i, param) in filter_params.iter().enumerate() {
594                param
595                    .to_sql()
596                    .map(|val| stmt.raw_bind_parameter(3 + i, val))
597                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
598            }
599
600            let mut hits = Vec::new();
601            let mut rows = stmt.raw_query();
602            let mut rank_idx = 0u32;
603
604            while let Some(row) = rows.next()? {
605                let id_str: String = row.get(0)?;
606                let fts_rank: f64 = row.get(1)?;
607                let title: String = row.get(2)?;
608                let snippet: Option<String> = row.get(3)?;
609
610                let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
611                    rusqlite::Error::FromSqlConversionFailure(
612                        0,
613                        rusqlite::types::Type::Text,
614                        Box::new(e),
615                    )
616                })?;
617
618                rank_idx += 1;
619                hits.push((subject_id, fts_rank, rank_idx, title, snippet));
620            }
621
622            // Normalize scores within the result set to (0.05, 1.0].
623            // Best rank (most negative) maps to 1.0, worst to 0.05.
624            let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
625            let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
626            let range = max_rank - min_rank;
627
628            let results = hits
629                .into_iter()
630                .map(|(subject_id, raw_rank, rank, title, snippet)| {
631                    let score = if range.abs() < 1e-12 {
632                        1.0
633                    } else {
634                        let t = (max_rank - raw_rank) / range;
635                        0.05 + 0.95 * t
636                    };
637                    TextSearchHit {
638                        subject_id,
639                        score: DeterministicScore::from_f64(score),
640                        rank,
641                        title: if title.is_empty() { None } else { Some(title) },
642                        snippet: snippet.filter(|s| !s.is_empty()),
643                    }
644                })
645                .collect();
646
647            Ok(results)
648        })
649        .await
650    }
651
652    async fn count(&self, filter: TextFilter) -> Result<u64, StorageError> {
653        let table = self.table_name.clone();
654
655        self.with_reader("fts_count", move |conn| {
656            let (filter_clause, filter_params) = build_filter_clause(&filter, &table, 1);
657
658            let sql = if filter_clause.is_empty() {
659                format!("SELECT COUNT(*) FROM {}", table)
660            } else {
661                let where_part = filter_clause.trim_start_matches(" AND ");
662                format!("SELECT COUNT(*) FROM {} WHERE {}", table, where_part)
663            };
664
665            let mut stmt = conn.prepare(&sql)?;
666
667            for (i, param) in filter_params.iter().enumerate() {
668                param
669                    .to_sql()
670                    .map(|val| stmt.raw_bind_parameter(1 + i, val))
671                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
672            }
673
674            let mut rows = stmt.raw_query();
675            match rows.next()? {
676                Some(row) => {
677                    let count: i64 = row.get(0)?;
678                    Ok(count as u64)
679                }
680                None => Ok(0),
681            }
682        })
683        .await
684    }
685
686    async fn stats(&self) -> Result<TextIndexStats, StorageError> {
687        let table = self.table_name.clone();
688
689        self.with_reader("fts_stats", move |conn| {
690            let sql = format!("SELECT COUNT(*) FROM {}", table);
691            let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
692
693            Ok(TextIndexStats {
694                document_count: count as u64,
695                needs_rebuild: false,
696                last_rebuild_at: None,
697            })
698        })
699        .await
700    }
701
702    async fn search_with_options(
703        &self,
704        request: TextSearchRequest,
705        options: TextSearchOptions,
706    ) -> Result<Vec<TextSearchHit>, StorageError> {
707        match options.gather_mode {
708            TextGatherMode::Ranked => self.search(request).await,
709            TextGatherMode::Unranked => self.search_unranked(request).await,
710            TextGatherMode::RankWithinCap => {
711                let gather_limit = options
712                    .gather_limit
713                    .unwrap_or(request.top_k)
714                    .max(request.top_k);
715                self.search_rank_within_cap(request, gather_limit).await
716            }
717        }
718    }
719
720    async fn term_stats(
721        &self,
722        request: TextTermStatsRequest,
723    ) -> Result<Vec<TextTermStats>, StorageError> {
724        let table = self.table_name.clone();
725
726        self.with_reader("fts_term_stats", move |conn| {
727            let filter = request.filter.as_ref();
728
729            // Document count uses params starting at ?1 (no MATCH expression).
730            let (count_filter_clause, count_filter_params) = if let Some(f) = filter {
731                build_filter_clause(f, &table, 1)
732            } else {
733                (String::new(), Vec::new())
734            };
735
736            let document_count: u64 = {
737                let count_sql = if count_filter_clause.is_empty() {
738                    format!("SELECT COUNT(*) FROM {table}")
739                } else {
740                    let where_part = count_filter_clause.trim_start_matches(" AND ");
741                    format!("SELECT COUNT(*) FROM {table} WHERE {where_part}")
742                };
743                let mut stmt = conn.prepare(&count_sql)?;
744                for (i, param) in count_filter_params.iter().enumerate() {
745                    param
746                        .to_sql()
747                        .map(|val| stmt.raw_bind_parameter(1 + i, val))
748                        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
749                }
750                let mut rows = stmt.raw_query();
751                match rows.next()? {
752                    Some(row) => {
753                        let c: i64 = row.get(0)?;
754                        c as u64
755                    }
756                    None => 0,
757                }
758            };
759
760            let mut results = Vec::with_capacity(request.terms.len());
761            for term in &request.terms {
762                let sanitized = sanitize_fts5_query(term);
763                if sanitized.is_empty() {
764                    results.push(TextTermStats {
765                        term: term.clone(),
766                        sanitized_term: sanitized,
767                        document_frequency: 0,
768                        document_count,
769                        inverse_document_frequency: 0.0,
770                    });
771                    continue;
772                }
773
774                // Per-term count: MATCH is ?1, so filter params start at ?2.
775                let (term_filter_clause, term_filter_params) = if let Some(f) = filter {
776                    build_filter_clause(f, &table, 2)
777                } else {
778                    (String::new(), Vec::new())
779                };
780
781                let count_sql = format!(
782                    "SELECT COUNT(*) FROM {table} WHERE {table} MATCH ?1{term_filter_clause}"
783                );
784                let mut stmt = conn.prepare(&count_sql)?;
785                stmt.raw_bind_parameter(1, &sanitized)?;
786                for (i, param) in term_filter_params.iter().enumerate() {
787                    param
788                        .to_sql()
789                        .map(|val| stmt.raw_bind_parameter(2 + i, val))
790                        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
791                }
792
793                let df: u64 = {
794                    let mut rows = stmt.raw_query();
795                    match rows.next()? {
796                        Some(row) => {
797                            let c: i64 = row.get(0)?;
798                            c as u64
799                        }
800                        None => 0,
801                    }
802                };
803
804                let idf = Fts5TextSearch::bm25_idf(df, document_count);
805                results.push(TextTermStats {
806                    term: term.clone(),
807                    sanitized_term: sanitized,
808                    document_frequency: df,
809                    document_count,
810                    inverse_document_frequency: idf,
811                });
812            }
813
814            Ok(results)
815        })
816        .await
817    }
818
819    async fn rebuild(&self, _scope: IndexRebuildScope) -> Result<TextIndexStats, StorageError> {
820        let table = self.table_name.clone();
821
822        self.with_writer("fts_rebuild", move |conn| {
823            // FTS5 rebuild command: repopulates the internal index structures.
824            let sql = format!("INSERT INTO {}({}) VALUES('rebuild')", table, table);
825            conn.execute(&sql, [])?;
826
827            let count_sql = format!("SELECT COUNT(*) FROM {}", table);
828            let count: i64 = conn.query_row(&count_sql, [], |row| row.get(0))?;
829
830            Ok(TextIndexStats {
831                document_count: count as u64,
832                needs_rebuild: false,
833                last_rebuild_at: Some(Utc::now()),
834            })
835        })
836        .await
837    }
838}
839
840impl Fts5TextSearch {
841    /// Robertson-Walker BM25 IDF: ln(((N - df + 0.5) / (df + 0.5)) + 1)
842    fn bm25_idf(df: u64, document_count: u64) -> f64 {
843        let n = document_count as f64;
844        let f = df as f64;
845        ((n - f + 0.5) / (f + 0.5) + 1.0).ln()
846    }
847
848    /// Build the OR match expression from a query string (shared logic).
849    fn build_any_term_expr(query: &str) -> Option<String> {
850        let parts: Vec<String> = query
851            .split_whitespace()
852            .map(sanitize_fts5_query)
853            .filter(|t| !t.is_empty())
854            .collect();
855        if parts.is_empty() {
856            None
857        } else {
858            Some(parts.join(" OR "))
859        }
860    }
861
862    /// Gather candidates without BM25 ranking; return with uniform score 1.0.
863    async fn search_unranked(
864        &self,
865        request: TextSearchRequest,
866    ) -> Result<Vec<TextSearchHit>, StorageError> {
867        let table = self.table_name.clone();
868
869        self.with_reader("fts_search_unranked", move |conn| {
870            let match_expr = match request.mode {
871                TextQueryMode::AnyTerm => match Self::build_any_term_expr(&request.query) {
872                    Some(e) => e,
873                    None => return Ok(Vec::new()),
874                },
875                _ => {
876                    let sanitized = sanitize_fts5_query(&request.query);
877                    if sanitized.is_empty() {
878                        return Ok(Vec::new());
879                    }
880                    match request.mode {
881                        TextQueryMode::Phrase => format!("\"{}\"", sanitized),
882                        TextQueryMode::Plain => sanitized,
883                        TextQueryMode::AnyTerm => unreachable!(),
884                    }
885                }
886            };
887
888            let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
889                build_filter_clause(filter, &table, 3)
890            } else {
891                (String::new(), Vec::new())
892            };
893
894            // No rank column, no ORDER BY — avoids BM25 computation entirely.
895            let sql = format!(
896                "SELECT subject_id, title \
897                 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
898                 LIMIT ?2",
899            );
900
901            let mut stmt = conn.prepare(&sql)?;
902            stmt.raw_bind_parameter(1, &match_expr)?;
903            stmt.raw_bind_parameter(2, request.top_k as i64)?;
904
905            for (i, param) in filter_params.iter().enumerate() {
906                param
907                    .to_sql()
908                    .map(|val| stmt.raw_bind_parameter(3 + i, val))
909                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
910            }
911
912            let mut results = Vec::new();
913            let mut rows = stmt.raw_query();
914            let mut rank_idx = 0u32;
915
916            while let Some(row) = rows.next()? {
917                let id_str: String = row.get(0)?;
918                let title: String = row.get(1)?;
919
920                let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
921                    rusqlite::Error::FromSqlConversionFailure(
922                        0,
923                        rusqlite::types::Type::Text,
924                        Box::new(e),
925                    )
926                })?;
927
928                rank_idx += 1;
929                results.push(TextSearchHit {
930                    subject_id,
931                    score: DeterministicScore::from_f64(1.0),
932                    rank: rank_idx,
933                    title: if title.is_empty() { None } else { Some(title) },
934                    snippet: None,
935                });
936            }
937
938            Ok(results)
939        })
940        .await
941    }
942
943    /// Two-stage gather: cheap unranked LIMIT gather_limit, then BM25-rank the subset.
944    async fn search_rank_within_cap(
945        &self,
946        request: TextSearchRequest,
947        gather_limit: u32,
948    ) -> Result<Vec<TextSearchHit>, StorageError> {
949        let table = self.table_name.clone();
950
951        self.with_reader("fts_search_rank_within_cap", move |conn| {
952            let match_expr = match request.mode {
953                TextQueryMode::AnyTerm => match Self::build_any_term_expr(&request.query) {
954                    Some(e) => e,
955                    None => return Ok(Vec::new()),
956                },
957                _ => {
958                    let sanitized = sanitize_fts5_query(&request.query);
959                    if sanitized.is_empty() {
960                        return Ok(Vec::new());
961                    }
962                    match request.mode {
963                        TextQueryMode::Phrase => format!("\"{}\"", sanitized),
964                        TextQueryMode::Plain => sanitized,
965                        TextQueryMode::AnyTerm => unreachable!(),
966                    }
967                }
968            };
969
970            let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
971                build_filter_clause(filter, &table, 3)
972            } else {
973                (String::new(), Vec::new())
974            };
975
976            // Stage 1: cheap unranked gather of rowids.
977            let gather_sql = format!(
978                "SELECT subject_id FROM {table} WHERE {table} MATCH ?1{filter_clause} LIMIT ?2"
979            );
980
981            let mut stmt = conn.prepare(&gather_sql)?;
982            stmt.raw_bind_parameter(1, &match_expr)?;
983            stmt.raw_bind_parameter(2, gather_limit as i64)?;
984            for (i, param) in filter_params.iter().enumerate() {
985                param
986                    .to_sql()
987                    .map(|val| stmt.raw_bind_parameter(3 + i, val))
988                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
989            }
990
991            let mut gathered_ids: Vec<String> = Vec::new();
992            let mut rows = stmt.raw_query();
993            while let Some(row) = rows.next()? {
994                gathered_ids.push(row.get::<_, String>(0)?);
995            }
996
997            if gathered_ids.is_empty() {
998                return Ok(Vec::new());
999            }
1000
1001            // Stage 2: BM25-rank only the gathered subset via subject_id IN (...).
1002            let snippet_expr = if request.snippet_chars == 0 {
1003                "NULL AS snippet".to_string()
1004            } else {
1005                let chars = i32::try_from(request.snippet_chars).unwrap_or(i32::MAX);
1006                format!("snippet({table}, 3, '', '', '...', {chars})")
1007            };
1008
1009            // Build IN clause for the gathered IDs.
1010            let id_placeholders: Vec<String> = gathered_ids
1011                .iter()
1012                .enumerate()
1013                .map(|(i, _)| format!("?{}", 3 + i))
1014                .collect();
1015            let in_clause = id_placeholders.join(", ");
1016
1017            let rank_sql = format!(
1018                "SELECT subject_id, rank, title, {snippet_expr} \
1019                 FROM {table} WHERE {table} MATCH ?1 AND subject_id IN ({in_clause}) \
1020                 ORDER BY rank LIMIT ?2"
1021            );
1022
1023            let mut stmt2 = conn.prepare(&rank_sql)?;
1024            stmt2.raw_bind_parameter(1, &match_expr)?;
1025            stmt2.raw_bind_parameter(2, request.top_k as i64)?;
1026            for (i, id_str) in gathered_ids.iter().enumerate() {
1027                stmt2.raw_bind_parameter(3 + i, id_str.as_str())?;
1028            }
1029
1030            let mut hits = Vec::new();
1031            let mut rows2 = stmt2.raw_query();
1032            let mut rank_idx = 0u32;
1033
1034            while let Some(row) = rows2.next()? {
1035                let id_str: String = row.get(0)?;
1036                let fts_rank: f64 = row.get(1)?;
1037                let title: String = row.get(2)?;
1038                let snippet: Option<String> = row.get(3)?;
1039
1040                let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
1041                    rusqlite::Error::FromSqlConversionFailure(
1042                        0,
1043                        rusqlite::types::Type::Text,
1044                        Box::new(e),
1045                    )
1046                })?;
1047
1048                rank_idx += 1;
1049                hits.push((subject_id, fts_rank, rank_idx, title, snippet));
1050            }
1051
1052            // Normalize scores within the ranked subset (same formula as search()).
1053            let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
1054            let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
1055            let range = max_rank - min_rank;
1056
1057            let results = hits
1058                .into_iter()
1059                .map(|(subject_id, raw_rank, rank, title, snippet)| {
1060                    let score = if range.abs() < 1e-12 {
1061                        1.0
1062                    } else {
1063                        let t = (max_rank - raw_rank) / range;
1064                        0.05 + 0.95 * t
1065                    };
1066                    TextSearchHit {
1067                        subject_id,
1068                        score: DeterministicScore::from_f64(score),
1069                        rank,
1070                        title: if title.is_empty() { None } else { Some(title) },
1071                        snippet: snippet.filter(|s| !s.is_empty()),
1072                    }
1073                })
1074                .collect();
1075
1076            Ok(results)
1077        })
1078        .await
1079    }
1080
1081    /// Move all FTS5 documents from `old_namespace` to `new_namespace` in a
1082    /// single transaction.
1083    ///
1084    /// FTS5 virtual tables do not support updating indexed columns (`title`,
1085    /// `body`) via UPDATE. The correct approach is read-then-delete-then-reinsert.
1086    ///
1087    /// Callers must invoke this after any SQL-level namespace change on the
1088    /// backing entity table so that FTS5 keyword search stays consistent with
1089    /// the entity store.
1090    // REASON: reserved for namespace migration operations
1091    #[allow(dead_code)]
1092    pub(crate) async fn rename_namespace(
1093        &self,
1094        old_namespace: &str,
1095        new_namespace: &str,
1096    ) -> Result<u64, StorageError> {
1097        if old_namespace == new_namespace {
1098            return Ok(0);
1099        }
1100        let table = self.table_name.clone();
1101        let old_ns = old_namespace.to_string();
1102        let new_ns = new_namespace.to_string();
1103
1104        self.with_writer("fts_rename_namespace", move |conn| {
1105            let sel_sql = format!(
1106                "SELECT subject_id, kind, title, body, tags, metadata, updated_at \
1107                 FROM {} WHERE namespace = ?1",
1108                table
1109            );
1110            struct Row {
1111                subject_id: String,
1112                kind: String,
1113                title: String,
1114                body: String,
1115                tags: String,
1116                metadata: Option<String>,
1117                updated_at: i64,
1118            }
1119            let rows: Vec<Row> = {
1120                let mut stmt = conn.prepare(&sel_sql)?;
1121                let iter = stmt.query_map(rusqlite::params![&old_ns], |row| {
1122                    Ok(Row {
1123                        subject_id: row.get(0)?,
1124                        kind: row.get(1)?,
1125                        title: row.get(2)?,
1126                        body: row.get(3)?,
1127                        tags: row.get(4)?,
1128                        metadata: row.get(5)?,
1129                        updated_at: row.get(6)?,
1130                    })
1131                })?;
1132                iter.collect::<Result<Vec<_>, _>>()?
1133            };
1134            let moved = rows.len() as u64;
1135            if moved == 0 {
1136                return Ok(0u64);
1137            }
1138
1139            conn.execute_batch("BEGIN IMMEDIATE")?;
1140
1141            let del_sql = format!("DELETE FROM {} WHERE namespace = ?1", table);
1142            if let Err(e) = conn.execute(&del_sql, rusqlite::params![&old_ns]) {
1143                let _ = conn.execute_batch("ROLLBACK");
1144                return Err(e);
1145            }
1146
1147            let ins_sql = format!(
1148                "INSERT INTO {} \
1149                 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1150                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1151                table
1152            );
1153            for row in &rows {
1154                if let Err(e) = conn.execute(
1155                    &ins_sql,
1156                    rusqlite::params![
1157                        row.subject_id,
1158                        row.kind,
1159                        row.title,
1160                        row.body,
1161                        row.tags,
1162                        &new_ns,
1163                        row.metadata,
1164                        row.updated_at,
1165                    ],
1166                ) {
1167                    let _ = conn.execute_batch("ROLLBACK");
1168                    return Err(e);
1169                }
1170            }
1171
1172            conn.execute_batch("COMMIT")?;
1173            Ok(moved)
1174        })
1175        .await
1176    }
1177}
1178
1179#[cfg(test)]
1180#[path = "text_tests.rs"]
1181mod tests;