1use std::sync::Arc;
39
40use async_trait::async_trait;
41use chrono::{DateTime, TimeZone, Utc};
42use uuid::Uuid;
43
44use khive_score::DeterministicScore;
45use khive_storage::error::StorageError;
46use khive_storage::types::{
47 BatchWriteSummary, IndexRebuildScope, TextDocument, TextFilter, TextIndexStats, TextQueryMode,
48 TextSearchHit, TextSearchRequest,
49};
50use khive_storage::StorageCapability;
51use khive_storage::TextSearch;
52use khive_types::SubstrateKind;
53
54use crate::error::SqliteError;
55use crate::pool::ConnectionPool;
56
57#[cfg(test)]
61pub(crate) fn ensure_fts5_schema(
62 conn: &rusqlite::Connection,
63 table_key: &str,
64) -> Result<(), rusqlite::Error> {
65 let table_name = format!("fts_{}", table_key);
66 let ddl = format!(
67 "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5(\
68 subject_id UNINDEXED, \
69 kind UNINDEXED, \
70 title, \
71 body, \
72 tags UNINDEXED, \
73 namespace UNINDEXED, \
74 metadata UNINDEXED, \
75 updated_at UNINDEXED\
76 )",
77 table_name
78 );
79 conn.execute_batch(&ddl)
80}
81
82fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
83 StorageError::driver(StorageCapability::Text, op, e)
84}
85
86fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
87 StorageError::driver(StorageCapability::Text, op, e)
88}
89
90pub struct Fts5TextSearch {
96 pool: Arc<ConnectionPool>,
97 is_file_backed: bool,
98 table_name: String,
99}
100
101impl Fts5TextSearch {
102 pub(crate) fn new(pool: Arc<ConnectionPool>, is_file_backed: bool, table_key: String) -> Self {
106 let table_name = format!("fts_{}", table_key);
107 Self {
108 pool,
109 is_file_backed,
110 table_name,
111 }
112 }
113
114 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
115 let config = self.pool.config();
116 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
117 operation: "fts_writer".into(),
118 message: "in-memory databases do not support standalone connections".into(),
119 })?;
120
121 let conn = rusqlite::Connection::open_with_flags(
122 path,
123 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
124 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
125 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
126 )
127 .map_err(|e| map_err(e, "open_fts_writer"))?;
128
129 conn.busy_timeout(config.busy_timeout)
130 .map_err(|e| map_err(e, "open_fts_writer"))?;
131 conn.pragma_update(None, "foreign_keys", "ON")
132 .map_err(|e| map_err(e, "open_fts_writer"))?;
133 conn.pragma_update(None, "synchronous", "NORMAL")
134 .map_err(|e| map_err(e, "open_fts_writer"))?;
135
136 Ok(conn)
137 }
138
139 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
140 let config = self.pool.config();
141 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
142 operation: "fts_reader".into(),
143 message: "in-memory databases do not support standalone connections".into(),
144 })?;
145
146 let conn = rusqlite::Connection::open_with_flags(
147 path,
148 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
149 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
150 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
151 )
152 .map_err(|e| map_err(e, "open_fts_reader"))?;
153
154 conn.busy_timeout(config.busy_timeout)
155 .map_err(|e| map_err(e, "open_fts_reader"))?;
156 conn.pragma_update(None, "foreign_keys", "ON")
157 .map_err(|e| map_err(e, "open_fts_reader"))?;
158 conn.pragma_update(None, "synchronous", "NORMAL")
159 .map_err(|e| map_err(e, "open_fts_reader"))?;
160
161 Ok(conn)
162 }
163
164 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
165 where
166 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
167 R: Send + 'static,
168 {
169 if self.is_file_backed {
170 let conn = self.open_standalone_writer()?;
171 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
172 .await
173 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
174 } else {
175 let pool = Arc::clone(&self.pool);
176 tokio::task::spawn_blocking(move || {
177 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
178 f(guard.conn()).map_err(|e| map_err(e, op))
179 })
180 .await
181 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
182 }
183 }
184
185 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
186 where
187 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
188 R: Send + 'static,
189 {
190 if self.is_file_backed {
191 let conn = self.open_standalone_reader()?;
192 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
193 .await
194 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
195 } else {
196 let pool = Arc::clone(&self.pool);
197 tokio::task::spawn_blocking(move || {
198 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
199 f(guard.conn()).map_err(|e| map_err(e, op))
200 })
201 .await
202 .map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
203 }
204 }
205}
206
207fn tags_to_json(tags: &[String]) -> String {
210 serde_json::to_string(tags).unwrap_or_else(|_| "[]".to_string())
211}
212
213fn tags_from_json(s: &str) -> Vec<String> {
214 serde_json::from_str(s).unwrap_or_default()
215}
216
217fn dt_to_micros(dt: &DateTime<Utc>) -> i64 {
218 dt.timestamp_micros()
219}
220
221fn micros_to_dt(micros: i64) -> DateTime<Utc> {
222 Utc.timestamp_micros(micros)
223 .single()
224 .unwrap_or_else(Utc::now)
225}
226
227fn sanitize_fts5_query(query: &str) -> String {
232 let sanitized: String = query
233 .chars()
234 .filter(|c| {
235 !matches!(c, '*' | '"' | '(' | ')' | '+' | '-' | ':' | '^' | '\0') && !c.is_control()
236 })
237 .collect();
238 sanitized
239 .split_whitespace()
240 .filter(|t| {
241 !matches!(
242 t.to_ascii_uppercase().as_str(),
243 "AND" | "OR" | "NOT" | "NEAR"
244 )
245 })
246 .collect::<Vec<_>>()
247 .join(" ")
248}
249
250fn build_filter_clause(
255 filter: &TextFilter,
256 table: &str,
257 start_idx: usize,
258) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
259 let mut conditions: Vec<String> = Vec::new();
260 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
261 let mut idx = start_idx;
262
263 if !filter.ids.is_empty() {
264 let placeholders: Vec<String> = filter
265 .ids
266 .iter()
267 .map(|_| {
268 let p = format!("?{}", idx);
269 idx += 1;
270 p
271 })
272 .collect();
273 conditions.push(format!(
274 "{}.subject_id IN ({})",
275 table,
276 placeholders.join(", ")
277 ));
278 for id in &filter.ids {
279 params.push(Box::new(id.to_string()));
280 }
281 }
282
283 if !filter.kinds.is_empty() {
284 let placeholders: Vec<String> = filter
285 .kinds
286 .iter()
287 .map(|_| {
288 let p = format!("?{}", idx);
289 idx += 1;
290 p
291 })
292 .collect();
293 conditions.push(format!("{}.kind IN ({})", table, placeholders.join(", ")));
294 for kind in &filter.kinds {
295 params.push(Box::new(kind.to_string()));
296 }
297 }
298
299 if !filter.namespaces.is_empty() {
300 let placeholders: Vec<String> = filter
301 .namespaces
302 .iter()
303 .map(|_| {
304 let p = format!("?{}", idx);
305 idx += 1;
306 p
307 })
308 .collect();
309 conditions.push(format!(
310 "{}.namespace IN ({})",
311 table,
312 placeholders.join(", ")
313 ));
314 for ns in &filter.namespaces {
315 params.push(Box::new(ns.clone()));
316 }
317 }
318
319 if conditions.is_empty() {
320 (String::new(), params)
321 } else {
322 (format!(" AND {}", conditions.join(" AND ")), params)
323 }
324}
325
326#[async_trait]
327impl TextSearch for Fts5TextSearch {
328 async fn upsert_document(&self, document: TextDocument) -> Result<(), StorageError> {
329 let table = self.table_name.clone();
330 let namespace = document.namespace.clone();
331
332 self.with_writer("fts_upsert", move |conn| {
333 conn.execute_batch("BEGIN IMMEDIATE")?;
334
335 let del_sql = format!(
336 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
337 table
338 );
339 if let Err(e) = conn.execute(
340 &del_sql,
341 rusqlite::params![&namespace, document.subject_id.to_string()],
342 ) {
343 let _ = conn.execute_batch("ROLLBACK");
344 return Err(e);
345 }
346
347 let ins_sql = format!(
348 "INSERT INTO {} \
349 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
350 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
351 table
352 );
353 let tags_json = tags_to_json(&document.tags);
354 let metadata_json: Option<String> = document.metadata.as_ref().map(|v| v.to_string());
355
356 if let Err(e) = conn.execute(
357 &ins_sql,
358 rusqlite::params![
359 document.subject_id.to_string(),
360 document.kind.to_string(),
361 document.title.as_deref().unwrap_or(""),
362 document.body,
363 tags_json,
364 &namespace,
365 metadata_json,
366 dt_to_micros(&document.updated_at),
367 ],
368 ) {
369 let _ = conn.execute_batch("ROLLBACK");
370 return Err(e);
371 }
372
373 conn.execute_batch("COMMIT")?;
374 Ok(())
375 })
376 .await
377 }
378
379 async fn upsert_documents(
380 &self,
381 documents: Vec<TextDocument>,
382 ) -> Result<BatchWriteSummary, StorageError> {
383 let table = self.table_name.clone();
384 let attempted = documents.len() as u64;
385
386 self.with_writer("fts_upsert_batch", move |conn| {
387 let del_sql = format!(
388 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
389 table
390 );
391 let ins_sql = format!(
392 "INSERT INTO {} \
393 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
394 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
395 table
396 );
397
398 conn.execute_batch("BEGIN IMMEDIATE")?;
399 let mut affected = 0u64;
400 let mut failed = 0u64;
401
402 for doc in &documents {
403 conn.execute_batch("SAVEPOINT fts_upsert_doc")?;
404 let id_str = doc.subject_id.to_string();
405 let namespace = &doc.namespace;
406 let result = (|| {
407 conn.execute(&del_sql, rusqlite::params![namespace, &id_str])?;
408
409 let tags_json = tags_to_json(&doc.tags);
410 let metadata_json: Option<String> =
411 doc.metadata.as_ref().map(|v| v.to_string());
412
413 conn.execute(
414 &ins_sql,
415 rusqlite::params![
416 &id_str,
417 &doc.kind.to_string(),
418 doc.title.as_deref().unwrap_or(""),
419 &doc.body,
420 &tags_json,
421 namespace,
422 &metadata_json,
423 dt_to_micros(&doc.updated_at),
424 ],
425 )?;
426 Ok::<(), rusqlite::Error>(())
427 })();
428
429 match result {
430 Ok(()) => {
431 conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc")?;
432 affected += 1;
433 }
434 Err(_) => {
435 let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT fts_upsert_doc");
436 let _ = conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc");
437 failed += 1;
438 }
439 }
440 }
441
442 conn.execute_batch("COMMIT")?;
443
444 Ok(BatchWriteSummary {
445 attempted,
446 affected,
447 failed,
448 first_error: String::new(),
449 })
450 })
451 .await
452 }
453
454 async fn delete_document(
455 &self,
456 namespace: &str,
457 subject_id: Uuid,
458 ) -> Result<bool, StorageError> {
459 let namespace = namespace.to_string();
460 let table = self.table_name.clone();
461
462 self.with_writer("fts_delete", move |conn| {
463 let sql = format!(
464 "DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
465 table
466 );
467 let deleted =
468 conn.execute(&sql, rusqlite::params![namespace, subject_id.to_string()])?;
469 Ok(deleted > 0)
470 })
471 .await
472 }
473
474 async fn get_document(
475 &self,
476 namespace: &str,
477 subject_id: Uuid,
478 ) -> Result<Option<TextDocument>, StorageError> {
479 let namespace = namespace.to_string();
480 let table = self.table_name.clone();
481
482 self.with_reader("fts_get", move |conn| {
483 let sql = format!(
484 "SELECT subject_id, kind, title, body, tags, namespace, metadata, updated_at \
485 FROM {} WHERE namespace = ?1 AND subject_id = ?2",
486 table
487 );
488 let mut stmt = conn.prepare(&sql)?;
489 let mut rows = stmt.query(rusqlite::params![namespace, subject_id.to_string()])?;
490
491 match rows.next()? {
492 Some(row) => {
493 let id_str: String = row.get(0)?;
494 let kind_str: String = row.get(1)?;
495 let title: String = row.get(2)?;
496 let body: String = row.get(3)?;
497 let tags_json: String = row.get(4)?;
498 let ns: String = row.get(5)?;
499 let metadata_json: Option<String> = row.get(6)?;
500 let updated_at_micros: i64 = row.get(7)?;
501
502 let sid = Uuid::parse_str(&id_str).map_err(|e| {
503 rusqlite::Error::FromSqlConversionFailure(
504 0,
505 rusqlite::types::Type::Text,
506 Box::new(e),
507 )
508 })?;
509
510 let kind = kind_str.parse::<SubstrateKind>().map_err(|e| {
511 rusqlite::Error::FromSqlConversionFailure(
512 1,
513 rusqlite::types::Type::Text,
514 Box::new(e),
515 )
516 })?;
517
518 Ok(Some(TextDocument {
519 subject_id: sid,
520 kind,
521 title: if title.is_empty() { None } else { Some(title) },
522 body,
523 tags: tags_from_json(&tags_json),
524 namespace: ns,
525 metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
526 updated_at: micros_to_dt(updated_at_micros),
527 }))
528 }
529 None => Ok(None),
530 }
531 })
532 .await
533 }
534
535 async fn search(&self, request: TextSearchRequest) -> Result<Vec<TextSearchHit>, StorageError> {
536 let table = self.table_name.clone();
537
538 self.with_reader("fts_search", move |conn| {
539 let sanitized = sanitize_fts5_query(&request.query);
540 if sanitized.is_empty() {
541 return Ok(Vec::new());
542 }
543
544 let match_expr = match request.mode {
545 TextQueryMode::Phrase => format!("\"{}\"", sanitized),
546 TextQueryMode::Plain => sanitized,
547 };
548
549 let snippet_chars = request.snippet_chars.max(1) as i32;
551
552 let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
553 build_filter_clause(filter, &table, 3)
554 } else {
555 (String::new(), Vec::new())
556 };
557
558 let sql = format!(
559 "SELECT subject_id, rank, title, snippet({table}, 3, '', '', '...', {snippet_chars}) \
560 FROM {table} WHERE {table} MATCH ?1{filter_clause} \
561 ORDER BY rank LIMIT ?2",
562 );
563
564 let mut stmt = conn.prepare(&sql)?;
565 stmt.raw_bind_parameter(1, &match_expr)?;
566 stmt.raw_bind_parameter(2, request.top_k as i64)?;
567
568 for (i, param) in filter_params.iter().enumerate() {
569 param
570 .to_sql()
571 .map(|val| stmt.raw_bind_parameter(3 + i, val))
572 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
573 }
574
575 let mut hits = Vec::new();
576 let mut rows = stmt.raw_query();
577 let mut rank_idx = 0u32;
578
579 while let Some(row) = rows.next()? {
580 let id_str: String = row.get(0)?;
581 let fts_rank: f64 = row.get(1)?;
582 let title: String = row.get(2)?;
583 let snippet: String = row.get(3)?;
584
585 let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
586 rusqlite::Error::FromSqlConversionFailure(
587 0,
588 rusqlite::types::Type::Text,
589 Box::new(e),
590 )
591 })?;
592
593 let score = 1.0 / (1.0 + fts_rank.abs());
596
597 rank_idx += 1;
598 hits.push(TextSearchHit {
599 subject_id,
600 score: DeterministicScore::from_f64(score),
601 rank: rank_idx,
602 title: if title.is_empty() { None } else { Some(title) },
603 snippet: if snippet.is_empty() {
604 None
605 } else {
606 Some(snippet)
607 },
608 });
609 }
610
611 Ok(hits)
612 })
613 .await
614 }
615
616 async fn count(&self, filter: TextFilter) -> Result<u64, StorageError> {
617 let table = self.table_name.clone();
618
619 self.with_reader("fts_count", move |conn| {
620 let (filter_clause, filter_params) = build_filter_clause(&filter, &table, 1);
621
622 let sql = if filter_clause.is_empty() {
623 format!("SELECT COUNT(*) FROM {}", table)
624 } else {
625 let where_part = filter_clause.trim_start_matches(" AND ");
626 format!("SELECT COUNT(*) FROM {} WHERE {}", table, where_part)
627 };
628
629 let mut stmt = conn.prepare(&sql)?;
630
631 for (i, param) in filter_params.iter().enumerate() {
632 param
633 .to_sql()
634 .map(|val| stmt.raw_bind_parameter(1 + i, val))
635 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
636 }
637
638 let mut rows = stmt.raw_query();
639 match rows.next()? {
640 Some(row) => {
641 let count: i64 = row.get(0)?;
642 Ok(count as u64)
643 }
644 None => Ok(0),
645 }
646 })
647 .await
648 }
649
650 async fn stats(&self) -> Result<TextIndexStats, StorageError> {
651 let table = self.table_name.clone();
652
653 self.with_reader("fts_stats", move |conn| {
654 let sql = format!("SELECT COUNT(*) FROM {}", table);
655 let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
656
657 Ok(TextIndexStats {
658 document_count: count as u64,
659 needs_rebuild: false,
660 last_rebuild_at: None,
661 })
662 })
663 .await
664 }
665
666 async fn rebuild(&self, _scope: IndexRebuildScope) -> Result<TextIndexStats, StorageError> {
667 let table = self.table_name.clone();
668
669 self.with_writer("fts_rebuild", move |conn| {
670 let sql = format!("INSERT INTO {}({}) VALUES('rebuild')", table, table);
672 conn.execute(&sql, [])?;
673
674 let count_sql = format!("SELECT COUNT(*) FROM {}", table);
675 let count: i64 = conn.query_row(&count_sql, [], |row| row.get(0))?;
676
677 Ok(TextIndexStats {
678 document_count: count as u64,
679 needs_rebuild: false,
680 last_rebuild_at: Some(Utc::now()),
681 })
682 })
683 .await
684 }
685}
686
687impl Fts5TextSearch {
688 #[allow(dead_code)]
698 pub(crate) async fn rename_namespace(
699 &self,
700 old_namespace: &str,
701 new_namespace: &str,
702 ) -> Result<u64, StorageError> {
703 if old_namespace == new_namespace {
704 return Ok(0);
705 }
706 let table = self.table_name.clone();
707 let old_ns = old_namespace.to_string();
708 let new_ns = new_namespace.to_string();
709
710 self.with_writer("fts_rename_namespace", move |conn| {
711 let sel_sql = format!(
712 "SELECT subject_id, kind, title, body, tags, metadata, updated_at \
713 FROM {} WHERE namespace = ?1",
714 table
715 );
716 struct Row {
717 subject_id: String,
718 kind: String,
719 title: String,
720 body: String,
721 tags: String,
722 metadata: Option<String>,
723 updated_at: i64,
724 }
725 let rows: Vec<Row> = {
726 let mut stmt = conn.prepare(&sel_sql)?;
727 let iter = stmt.query_map(rusqlite::params![&old_ns], |row| {
728 Ok(Row {
729 subject_id: row.get(0)?,
730 kind: row.get(1)?,
731 title: row.get(2)?,
732 body: row.get(3)?,
733 tags: row.get(4)?,
734 metadata: row.get(5)?,
735 updated_at: row.get(6)?,
736 })
737 })?;
738 iter.collect::<Result<Vec<_>, _>>()?
739 };
740 let moved = rows.len() as u64;
741 if moved == 0 {
742 return Ok(0u64);
743 }
744
745 conn.execute_batch("BEGIN IMMEDIATE")?;
746
747 let del_sql = format!("DELETE FROM {} WHERE namespace = ?1", table);
748 if let Err(e) = conn.execute(&del_sql, rusqlite::params![&old_ns]) {
749 let _ = conn.execute_batch("ROLLBACK");
750 return Err(e);
751 }
752
753 let ins_sql = format!(
754 "INSERT INTO {} \
755 (subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
756 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
757 table
758 );
759 for row in &rows {
760 if let Err(e) = conn.execute(
761 &ins_sql,
762 rusqlite::params![
763 row.subject_id,
764 row.kind,
765 row.title,
766 row.body,
767 row.tags,
768 &new_ns,
769 row.metadata,
770 row.updated_at,
771 ],
772 ) {
773 let _ = conn.execute_batch("ROLLBACK");
774 return Err(e);
775 }
776 }
777
778 conn.execute_batch("COMMIT")?;
779 Ok(moved)
780 })
781 .await
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use super::*;
788 use crate::pool::PoolConfig;
789
790 fn setup_memory_store(table_key: &str) -> Fts5TextSearch {
791 let config = PoolConfig {
792 path: None,
793 ..PoolConfig::default()
794 };
795 let pool = Arc::new(ConnectionPool::new(config).unwrap());
796
797 {
798 let writer = pool.writer().unwrap();
799 ensure_fts5_schema(writer.conn(), table_key).unwrap();
800 }
801
802 Fts5TextSearch::new(pool, false, table_key.to_string())
803 }
804
805 fn make_document(subject_id: Uuid, title: &str, body: &str) -> TextDocument {
806 TextDocument {
807 subject_id,
808 kind: SubstrateKind::Note,
809 title: if title.is_empty() {
810 None
811 } else {
812 Some(title.to_string())
813 },
814 body: body.to_string(),
815 tags: vec![],
816 namespace: "test_ns".to_string(),
817 metadata: None,
818 updated_at: Utc::now(),
819 }
820 }
821
822 fn ns_filter(namespace: &str) -> TextFilter {
823 TextFilter {
824 namespaces: vec![namespace.to_string()],
825 ..TextFilter::default()
826 }
827 }
828
829 #[tokio::test]
830 async fn test_upsert_and_search() {
831 let store = setup_memory_store("upsert_search");
832
833 let id = Uuid::new_v4();
834 let doc = TextDocument {
835 subject_id: id,
836 kind: SubstrateKind::Entity,
837 title: Some("Rust Programming".to_string()),
838 body: "Rust is a systems programming language focused on safety and performance."
839 .to_string(),
840 tags: vec!["rust".to_string(), "programming".to_string()],
841 namespace: "tech".to_string(),
842 metadata: None,
843 updated_at: Utc::now(),
844 };
845
846 store.upsert_document(doc).await.unwrap();
847
848 let hits = store
849 .search(TextSearchRequest {
850 query: "Rust programming".to_string(),
851 mode: TextQueryMode::Plain,
852 filter: Some(ns_filter("tech")),
853 top_k: 10,
854 snippet_chars: 64,
855 })
856 .await
857 .unwrap();
858
859 assert_eq!(hits.len(), 1);
860 assert_eq!(hits[0].subject_id, id);
861 assert_eq!(hits[0].rank, 1);
862 assert!(hits[0].score.to_f64() > 0.0);
863 assert!(hits[0].title.is_some());
864 }
865
866 #[tokio::test]
867 async fn test_phrase_search() {
868 let store = setup_memory_store("phrase");
869
870 let id1 = Uuid::new_v4();
871 let id2 = Uuid::new_v4();
872
873 store
874 .upsert_document(make_document(
875 id1,
876 "Animals",
877 "The quick brown fox jumps over the lazy dog.",
878 ))
879 .await
880 .unwrap();
881
882 store
883 .upsert_document(make_document(
884 id2,
885 "Colors",
886 "The brown paint was quick to dry, unlike the fox.",
887 ))
888 .await
889 .unwrap();
890
891 let hits = store
892 .search(TextSearchRequest {
893 query: "quick brown fox".to_string(),
894 mode: TextQueryMode::Phrase,
895 filter: Some(ns_filter("test_ns")),
896 top_k: 10,
897 snippet_chars: 64,
898 })
899 .await
900 .unwrap();
901
902 assert_eq!(hits.len(), 1);
903 assert_eq!(hits[0].subject_id, id1);
904
905 let hits = store
906 .search(TextSearchRequest {
907 query: "quick brown fox".to_string(),
908 mode: TextQueryMode::Plain,
909 filter: Some(ns_filter("test_ns")),
910 top_k: 10,
911 snippet_chars: 64,
912 })
913 .await
914 .unwrap();
915
916 assert_eq!(hits.len(), 2);
917 }
918
919 #[tokio::test]
920 async fn test_delete_document() {
921 let store = setup_memory_store("delete");
922
923 let id1 = Uuid::new_v4();
924 let id2 = Uuid::new_v4();
925
926 store
927 .upsert_document(make_document(id1, "Doc One", "First document content."))
928 .await
929 .unwrap();
930 store
931 .upsert_document(make_document(id2, "Doc Two", "Second document content."))
932 .await
933 .unwrap();
934
935 let stats = store.stats().await.unwrap();
936 assert_eq!(stats.document_count, 2);
937
938 let deleted = store.delete_document("test_ns", id1).await.unwrap();
939 assert!(deleted);
940
941 let stats = store.stats().await.unwrap();
942 assert_eq!(stats.document_count, 1);
943
944 let deleted_again = store.delete_document("test_ns", id1).await.unwrap();
945 assert!(!deleted_again);
946
947 let doc = store.get_document("test_ns", id2).await.unwrap();
948 assert!(doc.is_some());
949
950 let doc = store.get_document("test_ns", id1).await.unwrap();
951 assert!(doc.is_none());
952 }
953
954 #[tokio::test]
955 async fn test_count_with_filter() {
956 let store = setup_memory_store("count_filter");
957 let ns = "test_ns".to_string();
958
959 for i in 0..5 {
960 let kind = if i % 2 == 0 {
961 SubstrateKind::Entity
962 } else {
963 SubstrateKind::Note
964 };
965 let doc = TextDocument {
966 subject_id: Uuid::new_v4(),
967 kind,
968 title: Some(format!("Doc {}", i)),
969 body: format!("Content for document number {}", i),
970 tags: vec![],
971 namespace: ns.clone(),
972 metadata: None,
973 updated_at: Utc::now(),
974 };
975 store.upsert_document(doc).await.unwrap();
976 }
977
978 let total = store
979 .count(TextFilter {
980 namespaces: vec![ns.clone()],
981 ..TextFilter::default()
982 })
983 .await
984 .unwrap();
985 assert_eq!(total, 5);
986
987 let entities = store
988 .count(TextFilter {
989 namespaces: vec![ns.clone()],
990 kinds: vec![SubstrateKind::Entity],
991 ..TextFilter::default()
992 })
993 .await
994 .unwrap();
995 assert_eq!(entities, 3);
996
997 let notes = store
998 .count(TextFilter {
999 namespaces: vec![ns.clone()],
1000 kinds: vec![SubstrateKind::Note],
1001 ..TextFilter::default()
1002 })
1003 .await
1004 .unwrap();
1005 assert_eq!(notes, 2);
1006 }
1007
1008 #[tokio::test]
1009 async fn test_get_document_roundtrip() {
1010 let store = setup_memory_store("get_roundtrip");
1011
1012 let id = Uuid::new_v4();
1013 let original = TextDocument {
1014 subject_id: id,
1015 kind: SubstrateKind::Note,
1016 title: Some("Important Memo".to_string()),
1017 body: "This memo contains critical information.".to_string(),
1018 tags: vec!["important".to_string(), "memo".to_string()],
1019 namespace: "work".to_string(),
1020 metadata: Some(serde_json::json!({"priority": "high"})),
1021 updated_at: Utc::now(),
1022 };
1023
1024 store.upsert_document(original.clone()).await.unwrap();
1025
1026 let retrieved = store.get_document("work", id).await.unwrap().unwrap();
1027 assert_eq!(retrieved.subject_id, id);
1028 assert_eq!(retrieved.kind, SubstrateKind::Note);
1029 assert_eq!(retrieved.title, Some("Important Memo".to_string()));
1030 assert_eq!(retrieved.body, "This memo contains critical information.");
1031 assert_eq!(retrieved.tags, vec!["important", "memo"]);
1032 assert_eq!(retrieved.namespace, "work");
1033 }
1034
1035 #[tokio::test]
1036 async fn test_upsert_replaces_existing() {
1037 let store = setup_memory_store("replace");
1038
1039 let id = Uuid::new_v4();
1040 store
1041 .upsert_document(make_document(id, "Original", "Original body text."))
1042 .await
1043 .unwrap();
1044
1045 store
1046 .upsert_document(make_document(id, "Updated", "Updated body text."))
1047 .await
1048 .unwrap();
1049
1050 let stats = store.stats().await.unwrap();
1051 assert_eq!(stats.document_count, 1);
1052
1053 let doc = store.get_document("test_ns", id).await.unwrap().unwrap();
1054 assert_eq!(doc.title, Some("Updated".to_string()));
1055 assert_eq!(doc.body, "Updated body text.");
1056 }
1057
1058 #[tokio::test]
1059 async fn test_batch_upsert() {
1060 let store = setup_memory_store("batch");
1061
1062 let docs: Vec<TextDocument> = (0..50)
1063 .map(|i| TextDocument {
1064 subject_id: Uuid::new_v4(),
1065 kind: SubstrateKind::Entity,
1066 title: Some(format!("Item {}", i)),
1067 body: format!("This is the body content for item number {}", i),
1068 tags: vec![format!("tag_{}", i % 5)],
1069 namespace: "batch_ns".to_string(),
1070 metadata: None,
1071 updated_at: Utc::now(),
1072 })
1073 .collect();
1074
1075 let summary = store.upsert_documents(docs).await.unwrap();
1076 assert_eq!(summary.attempted, 50);
1077 assert_eq!(summary.affected, 50);
1078 assert_eq!(summary.failed, 0);
1079
1080 let stats = store.stats().await.unwrap();
1081 assert_eq!(stats.document_count, 50);
1082 }
1083
1084 #[tokio::test]
1085 async fn test_empty_search() {
1086 let store = setup_memory_store("empty");
1087
1088 let hits = store
1089 .search(TextSearchRequest {
1090 query: "nonexistent".to_string(),
1091 mode: TextQueryMode::Plain,
1092 filter: Some(ns_filter("test_ns")),
1093 top_k: 10,
1094 snippet_chars: 64,
1095 })
1096 .await
1097 .unwrap();
1098
1099 assert!(hits.is_empty());
1100 }
1101
1102 #[tokio::test]
1103 async fn test_rebuild() {
1104 let store = setup_memory_store("rebuild");
1105
1106 store
1107 .upsert_document(make_document(
1108 Uuid::new_v4(),
1109 "Test",
1110 "Test document for rebuild.",
1111 ))
1112 .await
1113 .unwrap();
1114
1115 let stats = store.rebuild(IndexRebuildScope::Full).await.unwrap();
1116 assert_eq!(stats.document_count, 1);
1117 assert!(!stats.needs_rebuild);
1118 assert!(stats.last_rebuild_at.is_some());
1119 }
1120
1121 #[tokio::test]
1122 async fn test_search_with_kind_filter() {
1123 let store = setup_memory_store("filter_kind");
1124
1125 let id_entity = Uuid::new_v4();
1126 let id_note = Uuid::new_v4();
1127
1128 store
1129 .upsert_document(TextDocument {
1130 subject_id: id_entity,
1131 kind: SubstrateKind::Entity,
1132 title: Some("Rust Guide".to_string()),
1133 body: "A comprehensive guide to Rust programming.".to_string(),
1134 tags: vec![],
1135 namespace: "test_ns".to_string(),
1136 metadata: None,
1137 updated_at: Utc::now(),
1138 })
1139 .await
1140 .unwrap();
1141
1142 store
1143 .upsert_document(TextDocument {
1144 subject_id: id_note,
1145 kind: SubstrateKind::Note,
1146 title: Some("Rust Notes".to_string()),
1147 body: "Quick notes about Rust concepts.".to_string(),
1148 tags: vec![],
1149 namespace: "test_ns".to_string(),
1150 metadata: None,
1151 updated_at: Utc::now(),
1152 })
1153 .await
1154 .unwrap();
1155
1156 let hits = store
1157 .search(TextSearchRequest {
1158 query: "Rust".to_string(),
1159 mode: TextQueryMode::Plain,
1160 filter: Some(TextFilter {
1161 kinds: vec![SubstrateKind::Entity],
1162 namespaces: vec!["test_ns".to_string()],
1163 ..TextFilter::default()
1164 }),
1165 top_k: 10,
1166 snippet_chars: 64,
1167 })
1168 .await
1169 .unwrap();
1170
1171 assert_eq!(hits.len(), 1);
1172 assert_eq!(hits[0].subject_id, id_entity);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_sanitize_fts5_query() {
1177 assert_eq!(sanitize_fts5_query("hello world"), "hello world");
1178 assert_eq!(sanitize_fts5_query("hello*world"), "helloworld");
1179 assert_eq!(sanitize_fts5_query("\"quoted\""), "quoted");
1180 assert_eq!(sanitize_fts5_query("(parens)"), "parens");
1181 assert_eq!(sanitize_fts5_query("a + b - c"), "a b c");
1182 assert_eq!(sanitize_fts5_query("col:value"), "colvalue");
1183 assert_eq!(sanitize_fts5_query(""), "");
1184 assert_eq!(sanitize_fts5_query("***"), "");
1185 }
1186
1187 #[tokio::test]
1188 async fn test_score_is_bounded() {
1189 let store = setup_memory_store("score_bounds");
1190
1191 for i in 0..5 {
1192 store
1193 .upsert_document(make_document(
1194 Uuid::new_v4(),
1195 &format!("Doc {}", i),
1196 &format!("This document discusses topic number {}", i),
1197 ))
1198 .await
1199 .unwrap();
1200 }
1201
1202 let hits = store
1203 .search(TextSearchRequest {
1204 query: "document topic".to_string(),
1205 mode: TextQueryMode::Plain,
1206 filter: Some(ns_filter("test_ns")),
1207 top_k: 10,
1208 snippet_chars: 64,
1209 })
1210 .await
1211 .unwrap();
1212
1213 for hit in &hits {
1214 let score = hit.score.to_f64();
1215 assert!(
1216 score > 0.0 && score <= 1.0,
1217 "score out of (0, 1] range: {}",
1218 score
1219 );
1220 }
1221
1222 for (i, hit) in hits.iter().enumerate() {
1223 assert_eq!(hit.rank, (i + 1) as u32);
1224 }
1225 }
1226
1227 #[tokio::test]
1228 async fn test_rename_namespace() {
1229 let store = setup_memory_store("rename_ns");
1230
1231 let id = Uuid::new_v4();
1232 let doc = TextDocument {
1233 subject_id: id,
1234 kind: SubstrateKind::Note,
1235 title: Some("Rename test".to_string()),
1236 body: "keyword_unique_xyz".to_string(),
1237 tags: vec![],
1238 namespace: "old_ns".to_string(),
1239 metadata: None,
1240 updated_at: Utc::now(),
1241 };
1242 store.upsert_document(doc).await.unwrap();
1243
1244 let before = store
1245 .search(TextSearchRequest {
1246 query: "keyword_unique_xyz".to_string(),
1247 mode: TextQueryMode::Plain,
1248 filter: Some(ns_filter("old_ns")),
1249 top_k: 10,
1250 snippet_chars: 64,
1251 })
1252 .await
1253 .unwrap();
1254 assert_eq!(before.len(), 1);
1255
1256 let moved = store.rename_namespace("old_ns", "new_ns").await.unwrap();
1257 assert_eq!(moved, 1);
1258
1259 let after_new = store
1260 .search(TextSearchRequest {
1261 query: "keyword_unique_xyz".to_string(),
1262 mode: TextQueryMode::Plain,
1263 filter: Some(ns_filter("new_ns")),
1264 top_k: 10,
1265 snippet_chars: 64,
1266 })
1267 .await
1268 .unwrap();
1269 assert_eq!(after_new.len(), 1);
1270
1271 let after_old = store
1272 .search(TextSearchRequest {
1273 query: "keyword_unique_xyz".to_string(),
1274 mode: TextQueryMode::Plain,
1275 filter: Some(ns_filter("old_ns")),
1276 top_k: 10,
1277 snippet_chars: 64,
1278 })
1279 .await
1280 .unwrap();
1281 assert!(after_old.is_empty());
1282 }
1283
1284 #[tokio::test]
1285 async fn test_metadata_none_roundtrip() {
1286 let store = setup_memory_store("meta_none");
1287 let id = uuid::Uuid::new_v4();
1288 let doc = TextDocument {
1289 subject_id: id,
1290 kind: SubstrateKind::Note,
1291 namespace: "test_ns".to_string(),
1292 title: None,
1293 body: "no metadata".to_string(),
1294 tags: vec![],
1295 metadata: None,
1296 updated_at: Utc::now(),
1297 };
1298 store.upsert_document(doc).await.unwrap();
1299 let fetched = store.get_document("test_ns", id).await.unwrap().unwrap();
1300 assert!(fetched.metadata.is_none());
1301 }
1302
1303 #[tokio::test]
1304 async fn test_rename_namespace_noop() {
1305 let store = setup_memory_store("rename_noop");
1306
1307 let id = Uuid::new_v4();
1308 let doc = TextDocument {
1309 subject_id: id,
1310 kind: SubstrateKind::Note,
1311 title: None,
1312 body: "noop_test_content".to_string(),
1313 tags: vec![],
1314 namespace: "same_ns".to_string(),
1315 metadata: None,
1316 updated_at: Utc::now(),
1317 };
1318 store.upsert_document(doc).await.unwrap();
1319
1320 let moved = store.rename_namespace("same_ns", "same_ns").await.unwrap();
1321 assert_eq!(moved, 0);
1322
1323 let hits = store
1324 .search(TextSearchRequest {
1325 query: "noop_test_content".to_string(),
1326 mode: TextQueryMode::Plain,
1327 filter: Some(ns_filter("same_ns")),
1328 top_k: 10,
1329 snippet_chars: 64,
1330 })
1331 .await
1332 .unwrap();
1333 assert_eq!(hits.len(), 1);
1334 }
1335}