1use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::{DateTime, TimeZone, Utc};
7use uuid::Uuid;
8
9use khive_score::DeterministicScore;
10use khive_storage::error::StorageError;
11use khive_storage::types::{
12 BatchWriteSummary, IndexRebuildScope, TextDocument, TextFilter, TextGatherMode, TextIndexStats,
13 TextQueryMode, TextSearchHit, TextSearchOptions, TextSearchRequest, TextTermStats,
14 TextTermStatsRequest,
15};
16use khive_storage::StorageCapability;
17use khive_storage::TextSearch;
18use khive_types::SubstrateKind;
19
20use crate::error::SqliteError;
21use crate::pool::ConnectionPool;
22
23#[cfg(test)]
27pub(crate) fn ensure_fts5_schema(
28 conn: &rusqlite::Connection,
29 table_key: &str,
30) -> Result<(), rusqlite::Error> {
31 let table_name = format!("fts_{}", table_key);
32 let ddl = format!(
33 "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5(\
34 subject_id UNINDEXED, \
35 kind UNINDEXED, \
36 title, \
37 body, \
38 tags UNINDEXED, \
39 namespace UNINDEXED, \
40 metadata UNINDEXED, \
41 updated_at UNINDEXED\
42 )",
43 table_name
44 );
45 conn.execute_batch(&ddl)
46}
47
48fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
49 StorageError::driver(StorageCapability::Text, op, e)
50}
51
52fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
53 StorageError::driver(StorageCapability::Text, op, e)
54}
55
56pub struct Fts5TextSearch {
62 pool: Arc<ConnectionPool>,
63 is_file_backed: bool,
64 table_name: String,
65}
66
67impl Fts5TextSearch {
68 pub(crate) fn new(pool: Arc<ConnectionPool>, is_file_backed: bool, table_key: String) -> Self {
72 let table_name = format!("fts_{}", table_key);
73 Self {
74 pool,
75 is_file_backed,
76 table_name,
77 }
78 }
79
80 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
81 let config = self.pool.config();
82 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
83 operation: "fts_writer".into(),
84 message: "in-memory databases do not support standalone connections".into(),
85 })?;
86
87 let conn = rusqlite::Connection::open_with_flags(
88 path,
89 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
90 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
91 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
92 )
93 .map_err(|e| map_err(e, "open_fts_writer"))?;
94
95 conn.busy_timeout(config.busy_timeout)
96 .map_err(|e| map_err(e, "open_fts_writer"))?;
97 conn.pragma_update(None, "foreign_keys", "ON")
98 .map_err(|e| map_err(e, "open_fts_writer"))?;
99 conn.pragma_update(None, "synchronous", "NORMAL")
100 .map_err(|e| map_err(e, "open_fts_writer"))?;
101
102 Ok(conn)
103 }
104
105 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
106 let config = self.pool.config();
107 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
108 operation: "fts_reader".into(),
109 message: "in-memory databases do not support standalone connections".into(),
110 })?;
111
112 let conn = rusqlite::Connection::open_with_flags(
113 path,
114 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
115 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
116 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
117 )
118 .map_err(|e| map_err(e, "open_fts_reader"))?;
119
120 conn.busy_timeout(config.busy_timeout)
121 .map_err(|e| map_err(e, "open_fts_reader"))?;
122 conn.pragma_update(None, "foreign_keys", "ON")
123 .map_err(|e| map_err(e, "open_fts_reader"))?;
124 conn.pragma_update(None, "synchronous", "NORMAL")
125 .map_err(|e| map_err(e, "open_fts_reader"))?;
126
127 Ok(conn)
128 }
129
130 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
131 where
132 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
133 R: Send + 'static,
134 {
135 if self.is_file_backed {
136 let conn = self.open_standalone_writer()?;
137 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
138 .await
139 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
140 } else {
141 let pool = Arc::clone(&self.pool);
142 tokio::task::spawn_blocking(move || {
143 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
144 f(guard.conn()).map_err(|e| map_err(e, op))
145 })
146 .await
147 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
148 }
149 }
150
151 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
152 where
153 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
154 R: Send + 'static,
155 {
156 if self.is_file_backed {
157 let conn = self.open_standalone_reader()?;
158 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
159 .await
160 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
161 } else {
162 let pool = Arc::clone(&self.pool);
163 tokio::task::spawn_blocking(move || {
164 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
165 f(guard.conn()).map_err(|e| map_err(e, op))
166 })
167 .await
168 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
169 }
170 }
171}
172
173fn tags_to_json(tags: &[String]) -> String {
176 serde_json::to_string(tags).unwrap_or_else(|_| "[]".to_string())
177}
178
179fn tags_from_json(s: &str) -> Vec<String> {
180 serde_json::from_str(s).unwrap_or_default()
181}
182
183fn dt_to_micros(dt: &DateTime<Utc>) -> i64 {
184 dt.timestamp_micros()
185}
186
187fn micros_to_dt(micros: i64) -> DateTime<Utc> {
188 Utc.timestamp_micros(micros)
189 .single()
190 .unwrap_or_else(Utc::now)
191}
192
193fn sanitize_fts5_query(query: &str) -> String {
207 let spaced: String = query
211 .chars()
212 .map(|c| {
213 if matches!(c, '(' | ')' | ',' | ':') {
214 ' '
215 } else {
216 c
217 }
218 })
219 .collect();
220
221 let sanitized: String = spaced
225 .chars()
226 .filter(|c| {
227 !matches!(
228 c,
229 '*' | '"' | '\'' | '+' | '-' | '^' | '.' | '~' | '!' | '\0'
230 ) && !c.is_control()
231 })
232 .collect();
233
234 sanitized
236 .split_whitespace()
237 .filter(|t| {
238 !matches!(
239 t.to_ascii_uppercase().as_str(),
240 "AND" | "OR" | "NOT" | "NEAR"
241 )
242 })
243 .collect::<Vec<_>>()
244 .join(" ")
245}
246
247fn build_filter_clause(
252 filter: &TextFilter,
253 table: &str,
254 start_idx: usize,
255) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
256 let mut conditions: Vec<String> = Vec::new();
257 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
258 let mut idx = start_idx;
259
260 if !filter.ids.is_empty() {
261 let placeholders: Vec<String> = filter
262 .ids
263 .iter()
264 .map(|_| {
265 let p = format!("?{}", idx);
266 idx += 1;
267 p
268 })
269 .collect();
270 conditions.push(format!(
271 "{}.subject_id IN ({})",
272 table,
273 placeholders.join(", ")
274 ));
275 for id in &filter.ids {
276 params.push(Box::new(id.to_string()));
277 }
278 }
279
280 if !filter.kinds.is_empty() {
281 let placeholders: Vec<String> = filter
282 .kinds
283 .iter()
284 .map(|_| {
285 let p = format!("?{}", idx);
286 idx += 1;
287 p
288 })
289 .collect();
290 conditions.push(format!("{}.kind IN ({})", table, placeholders.join(", ")));
291 for kind in &filter.kinds {
292 params.push(Box::new(kind.to_string()));
293 }
294 }
295
296 if !filter.namespaces.is_empty() {
297 let placeholders: Vec<String> = filter
298 .namespaces
299 .iter()
300 .map(|_| {
301 let p = format!("?{}", idx);
302 idx += 1;
303 p
304 })
305 .collect();
306 conditions.push(format!(
307 "{}.namespace IN ({})",
308 table,
309 placeholders.join(", ")
310 ));
311 for ns in &filter.namespaces {
312 params.push(Box::new(ns.clone()));
313 }
314 }
315
316 if conditions.is_empty() {
317 (String::new(), params)
318 } else {
319 (format!(" AND {}", conditions.join(" AND ")), params)
320 }
321}
322
323#[async_trait]
324impl TextSearch for Fts5TextSearch {
325 async fn upsert_document(&self, document: TextDocument) -> Result<(), StorageError> {
326 let table = self.table_name.clone();
327 let namespace = document.namespace.clone();
328
329 self.with_writer("fts_upsert", move |conn| {
330 conn.execute_batch("BEGIN IMMEDIATE")?;
331
332 let del_sql = format!(
333 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
334 table
335 );
336 if let Err(e) = conn.execute(
337 &del_sql,
338 rusqlite::params![&namespace, document.subject_id.to_string()],
339 ) {
340 let _ = conn.execute_batch("ROLLBACK");
341 return Err(e);
342 }
343
344 let ins_sql = format!(
345 "INSERT INTO {} \
346 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
347 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
348 table
349 );
350 let tags_json = tags_to_json(&document.tags);
351 let metadata_json: Option<String> = document.metadata.as_ref().map(|v| v.to_string());
352
353 if let Err(e) = conn.execute(
354 &ins_sql,
355 rusqlite::params![
356 document.subject_id.to_string(),
357 document.kind.to_string(),
358 document.title.as_deref().unwrap_or(""),
359 document.body,
360 tags_json,
361 &namespace,
362 metadata_json,
363 dt_to_micros(&document.updated_at),
364 ],
365 ) {
366 let _ = conn.execute_batch("ROLLBACK");
367 return Err(e);
368 }
369
370 conn.execute_batch("COMMIT")?;
371 Ok(())
372 })
373 .await
374 }
375
376 async fn upsert_documents(
377 &self,
378 documents: Vec<TextDocument>,
379 ) -> Result<BatchWriteSummary, StorageError> {
380 let table = self.table_name.clone();
381 let attempted = documents.len() as u64;
382
383 self.with_writer("fts_upsert_batch", move |conn| {
384 let del_sql = format!(
385 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
386 table
387 );
388 let ins_sql = format!(
389 "INSERT INTO {} \
390 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
391 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
392 table
393 );
394
395 conn.execute_batch("BEGIN IMMEDIATE")?;
396 let mut affected = 0u64;
397 let mut failed = 0u64;
398
399 for doc in &documents {
400 conn.execute_batch("SAVEPOINT fts_upsert_doc")?;
401 let id_str = doc.subject_id.to_string();
402 let namespace = &doc.namespace;
403 let result = (|| {
404 conn.execute(&del_sql, rusqlite::params![namespace, &id_str])?;
405
406 let tags_json = tags_to_json(&doc.tags);
407 let metadata_json: Option<String> =
408 doc.metadata.as_ref().map(|v| v.to_string());
409
410 conn.execute(
411 &ins_sql,
412 rusqlite::params![
413 &id_str,
414 &doc.kind.to_string(),
415 doc.title.as_deref().unwrap_or(""),
416 &doc.body,
417 &tags_json,
418 namespace,
419 &metadata_json,
420 dt_to_micros(&doc.updated_at),
421 ],
422 )?;
423 Ok::<(), rusqlite::Error>(())
424 })();
425
426 match result {
427 Ok(()) => {
428 conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc")?;
429 affected += 1;
430 }
431 Err(_) => {
432 let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT fts_upsert_doc");
433 let _ = conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc");
434 failed += 1;
435 }
436 }
437 }
438
439 conn.execute_batch("COMMIT")?;
440
441 Ok(BatchWriteSummary {
442 attempted,
443 affected,
444 failed,
445 first_error: String::new(),
446 })
447 })
448 .await
449 }
450
451 async fn delete_document(
452 &self,
453 namespace: &str,
454 subject_id: Uuid,
455 ) -> Result<bool, StorageError> {
456 let namespace = namespace.to_string();
457 let table = self.table_name.clone();
458
459 self.with_writer("fts_delete", move |conn| {
460 let sql = format!(
461 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
462 table
463 );
464 let deleted =
465 conn.execute(&sql, rusqlite::params![namespace, subject_id.to_string()])?;
466 Ok(deleted > 0)
467 })
468 .await
469 }
470
471 async fn get_document(
472 &self,
473 namespace: &str,
474 subject_id: Uuid,
475 ) -> Result<Option<TextDocument>, StorageError> {
476 let namespace = namespace.to_string();
477 let table = self.table_name.clone();
478
479 self.with_reader("fts_get", move |conn| {
480 let sql = format!(
481 "SELECT subject_id, kind, title, body, tags, namespace, metadata, updated_at \
482 FROM {} WHERE namespace = ?1 AND subject_id = ?2",
483 table
484 );
485 let mut stmt = conn.prepare(&sql)?;
486 let mut rows = stmt.query(rusqlite::params![namespace, subject_id.to_string()])?;
487
488 match rows.next()? {
489 Some(row) => {
490 let id_str: String = row.get(0)?;
491 let kind_str: String = row.get(1)?;
492 let title: String = row.get(2)?;
493 let body: String = row.get(3)?;
494 let tags_json: String = row.get(4)?;
495 let ns: String = row.get(5)?;
496 let metadata_json: Option<String> = row.get(6)?;
497 let updated_at_micros: i64 = row.get(7)?;
498
499 let sid = Uuid::parse_str(&id_str).map_err(|e| {
500 rusqlite::Error::FromSqlConversionFailure(
501 0,
502 rusqlite::types::Type::Text,
503 Box::new(e),
504 )
505 })?;
506
507 let kind = kind_str.parse::<SubstrateKind>().map_err(|e| {
508 rusqlite::Error::FromSqlConversionFailure(
509 1,
510 rusqlite::types::Type::Text,
511 Box::new(e),
512 )
513 })?;
514
515 Ok(Some(TextDocument {
516 subject_id: sid,
517 kind,
518 title: if title.is_empty() { None } else { Some(title) },
519 body,
520 tags: tags_from_json(&tags_json),
521 namespace: ns,
522 metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
523 updated_at: micros_to_dt(updated_at_micros),
524 }))
525 }
526 None => Ok(None),
527 }
528 })
529 .await
530 }
531
532 async fn search(&self, request: TextSearchRequest) -> Result<Vec<TextSearchHit>, StorageError> {
533 let table = self.table_name.clone();
534
535 self.with_reader("fts_search", move |conn| {
536 let match_expr = match request.mode {
537 TextQueryMode::AnyTerm => {
538 let parts: Vec<String> = request
541 .query
542 .split_whitespace()
543 .map(sanitize_fts5_query)
544 .filter(|t| !t.is_empty())
545 .collect();
546 if parts.is_empty() {
547 return Ok(Vec::new());
548 }
549 parts.join(" OR ")
550 }
551 _ => {
552 let sanitized = sanitize_fts5_query(&request.query);
553 if sanitized.is_empty() {
554 return Ok(Vec::new());
555 }
556 match request.mode {
557 TextQueryMode::Phrase => format!("\"{}\"", sanitized),
558 TextQueryMode::Plain => sanitized,
559 TextQueryMode::AnyTerm => unreachable!(),
560 }
561 }
562 };
563
564 let snippet_expr = if request.snippet_chars == 0 {
571 "NULL AS snippet".to_string()
572 } else {
573 let chars = i32::try_from(request.snippet_chars).unwrap_or(i32::MAX);
574 format!("snippet({table}, 3, '', '', '...', {chars})")
575 };
576
577 let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
578 build_filter_clause(filter, &table, 3)
579 } else {
580 (String::new(), Vec::new())
581 };
582
583 let sql = format!(
584 "SELECT subject_id, rank, title, {snippet_expr} \
585 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
586 ORDER BY rank LIMIT ?2",
587 );
588
589 let mut stmt = conn.prepare(&sql)?;
590 stmt.raw_bind_parameter(1, &match_expr)?;
591 stmt.raw_bind_parameter(2, request.top_k as i64)?;
592
593 for (i, param) in filter_params.iter().enumerate() {
594 param
595 .to_sql()
596 .map(|val| stmt.raw_bind_parameter(3 + i, val))
597 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
598 }
599
600 let mut hits = Vec::new();
601 let mut rows = stmt.raw_query();
602 let mut rank_idx = 0u32;
603
604 while let Some(row) = rows.next()? {
605 let id_str: String = row.get(0)?;
606 let fts_rank: f64 = row.get(1)?;
607 let title: String = row.get(2)?;
608 let snippet: Option<String> = row.get(3)?;
609
610 let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
611 rusqlite::Error::FromSqlConversionFailure(
612 0,
613 rusqlite::types::Type::Text,
614 Box::new(e),
615 )
616 })?;
617
618 rank_idx += 1;
619 hits.push((subject_id, fts_rank, rank_idx, title, snippet));
620 }
621
622 let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
625 let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
626 let range = max_rank - min_rank;
627
628 let results = hits
629 .into_iter()
630 .map(|(subject_id, raw_rank, rank, title, snippet)| {
631 let score = if range.abs() < 1e-12 {
632 1.0
633 } else {
634 let t = (max_rank - raw_rank) / range;
635 0.05 + 0.95 * t
636 };
637 TextSearchHit {
638 subject_id,
639 score: DeterministicScore::from_f64(score),
640 rank,
641 title: if title.is_empty() { None } else { Some(title) },
642 snippet: snippet.filter(|s| !s.is_empty()),
643 }
644 })
645 .collect();
646
647 Ok(results)
648 })
649 .await
650 }
651
652 async fn count(&self, filter: TextFilter) -> Result<u64, StorageError> {
653 let table = self.table_name.clone();
654
655 self.with_reader("fts_count", move |conn| {
656 let (filter_clause, filter_params) = build_filter_clause(&filter, &table, 1);
657
658 let sql = if filter_clause.is_empty() {
659 format!("SELECT COUNT(*) FROM {}", table)
660 } else {
661 let where_part = filter_clause.trim_start_matches(" AND ");
662 format!("SELECT COUNT(*) FROM {} WHERE {}", table, where_part)
663 };
664
665 let mut stmt = conn.prepare(&sql)?;
666
667 for (i, param) in filter_params.iter().enumerate() {
668 param
669 .to_sql()
670 .map(|val| stmt.raw_bind_parameter(1 + i, val))
671 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
672 }
673
674 let mut rows = stmt.raw_query();
675 match rows.next()? {
676 Some(row) => {
677 let count: i64 = row.get(0)?;
678 Ok(count as u64)
679 }
680 None => Ok(0),
681 }
682 })
683 .await
684 }
685
686 async fn stats(&self) -> Result<TextIndexStats, StorageError> {
687 let table = self.table_name.clone();
688
689 self.with_reader("fts_stats", move |conn| {
690 let sql = format!("SELECT COUNT(*) FROM {}", table);
691 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
692
693 Ok(TextIndexStats {
694 document_count: count as u64,
695 needs_rebuild: false,
696 last_rebuild_at: None,
697 })
698 })
699 .await
700 }
701
702 async fn search_with_options(
703 &self,
704 request: TextSearchRequest,
705 options: TextSearchOptions,
706 ) -> Result<Vec<TextSearchHit>, StorageError> {
707 match options.gather_mode {
708 TextGatherMode::Ranked => self.search(request).await,
709 TextGatherMode::Unranked => self.search_unranked(request).await,
710 TextGatherMode::RankWithinCap => {
711 let gather_limit = options
712 .gather_limit
713 .unwrap_or(request.top_k)
714 .max(request.top_k);
715 self.search_rank_within_cap(request, gather_limit).await
716 }
717 }
718 }
719
720 async fn term_stats(
721 &self,
722 request: TextTermStatsRequest,
723 ) -> Result<Vec<TextTermStats>, StorageError> {
724 let table = self.table_name.clone();
725
726 self.with_reader("fts_term_stats", move |conn| {
727 let filter = request.filter.as_ref();
728
729 let (count_filter_clause, count_filter_params) = if let Some(f) = filter {
731 build_filter_clause(f, &table, 1)
732 } else {
733 (String::new(), Vec::new())
734 };
735
736 let document_count: u64 = {
737 let count_sql = if count_filter_clause.is_empty() {
738 format!("SELECT COUNT(*) FROM {table}")
739 } else {
740 let where_part = count_filter_clause.trim_start_matches(" AND ");
741 format!("SELECT COUNT(*) FROM {table} WHERE {where_part}")
742 };
743 let mut stmt = conn.prepare(&count_sql)?;
744 for (i, param) in count_filter_params.iter().enumerate() {
745 param
746 .to_sql()
747 .map(|val| stmt.raw_bind_parameter(1 + i, val))
748 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
749 }
750 let mut rows = stmt.raw_query();
751 match rows.next()? {
752 Some(row) => {
753 let c: i64 = row.get(0)?;
754 c as u64
755 }
756 None => 0,
757 }
758 };
759
760 let mut results = Vec::with_capacity(request.terms.len());
761 for term in &request.terms {
762 let sanitized = sanitize_fts5_query(term);
763 if sanitized.is_empty() {
764 results.push(TextTermStats {
765 term: term.clone(),
766 sanitized_term: sanitized,
767 document_frequency: 0,
768 document_count,
769 inverse_document_frequency: 0.0,
770 });
771 continue;
772 }
773
774 let (term_filter_clause, term_filter_params) = if let Some(f) = filter {
776 build_filter_clause(f, &table, 2)
777 } else {
778 (String::new(), Vec::new())
779 };
780
781 let count_sql = format!(
782 "SELECT COUNT(*) FROM {table} WHERE {table} MATCH ?1{term_filter_clause}"
783 );
784 let mut stmt = conn.prepare(&count_sql)?;
785 stmt.raw_bind_parameter(1, &sanitized)?;
786 for (i, param) in term_filter_params.iter().enumerate() {
787 param
788 .to_sql()
789 .map(|val| stmt.raw_bind_parameter(2 + i, val))
790 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
791 }
792
793 let df: u64 = {
794 let mut rows = stmt.raw_query();
795 match rows.next()? {
796 Some(row) => {
797 let c: i64 = row.get(0)?;
798 c as u64
799 }
800 None => 0,
801 }
802 };
803
804 let idf = Fts5TextSearch::bm25_idf(df, document_count);
805 results.push(TextTermStats {
806 term: term.clone(),
807 sanitized_term: sanitized,
808 document_frequency: df,
809 document_count,
810 inverse_document_frequency: idf,
811 });
812 }
813
814 Ok(results)
815 })
816 .await
817 }
818
819 async fn rebuild(&self, _scope: IndexRebuildScope) -> Result<TextIndexStats, StorageError> {
820 let table = self.table_name.clone();
821
822 self.with_writer("fts_rebuild", move |conn| {
823 let sql = format!("INSERT INTO {}({}) VALUES('rebuild')", table, table);
825 conn.execute(&sql, [])?;
826
827 let count_sql = format!("SELECT COUNT(*) FROM {}", table);
828 let count: i64 = conn.query_row(&count_sql, [], |row| row.get(0))?;
829
830 Ok(TextIndexStats {
831 document_count: count as u64,
832 needs_rebuild: false,
833 last_rebuild_at: Some(Utc::now()),
834 })
835 })
836 .await
837 }
838}
839
840impl Fts5TextSearch {
841 fn bm25_idf(df: u64, document_count: u64) -> f64 {
843 let n = document_count as f64;
844 let f = df as f64;
845 ((n - f + 0.5) / (f + 0.5) + 1.0).ln()
846 }
847
848 fn build_any_term_expr(query: &str) -> Option<String> {
850 let parts: Vec<String> = query
851 .split_whitespace()
852 .map(sanitize_fts5_query)
853 .filter(|t| !t.is_empty())
854 .collect();
855 if parts.is_empty() {
856 None
857 } else {
858 Some(parts.join(" OR "))
859 }
860 }
861
862 async fn search_unranked(
864 &self,
865 request: TextSearchRequest,
866 ) -> Result<Vec<TextSearchHit>, StorageError> {
867 let table = self.table_name.clone();
868
869 self.with_reader("fts_search_unranked", move |conn| {
870 let match_expr = match request.mode {
871 TextQueryMode::AnyTerm => match Self::build_any_term_expr(&request.query) {
872 Some(e) => e,
873 None => return Ok(Vec::new()),
874 },
875 _ => {
876 let sanitized = sanitize_fts5_query(&request.query);
877 if sanitized.is_empty() {
878 return Ok(Vec::new());
879 }
880 match request.mode {
881 TextQueryMode::Phrase => format!("\"{}\"", sanitized),
882 TextQueryMode::Plain => sanitized,
883 TextQueryMode::AnyTerm => unreachable!(),
884 }
885 }
886 };
887
888 let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
889 build_filter_clause(filter, &table, 3)
890 } else {
891 (String::new(), Vec::new())
892 };
893
894 let sql = format!(
896 "SELECT subject_id, title \
897 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
898 LIMIT ?2",
899 );
900
901 let mut stmt = conn.prepare(&sql)?;
902 stmt.raw_bind_parameter(1, &match_expr)?;
903 stmt.raw_bind_parameter(2, request.top_k as i64)?;
904
905 for (i, param) in filter_params.iter().enumerate() {
906 param
907 .to_sql()
908 .map(|val| stmt.raw_bind_parameter(3 + i, val))
909 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
910 }
911
912 let mut results = Vec::new();
913 let mut rows = stmt.raw_query();
914 let mut rank_idx = 0u32;
915
916 while let Some(row) = rows.next()? {
917 let id_str: String = row.get(0)?;
918 let title: String = row.get(1)?;
919
920 let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
921 rusqlite::Error::FromSqlConversionFailure(
922 0,
923 rusqlite::types::Type::Text,
924 Box::new(e),
925 )
926 })?;
927
928 rank_idx += 1;
929 results.push(TextSearchHit {
930 subject_id,
931 score: DeterministicScore::from_f64(1.0),
932 rank: rank_idx,
933 title: if title.is_empty() { None } else { Some(title) },
934 snippet: None,
935 });
936 }
937
938 Ok(results)
939 })
940 .await
941 }
942
943 async fn search_rank_within_cap(
945 &self,
946 request: TextSearchRequest,
947 gather_limit: u32,
948 ) -> Result<Vec<TextSearchHit>, StorageError> {
949 let table = self.table_name.clone();
950
951 self.with_reader("fts_search_rank_within_cap", move |conn| {
952 let match_expr = match request.mode {
953 TextQueryMode::AnyTerm => match Self::build_any_term_expr(&request.query) {
954 Some(e) => e,
955 None => return Ok(Vec::new()),
956 },
957 _ => {
958 let sanitized = sanitize_fts5_query(&request.query);
959 if sanitized.is_empty() {
960 return Ok(Vec::new());
961 }
962 match request.mode {
963 TextQueryMode::Phrase => format!("\"{}\"", sanitized),
964 TextQueryMode::Plain => sanitized,
965 TextQueryMode::AnyTerm => unreachable!(),
966 }
967 }
968 };
969
970 let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
971 build_filter_clause(filter, &table, 3)
972 } else {
973 (String::new(), Vec::new())
974 };
975
976 let gather_sql = format!(
978 "SELECT subject_id FROM {table} WHERE {table} MATCH ?1{filter_clause} LIMIT ?2"
979 );
980
981 let mut stmt = conn.prepare(&gather_sql)?;
982 stmt.raw_bind_parameter(1, &match_expr)?;
983 stmt.raw_bind_parameter(2, gather_limit as i64)?;
984 for (i, param) in filter_params.iter().enumerate() {
985 param
986 .to_sql()
987 .map(|val| stmt.raw_bind_parameter(3 + i, val))
988 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
989 }
990
991 let mut gathered_ids: Vec<String> = Vec::new();
992 let mut rows = stmt.raw_query();
993 while let Some(row) = rows.next()? {
994 gathered_ids.push(row.get::<_, String>(0)?);
995 }
996
997 if gathered_ids.is_empty() {
998 return Ok(Vec::new());
999 }
1000
1001 let snippet_expr = if request.snippet_chars == 0 {
1003 "NULL AS snippet".to_string()
1004 } else {
1005 let chars = i32::try_from(request.snippet_chars).unwrap_or(i32::MAX);
1006 format!("snippet({table}, 3, '', '', '...', {chars})")
1007 };
1008
1009 let id_placeholders: Vec<String> = gathered_ids
1011 .iter()
1012 .enumerate()
1013 .map(|(i, _)| format!("?{}", 3 + i))
1014 .collect();
1015 let in_clause = id_placeholders.join(", ");
1016
1017 let rank_sql = format!(
1018 "SELECT subject_id, rank, title, {snippet_expr} \
1019 FROM {table} WHERE {table} MATCH ?1 AND subject_id IN ({in_clause}) \
1020 ORDER BY rank LIMIT ?2"
1021 );
1022
1023 let mut stmt2 = conn.prepare(&rank_sql)?;
1024 stmt2.raw_bind_parameter(1, &match_expr)?;
1025 stmt2.raw_bind_parameter(2, request.top_k as i64)?;
1026 for (i, id_str) in gathered_ids.iter().enumerate() {
1027 stmt2.raw_bind_parameter(3 + i, id_str.as_str())?;
1028 }
1029
1030 let mut hits = Vec::new();
1031 let mut rows2 = stmt2.raw_query();
1032 let mut rank_idx = 0u32;
1033
1034 while let Some(row) = rows2.next()? {
1035 let id_str: String = row.get(0)?;
1036 let fts_rank: f64 = row.get(1)?;
1037 let title: String = row.get(2)?;
1038 let snippet: Option<String> = row.get(3)?;
1039
1040 let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
1041 rusqlite::Error::FromSqlConversionFailure(
1042 0,
1043 rusqlite::types::Type::Text,
1044 Box::new(e),
1045 )
1046 })?;
1047
1048 rank_idx += 1;
1049 hits.push((subject_id, fts_rank, rank_idx, title, snippet));
1050 }
1051
1052 let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
1054 let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
1055 let range = max_rank - min_rank;
1056
1057 let results = hits
1058 .into_iter()
1059 .map(|(subject_id, raw_rank, rank, title, snippet)| {
1060 let score = if range.abs() < 1e-12 {
1061 1.0
1062 } else {
1063 let t = (max_rank - raw_rank) / range;
1064 0.05 + 0.95 * t
1065 };
1066 TextSearchHit {
1067 subject_id,
1068 score: DeterministicScore::from_f64(score),
1069 rank,
1070 title: if title.is_empty() { None } else { Some(title) },
1071 snippet: snippet.filter(|s| !s.is_empty()),
1072 }
1073 })
1074 .collect();
1075
1076 Ok(results)
1077 })
1078 .await
1079 }
1080
1081 #[allow(dead_code)]
1092 pub(crate) async fn rename_namespace(
1093 &self,
1094 old_namespace: &str,
1095 new_namespace: &str,
1096 ) -> Result<u64, StorageError> {
1097 if old_namespace == new_namespace {
1098 return Ok(0);
1099 }
1100 let table = self.table_name.clone();
1101 let old_ns = old_namespace.to_string();
1102 let new_ns = new_namespace.to_string();
1103
1104 self.with_writer("fts_rename_namespace", move |conn| {
1105 let sel_sql = format!(
1106 "SELECT subject_id, kind, title, body, tags, metadata, updated_at \
1107 FROM {} WHERE namespace = ?1",
1108 table
1109 );
1110 struct Row {
1111 subject_id: String,
1112 kind: String,
1113 title: String,
1114 body: String,
1115 tags: String,
1116 metadata: Option<String>,
1117 updated_at: i64,
1118 }
1119 let rows: Vec<Row> = {
1120 let mut stmt = conn.prepare(&sel_sql)?;
1121 let iter = stmt.query_map(rusqlite::params![&old_ns], |row| {
1122 Ok(Row {
1123 subject_id: row.get(0)?,
1124 kind: row.get(1)?,
1125 title: row.get(2)?,
1126 body: row.get(3)?,
1127 tags: row.get(4)?,
1128 metadata: row.get(5)?,
1129 updated_at: row.get(6)?,
1130 })
1131 })?;
1132 iter.collect::<Result<Vec<_>, _>>()?
1133 };
1134 let moved = rows.len() as u64;
1135 if moved == 0 {
1136 return Ok(0u64);
1137 }
1138
1139 conn.execute_batch("BEGIN IMMEDIATE")?;
1140
1141 let del_sql = format!("DELETE FROM {} WHERE namespace = ?1", table);
1142 if let Err(e) = conn.execute(&del_sql, rusqlite::params![&old_ns]) {
1143 let _ = conn.execute_batch("ROLLBACK");
1144 return Err(e);
1145 }
1146
1147 let ins_sql = format!(
1148 "INSERT INTO {} \
1149 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
1150 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1151 table
1152 );
1153 for row in &rows {
1154 if let Err(e) = conn.execute(
1155 &ins_sql,
1156 rusqlite::params![
1157 row.subject_id,
1158 row.kind,
1159 row.title,
1160 row.body,
1161 row.tags,
1162 &new_ns,
1163 row.metadata,
1164 row.updated_at,
1165 ],
1166 ) {
1167 let _ = conn.execute_batch("ROLLBACK");
1168 return Err(e);
1169 }
1170 }
1171
1172 conn.execute_batch("COMMIT")?;
1173 Ok(moved)
1174 })
1175 .await
1176 }
1177}
1178
1179#[cfg(test)]
1180#[path = "text_tests.rs"]
1181mod tests;