1use std::sync::Arc;
43
44use async_trait::async_trait;
45use chrono::{DateTime, TimeZone, Utc};
46use uuid::Uuid;
47
48use khive_score::DeterministicScore;
49use khive_storage::error::StorageError;
50use khive_storage::types::{
51 BatchWriteSummary, IndexRebuildScope, TextDocument, TextFilter, TextIndexStats, TextQueryMode,
52 TextSearchHit, TextSearchRequest,
53};
54use khive_storage::StorageCapability;
55use khive_storage::TextSearch;
56use khive_types::SubstrateKind;
57
58use crate::error::SqliteError;
59use crate::pool::ConnectionPool;
60
61#[cfg(test)]
65pub(crate) fn ensure_fts5_schema(
66 conn: &rusqlite::Connection,
67 table_key: &str,
68) -> Result<(), rusqlite::Error> {
69 let table_name = format!("fts_{}", table_key);
70 let ddl = format!(
71 "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5(\
72 subject_id UNINDEXED, \
73 kind UNINDEXED, \
74 title, \
75 body, \
76 tags UNINDEXED, \
77 namespace UNINDEXED, \
78 metadata UNINDEXED, \
79 updated_at UNINDEXED\
80 )",
81 table_name
82 );
83 conn.execute_batch(&ddl)
84}
85
86fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
87 StorageError::driver(StorageCapability::Text, op, e)
88}
89
90fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
91 StorageError::driver(StorageCapability::Text, op, e)
92}
93
94pub struct Fts5TextSearch {
100 pool: Arc<ConnectionPool>,
101 is_file_backed: bool,
102 table_name: String,
103}
104
105impl Fts5TextSearch {
106 pub(crate) fn new(pool: Arc<ConnectionPool>, is_file_backed: bool, table_key: String) -> Self {
110 let table_name = format!("fts_{}", table_key);
111 Self {
112 pool,
113 is_file_backed,
114 table_name,
115 }
116 }
117
118 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
119 let config = self.pool.config();
120 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
121 operation: "fts_writer".into(),
122 message: "in-memory databases do not support standalone connections".into(),
123 })?;
124
125 let conn = rusqlite::Connection::open_with_flags(
126 path,
127 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
128 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
129 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
130 )
131 .map_err(|e| map_err(e, "open_fts_writer"))?;
132
133 conn.busy_timeout(config.busy_timeout)
134 .map_err(|e| map_err(e, "open_fts_writer"))?;
135 conn.pragma_update(None, "foreign_keys", "ON")
136 .map_err(|e| map_err(e, "open_fts_writer"))?;
137 conn.pragma_update(None, "synchronous", "NORMAL")
138 .map_err(|e| map_err(e, "open_fts_writer"))?;
139
140 Ok(conn)
141 }
142
143 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
144 let config = self.pool.config();
145 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
146 operation: "fts_reader".into(),
147 message: "in-memory databases do not support standalone connections".into(),
148 })?;
149
150 let conn = rusqlite::Connection::open_with_flags(
151 path,
152 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
153 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
154 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
155 )
156 .map_err(|e| map_err(e, "open_fts_reader"))?;
157
158 conn.busy_timeout(config.busy_timeout)
159 .map_err(|e| map_err(e, "open_fts_reader"))?;
160 conn.pragma_update(None, "foreign_keys", "ON")
161 .map_err(|e| map_err(e, "open_fts_reader"))?;
162 conn.pragma_update(None, "synchronous", "NORMAL")
163 .map_err(|e| map_err(e, "open_fts_reader"))?;
164
165 Ok(conn)
166 }
167
168 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
169 where
170 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
171 R: Send + 'static,
172 {
173 if self.is_file_backed {
174 let conn = self.open_standalone_writer()?;
175 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
176 .await
177 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
178 } else {
179 let pool = Arc::clone(&self.pool);
180 tokio::task::spawn_blocking(move || {
181 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
182 f(guard.conn()).map_err(|e| map_err(e, op))
183 })
184 .await
185 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
186 }
187 }
188
189 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
190 where
191 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
192 R: Send + 'static,
193 {
194 if self.is_file_backed {
195 let conn = self.open_standalone_reader()?;
196 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
197 .await
198 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
199 } else {
200 let pool = Arc::clone(&self.pool);
201 tokio::task::spawn_blocking(move || {
202 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
203 f(guard.conn()).map_err(|e| map_err(e, op))
204 })
205 .await
206 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
207 }
208 }
209}
210
211fn tags_to_json(tags: &[String]) -> String {
214 serde_json::to_string(tags).unwrap_or_else(|_| "[]".to_string())
215}
216
217fn tags_from_json(s: &str) -> Vec<String> {
218 serde_json::from_str(s).unwrap_or_default()
219}
220
221fn dt_to_micros(dt: &DateTime<Utc>) -> i64 {
222 dt.timestamp_micros()
223}
224
225fn micros_to_dt(micros: i64) -> DateTime<Utc> {
226 Utc.timestamp_micros(micros)
227 .single()
228 .unwrap_or_else(Utc::now)
229}
230
231fn sanitize_fts5_query(query: &str) -> String {
236 let sanitized: String = query
237 .chars()
238 .filter(|c| {
239 !matches!(c, '*' | '"' | '(' | ')' | '+' | '-' | ':' | '^' | '\0') && !c.is_control()
240 })
241 .collect();
242 sanitized
243 .split_whitespace()
244 .filter(|t| {
245 !matches!(
246 t.to_ascii_uppercase().as_str(),
247 "AND" | "OR" | "NOT" | "NEAR"
248 )
249 })
250 .collect::<Vec<_>>()
251 .join(" ")
252}
253
254fn build_filter_clause(
259 filter: &TextFilter,
260 table: &str,
261 start_idx: usize,
262) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
263 let mut conditions: Vec<String> = Vec::new();
264 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
265 let mut idx = start_idx;
266
267 if !filter.ids.is_empty() {
268 let placeholders: Vec<String> = filter
269 .ids
270 .iter()
271 .map(|_| {
272 let p = format!("?{}", idx);
273 idx += 1;
274 p
275 })
276 .collect();
277 conditions.push(format!(
278 "{}.subject_id IN ({})",
279 table,
280 placeholders.join(", ")
281 ));
282 for id in &filter.ids {
283 params.push(Box::new(id.to_string()));
284 }
285 }
286
287 if !filter.kinds.is_empty() {
288 let placeholders: Vec<String> = filter
289 .kinds
290 .iter()
291 .map(|_| {
292 let p = format!("?{}", idx);
293 idx += 1;
294 p
295 })
296 .collect();
297 conditions.push(format!("{}.kind IN ({})", table, placeholders.join(", ")));
298 for kind in &filter.kinds {
299 params.push(Box::new(kind.to_string()));
300 }
301 }
302
303 if !filter.namespaces.is_empty() {
304 let placeholders: Vec<String> = filter
305 .namespaces
306 .iter()
307 .map(|_| {
308 let p = format!("?{}", idx);
309 idx += 1;
310 p
311 })
312 .collect();
313 conditions.push(format!(
314 "{}.namespace IN ({})",
315 table,
316 placeholders.join(", ")
317 ));
318 for ns in &filter.namespaces {
319 params.push(Box::new(ns.clone()));
320 }
321 }
322
323 if conditions.is_empty() {
324 (String::new(), params)
325 } else {
326 (format!(" AND {}", conditions.join(" AND ")), params)
327 }
328}
329
330#[async_trait]
331impl TextSearch for Fts5TextSearch {
332 async fn upsert_document(&self, document: TextDocument) -> Result<(), StorageError> {
333 let table = self.table_name.clone();
334 let namespace = document.namespace.clone();
335
336 self.with_writer("fts_upsert", move |conn| {
337 conn.execute_batch("BEGIN IMMEDIATE")?;
338
339 let del_sql = format!(
340 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
341 table
342 );
343 if let Err(e) = conn.execute(
344 &del_sql,
345 rusqlite::params![&namespace, document.subject_id.to_string()],
346 ) {
347 let _ = conn.execute_batch("ROLLBACK");
348 return Err(e);
349 }
350
351 let ins_sql = format!(
352 "INSERT INTO {} \
353 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
354 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
355 table
356 );
357 let tags_json = tags_to_json(&document.tags);
358 let metadata_json: Option<String> = document.metadata.as_ref().map(|v| v.to_string());
359
360 if let Err(e) = conn.execute(
361 &ins_sql,
362 rusqlite::params![
363 document.subject_id.to_string(),
364 document.kind.to_string(),
365 document.title.as_deref().unwrap_or(""),
366 document.body,
367 tags_json,
368 &namespace,
369 metadata_json,
370 dt_to_micros(&document.updated_at),
371 ],
372 ) {
373 let _ = conn.execute_batch("ROLLBACK");
374 return Err(e);
375 }
376
377 conn.execute_batch("COMMIT")?;
378 Ok(())
379 })
380 .await
381 }
382
383 async fn upsert_documents(
384 &self,
385 documents: Vec<TextDocument>,
386 ) -> Result<BatchWriteSummary, StorageError> {
387 let table = self.table_name.clone();
388 let attempted = documents.len() as u64;
389
390 self.with_writer("fts_upsert_batch", move |conn| {
391 let del_sql = format!(
392 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
393 table
394 );
395 let ins_sql = format!(
396 "INSERT INTO {} \
397 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
398 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
399 table
400 );
401
402 conn.execute_batch("BEGIN IMMEDIATE")?;
403 let mut affected = 0u64;
404 let mut failed = 0u64;
405
406 for doc in &documents {
407 conn.execute_batch("SAVEPOINT fts_upsert_doc")?;
408 let id_str = doc.subject_id.to_string();
409 let namespace = &doc.namespace;
410 let result = (|| {
411 conn.execute(&del_sql, rusqlite::params![namespace, &id_str])?;
412
413 let tags_json = tags_to_json(&doc.tags);
414 let metadata_json: Option<String> =
415 doc.metadata.as_ref().map(|v| v.to_string());
416
417 conn.execute(
418 &ins_sql,
419 rusqlite::params![
420 &id_str,
421 &doc.kind.to_string(),
422 doc.title.as_deref().unwrap_or(""),
423 &doc.body,
424 &tags_json,
425 namespace,
426 &metadata_json,
427 dt_to_micros(&doc.updated_at),
428 ],
429 )?;
430 Ok::<(), rusqlite::Error>(())
431 })();
432
433 match result {
434 Ok(()) => {
435 conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc")?;
436 affected += 1;
437 }
438 Err(_) => {
439 let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT fts_upsert_doc");
440 let _ = conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc");
441 failed += 1;
442 }
443 }
444 }
445
446 conn.execute_batch("COMMIT")?;
447
448 Ok(BatchWriteSummary {
449 attempted,
450 affected,
451 failed,
452 first_error: String::new(),
453 })
454 })
455 .await
456 }
457
458 async fn delete_document(
459 &self,
460 namespace: &str,
461 subject_id: Uuid,
462 ) -> Result<bool, StorageError> {
463 let namespace = namespace.to_string();
464 let table = self.table_name.clone();
465
466 self.with_writer("fts_delete", move |conn| {
467 let sql = format!(
468 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
469 table
470 );
471 let deleted =
472 conn.execute(&sql, rusqlite::params![namespace, subject_id.to_string()])?;
473 Ok(deleted > 0)
474 })
475 .await
476 }
477
478 async fn get_document(
479 &self,
480 namespace: &str,
481 subject_id: Uuid,
482 ) -> Result<Option<TextDocument>, StorageError> {
483 let namespace = namespace.to_string();
484 let table = self.table_name.clone();
485
486 self.with_reader("fts_get", move |conn| {
487 let sql = format!(
488 "SELECT subject_id, kind, title, body, tags, namespace, metadata, updated_at \
489 FROM {} WHERE namespace = ?1 AND subject_id = ?2",
490 table
491 );
492 let mut stmt = conn.prepare(&sql)?;
493 let mut rows = stmt.query(rusqlite::params![namespace, subject_id.to_string()])?;
494
495 match rows.next()? {
496 Some(row) => {
497 let id_str: String = row.get(0)?;
498 let kind_str: String = row.get(1)?;
499 let title: String = row.get(2)?;
500 let body: String = row.get(3)?;
501 let tags_json: String = row.get(4)?;
502 let ns: String = row.get(5)?;
503 let metadata_json: Option<String> = row.get(6)?;
504 let updated_at_micros: i64 = row.get(7)?;
505
506 let sid = Uuid::parse_str(&id_str).map_err(|e| {
507 rusqlite::Error::FromSqlConversionFailure(
508 0,
509 rusqlite::types::Type::Text,
510 Box::new(e),
511 )
512 })?;
513
514 let kind = kind_str.parse::<SubstrateKind>().map_err(|e| {
515 rusqlite::Error::FromSqlConversionFailure(
516 1,
517 rusqlite::types::Type::Text,
518 Box::new(e),
519 )
520 })?;
521
522 Ok(Some(TextDocument {
523 subject_id: sid,
524 kind,
525 title: if title.is_empty() { None } else { Some(title) },
526 body,
527 tags: tags_from_json(&tags_json),
528 namespace: ns,
529 metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
530 updated_at: micros_to_dt(updated_at_micros),
531 }))
532 }
533 None => Ok(None),
534 }
535 })
536 .await
537 }
538
539 async fn search(&self, request: TextSearchRequest) -> Result<Vec<TextSearchHit>, StorageError> {
540 let table = self.table_name.clone();
541
542 self.with_reader("fts_search", move |conn| {
543 let sanitized = sanitize_fts5_query(&request.query);
544 if sanitized.is_empty() {
545 return Ok(Vec::new());
546 }
547
548 let match_expr = match request.mode {
549 TextQueryMode::Phrase => format!("\"{}\"", sanitized),
550 TextQueryMode::Plain => sanitized,
551 };
552
553 let snippet_chars = request.snippet_chars.max(1) as i32;
555
556 let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
557 build_filter_clause(filter, &table, 3)
558 } else {
559 (String::new(), Vec::new())
560 };
561
562 let sql = format!(
563 "SELECT subject_id, rank, title, snippet({table}, 3, '', '', '...', {snippet_chars}) \
564 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
565 ORDER BY rank LIMIT ?2",
566 );
567
568 let mut stmt = conn.prepare(&sql)?;
569 stmt.raw_bind_parameter(1, &match_expr)?;
570 stmt.raw_bind_parameter(2, request.top_k as i64)?;
571
572 for (i, param) in filter_params.iter().enumerate() {
573 param
574 .to_sql()
575 .map(|val| stmt.raw_bind_parameter(3 + i, val))
576 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
577 }
578
579 let mut hits = Vec::new();
580 let mut rows = stmt.raw_query();
581 let mut rank_idx = 0u32;
582
583 while let Some(row) = rows.next()? {
584 let id_str: String = row.get(0)?;
585 let fts_rank: f64 = row.get(1)?;
586 let title: String = row.get(2)?;
587 let snippet: String = row.get(3)?;
588
589 let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
590 rusqlite::Error::FromSqlConversionFailure(
591 0,
592 rusqlite::types::Type::Text,
593 Box::new(e),
594 )
595 })?;
596
597 rank_idx += 1;
598 hits.push((subject_id, fts_rank, rank_idx, title, snippet));
599 }
600
601 let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
604 let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
605 let range = max_rank - min_rank;
606
607 let results = hits
608 .into_iter()
609 .map(|(subject_id, raw_rank, rank, title, snippet)| {
610 let score = if range.abs() < 1e-12 {
611 1.0
612 } else {
613 let t = (max_rank - raw_rank) / range;
614 0.05 + 0.95 * t
615 };
616 TextSearchHit {
617 subject_id,
618 score: DeterministicScore::from_f64(score),
619 rank,
620 title: if title.is_empty() { None } else { Some(title) },
621 snippet: if snippet.is_empty() { None } else { Some(snippet) },
622 }
623 })
624 .collect();
625
626 Ok(results)
627 })
628 .await
629 }
630
631 async fn count(&self, filter: TextFilter) -> Result<u64, StorageError> {
632 let table = self.table_name.clone();
633
634 self.with_reader("fts_count", move |conn| {
635 let (filter_clause, filter_params) = build_filter_clause(&filter, &table, 1);
636
637 let sql = if filter_clause.is_empty() {
638 format!("SELECT COUNT(*) FROM {}", table)
639 } else {
640 let where_part = filter_clause.trim_start_matches(" AND ");
641 format!("SELECT COUNT(*) FROM {} WHERE {}", table, where_part)
642 };
643
644 let mut stmt = conn.prepare(&sql)?;
645
646 for (i, param) in filter_params.iter().enumerate() {
647 param
648 .to_sql()
649 .map(|val| stmt.raw_bind_parameter(1 + i, val))
650 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
651 }
652
653 let mut rows = stmt.raw_query();
654 match rows.next()? {
655 Some(row) => {
656 let count: i64 = row.get(0)?;
657 Ok(count as u64)
658 }
659 None => Ok(0),
660 }
661 })
662 .await
663 }
664
665 async fn stats(&self) -> Result<TextIndexStats, StorageError> {
666 let table = self.table_name.clone();
667
668 self.with_reader("fts_stats", move |conn| {
669 let sql = format!("SELECT COUNT(*) FROM {}", table);
670 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
671
672 Ok(TextIndexStats {
673 document_count: count as u64,
674 needs_rebuild: false,
675 last_rebuild_at: None,
676 })
677 })
678 .await
679 }
680
681 async fn rebuild(&self, _scope: IndexRebuildScope) -> Result<TextIndexStats, StorageError> {
682 let table = self.table_name.clone();
683
684 self.with_writer("fts_rebuild", move |conn| {
685 let sql = format!("INSERT INTO {}({}) VALUES('rebuild')", table, table);
687 conn.execute(&sql, [])?;
688
689 let count_sql = format!("SELECT COUNT(*) FROM {}", table);
690 let count: i64 = conn.query_row(&count_sql, [], |row| row.get(0))?;
691
692 Ok(TextIndexStats {
693 document_count: count as u64,
694 needs_rebuild: false,
695 last_rebuild_at: Some(Utc::now()),
696 })
697 })
698 .await
699 }
700}
701
702impl Fts5TextSearch {
703 #[allow(dead_code)]
713 pub(crate) async fn rename_namespace(
714 &self,
715 old_namespace: &str,
716 new_namespace: &str,
717 ) -> Result<u64, StorageError> {
718 if old_namespace == new_namespace {
719 return Ok(0);
720 }
721 let table = self.table_name.clone();
722 let old_ns = old_namespace.to_string();
723 let new_ns = new_namespace.to_string();
724
725 self.with_writer("fts_rename_namespace", move |conn| {
726 let sel_sql = format!(
727 "SELECT subject_id, kind, title, body, tags, metadata, updated_at \
728 FROM {} WHERE namespace = ?1",
729 table
730 );
731 struct Row {
732 subject_id: String,
733 kind: String,
734 title: String,
735 body: String,
736 tags: String,
737 metadata: Option<String>,
738 updated_at: i64,
739 }
740 let rows: Vec<Row> = {
741 let mut stmt = conn.prepare(&sel_sql)?;
742 let iter = stmt.query_map(rusqlite::params![&old_ns], |row| {
743 Ok(Row {
744 subject_id: row.get(0)?,
745 kind: row.get(1)?,
746 title: row.get(2)?,
747 body: row.get(3)?,
748 tags: row.get(4)?,
749 metadata: row.get(5)?,
750 updated_at: row.get(6)?,
751 })
752 })?;
753 iter.collect::<Result<Vec<_>, _>>()?
754 };
755 let moved = rows.len() as u64;
756 if moved == 0 {
757 return Ok(0u64);
758 }
759
760 conn.execute_batch("BEGIN IMMEDIATE")?;
761
762 let del_sql = format!("DELETE FROM {} WHERE namespace = ?1", table);
763 if let Err(e) = conn.execute(&del_sql, rusqlite::params![&old_ns]) {
764 let _ = conn.execute_batch("ROLLBACK");
765 return Err(e);
766 }
767
768 let ins_sql = format!(
769 "INSERT INTO {} \
770 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
771 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
772 table
773 );
774 for row in &rows {
775 if let Err(e) = conn.execute(
776 &ins_sql,
777 rusqlite::params![
778 row.subject_id,
779 row.kind,
780 row.title,
781 row.body,
782 row.tags,
783 &new_ns,
784 row.metadata,
785 row.updated_at,
786 ],
787 ) {
788 let _ = conn.execute_batch("ROLLBACK");
789 return Err(e);
790 }
791 }
792
793 conn.execute_batch("COMMIT")?;
794 Ok(moved)
795 })
796 .await
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803 use crate::pool::PoolConfig;
804
805 fn setup_memory_store(table_key: &str) -> Fts5TextSearch {
806 let config = PoolConfig {
807 path: None,
808 ..PoolConfig::default()
809 };
810 let pool = Arc::new(ConnectionPool::new(config).unwrap());
811
812 {
813 let writer = pool.writer().unwrap();
814 ensure_fts5_schema(writer.conn(), table_key).unwrap();
815 }
816
817 Fts5TextSearch::new(pool, false, table_key.to_string())
818 }
819
820 fn make_document(subject_id: Uuid, title: &str, body: &str) -> TextDocument {
821 TextDocument {
822 subject_id,
823 kind: SubstrateKind::Note,
824 title: if title.is_empty() {
825 None
826 } else {
827 Some(title.to_string())
828 },
829 body: body.to_string(),
830 tags: vec![],
831 namespace: "test_ns".to_string(),
832 metadata: None,
833 updated_at: Utc::now(),
834 }
835 }
836
837 fn ns_filter(namespace: &str) -> TextFilter {
838 TextFilter {
839 namespaces: vec![namespace.to_string()],
840 ..TextFilter::default()
841 }
842 }
843
844 #[tokio::test]
845 async fn test_upsert_and_search() {
846 let store = setup_memory_store("upsert_search");
847
848 let id = Uuid::new_v4();
849 let doc = TextDocument {
850 subject_id: id,
851 kind: SubstrateKind::Entity,
852 title: Some("Rust Programming".to_string()),
853 body: "Rust is a systems programming language focused on safety and performance."
854 .to_string(),
855 tags: vec!["rust".to_string(), "programming".to_string()],
856 namespace: "tech".to_string(),
857 metadata: None,
858 updated_at: Utc::now(),
859 };
860
861 store.upsert_document(doc).await.unwrap();
862
863 let hits = store
864 .search(TextSearchRequest {
865 query: "Rust programming".to_string(),
866 mode: TextQueryMode::Plain,
867 filter: Some(ns_filter("tech")),
868 top_k: 10,
869 snippet_chars: 64,
870 })
871 .await
872 .unwrap();
873
874 assert_eq!(hits.len(), 1);
875 assert_eq!(hits[0].subject_id, id);
876 assert_eq!(hits[0].rank, 1);
877 assert!(hits[0].score.to_f64() > 0.0);
878 assert!(hits[0].title.is_some());
879 }
880
881 #[tokio::test]
882 async fn test_phrase_search() {
883 let store = setup_memory_store("phrase");
884
885 let id1 = Uuid::new_v4();
886 let id2 = Uuid::new_v4();
887
888 store
889 .upsert_document(make_document(
890 id1,
891 "Animals",
892 "The quick brown fox jumps over the lazy dog.",
893 ))
894 .await
895 .unwrap();
896
897 store
898 .upsert_document(make_document(
899 id2,
900 "Colors",
901 "The brown paint was quick to dry, unlike the fox.",
902 ))
903 .await
904 .unwrap();
905
906 let hits = store
907 .search(TextSearchRequest {
908 query: "quick brown fox".to_string(),
909 mode: TextQueryMode::Phrase,
910 filter: Some(ns_filter("test_ns")),
911 top_k: 10,
912 snippet_chars: 64,
913 })
914 .await
915 .unwrap();
916
917 assert_eq!(hits.len(), 1);
918 assert_eq!(hits[0].subject_id, id1);
919
920 let hits = store
921 .search(TextSearchRequest {
922 query: "quick brown fox".to_string(),
923 mode: TextQueryMode::Plain,
924 filter: Some(ns_filter("test_ns")),
925 top_k: 10,
926 snippet_chars: 64,
927 })
928 .await
929 .unwrap();
930
931 assert_eq!(hits.len(), 2);
932 }
933
934 #[tokio::test]
935 async fn test_delete_document() {
936 let store = setup_memory_store("delete");
937
938 let id1 = Uuid::new_v4();
939 let id2 = Uuid::new_v4();
940
941 store
942 .upsert_document(make_document(id1, "Doc One", "First document content."))
943 .await
944 .unwrap();
945 store
946 .upsert_document(make_document(id2, "Doc Two", "Second document content."))
947 .await
948 .unwrap();
949
950 let stats = store.stats().await.unwrap();
951 assert_eq!(stats.document_count, 2);
952
953 let deleted = store.delete_document("test_ns", id1).await.unwrap();
954 assert!(deleted);
955
956 let stats = store.stats().await.unwrap();
957 assert_eq!(stats.document_count, 1);
958
959 let deleted_again = store.delete_document("test_ns", id1).await.unwrap();
960 assert!(!deleted_again);
961
962 let doc = store.get_document("test_ns", id2).await.unwrap();
963 assert!(doc.is_some());
964
965 let doc = store.get_document("test_ns", id1).await.unwrap();
966 assert!(doc.is_none());
967 }
968
969 #[tokio::test]
970 async fn test_count_with_filter() {
971 let store = setup_memory_store("count_filter");
972 let ns = "test_ns".to_string();
973
974 for i in 0..5 {
975 let kind = if i % 2 == 0 {
976 SubstrateKind::Entity
977 } else {
978 SubstrateKind::Note
979 };
980 let doc = TextDocument {
981 subject_id: Uuid::new_v4(),
982 kind,
983 title: Some(format!("Doc {}", i)),
984 body: format!("Content for document number {}", i),
985 tags: vec![],
986 namespace: ns.clone(),
987 metadata: None,
988 updated_at: Utc::now(),
989 };
990 store.upsert_document(doc).await.unwrap();
991 }
992
993 let total = store
994 .count(TextFilter {
995 namespaces: vec![ns.clone()],
996 ..TextFilter::default()
997 })
998 .await
999 .unwrap();
1000 assert_eq!(total, 5);
1001
1002 let entities = store
1003 .count(TextFilter {
1004 namespaces: vec![ns.clone()],
1005 kinds: vec![SubstrateKind::Entity],
1006 ..TextFilter::default()
1007 })
1008 .await
1009 .unwrap();
1010 assert_eq!(entities, 3);
1011
1012 let notes = store
1013 .count(TextFilter {
1014 namespaces: vec![ns.clone()],
1015 kinds: vec![SubstrateKind::Note],
1016 ..TextFilter::default()
1017 })
1018 .await
1019 .unwrap();
1020 assert_eq!(notes, 2);
1021 }
1022
1023 #[tokio::test]
1024 async fn test_get_document_roundtrip() {
1025 let store = setup_memory_store("get_roundtrip");
1026
1027 let id = Uuid::new_v4();
1028 let original = TextDocument {
1029 subject_id: id,
1030 kind: SubstrateKind::Note,
1031 title: Some("Important Memo".to_string()),
1032 body: "This memo contains critical information.".to_string(),
1033 tags: vec!["important".to_string(), "memo".to_string()],
1034 namespace: "work".to_string(),
1035 metadata: Some(serde_json::json!({"priority": "high"})),
1036 updated_at: Utc::now(),
1037 };
1038
1039 store.upsert_document(original.clone()).await.unwrap();
1040
1041 let retrieved = store.get_document("work", id).await.unwrap().unwrap();
1042 assert_eq!(retrieved.subject_id, id);
1043 assert_eq!(retrieved.kind, SubstrateKind::Note);
1044 assert_eq!(retrieved.title, Some("Important Memo".to_string()));
1045 assert_eq!(retrieved.body, "This memo contains critical information.");
1046 assert_eq!(retrieved.tags, vec!["important", "memo"]);
1047 assert_eq!(retrieved.namespace, "work");
1048 }
1049
1050 #[tokio::test]
1051 async fn test_upsert_replaces_existing() {
1052 let store = setup_memory_store("replace");
1053
1054 let id = Uuid::new_v4();
1055 store
1056 .upsert_document(make_document(id, "Original", "Original body text."))
1057 .await
1058 .unwrap();
1059
1060 store
1061 .upsert_document(make_document(id, "Updated", "Updated body text."))
1062 .await
1063 .unwrap();
1064
1065 let stats = store.stats().await.unwrap();
1066 assert_eq!(stats.document_count, 1);
1067
1068 let doc = store.get_document("test_ns", id).await.unwrap().unwrap();
1069 assert_eq!(doc.title, Some("Updated".to_string()));
1070 assert_eq!(doc.body, "Updated body text.");
1071 }
1072
1073 #[tokio::test]
1074 async fn test_batch_upsert() {
1075 let store = setup_memory_store("batch");
1076
1077 let docs: Vec<TextDocument> = (0..50)
1078 .map(|i| TextDocument {
1079 subject_id: Uuid::new_v4(),
1080 kind: SubstrateKind::Entity,
1081 title: Some(format!("Item {}", i)),
1082 body: format!("This is the body content for item number {}", i),
1083 tags: vec![format!("tag_{}", i % 5)],
1084 namespace: "batch_ns".to_string(),
1085 metadata: None,
1086 updated_at: Utc::now(),
1087 })
1088 .collect();
1089
1090 let summary = store.upsert_documents(docs).await.unwrap();
1091 assert_eq!(summary.attempted, 50);
1092 assert_eq!(summary.affected, 50);
1093 assert_eq!(summary.failed, 0);
1094
1095 let stats = store.stats().await.unwrap();
1096 assert_eq!(stats.document_count, 50);
1097 }
1098
1099 #[tokio::test]
1100 async fn test_empty_search() {
1101 let store = setup_memory_store("empty");
1102
1103 let hits = store
1104 .search(TextSearchRequest {
1105 query: "nonexistent".to_string(),
1106 mode: TextQueryMode::Plain,
1107 filter: Some(ns_filter("test_ns")),
1108 top_k: 10,
1109 snippet_chars: 64,
1110 })
1111 .await
1112 .unwrap();
1113
1114 assert!(hits.is_empty());
1115 }
1116
1117 #[tokio::test]
1118 async fn test_rebuild() {
1119 let store = setup_memory_store("rebuild");
1120
1121 store
1122 .upsert_document(make_document(
1123 Uuid::new_v4(),
1124 "Test",
1125 "Test document for rebuild.",
1126 ))
1127 .await
1128 .unwrap();
1129
1130 let stats = store.rebuild(IndexRebuildScope::Full).await.unwrap();
1131 assert_eq!(stats.document_count, 1);
1132 assert!(!stats.needs_rebuild);
1133 assert!(stats.last_rebuild_at.is_some());
1134 }
1135
1136 #[tokio::test]
1137 async fn test_search_with_kind_filter() {
1138 let store = setup_memory_store("filter_kind");
1139
1140 let id_entity = Uuid::new_v4();
1141 let id_note = Uuid::new_v4();
1142
1143 store
1144 .upsert_document(TextDocument {
1145 subject_id: id_entity,
1146 kind: SubstrateKind::Entity,
1147 title: Some("Rust Guide".to_string()),
1148 body: "A comprehensive guide to Rust programming.".to_string(),
1149 tags: vec![],
1150 namespace: "test_ns".to_string(),
1151 metadata: None,
1152 updated_at: Utc::now(),
1153 })
1154 .await
1155 .unwrap();
1156
1157 store
1158 .upsert_document(TextDocument {
1159 subject_id: id_note,
1160 kind: SubstrateKind::Note,
1161 title: Some("Rust Notes".to_string()),
1162 body: "Quick notes about Rust concepts.".to_string(),
1163 tags: vec![],
1164 namespace: "test_ns".to_string(),
1165 metadata: None,
1166 updated_at: Utc::now(),
1167 })
1168 .await
1169 .unwrap();
1170
1171 let hits = store
1172 .search(TextSearchRequest {
1173 query: "Rust".to_string(),
1174 mode: TextQueryMode::Plain,
1175 filter: Some(TextFilter {
1176 kinds: vec![SubstrateKind::Entity],
1177 namespaces: vec!["test_ns".to_string()],
1178 ..TextFilter::default()
1179 }),
1180 top_k: 10,
1181 snippet_chars: 64,
1182 })
1183 .await
1184 .unwrap();
1185
1186 assert_eq!(hits.len(), 1);
1187 assert_eq!(hits[0].subject_id, id_entity);
1188 }
1189
1190 #[tokio::test]
1191 async fn test_sanitize_fts5_query() {
1192 assert_eq!(sanitize_fts5_query("hello world"), "hello world");
1193 assert_eq!(sanitize_fts5_query("hello*world"), "helloworld");
1194 assert_eq!(sanitize_fts5_query("\"quoted\""), "quoted");
1195 assert_eq!(sanitize_fts5_query("(parens)"), "parens");
1196 assert_eq!(sanitize_fts5_query("a + b - c"), "a b c");
1197 assert_eq!(sanitize_fts5_query("col:value"), "colvalue");
1198 assert_eq!(sanitize_fts5_query(""), "");
1199 assert_eq!(sanitize_fts5_query("***"), "");
1200 }
1201
1202 #[tokio::test]
1203 async fn test_score_is_bounded() {
1204 let store = setup_memory_store("score_bounds");
1205
1206 for i in 0..5 {
1207 store
1208 .upsert_document(make_document(
1209 Uuid::new_v4(),
1210 &format!("Doc {}", i),
1211 &format!("This document discusses topic number {}", i),
1212 ))
1213 .await
1214 .unwrap();
1215 }
1216
1217 let hits = store
1218 .search(TextSearchRequest {
1219 query: "document topic".to_string(),
1220 mode: TextQueryMode::Plain,
1221 filter: Some(ns_filter("test_ns")),
1222 top_k: 10,
1223 snippet_chars: 64,
1224 })
1225 .await
1226 .unwrap();
1227
1228 for hit in &hits {
1229 let score = hit.score.to_f64();
1230 assert!(
1231 score > 0.0 && score <= 1.0,
1232 "score out of (0, 1] range: {}",
1233 score
1234 );
1235 }
1236
1237 for (i, hit) in hits.iter().enumerate() {
1238 assert_eq!(hit.rank, (i + 1) as u32);
1239 }
1240 }
1241
1242 #[tokio::test]
1243 async fn test_rename_namespace() {
1244 let store = setup_memory_store("rename_ns");
1245
1246 let id = Uuid::new_v4();
1247 let doc = TextDocument {
1248 subject_id: id,
1249 kind: SubstrateKind::Note,
1250 title: Some("Rename test".to_string()),
1251 body: "keyword_unique_xyz".to_string(),
1252 tags: vec![],
1253 namespace: "old_ns".to_string(),
1254 metadata: None,
1255 updated_at: Utc::now(),
1256 };
1257 store.upsert_document(doc).await.unwrap();
1258
1259 let before = store
1260 .search(TextSearchRequest {
1261 query: "keyword_unique_xyz".to_string(),
1262 mode: TextQueryMode::Plain,
1263 filter: Some(ns_filter("old_ns")),
1264 top_k: 10,
1265 snippet_chars: 64,
1266 })
1267 .await
1268 .unwrap();
1269 assert_eq!(before.len(), 1);
1270
1271 let moved = store.rename_namespace("old_ns", "new_ns").await.unwrap();
1272 assert_eq!(moved, 1);
1273
1274 let after_new = store
1275 .search(TextSearchRequest {
1276 query: "keyword_unique_xyz".to_string(),
1277 mode: TextQueryMode::Plain,
1278 filter: Some(ns_filter("new_ns")),
1279 top_k: 10,
1280 snippet_chars: 64,
1281 })
1282 .await
1283 .unwrap();
1284 assert_eq!(after_new.len(), 1);
1285
1286 let after_old = store
1287 .search(TextSearchRequest {
1288 query: "keyword_unique_xyz".to_string(),
1289 mode: TextQueryMode::Plain,
1290 filter: Some(ns_filter("old_ns")),
1291 top_k: 10,
1292 snippet_chars: 64,
1293 })
1294 .await
1295 .unwrap();
1296 assert!(after_old.is_empty());
1297 }
1298
1299 #[tokio::test]
1300 async fn test_metadata_none_roundtrip() {
1301 let store = setup_memory_store("meta_none");
1302 let id = uuid::Uuid::new_v4();
1303 let doc = TextDocument {
1304 subject_id: id,
1305 kind: SubstrateKind::Note,
1306 namespace: "test_ns".to_string(),
1307 title: None,
1308 body: "no metadata".to_string(),
1309 tags: vec![],
1310 metadata: None,
1311 updated_at: Utc::now(),
1312 };
1313 store.upsert_document(doc).await.unwrap();
1314 let fetched = store.get_document("test_ns", id).await.unwrap().unwrap();
1315 assert!(fetched.metadata.is_none());
1316 }
1317
1318 #[tokio::test]
1319 async fn test_rename_namespace_noop() {
1320 let store = setup_memory_store("rename_noop");
1321
1322 let id = Uuid::new_v4();
1323 let doc = TextDocument {
1324 subject_id: id,
1325 kind: SubstrateKind::Note,
1326 title: None,
1327 body: "noop_test_content".to_string(),
1328 tags: vec![],
1329 namespace: "same_ns".to_string(),
1330 metadata: None,
1331 updated_at: Utc::now(),
1332 };
1333 store.upsert_document(doc).await.unwrap();
1334
1335 let moved = store.rename_namespace("same_ns", "same_ns").await.unwrap();
1336 assert_eq!(moved, 0);
1337
1338 let hits = store
1339 .search(TextSearchRequest {
1340 query: "noop_test_content".to_string(),
1341 mode: TextQueryMode::Plain,
1342 filter: Some(ns_filter("same_ns")),
1343 top_k: 10,
1344 snippet_chars: 64,
1345 })
1346 .await
1347 .unwrap();
1348 assert_eq!(hits.len(), 1);
1349 }
1350
1351 #[tokio::test]
1355 async fn test_score_normalization_range() {
1356 let store = setup_memory_store("score_range");
1357
1358 let id1 = Uuid::new_v4();
1360 let id2 = Uuid::new_v4();
1361 let id3 = Uuid::new_v4();
1362 store
1363 .upsert_document(make_document(
1364 id1,
1365 "normtest topic",
1366 "normtest normtest normtest",
1367 ))
1368 .await
1369 .unwrap();
1370 store
1371 .upsert_document(make_document(
1372 id2,
1373 "normtest light",
1374 "other content without the keyword",
1375 ))
1376 .await
1377 .unwrap();
1378 store
1379 .upsert_document(make_document(
1380 id3,
1381 "irrelevant title",
1382 "completely different document content",
1383 ))
1384 .await
1385 .unwrap();
1386
1387 let hits = store
1388 .search(TextSearchRequest {
1389 query: "normtest".to_string(),
1390 mode: TextQueryMode::Plain,
1391 filter: Some(ns_filter("test_ns")),
1392 top_k: 10,
1393 snippet_chars: 64,
1394 })
1395 .await
1396 .unwrap();
1397
1398 assert!(!hits.is_empty(), "at least one doc must match");
1400 assert!(
1401 hits.iter().all(|h| h.subject_id != id3),
1402 "id3 must not appear"
1403 );
1404
1405 for h in &hits {
1407 let s = h.score.to_f64();
1408 assert!(s > 0.0 && s <= 1.0, "score out of (0,1]: {s}");
1409 }
1410 for (i, h) in hits.iter().enumerate() {
1412 assert_eq!(h.rank, (i + 1) as u32, "rank must equal position+1");
1413 }
1414 assert!(
1417 hits[0].score.to_f64() > 0.99,
1418 "top hit must score ≈ 1.0, got {}",
1419 hits[0].score.to_f64()
1420 );
1421
1422 let single_id = Uuid::new_v4();
1425 store
1426 .upsert_document(make_document(
1427 single_id,
1428 "xqzplurp_unique_marker",
1429 "xqzplurp_unique_marker body",
1430 ))
1431 .await
1432 .unwrap();
1433 let single = store
1434 .search(TextSearchRequest {
1435 query: "xqzplurp_unique_marker".to_string(),
1436 mode: TextQueryMode::Plain,
1437 filter: Some(ns_filter("test_ns")),
1438 top_k: 10,
1439 snippet_chars: 64,
1440 })
1441 .await
1442 .unwrap();
1443 assert_eq!(single.len(), 1);
1444 assert!(
1445 single[0].score.to_f64() > 0.99,
1446 "single-hit must score ≈ 1.0, got {}",
1447 single[0].score.to_f64()
1448 );
1449 }
1450}