use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use uuid::Uuid;
use khive_score::DeterministicScore;
use khive_storage::error::StorageError;
use khive_storage::types::{
BatchWriteSummary, IndexRebuildScope, TextDocument, TextFilter, TextGatherMode, TextIndexStats,
TextQueryMode, TextSearchHit, TextSearchOptions, TextSearchRequest, TextTermStats,
TextTermStatsRequest,
};
use khive_storage::StorageCapability;
use khive_storage::TextSearch;
use khive_types::SubstrateKind;
use crate::error::SqliteError;
use crate::pool::ConnectionPool;
#[cfg(test)]
pub(crate) fn ensure_fts5_schema(
conn: &rusqlite::Connection,
table_key: &str,
) -> Result<(), rusqlite::Error> {
let table_name = format!("fts_{}", table_key);
let ddl = format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5(\
subject_id UNINDEXED, \
kind UNINDEXED, \
title, \
body, \
tags UNINDEXED, \
namespace UNINDEXED, \
metadata UNINDEXED, \
updated_at UNINDEXED\
)",
table_name
);
conn.execute_batch(&ddl)
}
fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
StorageError::driver(StorageCapability::Text, op, e)
}
fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
StorageError::driver(StorageCapability::Text, op, e)
}
pub struct Fts5TextSearch {
pool: Arc<ConnectionPool>,
is_file_backed: bool,
table_name: String,
}
impl Fts5TextSearch {
pub(crate) fn new(pool: Arc<ConnectionPool>, is_file_backed: bool, table_key: String) -> Self {
let table_name = format!("fts_{}", table_key);
Self {
pool,
is_file_backed,
table_name,
}
}
fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
let config = self.pool.config();
let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
operation: "fts_writer".into(),
message: "in-memory databases do not support standalone connections".into(),
})?;
let conn = rusqlite::Connection::open_with_flags(
path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
| rusqlite::OpenFlags::SQLITE_OPEN_URI,
)
.map_err(|e| map_err(e, "open_fts_writer"))?;
conn.busy_timeout(config.busy_timeout)
.map_err(|e| map_err(e, "open_fts_writer"))?;
conn.pragma_update(None, "foreign_keys", "ON")
.map_err(|e| map_err(e, "open_fts_writer"))?;
conn.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| map_err(e, "open_fts_writer"))?;
Ok(conn)
}
fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
let config = self.pool.config();
let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
operation: "fts_reader".into(),
message: "in-memory databases do not support standalone connections".into(),
})?;
let conn = rusqlite::Connection::open_with_flags(
path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
| rusqlite::OpenFlags::SQLITE_OPEN_URI,
)
.map_err(|e| map_err(e, "open_fts_reader"))?;
conn.busy_timeout(config.busy_timeout)
.map_err(|e| map_err(e, "open_fts_reader"))?;
conn.pragma_update(None, "foreign_keys", "ON")
.map_err(|e| map_err(e, "open_fts_reader"))?;
conn.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| map_err(e, "open_fts_reader"))?;
Ok(conn)
}
async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
R: Send + 'static,
{
if self.is_file_backed {
let conn = self.open_standalone_writer()?;
tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
.await
.map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
} else {
let pool = Arc::clone(&self.pool);
tokio::task::spawn_blocking(move || {
let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
f(guard.conn()).map_err(|e| map_err(e, op))
})
.await
.map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
}
}
async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
R: Send + 'static,
{
if self.is_file_backed {
let conn = self.open_standalone_reader()?;
tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
.await
.map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
} else {
let pool = Arc::clone(&self.pool);
tokio::task::spawn_blocking(move || {
let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
f(guard.conn()).map_err(|e| map_err(e, op))
})
.await
.map_err(|e| StorageError::driver(StorageCapability::Text, op, e))?
}
}
}
fn tags_to_json(tags: &[String]) -> String {
serde_json::to_string(tags).unwrap_or_else(|_| "[]".to_string())
}
fn tags_from_json(s: &str) -> Vec<String> {
serde_json::from_str(s).unwrap_or_default()
}
fn dt_to_micros(dt: &DateTime<Utc>) -> i64 {
dt.timestamp_micros()
}
fn micros_to_dt(micros: i64) -> DateTime<Utc> {
Utc.timestamp_micros(micros)
.single()
.unwrap_or_else(Utc::now)
}
fn sanitize_fts5_query(query: &str) -> String {
let spaced: String = query
.chars()
.map(|c| {
if matches!(c, '(' | ')' | ',' | ':') {
' '
} else {
c
}
})
.collect();
let sanitized: String = spaced
.chars()
.filter(|c| {
!matches!(
c,
'*' | '"' | '\'' | '+' | '-' | '^' | '.' | '~' | '!' | '\0'
) && !c.is_control()
})
.collect();
sanitized
.split_whitespace()
.filter(|t| {
!matches!(
t.to_ascii_uppercase().as_str(),
"AND" | "OR" | "NOT" | "NEAR"
)
})
.collect::<Vec<_>>()
.join(" ")
}
fn build_filter_clause(
filter: &TextFilter,
table: &str,
start_idx: usize,
) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
let mut conditions: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
let mut idx = start_idx;
if !filter.ids.is_empty() {
let placeholders: Vec<String> = filter
.ids
.iter()
.map(|_| {
let p = format!("?{}", idx);
idx += 1;
p
})
.collect();
conditions.push(format!(
"{}.subject_id IN ({})",
table,
placeholders.join(", ")
));
for id in &filter.ids {
params.push(Box::new(id.to_string()));
}
}
if !filter.kinds.is_empty() {
let placeholders: Vec<String> = filter
.kinds
.iter()
.map(|_| {
let p = format!("?{}", idx);
idx += 1;
p
})
.collect();
conditions.push(format!("{}.kind IN ({})", table, placeholders.join(", ")));
for kind in &filter.kinds {
params.push(Box::new(kind.to_string()));
}
}
if !filter.namespaces.is_empty() {
let placeholders: Vec<String> = filter
.namespaces
.iter()
.map(|_| {
let p = format!("?{}", idx);
idx += 1;
p
})
.collect();
conditions.push(format!(
"{}.namespace IN ({})",
table,
placeholders.join(", ")
));
for ns in &filter.namespaces {
params.push(Box::new(ns.clone()));
}
}
if conditions.is_empty() {
(String::new(), params)
} else {
(format!(" AND {}", conditions.join(" AND ")), params)
}
}
#[async_trait]
impl TextSearch for Fts5TextSearch {
async fn upsert_document(&self, document: TextDocument) -> Result<(), StorageError> {
let table = self.table_name.clone();
let namespace = document.namespace.clone();
self.with_writer("fts_upsert", move |conn| {
conn.execute_batch("BEGIN IMMEDIATE")?;
let del_sql = format!(
"DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
table
);
if let Err(e) = conn.execute(
&del_sql,
rusqlite::params![&namespace, document.subject_id.to_string()],
) {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
let ins_sql = format!(
"INSERT INTO {} \
(subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
table
);
let tags_json = tags_to_json(&document.tags);
let metadata_json: Option<String> = document.metadata.as_ref().map(|v| v.to_string());
if let Err(e) = conn.execute(
&ins_sql,
rusqlite::params![
document.subject_id.to_string(),
document.kind.to_string(),
document.title.as_deref().unwrap_or(""),
document.body,
tags_json,
&namespace,
metadata_json,
dt_to_micros(&document.updated_at),
],
) {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
conn.execute_batch("COMMIT")?;
Ok(())
})
.await
}
async fn upsert_documents(
&self,
documents: Vec<TextDocument>,
) -> Result<BatchWriteSummary, StorageError> {
let table = self.table_name.clone();
let attempted = documents.len() as u64;
self.with_writer("fts_upsert_batch", move |conn| {
let del_sql = format!(
"DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
table
);
let ins_sql = format!(
"INSERT INTO {} \
(subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
table
);
conn.execute_batch("BEGIN IMMEDIATE")?;
let mut affected = 0u64;
let mut failed = 0u64;
for doc in &documents {
conn.execute_batch("SAVEPOINT fts_upsert_doc")?;
let id_str = doc.subject_id.to_string();
let namespace = &doc.namespace;
let result = (|| {
conn.execute(&del_sql, rusqlite::params![namespace, &id_str])?;
let tags_json = tags_to_json(&doc.tags);
let metadata_json: Option<String> =
doc.metadata.as_ref().map(|v| v.to_string());
conn.execute(
&ins_sql,
rusqlite::params![
&id_str,
&doc.kind.to_string(),
doc.title.as_deref().unwrap_or(""),
&doc.body,
&tags_json,
namespace,
&metadata_json,
dt_to_micros(&doc.updated_at),
],
)?;
Ok::<(), rusqlite::Error>(())
})();
match result {
Ok(()) => {
conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc")?;
affected += 1;
}
Err(_) => {
let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT fts_upsert_doc");
let _ = conn.execute_batch("RELEASE SAVEPOINT fts_upsert_doc");
failed += 1;
}
}
}
conn.execute_batch("COMMIT")?;
Ok(BatchWriteSummary {
attempted,
affected,
failed,
first_error: String::new(),
})
})
.await
}
async fn delete_document(
&self,
namespace: &str,
subject_id: Uuid,
) -> Result<bool, StorageError> {
let namespace = namespace.to_string();
let table = self.table_name.clone();
self.with_writer("fts_delete", move |conn| {
let sql = format!(
"DELETE FROM {} WHERE namespace = ?1 AND subject_id = ?2",
table
);
let deleted =
conn.execute(&sql, rusqlite::params![namespace, subject_id.to_string()])?;
Ok(deleted > 0)
})
.await
}
async fn get_document(
&self,
namespace: &str,
subject_id: Uuid,
) -> Result<Option<TextDocument>, StorageError> {
let namespace = namespace.to_string();
let table = self.table_name.clone();
self.with_reader("fts_get", move |conn| {
let sql = format!(
"SELECT subject_id, kind, title, body, tags, namespace, metadata, updated_at \
FROM {} WHERE namespace = ?1 AND subject_id = ?2",
table
);
let mut stmt = conn.prepare(&sql)?;
let mut rows = stmt.query(rusqlite::params![namespace, subject_id.to_string()])?;
match rows.next()? {
Some(row) => {
let id_str: String = row.get(0)?;
let kind_str: String = row.get(1)?;
let title: String = row.get(2)?;
let body: String = row.get(3)?;
let tags_json: String = row.get(4)?;
let ns: String = row.get(5)?;
let metadata_json: Option<String> = row.get(6)?;
let updated_at_micros: i64 = row.get(7)?;
let sid = Uuid::parse_str(&id_str).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(e),
)
})?;
let kind = kind_str.parse::<SubstrateKind>().map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
1,
rusqlite::types::Type::Text,
Box::new(e),
)
})?;
Ok(Some(TextDocument {
subject_id: sid,
kind,
title: if title.is_empty() { None } else { Some(title) },
body,
tags: tags_from_json(&tags_json),
namespace: ns,
metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
updated_at: micros_to_dt(updated_at_micros),
}))
}
None => Ok(None),
}
})
.await
}
async fn search(&self, request: TextSearchRequest) -> Result<Vec<TextSearchHit>, StorageError> {
let table = self.table_name.clone();
self.with_reader("fts_search", move |conn| {
let match_expr = match request.mode {
TextQueryMode::AnyTerm => {
let parts: Vec<String> = request
.query
.split_whitespace()
.map(sanitize_fts5_query)
.filter(|t| !t.is_empty())
.collect();
if parts.is_empty() {
return Ok(Vec::new());
}
parts.join(" OR ")
}
_ => {
let sanitized = sanitize_fts5_query(&request.query);
if sanitized.is_empty() {
return Ok(Vec::new());
}
match request.mode {
TextQueryMode::Phrase => format!("\"{}\"", sanitized),
TextQueryMode::Plain => sanitized,
TextQueryMode::AnyTerm => unreachable!(),
}
}
};
let snippet_expr = if request.snippet_chars == 0 {
"NULL AS snippet".to_string()
} else {
let chars = i32::try_from(request.snippet_chars).unwrap_or(i32::MAX);
format!("snippet({table}, 3, '', '', '...', {chars})")
};
let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
build_filter_clause(filter, &table, 3)
} else {
(String::new(), Vec::new())
};
let sql = format!(
"SELECT subject_id, rank, title, {snippet_expr} \
FROM {table} WHERE {table} MATCH ?1{filter_clause} \
ORDER BY rank LIMIT ?2",
);
let mut stmt = conn.prepare(&sql)?;
stmt.raw_bind_parameter(1, &match_expr)?;
stmt.raw_bind_parameter(2, request.top_k as i64)?;
for (i, param) in filter_params.iter().enumerate() {
param
.to_sql()
.map(|val| stmt.raw_bind_parameter(3 + i, val))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
}
let mut hits = Vec::new();
let mut rows = stmt.raw_query();
let mut rank_idx = 0u32;
while let Some(row) = rows.next()? {
let id_str: String = row.get(0)?;
let fts_rank: f64 = row.get(1)?;
let title: String = row.get(2)?;
let snippet: Option<String> = row.get(3)?;
let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(e),
)
})?;
rank_idx += 1;
hits.push((subject_id, fts_rank, rank_idx, title, snippet));
}
let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
let range = max_rank - min_rank;
let results = hits
.into_iter()
.map(|(subject_id, raw_rank, rank, title, snippet)| {
let score = if range.abs() < 1e-12 {
1.0
} else {
let t = (max_rank - raw_rank) / range;
0.05 + 0.95 * t
};
TextSearchHit {
subject_id,
score: DeterministicScore::from_f64(score),
rank,
title: if title.is_empty() { None } else { Some(title) },
snippet: snippet.filter(|s| !s.is_empty()),
}
})
.collect();
Ok(results)
})
.await
}
async fn count(&self, filter: TextFilter) -> Result<u64, StorageError> {
let table = self.table_name.clone();
self.with_reader("fts_count", move |conn| {
let (filter_clause, filter_params) = build_filter_clause(&filter, &table, 1);
let sql = if filter_clause.is_empty() {
format!("SELECT COUNT(*) FROM {}", table)
} else {
let where_part = filter_clause.trim_start_matches(" AND ");
format!("SELECT COUNT(*) FROM {} WHERE {}", table, where_part)
};
let mut stmt = conn.prepare(&sql)?;
for (i, param) in filter_params.iter().enumerate() {
param
.to_sql()
.map(|val| stmt.raw_bind_parameter(1 + i, val))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
}
let mut rows = stmt.raw_query();
match rows.next()? {
Some(row) => {
let count: i64 = row.get(0)?;
Ok(count as u64)
}
None => Ok(0),
}
})
.await
}
async fn stats(&self) -> Result<TextIndexStats, StorageError> {
let table = self.table_name.clone();
self.with_reader("fts_stats", move |conn| {
let sql = format!("SELECT COUNT(*) FROM {}", table);
let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
Ok(TextIndexStats {
document_count: count as u64,
needs_rebuild: false,
last_rebuild_at: None,
})
})
.await
}
async fn search_with_options(
&self,
request: TextSearchRequest,
options: TextSearchOptions,
) -> Result<Vec<TextSearchHit>, StorageError> {
match options.gather_mode {
TextGatherMode::Ranked => self.search(request).await,
TextGatherMode::Unranked => self.search_unranked(request).await,
TextGatherMode::RankWithinCap => {
let gather_limit = options
.gather_limit
.unwrap_or(request.top_k)
.max(request.top_k);
self.search_rank_within_cap(request, gather_limit).await
}
}
}
async fn term_stats(
&self,
request: TextTermStatsRequest,
) -> Result<Vec<TextTermStats>, StorageError> {
let table = self.table_name.clone();
self.with_reader("fts_term_stats", move |conn| {
let filter = request.filter.as_ref();
let (count_filter_clause, count_filter_params) = if let Some(f) = filter {
build_filter_clause(f, &table, 1)
} else {
(String::new(), Vec::new())
};
let document_count: u64 = {
let count_sql = if count_filter_clause.is_empty() {
format!("SELECT COUNT(*) FROM {table}")
} else {
let where_part = count_filter_clause.trim_start_matches(" AND ");
format!("SELECT COUNT(*) FROM {table} WHERE {where_part}")
};
let mut stmt = conn.prepare(&count_sql)?;
for (i, param) in count_filter_params.iter().enumerate() {
param
.to_sql()
.map(|val| stmt.raw_bind_parameter(1 + i, val))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
}
let mut rows = stmt.raw_query();
match rows.next()? {
Some(row) => {
let c: i64 = row.get(0)?;
c as u64
}
None => 0,
}
};
let mut results = Vec::with_capacity(request.terms.len());
for term in &request.terms {
let sanitized = sanitize_fts5_query(term);
if sanitized.is_empty() {
results.push(TextTermStats {
term: term.clone(),
sanitized_term: sanitized,
document_frequency: 0,
document_count,
inverse_document_frequency: 0.0,
});
continue;
}
let (term_filter_clause, term_filter_params) = if let Some(f) = filter {
build_filter_clause(f, &table, 2)
} else {
(String::new(), Vec::new())
};
let count_sql = format!(
"SELECT COUNT(*) FROM {table} WHERE {table} MATCH ?1{term_filter_clause}"
);
let mut stmt = conn.prepare(&count_sql)?;
stmt.raw_bind_parameter(1, &sanitized)?;
for (i, param) in term_filter_params.iter().enumerate() {
param
.to_sql()
.map(|val| stmt.raw_bind_parameter(2 + i, val))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
}
let df: u64 = {
let mut rows = stmt.raw_query();
match rows.next()? {
Some(row) => {
let c: i64 = row.get(0)?;
c as u64
}
None => 0,
}
};
let idf = Fts5TextSearch::bm25_idf(df, document_count);
results.push(TextTermStats {
term: term.clone(),
sanitized_term: sanitized,
document_frequency: df,
document_count,
inverse_document_frequency: idf,
});
}
Ok(results)
})
.await
}
async fn rebuild(&self, _scope: IndexRebuildScope) -> Result<TextIndexStats, StorageError> {
let table = self.table_name.clone();
self.with_writer("fts_rebuild", move |conn| {
let sql = format!("INSERT INTO {}({}) VALUES('rebuild')", table, table);
conn.execute(&sql, [])?;
let count_sql = format!("SELECT COUNT(*) FROM {}", table);
let count: i64 = conn.query_row(&count_sql, [], |row| row.get(0))?;
Ok(TextIndexStats {
document_count: count as u64,
needs_rebuild: false,
last_rebuild_at: Some(Utc::now()),
})
})
.await
}
}
impl Fts5TextSearch {
fn bm25_idf(df: u64, document_count: u64) -> f64 {
let n = document_count as f64;
let f = df as f64;
((n - f + 0.5) / (f + 0.5) + 1.0).ln()
}
fn build_any_term_expr(query: &str) -> Option<String> {
let parts: Vec<String> = query
.split_whitespace()
.map(sanitize_fts5_query)
.filter(|t| !t.is_empty())
.collect();
if parts.is_empty() {
None
} else {
Some(parts.join(" OR "))
}
}
async fn search_unranked(
&self,
request: TextSearchRequest,
) -> Result<Vec<TextSearchHit>, StorageError> {
let table = self.table_name.clone();
self.with_reader("fts_search_unranked", move |conn| {
let match_expr = match request.mode {
TextQueryMode::AnyTerm => match Self::build_any_term_expr(&request.query) {
Some(e) => e,
None => return Ok(Vec::new()),
},
_ => {
let sanitized = sanitize_fts5_query(&request.query);
if sanitized.is_empty() {
return Ok(Vec::new());
}
match request.mode {
TextQueryMode::Phrase => format!("\"{}\"", sanitized),
TextQueryMode::Plain => sanitized,
TextQueryMode::AnyTerm => unreachable!(),
}
}
};
let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
build_filter_clause(filter, &table, 3)
} else {
(String::new(), Vec::new())
};
let sql = format!(
"SELECT subject_id, title \
FROM {table} WHERE {table} MATCH ?1{filter_clause} \
LIMIT ?2",
);
let mut stmt = conn.prepare(&sql)?;
stmt.raw_bind_parameter(1, &match_expr)?;
stmt.raw_bind_parameter(2, request.top_k as i64)?;
for (i, param) in filter_params.iter().enumerate() {
param
.to_sql()
.map(|val| stmt.raw_bind_parameter(3 + i, val))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
}
let mut results = Vec::new();
let mut rows = stmt.raw_query();
let mut rank_idx = 0u32;
while let Some(row) = rows.next()? {
let id_str: String = row.get(0)?;
let title: String = row.get(1)?;
let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(e),
)
})?;
rank_idx += 1;
results.push(TextSearchHit {
subject_id,
score: DeterministicScore::from_f64(1.0),
rank: rank_idx,
title: if title.is_empty() { None } else { Some(title) },
snippet: None,
});
}
Ok(results)
})
.await
}
async fn search_rank_within_cap(
&self,
request: TextSearchRequest,
gather_limit: u32,
) -> Result<Vec<TextSearchHit>, StorageError> {
let table = self.table_name.clone();
self.with_reader("fts_search_rank_within_cap", move |conn| {
let match_expr = match request.mode {
TextQueryMode::AnyTerm => match Self::build_any_term_expr(&request.query) {
Some(e) => e,
None => return Ok(Vec::new()),
},
_ => {
let sanitized = sanitize_fts5_query(&request.query);
if sanitized.is_empty() {
return Ok(Vec::new());
}
match request.mode {
TextQueryMode::Phrase => format!("\"{}\"", sanitized),
TextQueryMode::Plain => sanitized,
TextQueryMode::AnyTerm => unreachable!(),
}
}
};
let (filter_clause, filter_params) = if let Some(ref filter) = request.filter {
build_filter_clause(filter, &table, 3)
} else {
(String::new(), Vec::new())
};
let gather_sql = format!(
"SELECT subject_id FROM {table} WHERE {table} MATCH ?1{filter_clause} LIMIT ?2"
);
let mut stmt = conn.prepare(&gather_sql)?;
stmt.raw_bind_parameter(1, &match_expr)?;
stmt.raw_bind_parameter(2, gather_limit as i64)?;
for (i, param) in filter_params.iter().enumerate() {
param
.to_sql()
.map(|val| stmt.raw_bind_parameter(3 + i, val))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))??;
}
let mut gathered_ids: Vec<String> = Vec::new();
let mut rows = stmt.raw_query();
while let Some(row) = rows.next()? {
gathered_ids.push(row.get::<_, String>(0)?);
}
if gathered_ids.is_empty() {
return Ok(Vec::new());
}
let snippet_expr = if request.snippet_chars == 0 {
"NULL AS snippet".to_string()
} else {
let chars = i32::try_from(request.snippet_chars).unwrap_or(i32::MAX);
format!("snippet({table}, 3, '', '', '...', {chars})")
};
let id_placeholders: Vec<String> = gathered_ids
.iter()
.enumerate()
.map(|(i, _)| format!("?{}", 3 + i))
.collect();
let in_clause = id_placeholders.join(", ");
let rank_sql = format!(
"SELECT subject_id, rank, title, {snippet_expr} \
FROM {table} WHERE {table} MATCH ?1 AND subject_id IN ({in_clause}) \
ORDER BY rank LIMIT ?2"
);
let mut stmt2 = conn.prepare(&rank_sql)?;
stmt2.raw_bind_parameter(1, &match_expr)?;
stmt2.raw_bind_parameter(2, request.top_k as i64)?;
for (i, id_str) in gathered_ids.iter().enumerate() {
stmt2.raw_bind_parameter(3 + i, id_str.as_str())?;
}
let mut hits = Vec::new();
let mut rows2 = stmt2.raw_query();
let mut rank_idx = 0u32;
while let Some(row) = rows2.next()? {
let id_str: String = row.get(0)?;
let fts_rank: f64 = row.get(1)?;
let title: String = row.get(2)?;
let snippet: Option<String> = row.get(3)?;
let subject_id = Uuid::parse_str(&id_str).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(e),
)
})?;
rank_idx += 1;
hits.push((subject_id, fts_rank, rank_idx, title, snippet));
}
let min_rank = hits.iter().map(|h| h.1).fold(f64::INFINITY, f64::min);
let max_rank = hits.iter().map(|h| h.1).fold(f64::NEG_INFINITY, f64::max);
let range = max_rank - min_rank;
let results = hits
.into_iter()
.map(|(subject_id, raw_rank, rank, title, snippet)| {
let score = if range.abs() < 1e-12 {
1.0
} else {
let t = (max_rank - raw_rank) / range;
0.05 + 0.95 * t
};
TextSearchHit {
subject_id,
score: DeterministicScore::from_f64(score),
rank,
title: if title.is_empty() { None } else { Some(title) },
snippet: snippet.filter(|s| !s.is_empty()),
}
})
.collect();
Ok(results)
})
.await
}
#[allow(dead_code)]
pub(crate) async fn rename_namespace(
&self,
old_namespace: &str,
new_namespace: &str,
) -> Result<u64, StorageError> {
if old_namespace == new_namespace {
return Ok(0);
}
let table = self.table_name.clone();
let old_ns = old_namespace.to_string();
let new_ns = new_namespace.to_string();
self.with_writer("fts_rename_namespace", move |conn| {
let sel_sql = format!(
"SELECT subject_id, kind, title, body, tags, metadata, updated_at \
FROM {} WHERE namespace = ?1",
table
);
struct Row {
subject_id: String,
kind: String,
title: String,
body: String,
tags: String,
metadata: Option<String>,
updated_at: i64,
}
let rows: Vec<Row> = {
let mut stmt = conn.prepare(&sel_sql)?;
let iter = stmt.query_map(rusqlite::params![&old_ns], |row| {
Ok(Row {
subject_id: row.get(0)?,
kind: row.get(1)?,
title: row.get(2)?,
body: row.get(3)?,
tags: row.get(4)?,
metadata: row.get(5)?,
updated_at: row.get(6)?,
})
})?;
iter.collect::<Result<Vec<_>, _>>()?
};
let moved = rows.len() as u64;
if moved == 0 {
return Ok(0u64);
}
conn.execute_batch("BEGIN IMMEDIATE")?;
let del_sql = format!("DELETE FROM {} WHERE namespace = ?1", table);
if let Err(e) = conn.execute(&del_sql, rusqlite::params![&old_ns]) {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
let ins_sql = format!(
"INSERT INTO {} \
(subject_id, kind, title, body, tags, namespace, metadata, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
table
);
for row in &rows {
if let Err(e) = conn.execute(
&ins_sql,
rusqlite::params![
row.subject_id,
row.kind,
row.title,
row.body,
row.tags,
&new_ns,
row.metadata,
row.updated_at,
],
) {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
}
conn.execute_batch("COMMIT")?;
Ok(moved)
})
.await
}
}
#[cfg(test)]
#[path = "text_tests.rs"]
mod tests;