use std::collections::HashMap;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use fathomdb_query::{
BindValue, ComparisonOp, CompiledGroupedQuery, CompiledQuery, CompiledRetrievalPlan,
CompiledSearch, CompiledSearchPlan, CompiledVectorSearch, DrivingTable, EdgeExpansionSlot,
ExpansionSlot, FALLBACK_TRIGGER_K, HitAttribution, Predicate, RetrievalModality, ScalarValue,
SearchBranch, SearchHit, SearchHitSource, SearchMatchMode, SearchRows, ShapeHash,
render_text_query_fts5,
};
use fathomdb_schema::SchemaManager;
use rusqlite::{Connection, OptionalExtension, params_from_iter, types::Value};
use crate::embedder::QueryEmbedder;
use crate::telemetry::{SqliteCacheStatus, TelemetryCounters, read_db_cache_status};
use crate::{EngineError, sqlite};
const MAX_SHAPE_CACHE_SIZE: usize = 4096;
const BATCH_CHUNK_SIZE: usize = 200;
fn compile_expansion_in_filter(
p: usize,
path: &str,
value_binds: Vec<Value>,
) -> (String, Vec<Value>) {
let first_val = p + 1;
let placeholders = (0..value_binds.len())
.map(|i| format!("?{}", first_val + i))
.collect::<Vec<_>>()
.join(", ");
let mut binds = vec![Value::Text(path.to_owned())];
binds.extend(value_binds);
(
format!("\n AND json_extract(n.properties, ?{p}) IN ({placeholders})"),
binds,
)
}
#[allow(clippy::too_many_lines)]
fn compile_expansion_filter(
filter: Option<&Predicate>,
first_param: usize,
) -> (String, Vec<Value>) {
let Some(predicate) = filter else {
return (String::new(), vec![]);
};
let p = first_param;
match predicate {
Predicate::JsonPathEq { path, value } => {
let val = match value {
ScalarValue::Text(t) => Value::Text(t.clone()),
ScalarValue::Integer(i) => Value::Integer(*i),
ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
};
(
format!(
"\n AND json_extract(n.properties, ?{p}) = ?{}",
p + 1
),
vec![Value::Text(path.clone()), val],
)
}
Predicate::JsonPathCompare { path, op, value } => {
let val = match value {
ScalarValue::Text(t) => Value::Text(t.clone()),
ScalarValue::Integer(i) => Value::Integer(*i),
ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
};
let operator = match op {
ComparisonOp::Gt => ">",
ComparisonOp::Gte => ">=",
ComparisonOp::Lt => "<",
ComparisonOp::Lte => "<=",
};
(
format!(
"\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
p + 1
),
vec![Value::Text(path.clone()), val],
)
}
Predicate::JsonPathFusedEq { path, value } => (
format!(
"\n AND json_extract(n.properties, ?{p}) = ?{}",
p + 1
),
vec![Value::Text(path.clone()), Value::Text(value.clone())],
),
Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
let operator = match op {
ComparisonOp::Gt => ">",
ComparisonOp::Gte => ">=",
ComparisonOp::Lt => "<",
ComparisonOp::Lte => "<=",
};
(
format!(
"\n AND json_extract(n.properties, ?{p}) {operator} ?{}",
p + 1
),
vec![Value::Text(path.clone()), Value::Integer(*value)],
)
}
Predicate::JsonPathFusedBoolEq { path, value } => (
format!(
"\n AND json_extract(n.properties, ?{p}) = ?{}",
p + 1
),
vec![Value::Text(path.clone()), Value::Integer(i64::from(*value))],
),
Predicate::KindEq(kind) => (
format!("\n AND n.kind = ?{p}"),
vec![Value::Text(kind.clone())],
),
Predicate::LogicalIdEq(logical_id) => (
format!("\n AND n.logical_id = ?{p}"),
vec![Value::Text(logical_id.clone())],
),
Predicate::SourceRefEq(source_ref) => (
format!("\n AND n.source_ref = ?{p}"),
vec![Value::Text(source_ref.clone())],
),
Predicate::ContentRefEq(uri) => (
format!("\n AND n.content_ref = ?{p}"),
vec![Value::Text(uri.clone())],
),
Predicate::ContentRefNotNull => (
"\n AND n.content_ref IS NOT NULL".to_owned(),
vec![],
),
Predicate::EdgePropertyEq { .. } | Predicate::EdgePropertyCompare { .. } => {
unreachable!(
"compile_expansion_filter: EdgeProperty* variants must use compile_edge_filter"
);
}
Predicate::JsonPathFusedIn { path, values } => compile_expansion_in_filter(
p,
path,
values.iter().map(|v| Value::Text(v.clone())).collect(),
),
Predicate::JsonPathIn { path, values } => compile_expansion_in_filter(
p,
path,
values
.iter()
.map(|v| match v {
ScalarValue::Text(t) => Value::Text(t.clone()),
ScalarValue::Integer(i) => Value::Integer(*i),
ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
})
.collect(),
),
}
}
fn compile_edge_filter(filter: Option<&Predicate>, first_param: usize) -> (String, Vec<Value>) {
let Some(predicate) = filter else {
return (String::new(), vec![]);
};
let p = first_param;
match predicate {
Predicate::EdgePropertyEq { path, value } => {
let val = match value {
ScalarValue::Text(t) => Value::Text(t.clone()),
ScalarValue::Integer(i) => Value::Integer(*i),
ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
};
(
format!(
"\n AND json_extract(e.properties, ?{p}) = ?{}",
p + 1
),
vec![Value::Text(path.clone()), val],
)
}
Predicate::EdgePropertyCompare { path, op, value } => {
let val = match value {
ScalarValue::Text(t) => Value::Text(t.clone()),
ScalarValue::Integer(i) => Value::Integer(*i),
ScalarValue::Bool(b) => Value::Integer(i64::from(*b)),
};
let operator = match op {
ComparisonOp::Gt => ">",
ComparisonOp::Gte => ">=",
ComparisonOp::Lt => "<",
ComparisonOp::Lte => "<=",
};
(
format!(
"\n AND json_extract(e.properties, ?{p}) {operator} ?{}",
p + 1
),
vec![Value::Text(path.clone()), val],
)
}
_ => {
unreachable!("compile_edge_filter: non-edge predicate {predicate:?}");
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TokenizerStrategy {
RecallOptimizedEnglish,
PrecisionOptimized,
SubstringTrigram,
GlobalCjk,
SourceCode,
Custom(String),
}
impl TokenizerStrategy {
pub fn from_str(s: &str) -> Self {
match s {
"porter unicode61 remove_diacritics 2" => Self::RecallOptimizedEnglish,
"unicode61 remove_diacritics 2" => Self::PrecisionOptimized,
"trigram" => Self::SubstringTrigram,
"icu" => Self::GlobalCjk,
s if s.starts_with("unicode61 tokenchars") => Self::SourceCode,
other => Self::Custom(other.to_string()),
}
}
}
struct ReadPool {
connections: Vec<Mutex<Connection>>,
}
impl fmt::Debug for ReadPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadPool")
.field("size", &self.connections.len())
.finish()
}
}
impl ReadPool {
fn new(
db_path: &Path,
pool_size: usize,
schema_manager: &SchemaManager,
vector_enabled: bool,
) -> Result<Self, EngineError> {
let mut connections = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
let conn = if vector_enabled {
#[cfg(feature = "sqlite-vec")]
{
sqlite::open_readonly_connection_with_vec(db_path)?
}
#[cfg(not(feature = "sqlite-vec"))]
{
sqlite::open_readonly_connection(db_path)?
}
} else {
sqlite::open_readonly_connection(db_path)?
};
schema_manager
.initialize_reader_connection(&conn)
.map_err(EngineError::Schema)?;
connections.push(Mutex::new(conn));
}
Ok(Self { connections })
}
fn acquire(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
for conn in &self.connections {
if let Ok(guard) = conn.try_lock() {
return Ok(guard);
}
}
self.connections[0].lock().map_err(|_| {
trace_error!("read pool: connection mutex poisoned");
EngineError::Bridge("connection mutex poisoned".to_owned())
})
}
#[cfg(test)]
fn size(&self) -> usize {
self.connections.len()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct QueryPlan {
pub sql: String,
pub bind_count: usize,
pub driving_table: DrivingTable,
pub shape_hash: ShapeHash,
pub cache_hit: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NodeRow {
pub row_id: String,
pub logical_id: String,
pub kind: String,
pub properties: String,
pub content_ref: Option<String>,
pub last_accessed_at: Option<i64>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct EdgeRow {
pub row_id: String,
pub logical_id: String,
pub source_logical_id: String,
pub target_logical_id: String,
pub kind: String,
pub properties: String,
pub source_ref: Option<String>,
pub confidence: Option<f64>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RunRow {
pub id: String,
pub kind: String,
pub status: String,
pub properties: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StepRow {
pub id: String,
pub run_id: String,
pub kind: String,
pub status: String,
pub properties: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ActionRow {
pub id: String,
pub step_id: String,
pub kind: String,
pub status: String,
pub properties: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProvenanceEvent {
pub id: String,
pub event_type: String,
pub subject: String,
pub source_ref: Option<String>,
pub metadata_json: String,
pub created_at: i64,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct QueryRows {
pub nodes: Vec<NodeRow>,
pub runs: Vec<RunRow>,
pub steps: Vec<StepRow>,
pub actions: Vec<ActionRow>,
pub was_degraded: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExpansionRootRows {
pub root_logical_id: String,
pub nodes: Vec<NodeRow>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExpansionSlotRows {
pub slot: String,
pub roots: Vec<ExpansionRootRows>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct EdgeExpansionRootRows {
pub root_logical_id: String,
pub pairs: Vec<(EdgeRow, NodeRow)>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct EdgeExpansionSlotRows {
pub slot: String,
pub roots: Vec<EdgeExpansionRootRows>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct GroupedQueryRows {
pub roots: Vec<NodeRow>,
pub expansions: Vec<ExpansionSlotRows>,
pub edge_expansions: Vec<EdgeExpansionSlotRows>,
pub was_degraded: bool,
}
pub struct ExecutionCoordinator {
database_path: PathBuf,
schema_manager: Arc<SchemaManager>,
pool: ReadPool,
shape_sql_map: Mutex<HashMap<ShapeHash, String>>,
vector_enabled: bool,
vec_degradation_warned: AtomicBool,
telemetry: Arc<TelemetryCounters>,
query_embedder: Option<Arc<dyn QueryEmbedder>>,
fts_strategies: HashMap<String, TokenizerStrategy>,
}
impl fmt::Debug for ExecutionCoordinator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExecutionCoordinator")
.field("database_path", &self.database_path)
.finish_non_exhaustive()
}
}
impl ExecutionCoordinator {
pub fn open(
path: impl AsRef<Path>,
schema_manager: Arc<SchemaManager>,
vector_dimension: Option<usize>,
pool_size: usize,
telemetry: Arc<TelemetryCounters>,
query_embedder: Option<Arc<dyn QueryEmbedder>>,
) -> Result<Self, EngineError> {
let path = path.as_ref().to_path_buf();
#[cfg(feature = "sqlite-vec")]
let mut conn = if vector_dimension.is_some() {
sqlite::open_connection_with_vec(&path)?
} else {
sqlite::open_connection(&path)?
};
#[cfg(not(feature = "sqlite-vec"))]
let mut conn = sqlite::open_connection(&path)?;
let report = schema_manager.bootstrap(&conn)?;
run_open_time_fts_guards(&mut conn)?;
#[cfg(feature = "sqlite-vec")]
let mut vector_enabled = report.vector_profile_enabled;
#[cfg(not(feature = "sqlite-vec"))]
let vector_enabled = {
let _ = &report;
false
};
if vector_dimension.is_some() {
#[cfg(feature = "sqlite-vec")]
{
vector_enabled = true;
}
}
if let Some(ref emb) = query_embedder {
check_vec_identity_at_open(&conn, emb.as_ref())?;
}
let fts_strategies: HashMap<String, TokenizerStrategy> = {
let mut map = HashMap::new();
let mut stmt = conn
.prepare("SELECT kind, config_json FROM projection_profiles WHERE facet='fts'")?;
let rows = stmt.query_map([], |row| {
let kind: String = row.get(0)?;
let config_json: String = row.get(1)?;
Ok((kind, config_json))
})?;
for row in rows.flatten() {
let (kind, config_json) = row;
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&config_json)
&& let Some(tok) = v["tokenizer"].as_str()
{
map.insert(kind, TokenizerStrategy::from_str(tok));
}
}
map
};
drop(conn);
let pool = ReadPool::new(&path, pool_size, &schema_manager, vector_enabled)?;
Ok(Self {
database_path: path,
schema_manager,
pool,
shape_sql_map: Mutex::new(HashMap::new()),
vector_enabled,
vec_degradation_warned: AtomicBool::new(false),
telemetry,
query_embedder,
fts_strategies,
})
}
pub fn database_path(&self) -> &Path {
&self.database_path
}
#[must_use]
pub fn vector_enabled(&self) -> bool {
self.vector_enabled
}
#[must_use]
pub fn query_embedder(&self) -> Option<&Arc<dyn QueryEmbedder>> {
self.query_embedder.as_ref()
}
fn lock_connection(&self) -> Result<MutexGuard<'_, Connection>, EngineError> {
self.pool.acquire()
}
#[must_use]
pub fn aggregate_cache_status(&self) -> SqliteCacheStatus {
let mut total = SqliteCacheStatus::default();
for conn_mutex in &self.pool.connections {
if let Ok(conn) = conn_mutex.try_lock() {
total.add(&read_db_cache_status(&conn));
}
}
total
}
#[allow(clippy::expect_used, clippy::too_many_lines)]
pub fn execute_compiled_read(
&self,
compiled: &CompiledQuery,
) -> Result<QueryRows, EngineError> {
if compiled.driving_table == DrivingTable::FtsNodes
&& let Some(BindValue::Text(root_kind)) = compiled.binds.get(1)
&& let Some(nodes) = self.scan_fallback_if_first_registration(root_kind)?
{
self.telemetry.increment_queries();
return Ok(QueryRows {
nodes,
runs: Vec::new(),
steps: Vec::new(),
actions: Vec::new(),
was_degraded: false,
});
}
let (adapted_sql, adapted_binds) = if compiled.driving_table == DrivingTable::FtsNodes {
let conn_check = match self.lock_connection() {
Ok(g) => g,
Err(e) => {
self.telemetry.increment_errors();
return Err(e);
}
};
let result = adapt_fts_nodes_sql_for_per_kind_tables(compiled, &conn_check);
drop(conn_check);
result?
} else if compiled.driving_table == DrivingTable::VecNodes {
let root_kind = compiled
.binds
.get(1)
.and_then(|b| {
if let BindValue::Text(k) = b {
Some(k.as_str())
} else {
None
}
})
.unwrap_or("");
let vec_table = if root_kind.is_empty() {
"vec__unknown".to_owned()
} else {
fathomdb_schema::vec_kind_table_name(root_kind)
};
let new_sql = compiled.sql.replace("vec_nodes_active", &vec_table);
(new_sql, compiled.binds.clone())
} else {
(compiled.sql.clone(), compiled.binds.clone())
};
let row_sql = wrap_node_row_projection_sql(&adapted_sql);
{
let mut cache = self
.shape_sql_map
.lock()
.unwrap_or_else(PoisonError::into_inner);
if cache.len() >= MAX_SHAPE_CACHE_SIZE {
trace_debug!(evicted = cache.len(), "shape cache full, clearing");
cache.clear();
}
cache.insert(compiled.shape_hash, row_sql.clone());
}
let bind_values = adapted_binds
.iter()
.map(bind_value_to_sql)
.collect::<Vec<_>>();
let conn_guard = match self.lock_connection() {
Ok(g) => g,
Err(e) => {
self.telemetry.increment_errors();
return Err(e);
}
};
let mut statement = match conn_guard.prepare_cached(&row_sql) {
Ok(stmt) => stmt,
Err(e) if is_vec_table_absent(&e) => {
if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
trace_warn!("vector table absent, degrading to non-vector query");
}
return Ok(QueryRows {
was_degraded: true,
..Default::default()
});
}
Err(e) => {
self.telemetry.increment_errors();
return Err(EngineError::Sqlite(e));
}
};
let nodes = match statement
.query_map(params_from_iter(bind_values.iter()), |row| {
Ok(NodeRow {
row_id: row.get(0)?,
logical_id: row.get(1)?,
kind: row.get(2)?,
properties: row.get(3)?,
content_ref: row.get(4)?,
last_accessed_at: row.get(5)?,
})
})
.and_then(Iterator::collect)
{
Ok(rows) => rows,
Err(e) => {
self.telemetry.increment_errors();
return Err(EngineError::Sqlite(e));
}
};
self.telemetry.increment_queries();
Ok(QueryRows {
nodes,
runs: Vec::new(),
steps: Vec::new(),
actions: Vec::new(),
was_degraded: false,
})
}
pub fn execute_compiled_search(
&self,
compiled: &CompiledSearch,
) -> Result<SearchRows, EngineError> {
let (relaxed_query, was_degraded_at_plan_time) =
fathomdb_query::derive_relaxed(&compiled.text_query);
let relaxed = relaxed_query.map(|q| CompiledSearch {
root_kind: compiled.root_kind.clone(),
text_query: q,
limit: compiled.limit,
fusable_filters: compiled.fusable_filters.clone(),
residual_filters: compiled.residual_filters.clone(),
attribution_requested: compiled.attribution_requested,
});
let plan = CompiledSearchPlan {
strict: compiled.clone(),
relaxed,
was_degraded_at_plan_time,
};
self.execute_compiled_search_plan(&plan)
}
pub fn execute_compiled_search_plan(
&self,
plan: &CompiledSearchPlan,
) -> Result<SearchRows, EngineError> {
let strict = &plan.strict;
let limit = strict.limit;
let strict_hits = self.run_search_branch(strict, SearchBranch::Strict)?;
let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
let strict_underfilled = strict_hits.len() < fallback_threshold;
let mut relaxed_hits: Vec<SearchHit> = Vec::new();
let mut fallback_used = false;
let mut was_degraded = false;
if let Some(relaxed) = plan.relaxed.as_ref()
&& strict_underfilled
{
relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
fallback_used = true;
was_degraded = plan.was_degraded_at_plan_time;
}
let mut merged = merge_search_branches(strict_hits, relaxed_hits, limit);
if strict.attribution_requested {
let relaxed_text_query = plan.relaxed.as_ref().map(|r| &r.text_query);
self.populate_attribution_for_hits(
&mut merged,
&strict.text_query,
relaxed_text_query,
)?;
}
let strict_hit_count = merged
.iter()
.filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
.count();
let relaxed_hit_count = merged
.iter()
.filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
.count();
let vector_hit_count = 0;
Ok(SearchRows {
hits: merged,
strict_hit_count,
relaxed_hit_count,
vector_hit_count,
fallback_used,
was_degraded,
})
}
#[allow(clippy::too_many_lines)]
pub fn execute_compiled_vector_search(
&self,
compiled: &CompiledVectorSearch,
) -> Result<SearchRows, EngineError> {
use std::fmt::Write as _;
if compiled.limit == 0 {
return Ok(SearchRows::default());
}
let filter_by_kind = !compiled.root_kind.is_empty();
let mut binds: Vec<BindValue> = Vec::new();
binds.push(BindValue::Text(compiled.query_text.clone()));
if filter_by_kind {
binds.push(BindValue::Text(compiled.root_kind.clone()));
}
let mut fused_clauses = String::new();
for predicate in &compiled.fusable_filters {
match predicate {
Predicate::KindEq(kind) => {
binds.push(BindValue::Text(kind.clone()));
let idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND src.kind = ?{idx}"
);
}
Predicate::LogicalIdEq(logical_id) => {
binds.push(BindValue::Text(logical_id.clone()));
let idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND src.logical_id = ?{idx}"
);
}
Predicate::SourceRefEq(source_ref) => {
binds.push(BindValue::Text(source_ref.clone()));
let idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND src.source_ref = ?{idx}"
);
}
Predicate::ContentRefEq(uri) => {
binds.push(BindValue::Text(uri.clone()));
let idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND src.content_ref = ?{idx}"
);
}
Predicate::ContentRefNotNull => {
fused_clauses
.push_str("\n AND src.content_ref IS NOT NULL");
}
Predicate::JsonPathFusedEq { path, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(BindValue::Text(value.clone()));
let value_idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
);
}
Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(BindValue::Integer(*value));
let value_idx = binds.len();
let operator = match op {
ComparisonOp::Gt => ">",
ComparisonOp::Gte => ">=",
ComparisonOp::Lt => "<",
ComparisonOp::Lte => "<=",
};
let _ = write!(
fused_clauses,
"\n AND json_extract(src.properties, ?{path_idx}) {operator} ?{value_idx}"
);
}
Predicate::JsonPathFusedBoolEq { path, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(BindValue::Integer(i64::from(*value)));
let value_idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND json_extract(src.properties, ?{path_idx}) = ?{value_idx}"
);
}
Predicate::JsonPathFusedIn { path, values } => {
binds.push(BindValue::Text(path.clone()));
let first_param = binds.len();
for v in values {
binds.push(BindValue::Text(v.clone()));
}
let placeholders = (1..=values.len())
.map(|i| format!("?{}", first_param + i))
.collect::<Vec<_>>()
.join(", ");
let _ = write!(
fused_clauses,
"\n AND json_extract(src.properties, ?{first_param}) IN ({placeholders})"
);
}
Predicate::JsonPathEq { .. }
| Predicate::JsonPathCompare { .. }
| Predicate::JsonPathIn { .. }
| Predicate::EdgePropertyEq { .. }
| Predicate::EdgePropertyCompare { .. } => {
}
}
}
let mut filter_clauses = String::new();
for predicate in &compiled.residual_filters {
match predicate {
Predicate::JsonPathEq { path, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(scalar_to_bind(value));
let value_idx = binds.len();
let _ = write!(
filter_clauses,
"\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
);
}
Predicate::JsonPathCompare { path, op, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(scalar_to_bind(value));
let value_idx = binds.len();
let operator = match op {
ComparisonOp::Gt => ">",
ComparisonOp::Gte => ">=",
ComparisonOp::Lt => "<",
ComparisonOp::Lte => "<=",
};
let _ = write!(
filter_clauses,
"\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
);
}
Predicate::JsonPathIn { path, values } => {
binds.push(BindValue::Text(path.clone()));
let first_param = binds.len();
for v in values {
binds.push(scalar_to_bind(v));
}
let placeholders = (1..=values.len())
.map(|i| format!("?{}", first_param + i))
.collect::<Vec<_>>()
.join(", ");
let _ = write!(
filter_clauses,
"\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
);
}
Predicate::KindEq(_)
| Predicate::LogicalIdEq(_)
| Predicate::SourceRefEq(_)
| Predicate::ContentRefEq(_)
| Predicate::ContentRefNotNull
| Predicate::JsonPathFusedEq { .. }
| Predicate::JsonPathFusedTimestampCmp { .. }
| Predicate::JsonPathFusedBoolEq { .. }
| Predicate::JsonPathFusedIn { .. }
| Predicate::EdgePropertyEq { .. }
| Predicate::EdgePropertyCompare { .. } => {
}
}
}
let limit = compiled.limit;
binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
let limit_idx = binds.len();
let base_limit = limit;
let kind_clause = if filter_by_kind {
"\n AND src.kind = ?2"
} else {
""
};
let vec_table = if compiled.root_kind.is_empty() {
"vec__unknown".to_owned()
} else {
fathomdb_schema::vec_kind_table_name(&compiled.root_kind)
};
let sql = format!(
"WITH vector_hits AS (
SELECT
src.row_id AS row_id,
src.logical_id AS logical_id,
src.kind AS kind,
src.properties AS properties,
src.source_ref AS source_ref,
src.content_ref AS content_ref,
src.created_at AS created_at,
vc.distance AS distance,
vc.chunk_id AS chunk_id
FROM (
SELECT chunk_id, distance
FROM {vec_table}
WHERE embedding MATCH ?1
LIMIT {base_limit}
) vc
JOIN chunks c ON c.id = vc.chunk_id
JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
WHERE 1 = 1{kind_clause}{fused_clauses}
)
SELECT
h.row_id,
h.logical_id,
h.kind,
h.properties,
h.content_ref,
am.last_accessed_at,
h.created_at,
h.distance,
h.chunk_id
FROM vector_hits h
LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
WHERE 1 = 1{filter_clauses}
ORDER BY h.distance ASC
LIMIT ?{limit_idx}"
);
let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
let conn_guard = match self.lock_connection() {
Ok(g) => g,
Err(e) => {
self.telemetry.increment_errors();
return Err(e);
}
};
let mut statement = match conn_guard.prepare_cached(&sql) {
Ok(stmt) => stmt,
Err(e) if is_vec_table_absent(&e) => {
if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
trace_warn!("vector table absent, degrading vector_search to empty result");
}
return Ok(SearchRows {
hits: Vec::new(),
strict_hit_count: 0,
relaxed_hit_count: 0,
vector_hit_count: 0,
fallback_used: false,
was_degraded: true,
});
}
Err(e) => {
self.telemetry.increment_errors();
return Err(EngineError::Sqlite(e));
}
};
let attribution_requested = compiled.attribution_requested;
let hits = match statement
.query_map(params_from_iter(bind_values.iter()), |row| {
let distance: f64 = row.get(7)?;
let score = -distance;
Ok(SearchHit {
node: fathomdb_query::NodeRowLite {
row_id: row.get(0)?,
logical_id: row.get(1)?,
kind: row.get(2)?,
properties: row.get(3)?,
content_ref: row.get(4)?,
last_accessed_at: row.get(5)?,
},
written_at: row.get(6)?,
score,
modality: RetrievalModality::Vector,
source: SearchHitSource::Vector,
match_mode: None,
snippet: None,
projection_row_id: row.get::<_, Option<String>>(8)?,
vector_distance: Some(distance),
attribution: if attribution_requested {
Some(HitAttribution {
matched_paths: Vec::new(),
})
} else {
None
},
})
})
.and_then(Iterator::collect::<Result<Vec<_>, _>>)
{
Ok(rows) => rows,
Err(e) => {
if is_vec_table_absent(&e) {
if !self.vec_degradation_warned.swap(true, Ordering::Relaxed) {
trace_warn!(
"vector table absent at query time, degrading vector_search to empty result"
);
}
drop(statement);
drop(conn_guard);
return Ok(SearchRows {
hits: Vec::new(),
strict_hit_count: 0,
relaxed_hit_count: 0,
vector_hit_count: 0,
fallback_used: false,
was_degraded: true,
});
}
self.telemetry.increment_errors();
return Err(EngineError::Sqlite(e));
}
};
drop(statement);
drop(conn_guard);
self.telemetry.increment_queries();
let vector_hit_count = hits.len();
Ok(SearchRows {
hits,
strict_hit_count: 0,
relaxed_hit_count: 0,
vector_hit_count,
fallback_used: false,
was_degraded: false,
})
}
pub fn execute_retrieval_plan(
&self,
plan: &CompiledRetrievalPlan,
raw_query: &str,
) -> Result<SearchRows, EngineError> {
let mut plan = plan.clone();
let limit = plan.text.strict.limit;
let strict_hits = self.run_search_branch(&plan.text.strict, SearchBranch::Strict)?;
let fallback_threshold = FALLBACK_TRIGGER_K.min(limit);
let strict_underfilled = strict_hits.len() < fallback_threshold;
let mut relaxed_hits: Vec<SearchHit> = Vec::new();
let mut fallback_used = false;
let mut was_degraded = false;
if let Some(relaxed) = plan.text.relaxed.as_ref()
&& strict_underfilled
{
relaxed_hits = self.run_search_branch(relaxed, SearchBranch::Relaxed)?;
fallback_used = true;
was_degraded = plan.was_degraded_at_plan_time;
}
let text_branches_empty = strict_hits.is_empty() && relaxed_hits.is_empty();
if text_branches_empty && self.query_embedder.is_some() {
self.fill_vector_branch(&mut plan, raw_query);
}
let mut vector_hits: Vec<SearchHit> = Vec::new();
if let Some(vector) = plan.vector.as_ref()
&& strict_hits.is_empty()
&& relaxed_hits.is_empty()
{
let vector_rows = self.execute_compiled_vector_search(vector)?;
vector_hits = vector_rows.hits;
if vector_rows.was_degraded {
was_degraded = true;
}
}
if text_branches_empty
&& plan.was_degraded_at_plan_time
&& plan.vector.is_none()
&& self.query_embedder.is_some()
{
was_degraded = true;
}
let strict = &plan.text.strict;
let mut merged = merge_search_branches_three(strict_hits, relaxed_hits, vector_hits, limit);
if strict.attribution_requested {
let relaxed_text_query = plan.text.relaxed.as_ref().map(|r| &r.text_query);
self.populate_attribution_for_hits(
&mut merged,
&strict.text_query,
relaxed_text_query,
)?;
}
let strict_hit_count = merged
.iter()
.filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict)))
.count();
let relaxed_hit_count = merged
.iter()
.filter(|h| matches!(h.match_mode, Some(SearchMatchMode::Relaxed)))
.count();
let vector_hit_count = merged
.iter()
.filter(|h| matches!(h.modality, RetrievalModality::Vector))
.count();
Ok(SearchRows {
hits: merged,
strict_hit_count,
relaxed_hit_count,
vector_hit_count,
fallback_used,
was_degraded,
})
}
fn fill_vector_branch(&self, plan: &mut CompiledRetrievalPlan, raw_query: &str) {
let Some(embedder) = self.query_embedder.as_ref() else {
return;
};
match embedder.embed_query(raw_query) {
Ok(vec) => {
let literal = match serde_json::to_string(&vec) {
Ok(s) => s,
Err(err) => {
trace_warn!(
error = %err,
"query embedder vector serialization failed; skipping vector branch"
);
let _ = err; plan.was_degraded_at_plan_time = true;
return;
}
};
let strict = &plan.text.strict;
plan.vector = Some(CompiledVectorSearch {
root_kind: strict.root_kind.clone(),
query_text: literal,
limit: strict.limit,
fusable_filters: strict.fusable_filters.clone(),
residual_filters: strict.residual_filters.clone(),
attribution_requested: strict.attribution_requested,
});
}
Err(err) => {
trace_warn!(
error = %err,
"query embedder unavailable, skipping vector branch"
);
let _ = err; plan.was_degraded_at_plan_time = true;
}
}
}
#[allow(clippy::too_many_lines)]
fn run_search_branch(
&self,
compiled: &CompiledSearch,
branch: SearchBranch,
) -> Result<Vec<SearchHit>, EngineError> {
use std::fmt::Write as _;
if matches!(
compiled.text_query,
fathomdb_query::TextQuery::Empty | fathomdb_query::TextQuery::Not(_)
) {
return Ok(Vec::new());
}
let rendered_base = render_text_query_fts5(&compiled.text_query);
let strategy = self.fts_strategies.get(compiled.root_kind.as_str());
if matches!(strategy, Some(TokenizerStrategy::SubstringTrigram))
&& rendered_base
.chars()
.filter(|c| c.is_alphanumeric())
.count()
< 3
{
return Ok(Vec::new());
}
let rendered = rendered_base;
let filter_by_kind = !compiled.root_kind.is_empty();
let conn_guard = match self.lock_connection() {
Ok(g) => g,
Err(e) => {
self.telemetry.increment_errors();
return Err(e);
}
};
let prop_fts_tables: Vec<(String, String)> = if filter_by_kind {
let kind = compiled.root_kind.clone();
let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
let exists: bool = conn_guard
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![prop_table],
|_| Ok(true),
)
.optional()
.map_err(EngineError::Sqlite)?
.unwrap_or(false);
if exists {
vec![(kind, prop_table)]
} else {
vec![]
}
} else {
let kind_eq_values: Vec<String> = compiled
.fusable_filters
.iter()
.filter_map(|p| match p {
Predicate::KindEq(k) => Some(k.clone()),
_ => None,
})
.collect();
if kind_eq_values.len() == 1 {
let kind = kind_eq_values[0].clone();
let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
let exists: bool = conn_guard
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![prop_table],
|_| Ok(true),
)
.optional()
.map_err(EngineError::Sqlite)?
.unwrap_or(false);
if exists {
vec![(kind, prop_table)]
} else {
vec![]
}
} else {
let mut stmt = conn_guard
.prepare("SELECT kind FROM fts_property_schemas")
.map_err(EngineError::Sqlite)?;
let all_kinds: Vec<String> = stmt
.query_map([], |r| r.get::<_, String>(0))
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?;
drop(stmt);
let mut result = Vec::new();
for kind in all_kinds {
let prop_table = fathomdb_schema::fts_kind_table_name(&kind);
let exists: bool = conn_guard
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![prop_table],
|_| Ok(true),
)
.optional()
.map_err(EngineError::Sqlite)?
.unwrap_or(false);
if exists {
result.push((kind, prop_table));
}
}
result
}
};
let use_prop_fts = !prop_fts_tables.is_empty();
let mut binds: Vec<BindValue> = if filter_by_kind {
if use_prop_fts {
vec![
BindValue::Text(rendered.clone()),
BindValue::Text(compiled.root_kind.clone()),
BindValue::Text(rendered),
]
} else {
vec![
BindValue::Text(rendered.clone()),
BindValue::Text(compiled.root_kind.clone()),
]
}
} else if use_prop_fts {
vec![BindValue::Text(rendered.clone()), BindValue::Text(rendered)]
} else {
vec![BindValue::Text(rendered)]
};
let mut fused_clauses = String::new();
for predicate in &compiled.fusable_filters {
match predicate {
Predicate::KindEq(kind) => {
binds.push(BindValue::Text(kind.clone()));
let idx = binds.len();
let _ = write!(fused_clauses, "\n AND u.kind = ?{idx}");
}
Predicate::LogicalIdEq(logical_id) => {
binds.push(BindValue::Text(logical_id.clone()));
let idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND u.logical_id = ?{idx}"
);
}
Predicate::SourceRefEq(source_ref) => {
binds.push(BindValue::Text(source_ref.clone()));
let idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND u.source_ref = ?{idx}"
);
}
Predicate::ContentRefEq(uri) => {
binds.push(BindValue::Text(uri.clone()));
let idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND u.content_ref = ?{idx}"
);
}
Predicate::ContentRefNotNull => {
fused_clauses.push_str("\n AND u.content_ref IS NOT NULL");
}
Predicate::JsonPathFusedEq { path, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(BindValue::Text(value.clone()));
let value_idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
);
}
Predicate::JsonPathFusedTimestampCmp { path, op, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(BindValue::Integer(*value));
let value_idx = binds.len();
let operator = match op {
ComparisonOp::Gt => ">",
ComparisonOp::Gte => ">=",
ComparisonOp::Lt => "<",
ComparisonOp::Lte => "<=",
};
let _ = write!(
fused_clauses,
"\n AND json_extract(u.properties, ?{path_idx}) {operator} ?{value_idx}"
);
}
Predicate::JsonPathFusedBoolEq { path, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(BindValue::Integer(i64::from(*value)));
let value_idx = binds.len();
let _ = write!(
fused_clauses,
"\n AND json_extract(u.properties, ?{path_idx}) = ?{value_idx}"
);
}
Predicate::JsonPathFusedIn { path, values } => {
binds.push(BindValue::Text(path.clone()));
let first_param = binds.len();
for v in values {
binds.push(BindValue::Text(v.clone()));
}
let placeholders = (1..=values.len())
.map(|i| format!("?{}", first_param + i))
.collect::<Vec<_>>()
.join(", ");
let _ = write!(
fused_clauses,
"\n AND json_extract(u.properties, ?{first_param}) IN ({placeholders})"
);
}
Predicate::JsonPathEq { .. }
| Predicate::JsonPathCompare { .. }
| Predicate::JsonPathIn { .. }
| Predicate::EdgePropertyEq { .. }
| Predicate::EdgePropertyCompare { .. } => {
}
}
}
let mut filter_clauses = String::new();
for predicate in &compiled.residual_filters {
match predicate {
Predicate::JsonPathEq { path, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(scalar_to_bind(value));
let value_idx = binds.len();
let _ = write!(
filter_clauses,
"\n AND json_extract(h.properties, ?{path_idx}) = ?{value_idx}"
);
}
Predicate::JsonPathCompare { path, op, value } => {
binds.push(BindValue::Text(path.clone()));
let path_idx = binds.len();
binds.push(scalar_to_bind(value));
let value_idx = binds.len();
let operator = match op {
ComparisonOp::Gt => ">",
ComparisonOp::Gte => ">=",
ComparisonOp::Lt => "<",
ComparisonOp::Lte => "<=",
};
let _ = write!(
filter_clauses,
"\n AND json_extract(h.properties, ?{path_idx}) {operator} ?{value_idx}"
);
}
Predicate::JsonPathIn { path, values } => {
binds.push(BindValue::Text(path.clone()));
let first_param = binds.len();
for v in values {
binds.push(scalar_to_bind(v));
}
let placeholders = (1..=values.len())
.map(|i| format!("?{}", first_param + i))
.collect::<Vec<_>>()
.join(", ");
let _ = write!(
filter_clauses,
"\n AND json_extract(h.properties, ?{first_param}) IN ({placeholders})"
);
}
Predicate::KindEq(_)
| Predicate::LogicalIdEq(_)
| Predicate::SourceRefEq(_)
| Predicate::ContentRefEq(_)
| Predicate::ContentRefNotNull
| Predicate::JsonPathFusedEq { .. }
| Predicate::JsonPathFusedTimestampCmp { .. }
| Predicate::JsonPathFusedBoolEq { .. }
| Predicate::JsonPathFusedIn { .. }
| Predicate::EdgePropertyEq { .. }
| Predicate::EdgePropertyCompare { .. } => {
}
}
}
let limit = compiled.limit;
binds.push(BindValue::Integer(i64::try_from(limit).unwrap_or(i64::MAX)));
let limit_idx = binds.len();
let prop_bind_idx: usize = if filter_by_kind { 3 } else { 2 };
let prop_arm_sql: String = if use_prop_fts {
prop_fts_tables.iter().fold(String::new(), |mut acc, (kind, prop_table)| {
let bm25_expr = conn_guard
.query_row(
"SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
rusqlite::params![kind],
|r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
)
.ok()
.map_or_else(
|| format!("bm25({prop_table})"),
|(json, sep)| build_bm25_expr(prop_table, &json, &sep),
);
let is_weighted = bm25_expr != format!("bm25({prop_table})");
let snippet_expr = if is_weighted {
"'' AS snippet".to_owned()
} else {
"substr(fp.text_content, 1, 200) AS snippet".to_owned()
};
let _ = write!(
acc,
"
UNION ALL
SELECT
src.row_id AS row_id,
fp.node_logical_id AS logical_id,
src.kind AS kind,
src.properties AS properties,
src.source_ref AS source_ref,
src.content_ref AS content_ref,
src.created_at AS created_at,
-{bm25_expr} AS score,
'property' AS source,
{snippet_expr},
CAST(fp.rowid AS TEXT) AS projection_row_id
FROM {prop_table} fp
JOIN nodes src ON src.logical_id = fp.node_logical_id AND src.superseded_at IS NULL
WHERE {prop_table} MATCH ?{prop_bind_idx}"
);
acc
})
} else {
String::new()
};
let (chunk_fts_bind, chunk_kind_clause) = if filter_by_kind {
("?1", "\n AND src.kind = ?2")
} else {
("?1", "")
};
let sql = format!(
"WITH search_hits AS (
SELECT
u.row_id AS row_id,
u.logical_id AS logical_id,
u.kind AS kind,
u.properties AS properties,
u.source_ref AS source_ref,
u.content_ref AS content_ref,
u.created_at AS created_at,
u.score AS score,
u.source AS source,
u.snippet AS snippet,
u.projection_row_id AS projection_row_id
FROM (
SELECT
src.row_id AS row_id,
c.node_logical_id AS logical_id,
src.kind AS kind,
src.properties AS properties,
src.source_ref AS source_ref,
src.content_ref AS content_ref,
src.created_at AS created_at,
-bm25(fts_nodes) AS score,
'chunk' AS source,
snippet(fts_nodes, 3, '[', ']', '…', 32) AS snippet,
f.chunk_id AS projection_row_id
FROM fts_nodes f
JOIN chunks c ON c.id = f.chunk_id
JOIN nodes src ON src.logical_id = c.node_logical_id AND src.superseded_at IS NULL
WHERE fts_nodes MATCH {chunk_fts_bind}{chunk_kind_clause}{prop_arm_sql}
) u
WHERE 1 = 1{fused_clauses}
ORDER BY u.score DESC
LIMIT ?{limit_idx}
)
SELECT
h.row_id,
h.logical_id,
h.kind,
h.properties,
h.content_ref,
am.last_accessed_at,
h.created_at,
h.score,
h.source,
h.snippet,
h.projection_row_id
FROM search_hits h
LEFT JOIN node_access_metadata am ON am.logical_id = h.logical_id
WHERE 1 = 1{filter_clauses}
ORDER BY h.score DESC"
);
let bind_values = binds.iter().map(bind_value_to_sql).collect::<Vec<_>>();
let mut statement = match conn_guard.prepare_cached(&sql) {
Ok(stmt) => stmt,
Err(e) => {
self.telemetry.increment_errors();
return Err(EngineError::Sqlite(e));
}
};
let hits = match statement
.query_map(params_from_iter(bind_values.iter()), |row| {
let source_str: String = row.get(8)?;
let source = if source_str == "property" {
SearchHitSource::Property
} else {
SearchHitSource::Chunk
};
let match_mode = match branch {
SearchBranch::Strict => SearchMatchMode::Strict,
SearchBranch::Relaxed => SearchMatchMode::Relaxed,
};
Ok(SearchHit {
node: fathomdb_query::NodeRowLite {
row_id: row.get(0)?,
logical_id: row.get(1)?,
kind: row.get(2)?,
properties: row.get(3)?,
content_ref: row.get(4)?,
last_accessed_at: row.get(5)?,
},
written_at: row.get(6)?,
score: row.get(7)?,
modality: RetrievalModality::Text,
source,
match_mode: Some(match_mode),
snippet: row.get(9)?,
projection_row_id: row.get(10)?,
vector_distance: None,
attribution: None,
})
})
.and_then(Iterator::collect::<Result<Vec<_>, _>>)
{
Ok(rows) => rows,
Err(e) => {
self.telemetry.increment_errors();
return Err(EngineError::Sqlite(e));
}
};
drop(statement);
drop(conn_guard);
self.telemetry.increment_queries();
Ok(hits)
}
fn populate_attribution_for_hits(
&self,
hits: &mut [SearchHit],
strict_text_query: &fathomdb_query::TextQuery,
relaxed_text_query: Option<&fathomdb_query::TextQuery>,
) -> Result<(), EngineError> {
let conn_guard = match self.lock_connection() {
Ok(g) => g,
Err(e) => {
self.telemetry.increment_errors();
return Err(e);
}
};
let strict_expr = render_text_query_fts5(strict_text_query);
let relaxed_expr = relaxed_text_query.map(render_text_query_fts5);
for hit in hits.iter_mut() {
let match_expr = match hit.match_mode {
Some(SearchMatchMode::Strict) => strict_expr.as_str(),
Some(SearchMatchMode::Relaxed) => {
relaxed_expr.as_deref().unwrap_or(strict_expr.as_str())
}
None => continue,
};
match resolve_hit_attribution(&conn_guard, hit, match_expr) {
Ok(att) => hit.attribution = Some(att),
Err(e) => {
self.telemetry.increment_errors();
return Err(e);
}
}
}
Ok(())
}
pub fn execute_compiled_grouped_read(
&self,
compiled: &CompiledGroupedQuery,
) -> Result<GroupedQueryRows, EngineError> {
let root_rows = self.execute_compiled_read(&compiled.root)?;
if root_rows.was_degraded {
return Ok(GroupedQueryRows {
roots: Vec::new(),
expansions: Vec::new(),
edge_expansions: Vec::new(),
was_degraded: true,
});
}
let roots = root_rows.nodes;
let mut expansions = Vec::with_capacity(compiled.expansions.len());
for expansion in &compiled.expansions {
let slot_rows = if roots.is_empty() {
Vec::new()
} else {
self.read_expansion_nodes_chunked(&roots, expansion, compiled.hints.hard_limit)?
};
expansions.push(ExpansionSlotRows {
slot: expansion.slot.clone(),
roots: slot_rows,
});
}
let mut edge_expansions = Vec::with_capacity(compiled.edge_expansions.len());
for edge_expansion in &compiled.edge_expansions {
let slot_rows = if roots.is_empty() {
Vec::new()
} else {
self.read_edge_expansion_chunked(&roots, edge_expansion, compiled.hints.hard_limit)?
};
edge_expansions.push(EdgeExpansionSlotRows {
slot: edge_expansion.slot.clone(),
roots: slot_rows,
});
}
Ok(GroupedQueryRows {
roots,
expansions,
edge_expansions,
was_degraded: false,
})
}
fn read_expansion_nodes_chunked(
&self,
roots: &[NodeRow],
expansion: &ExpansionSlot,
hard_limit: usize,
) -> Result<Vec<ExpansionRootRows>, EngineError> {
if roots.len() <= BATCH_CHUNK_SIZE {
return self.read_expansion_nodes_batched(roots, expansion, hard_limit);
}
let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
for group in self.read_expansion_nodes_batched(chunk, expansion, hard_limit)? {
per_root
.entry(group.root_logical_id)
.or_default()
.extend(group.nodes);
}
}
Ok(roots
.iter()
.map(|root| ExpansionRootRows {
root_logical_id: root.logical_id.clone(),
nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
})
.collect())
}
#[allow(clippy::too_many_lines)]
fn read_expansion_nodes_batched(
&self,
roots: &[NodeRow],
expansion: &ExpansionSlot,
hard_limit: usize,
) -> Result<Vec<ExpansionRootRows>, EngineError> {
let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
let (join_condition, next_logical_id) = match expansion.direction {
fathomdb_query::TraverseDirection::Out => {
("e.source_logical_id = t.logical_id", "e.target_logical_id")
}
fathomdb_query::TraverseDirection::In => {
("e.target_logical_id = t.logical_id", "e.source_logical_id")
}
};
if expansion.filter.as_ref().is_some_and(|f| {
matches!(
f,
Predicate::JsonPathFusedEq { .. }
| Predicate::JsonPathFusedTimestampCmp { .. }
| Predicate::JsonPathFusedIn { .. }
)
}) {
self.validate_fused_filter_for_edge_label(&expansion.label)?;
}
let root_seed_union: String = (1..=root_ids.len())
.map(|i| format!("SELECT ?{i}"))
.collect::<Vec<_>>()
.join(" UNION ALL ");
let edge_kind_param = root_ids.len() + 1;
let edge_filter_param_start = root_ids.len() + 2;
let (edge_filter_sql, edge_filter_binds) =
compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
let filter_param_start = edge_filter_param_start + edge_filter_binds.len();
let (filter_sql, filter_binds) =
compile_expansion_filter(expansion.filter.as_ref(), filter_param_start);
let sql = format!(
"WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
traversed(root_id, logical_id, depth, visited, emitted) AS (
SELECT rid, rid, 0, printf(',%s,', rid), 0
FROM root_ids
UNION ALL
SELECT
t.root_id,
{next_logical_id},
t.depth + 1,
t.visited || {next_logical_id} || ',',
t.emitted + 1
FROM traversed t
JOIN edges e ON {join_condition}
AND e.kind = ?{edge_kind_param}
AND e.superseded_at IS NULL{edge_filter_sql}
WHERE t.depth < {max_depth}
AND t.emitted < {hard_limit}
AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
),
numbered AS (
SELECT t.root_id, n.row_id, n.logical_id, n.kind, n.properties
, n.content_ref, am.last_accessed_at
, ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id) AS rn
FROM traversed t
JOIN nodes n ON n.logical_id = t.logical_id
AND n.superseded_at IS NULL
LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
WHERE t.depth > 0{filter_sql}
)
SELECT root_id, row_id, logical_id, kind, properties, content_ref, last_accessed_at
FROM numbered
WHERE rn <= {hard_limit}
ORDER BY root_id, logical_id",
max_depth = expansion.max_depth,
);
let conn_guard = self.lock_connection()?;
let mut statement = conn_guard
.prepare_cached(&sql)
.map_err(EngineError::Sqlite)?;
let mut bind_values: Vec<Value> = root_ids
.iter()
.map(|id| Value::Text((*id).to_owned()))
.collect();
bind_values.push(Value::Text(expansion.label.clone()));
bind_values.extend(edge_filter_binds);
bind_values.extend(filter_binds);
let rows = statement
.query_map(params_from_iter(bind_values.iter()), |row| {
Ok((
row.get::<_, String>(0)?, NodeRow {
row_id: row.get(1)?,
logical_id: row.get(2)?,
kind: row.get(3)?,
properties: row.get(4)?,
content_ref: row.get(5)?,
last_accessed_at: row.get(6)?,
},
))
})
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?;
let mut per_root: HashMap<String, Vec<NodeRow>> = HashMap::new();
for (root_id, node) in rows {
per_root.entry(root_id).or_default().push(node);
}
let root_groups = roots
.iter()
.map(|root| ExpansionRootRows {
root_logical_id: root.logical_id.clone(),
nodes: per_root.remove(&root.logical_id).unwrap_or_default(),
})
.collect();
Ok(root_groups)
}
fn read_edge_expansion_chunked(
&self,
roots: &[NodeRow],
expansion: &EdgeExpansionSlot,
hard_limit: usize,
) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
if roots.len() <= BATCH_CHUNK_SIZE {
return self.read_edge_expansion_batched(roots, expansion, hard_limit);
}
let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
for chunk in roots.chunks(BATCH_CHUNK_SIZE) {
for group in self.read_edge_expansion_batched(chunk, expansion, hard_limit)? {
per_root
.entry(group.root_logical_id)
.or_default()
.extend(group.pairs);
}
}
Ok(roots
.iter()
.map(|root| EdgeExpansionRootRows {
root_logical_id: root.logical_id.clone(),
pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
})
.collect())
}
#[allow(clippy::too_many_lines)]
fn read_edge_expansion_batched(
&self,
roots: &[NodeRow],
expansion: &EdgeExpansionSlot,
hard_limit: usize,
) -> Result<Vec<EdgeExpansionRootRows>, EngineError> {
let root_ids: Vec<&str> = roots.iter().map(|r| r.logical_id.as_str()).collect();
let (join_condition, next_logical_id) = match expansion.direction {
fathomdb_query::TraverseDirection::Out => {
("e.source_logical_id = t.logical_id", "e.target_logical_id")
}
fathomdb_query::TraverseDirection::In => {
("e.target_logical_id = t.logical_id", "e.source_logical_id")
}
};
if expansion.endpoint_filter.as_ref().is_some_and(|f| {
matches!(
f,
Predicate::JsonPathFusedEq { .. }
| Predicate::JsonPathFusedTimestampCmp { .. }
| Predicate::JsonPathFusedIn { .. }
)
}) {
self.validate_fused_filter_for_edge_label(&expansion.label)?;
}
let root_seed_union: String = (1..=root_ids.len())
.map(|i| format!("SELECT ?{i}"))
.collect::<Vec<_>>()
.join(" UNION ALL ");
let edge_kind_param = root_ids.len() + 1;
let edge_filter_param_start = root_ids.len() + 2;
let (edge_filter_sql, edge_filter_binds) =
compile_edge_filter(expansion.edge_filter.as_ref(), edge_filter_param_start);
let endpoint_filter_param_start = edge_filter_param_start + edge_filter_binds.len();
let (endpoint_filter_sql, endpoint_filter_binds) = compile_expansion_filter(
expansion.endpoint_filter.as_ref(),
endpoint_filter_param_start,
);
let sql = format!(
"WITH RECURSIVE root_ids(rid) AS ({root_seed_union}),
traversed(root_id, logical_id, depth, visited, emitted, edge_row_id) AS (
SELECT rid, rid, 0, printf(',%s,', rid), 0, NULL AS edge_row_id
FROM root_ids
UNION ALL
SELECT
t.root_id,
{next_logical_id},
t.depth + 1,
t.visited || {next_logical_id} || ',',
t.emitted + 1,
e.row_id AS edge_row_id
FROM traversed t
JOIN edges e ON {join_condition}
AND e.kind = ?{edge_kind_param}
AND e.superseded_at IS NULL{edge_filter_sql}
WHERE t.depth < {max_depth}
AND t.emitted < {hard_limit}
AND instr(t.visited, printf(',%s,', {next_logical_id})) = 0
),
numbered AS (
SELECT t.root_id,
e.row_id AS e_row_id,
e.logical_id AS e_logical_id,
e.source_logical_id AS e_source,
e.target_logical_id AS e_target,
e.kind AS e_kind,
e.properties AS e_properties,
e.source_ref AS e_source_ref,
e.confidence AS e_confidence,
n.row_id AS n_row_id,
n.logical_id AS n_logical_id,
n.kind AS n_kind,
n.properties AS n_properties,
n.content_ref AS n_content_ref,
am.last_accessed_at AS n_last_accessed_at,
ROW_NUMBER() OVER (PARTITION BY t.root_id ORDER BY n.logical_id, e.row_id) AS rn
FROM traversed t
JOIN edges e ON e.row_id = t.edge_row_id
JOIN nodes n ON n.logical_id = t.logical_id
AND n.superseded_at IS NULL
LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id
WHERE t.depth > 0{endpoint_filter_sql}
)
SELECT root_id,
e_row_id, e_logical_id, e_source, e_target, e_kind, e_properties,
e_source_ref, e_confidence,
n_row_id, n_logical_id, n_kind, n_properties, n_content_ref,
n_last_accessed_at
FROM numbered
WHERE rn <= {hard_limit}
ORDER BY root_id, n_logical_id, e_row_id",
max_depth = expansion.max_depth,
);
let conn_guard = self.lock_connection()?;
let mut statement = conn_guard
.prepare_cached(&sql)
.map_err(EngineError::Sqlite)?;
let mut bind_values: Vec<Value> = root_ids
.iter()
.map(|id| Value::Text((*id).to_owned()))
.collect();
bind_values.push(Value::Text(expansion.label.clone()));
bind_values.extend(edge_filter_binds);
bind_values.extend(endpoint_filter_binds);
let rows = statement
.query_map(params_from_iter(bind_values.iter()), |row| {
let root_id: String = row.get(0)?;
let edge_row = EdgeRow {
row_id: row.get(1)?,
logical_id: row.get(2)?,
source_logical_id: row.get(3)?,
target_logical_id: row.get(4)?,
kind: row.get(5)?,
properties: row.get(6)?,
source_ref: row.get(7)?,
confidence: row.get(8)?,
};
let node_row = NodeRow {
row_id: row.get(9)?,
logical_id: row.get(10)?,
kind: row.get(11)?,
properties: row.get(12)?,
content_ref: row.get(13)?,
last_accessed_at: row.get(14)?,
};
Ok((root_id, edge_row, node_row))
})
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?;
let mut per_root: HashMap<String, Vec<(EdgeRow, NodeRow)>> = HashMap::new();
for (root_id, edge, node) in rows {
per_root.entry(root_id).or_default().push((edge, node));
}
let root_groups = roots
.iter()
.map(|root| EdgeExpansionRootRows {
root_logical_id: root.logical_id.clone(),
pairs: per_root.remove(&root.logical_id).unwrap_or_default(),
})
.collect();
Ok(root_groups)
}
fn validate_fused_filter_for_edge_label(&self, edge_label: &str) -> Result<(), EngineError> {
let conn = self.lock_connection()?;
let mut stmt = conn
.prepare_cached(
"SELECT DISTINCT n.kind \
FROM edges e \
JOIN nodes n ON n.logical_id = e.target_logical_id \
WHERE e.kind = ?1 AND e.superseded_at IS NULL",
)
.map_err(EngineError::Sqlite)?;
let target_kinds: Vec<String> = stmt
.query_map(rusqlite::params![edge_label], |row| row.get(0))
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?;
for kind in &target_kinds {
let has_schema: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM fts_property_schemas WHERE kind = ?1",
rusqlite::params![kind],
|row| row.get(0),
)
.map_err(EngineError::Sqlite)?;
if !has_schema {
return Err(EngineError::InvalidConfig(format!(
"kind {kind:?} has no registered property-FTS schema; register one with \
admin.register_fts_property_schema(..) before using fused filters on \
expansion slots, or use JsonPathEq for non-fused semantics \
(expand slot uses edge label {edge_label:?})"
)));
}
}
Ok(())
}
pub fn read_run(&self, id: &str) -> Result<Option<RunRow>, EngineError> {
let conn = self.lock_connection()?;
conn.query_row(
"SELECT id, kind, status, properties FROM runs WHERE id = ?1",
rusqlite::params![id],
|row| {
Ok(RunRow {
id: row.get(0)?,
kind: row.get(1)?,
status: row.get(2)?,
properties: row.get(3)?,
})
},
)
.optional()
.map_err(EngineError::Sqlite)
}
pub fn read_step(&self, id: &str) -> Result<Option<StepRow>, EngineError> {
let conn = self.lock_connection()?;
conn.query_row(
"SELECT id, run_id, kind, status, properties FROM steps WHERE id = ?1",
rusqlite::params![id],
|row| {
Ok(StepRow {
id: row.get(0)?,
run_id: row.get(1)?,
kind: row.get(2)?,
status: row.get(3)?,
properties: row.get(4)?,
})
},
)
.optional()
.map_err(EngineError::Sqlite)
}
pub fn read_action(&self, id: &str) -> Result<Option<ActionRow>, EngineError> {
let conn = self.lock_connection()?;
conn.query_row(
"SELECT id, step_id, kind, status, properties FROM actions WHERE id = ?1",
rusqlite::params![id],
|row| {
Ok(ActionRow {
id: row.get(0)?,
step_id: row.get(1)?,
kind: row.get(2)?,
status: row.get(3)?,
properties: row.get(4)?,
})
},
)
.optional()
.map_err(EngineError::Sqlite)
}
pub fn read_active_runs(&self) -> Result<Vec<RunRow>, EngineError> {
let conn = self.lock_connection()?;
let mut stmt = conn
.prepare_cached(
"SELECT id, kind, status, properties FROM runs WHERE superseded_at IS NULL",
)
.map_err(EngineError::Sqlite)?;
let rows = stmt
.query_map([], |row| {
Ok(RunRow {
id: row.get(0)?,
kind: row.get(1)?,
status: row.get(2)?,
properties: row.get(3)?,
})
})
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?;
Ok(rows)
}
#[must_use]
#[allow(clippy::expect_used)]
pub fn shape_sql_count(&self) -> usize {
self.shape_sql_map
.lock()
.unwrap_or_else(PoisonError::into_inner)
.len()
}
#[must_use]
pub fn schema_manager(&self) -> Arc<SchemaManager> {
Arc::clone(&self.schema_manager)
}
#[must_use]
pub fn explain_compiled_read(&self, compiled: &CompiledQuery) -> QueryPlan {
let cache_hit = self
.shape_sql_map
.lock()
.unwrap_or_else(PoisonError::into_inner)
.contains_key(&compiled.shape_hash);
QueryPlan {
sql: wrap_node_row_projection_sql(&compiled.sql),
bind_count: compiled.binds.len(),
driving_table: compiled.driving_table,
shape_hash: compiled.shape_hash,
cache_hit,
}
}
#[doc(hidden)]
pub fn raw_pragma(&self, name: &str) -> Result<String, EngineError> {
let conn = self.lock_connection()?;
let result = conn
.query_row(&format!("PRAGMA {name}"), [], |row| {
row.get::<_, rusqlite::types::Value>(0)
})
.map_err(EngineError::Sqlite)?;
let s = match result {
rusqlite::types::Value::Text(t) => t,
rusqlite::types::Value::Integer(i) => i.to_string(),
rusqlite::types::Value::Real(f) => f.to_string(),
rusqlite::types::Value::Blob(_) => {
return Err(EngineError::InvalidWrite(format!(
"PRAGMA {name} returned an unexpected BLOB value"
)));
}
rusqlite::types::Value::Null => String::new(),
};
Ok(s)
}
pub fn query_provenance_events(
&self,
subject: &str,
) -> Result<Vec<ProvenanceEvent>, EngineError> {
let conn = self.lock_connection()?;
let mut stmt = conn
.prepare_cached(
"SELECT id, event_type, subject, source_ref, metadata_json, created_at \
FROM provenance_events WHERE subject = ?1 ORDER BY created_at",
)
.map_err(EngineError::Sqlite)?;
let events = stmt
.query_map(rusqlite::params![subject], |row| {
Ok(ProvenanceEvent {
id: row.get(0)?,
event_type: row.get(1)?,
subject: row.get(2)?,
source_ref: row.get(3)?,
metadata_json: row.get(4)?,
created_at: row.get(5)?,
})
})
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?;
Ok(events)
}
fn scan_fallback_if_first_registration(
&self,
kind: &str,
) -> Result<Option<Vec<NodeRow>>, EngineError> {
let conn = self.lock_connection()?;
let prop_table = fathomdb_schema::fts_kind_table_name(kind);
let table_exists: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![prop_table],
|_| Ok(true),
)
.optional()?
.unwrap_or(false);
let prop_empty = if table_exists {
let cnt: i64 =
conn.query_row(&format!("SELECT COUNT(*) FROM {prop_table}"), [], |r| {
r.get(0)
})?;
cnt == 0
} else {
true
};
let needs_scan: bool = if prop_empty {
conn.query_row(
"SELECT 1 FROM fts_property_rebuild_state \
WHERE kind = ?1 AND is_first_registration = 1 \
AND state IN ('PENDING','BUILDING','SWAPPING') \
LIMIT 1",
rusqlite::params![kind],
|_| Ok(true),
)
.optional()?
.unwrap_or(false)
} else {
false
};
if !needs_scan {
return Ok(None);
}
let mut stmt = conn
.prepare_cached(
"SELECT n.row_id, n.logical_id, n.kind, n.properties, n.content_ref, \
am.last_accessed_at \
FROM nodes n \
LEFT JOIN node_access_metadata am ON am.logical_id = n.logical_id \
WHERE n.kind = ?1 AND n.superseded_at IS NULL",
)
.map_err(EngineError::Sqlite)?;
let nodes = stmt
.query_map(rusqlite::params![kind], |row| {
Ok(NodeRow {
row_id: row.get(0)?,
logical_id: row.get(1)?,
kind: row.get(2)?,
properties: row.get(3)?,
content_ref: row.get(4)?,
last_accessed_at: row.get(5)?,
})
})
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?;
Ok(Some(nodes))
}
pub fn get_property_fts_rebuild_progress(
&self,
kind: &str,
) -> Result<Option<crate::rebuild_actor::RebuildProgress>, EngineError> {
let conn = self.lock_connection()?;
let row = conn
.query_row(
"SELECT state, rows_total, rows_done, started_at, last_progress_at, error_message \
FROM fts_property_rebuild_state WHERE kind = ?1",
rusqlite::params![kind],
|r| {
Ok(crate::rebuild_actor::RebuildProgress {
state: r.get(0)?,
rows_total: r.get(1)?,
rows_done: r.get(2)?,
started_at: r.get(3)?,
last_progress_at: r.get(4)?,
error_message: r.get(5)?,
})
},
)
.optional()?;
Ok(row)
}
}
fn adapt_fts_nodes_sql_for_per_kind_tables(
compiled: &CompiledQuery,
conn: &rusqlite::Connection,
) -> Result<(String, Vec<BindValue>), EngineError> {
let root_kind = compiled
.binds
.get(1)
.and_then(|b| {
if let BindValue::Text(k) = b {
Some(k.as_str())
} else {
None
}
})
.unwrap_or("");
let prop_table = fathomdb_schema::fts_kind_table_name(root_kind);
let prop_table_exists: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![prop_table],
|_| Ok(true),
)
.optional()
.map_err(EngineError::Sqlite)?
.unwrap_or(false);
Ok(compiled.adapt_fts_for_kind(prop_table_exists, &prop_table))
}
#[allow(clippy::unnecessary_wraps)]
fn check_vec_identity_at_open(
conn: &rusqlite::Connection,
embedder: &dyn QueryEmbedder,
) -> Result<(), EngineError> {
let row: Option<String> = conn
.query_row(
"SELECT config_json FROM projection_profiles WHERE kind='*' AND facet='vec'",
[],
|row| row.get(0),
)
.optional()
.unwrap_or(None);
let Some(config_json) = row else {
return Ok(());
};
let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&config_json) else {
return Ok(());
};
let identity = embedder.identity();
if let Some(stored_model) = parsed
.get("model_identity")
.and_then(serde_json::Value::as_str)
&& stored_model != identity.model_identity
{
trace_warn!(
stored_model_identity = stored_model,
embedder_model_identity = %identity.model_identity,
"vec identity mismatch at open: model_identity differs"
);
}
if let Some(stored_dim) = parsed.get("dimensions").and_then(serde_json::Value::as_u64) {
let stored_dim = usize::try_from(stored_dim).unwrap_or(usize::MAX);
if stored_dim != identity.dimension {
trace_warn!(
stored_dimensions = stored_dim,
embedder_dimensions = identity.dimension,
"vec identity mismatch at open: dimensions differ"
);
}
}
Ok(())
}
fn run_open_time_fts_guards(conn: &mut rusqlite::Connection) -> Result<(), EngineError> {
let schema_count: i64 = conn
.query_row("SELECT COUNT(*) FROM fts_property_schemas", [], |row| {
row.get(0)
})
.map_err(EngineError::Sqlite)?;
if schema_count == 0 {
return Ok(());
}
let needs_fts_rebuild = open_guard_check_fts_empty(conn)?;
let needs_position_backfill = if needs_fts_rebuild {
false
} else {
open_guard_check_positions_empty(conn)?
};
if needs_fts_rebuild || needs_position_backfill {
let per_kind_tables: Vec<String> = {
let mut stmt = conn
.prepare(
"SELECT name FROM sqlite_master \
WHERE type='table' AND name LIKE 'fts_props_%' \
AND sql LIKE 'CREATE VIRTUAL TABLE%'",
)
.map_err(EngineError::Sqlite)?;
stmt.query_map([], |r| r.get::<_, String>(0))
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?
};
let tx = conn
.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
.map_err(EngineError::Sqlite)?;
for table in &per_kind_tables {
tx.execute_batch(&format!("DELETE FROM {table}"))
.map_err(EngineError::Sqlite)?;
}
tx.execute("DELETE FROM fts_node_property_positions", [])
.map_err(EngineError::Sqlite)?;
crate::projection::insert_property_fts_rows(
&tx,
"SELECT logical_id, properties FROM nodes \
WHERE kind = ?1 AND superseded_at IS NULL",
)
.map_err(EngineError::Sqlite)?;
tx.commit().map_err(EngineError::Sqlite)?;
}
Ok(())
}
fn open_guard_check_fts_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
let kinds: Vec<String> = {
let mut stmt = conn
.prepare("SELECT kind FROM fts_property_schemas")
.map_err(EngineError::Sqlite)?;
stmt.query_map([], |row| row.get::<_, String>(0))
.map_err(EngineError::Sqlite)?
.collect::<Result<Vec<_>, _>>()
.map_err(EngineError::Sqlite)?
};
for kind in &kinds {
let table = fathomdb_schema::fts_kind_table_name(kind);
let table_exists: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![table],
|_| Ok(true),
)
.optional()
.map_err(EngineError::Sqlite)?
.unwrap_or(false);
let fts_count: i64 = if table_exists {
conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
row.get(0)
})
.map_err(EngineError::Sqlite)?
} else {
0
};
if fts_count == 0 {
let node_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
rusqlite::params![kind],
|row| row.get(0),
)
.map_err(EngineError::Sqlite)?;
if node_count > 0 {
return Ok(true);
}
}
}
Ok(false)
}
fn open_guard_check_positions_empty(conn: &rusqlite::Connection) -> Result<bool, EngineError> {
let recursive_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM fts_property_schemas \
WHERE property_paths_json LIKE '%\"mode\":\"recursive\"%'",
[],
|row| row.get(0),
)
.map_err(EngineError::Sqlite)?;
if recursive_count == 0 {
return Ok(false);
}
let pos_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM fts_node_property_positions",
[],
|row| row.get(0),
)
.map_err(EngineError::Sqlite)?;
Ok(pos_count == 0)
}
fn wrap_node_row_projection_sql(base_sql: &str) -> String {
format!(
"SELECT q.row_id, q.logical_id, q.kind, q.properties, q.content_ref, am.last_accessed_at \
FROM ({base_sql}) q \
LEFT JOIN node_access_metadata am ON am.logical_id = q.logical_id"
)
}
pub(crate) fn is_vec_table_absent(err: &rusqlite::Error) -> bool {
match err {
rusqlite::Error::SqliteFailure(_, Some(msg)) => {
(msg.contains("no such table: vec_") && !msg.contains("vec_embedding"))
|| msg.contains("no such module: vec0")
}
_ => false,
}
}
fn scalar_to_bind(value: &ScalarValue) -> BindValue {
match value {
ScalarValue::Text(text) => BindValue::Text(text.clone()),
ScalarValue::Integer(integer) => BindValue::Integer(*integer),
ScalarValue::Bool(boolean) => BindValue::Bool(*boolean),
}
}
fn merge_search_branches(
strict: Vec<SearchHit>,
relaxed: Vec<SearchHit>,
limit: usize,
) -> Vec<SearchHit> {
merge_search_branches_three(strict, relaxed, Vec::new(), limit)
}
fn merge_search_branches_three(
strict: Vec<SearchHit>,
relaxed: Vec<SearchHit>,
vector: Vec<SearchHit>,
limit: usize,
) -> Vec<SearchHit> {
let strict_block = dedup_branch_hits(strict);
let relaxed_block = dedup_branch_hits(relaxed);
let vector_block = dedup_branch_hits(vector);
let mut seen: std::collections::HashSet<String> = strict_block
.iter()
.map(|h| h.node.logical_id.clone())
.collect();
let mut merged = strict_block;
for hit in relaxed_block {
if seen.insert(hit.node.logical_id.clone()) {
merged.push(hit);
}
}
for hit in vector_block {
if seen.insert(hit.node.logical_id.clone()) {
merged.push(hit);
}
}
if merged.len() > limit {
merged.truncate(limit);
}
merged
}
fn dedup_branch_hits(mut hits: Vec<SearchHit>) -> Vec<SearchHit> {
hits.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.node.logical_id.cmp(&b.node.logical_id))
.then_with(|| source_priority(a.source).cmp(&source_priority(b.source)))
});
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
hits.retain(|hit| seen.insert(hit.node.logical_id.clone()));
hits
}
fn source_priority(source: SearchHitSource) -> u8 {
match source {
SearchHitSource::Chunk => 0,
SearchHitSource::Property => 1,
SearchHitSource::Vector => 2,
}
}
const ATTRIBUTION_HIGHLIGHT_OPEN: &str = "\x01";
const ATTRIBUTION_HIGHLIGHT_CLOSE: &str = "\x02";
fn load_position_map(
conn: &Connection,
logical_id: &str,
kind: &str,
) -> Result<Vec<(usize, usize, String)>, EngineError> {
let mut stmt = conn
.prepare_cached(
"SELECT start_offset, end_offset, leaf_path \
FROM fts_node_property_positions \
WHERE node_logical_id = ?1 AND kind = ?2 \
ORDER BY start_offset ASC",
)
.map_err(EngineError::Sqlite)?;
let rows = stmt
.query_map(rusqlite::params![logical_id, kind], |row| {
let start: i64 = row.get(0)?;
let end: i64 = row.get(1)?;
let path: String = row.get(2)?;
let start = usize::try_from(start).unwrap_or(0);
let end = usize::try_from(end).unwrap_or(0);
Ok((start, end, path))
})
.map_err(EngineError::Sqlite)?;
let mut out = Vec::new();
for row in rows {
out.push(row.map_err(EngineError::Sqlite)?);
}
Ok(out)
}
fn parse_highlight_offsets(wrapped: &str, open: &str, close: &str) -> Vec<usize> {
let mut offsets = Vec::new();
let bytes = wrapped.as_bytes();
let open_bytes = open.as_bytes();
let close_bytes = close.as_bytes();
let mut i = 0usize;
let mut marker_bytes_seen = 0usize;
while i < bytes.len() {
if bytes[i..].starts_with(open_bytes) {
let original_offset = i - marker_bytes_seen;
offsets.push(original_offset);
i += open_bytes.len();
marker_bytes_seen += open_bytes.len();
} else if bytes[i..].starts_with(close_bytes) {
i += close_bytes.len();
marker_bytes_seen += close_bytes.len();
} else {
i += 1;
}
}
offsets
}
fn find_leaf_for_offset(positions: &[(usize, usize, String)], offset: usize) -> Option<&str> {
let idx = match positions.binary_search_by(|entry| entry.0.cmp(&offset)) {
Ok(i) => i,
Err(0) => return None,
Err(i) => i - 1,
};
let (start, end, path) = &positions[idx];
if offset >= *start && offset < *end {
Some(path.as_str())
} else {
None
}
}
fn resolve_hit_attribution(
conn: &Connection,
hit: &SearchHit,
match_expr: &str,
) -> Result<HitAttribution, EngineError> {
if matches!(hit.source, SearchHitSource::Chunk) {
return Ok(HitAttribution {
matched_paths: vec!["text_content".to_owned()],
});
}
if !matches!(hit.source, SearchHitSource::Property) {
return Ok(HitAttribution {
matched_paths: Vec::new(),
});
}
let Some(rowid_str) = hit.projection_row_id.as_deref() else {
return Ok(HitAttribution {
matched_paths: Vec::new(),
});
};
let rowid: i64 = match rowid_str.parse() {
Ok(v) => v,
Err(_) => {
return Ok(HitAttribution {
matched_paths: Vec::new(),
});
}
};
let prop_table = fathomdb_schema::fts_kind_table_name(&hit.node.kind);
let highlight_sql = format!(
"SELECT highlight({prop_table}, 1, ?1, ?2) \
FROM {prop_table} \
WHERE rowid = ?3 AND {prop_table} MATCH ?4"
);
let mut stmt = conn.prepare(&highlight_sql).map_err(EngineError::Sqlite)?;
let wrapped: Option<String> = stmt
.query_row(
rusqlite::params![
ATTRIBUTION_HIGHLIGHT_OPEN,
ATTRIBUTION_HIGHLIGHT_CLOSE,
rowid,
match_expr,
],
|row| row.get(0),
)
.optional()
.map_err(EngineError::Sqlite)?;
let Some(wrapped) = wrapped else {
return Ok(HitAttribution {
matched_paths: Vec::new(),
});
};
let offsets = parse_highlight_offsets(
&wrapped,
ATTRIBUTION_HIGHLIGHT_OPEN,
ATTRIBUTION_HIGHLIGHT_CLOSE,
);
if offsets.is_empty() {
return Ok(HitAttribution {
matched_paths: Vec::new(),
});
}
let positions = load_position_map(conn, &hit.node.logical_id, &hit.node.kind)?;
if positions.is_empty() {
return Ok(HitAttribution {
matched_paths: Vec::new(),
});
}
let mut matched_paths: Vec<String> = Vec::new();
for offset in offsets {
if let Some(path) = find_leaf_for_offset(&positions, offset)
&& !matched_paths.iter().any(|p| p == path)
{
matched_paths.push(path.to_owned());
}
}
Ok(HitAttribution { matched_paths })
}
fn build_bm25_expr(table: &str, schema_json: &str, sep: &str) -> String {
let schema = crate::writer::parse_property_schema_json(schema_json, sep);
let any_weighted = schema.paths.iter().any(|p| p.weight.is_some());
if !any_weighted {
return format!("bm25({table})");
}
let weights: Vec<String> = std::iter::once("0.0".to_owned())
.chain(
schema
.paths
.iter()
.map(|p| format!("{:.1}", p.weight.unwrap_or(1.0))),
)
.collect();
format!("bm25({table}, {})", weights.join(", "))
}
fn bind_value_to_sql(value: &fathomdb_query::BindValue) -> Value {
match value {
fathomdb_query::BindValue::Text(text) => Value::Text(text.clone()),
fathomdb_query::BindValue::Integer(integer) => Value::Integer(*integer),
fathomdb_query::BindValue::Bool(boolean) => Value::Integer(i64::from(*boolean)),
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;
use fathomdb_query::{BindValue, QueryBuilder};
use fathomdb_schema::SchemaManager;
use rusqlite::types::Value;
use tempfile::NamedTempFile;
use crate::{EngineError, ExecutionCoordinator, TelemetryCounters};
use fathomdb_query::{
NodeRowLite, RetrievalModality, SearchHit, SearchHitSource, SearchMatchMode,
};
use super::{
bind_value_to_sql, is_vec_table_absent, merge_search_branches, merge_search_branches_three,
wrap_node_row_projection_sql,
};
fn mk_hit(
logical_id: &str,
score: f64,
match_mode: SearchMatchMode,
source: SearchHitSource,
) -> SearchHit {
SearchHit {
node: NodeRowLite {
row_id: format!("{logical_id}-row"),
logical_id: logical_id.to_owned(),
kind: "Goal".to_owned(),
properties: "{}".to_owned(),
content_ref: None,
last_accessed_at: None,
},
score,
modality: RetrievalModality::Text,
source,
match_mode: Some(match_mode),
snippet: None,
written_at: 0,
projection_row_id: None,
vector_distance: None,
attribution: None,
}
}
#[test]
fn merge_places_strict_block_before_relaxed_regardless_of_score() {
let strict = vec![mk_hit(
"a",
1.0,
SearchMatchMode::Strict,
SearchHitSource::Chunk,
)];
let relaxed = vec![mk_hit(
"b",
9.9,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
)];
let merged = merge_search_branches(strict, relaxed, 10);
assert_eq!(merged.len(), 2);
assert_eq!(merged[0].node.logical_id, "a");
assert!(matches!(
merged[0].match_mode,
Some(SearchMatchMode::Strict)
));
assert_eq!(merged[1].node.logical_id, "b");
assert!(matches!(
merged[1].match_mode,
Some(SearchMatchMode::Relaxed)
));
}
#[test]
fn merge_dedup_keeps_strict_over_relaxed_for_same_logical_id() {
let strict = vec![mk_hit(
"shared",
1.0,
SearchMatchMode::Strict,
SearchHitSource::Chunk,
)];
let relaxed = vec![
mk_hit(
"shared",
9.9,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
),
mk_hit(
"other",
2.0,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
),
];
let merged = merge_search_branches(strict, relaxed, 10);
assert_eq!(merged.len(), 2);
assert_eq!(merged[0].node.logical_id, "shared");
assert!(matches!(
merged[0].match_mode,
Some(SearchMatchMode::Strict)
));
assert_eq!(merged[1].node.logical_id, "other");
assert!(matches!(
merged[1].match_mode,
Some(SearchMatchMode::Relaxed)
));
}
#[test]
fn merge_sorts_within_block_by_score_desc_then_logical_id() {
let strict = vec![
mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
mk_hit("c", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
];
let merged = merge_search_branches(strict, vec![], 10);
assert_eq!(
merged
.iter()
.map(|h| &h.node.logical_id)
.collect::<Vec<_>>(),
vec!["a", "c", "b"]
);
}
#[test]
fn merge_dedup_within_branch_prefers_chunk_over_property_at_equal_score() {
let strict = vec![
mk_hit(
"shared",
1.0,
SearchMatchMode::Strict,
SearchHitSource::Property,
),
mk_hit(
"shared",
1.0,
SearchMatchMode::Strict,
SearchHitSource::Chunk,
),
];
let merged = merge_search_branches(strict, vec![], 10);
assert_eq!(merged.len(), 1);
assert!(matches!(merged[0].source, SearchHitSource::Chunk));
}
#[test]
fn merge_truncates_to_limit_after_block_merge() {
let strict = vec![
mk_hit("a", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
mk_hit("b", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
];
let relaxed = vec![mk_hit(
"c",
9.0,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
)];
let merged = merge_search_branches(strict, relaxed, 2);
assert_eq!(merged.len(), 2);
assert_eq!(merged[0].node.logical_id, "a");
assert_eq!(merged[1].node.logical_id, "b");
}
#[test]
fn search_architecturally_supports_three_branch_fusion() {
let strict = vec![mk_hit(
"alpha",
1.0,
SearchMatchMode::Strict,
SearchHitSource::Chunk,
)];
let relaxed = vec![mk_hit(
"bravo",
5.0,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
)];
let mut vector_hit = mk_hit(
"charlie",
9.9,
SearchMatchMode::Strict,
SearchHitSource::Vector,
);
vector_hit.match_mode = None;
vector_hit.modality = RetrievalModality::Vector;
let vector = vec![vector_hit];
let merged = merge_search_branches_three(strict, relaxed, vector, 10);
assert_eq!(merged.len(), 3);
assert_eq!(merged[0].node.logical_id, "alpha");
assert_eq!(merged[1].node.logical_id, "bravo");
assert_eq!(merged[2].node.logical_id, "charlie");
assert!(matches!(merged[2].source, SearchHitSource::Vector));
let strict2 = vec![mk_hit(
"shared",
0.5,
SearchMatchMode::Strict,
SearchHitSource::Chunk,
)];
let relaxed2 = vec![mk_hit(
"shared",
5.0,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
)];
let mut vshared = mk_hit(
"shared",
9.9,
SearchMatchMode::Strict,
SearchHitSource::Vector,
);
vshared.match_mode = None;
vshared.modality = RetrievalModality::Vector;
let merged2 = merge_search_branches_three(strict2, relaxed2, vec![vshared], 10);
assert_eq!(merged2.len(), 1, "shared logical_id must dedup to one row");
assert!(matches!(
merged2[0].match_mode,
Some(SearchMatchMode::Strict)
));
assert!(matches!(merged2[0].source, SearchHitSource::Chunk));
let mut vshared2 = mk_hit(
"shared",
9.9,
SearchMatchMode::Strict,
SearchHitSource::Vector,
);
vshared2.match_mode = None;
vshared2.modality = RetrievalModality::Vector;
let merged3 = merge_search_branches_three(
vec![],
vec![mk_hit(
"shared",
1.0,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
)],
vec![vshared2],
10,
);
assert_eq!(merged3.len(), 1);
assert!(matches!(
merged3[0].match_mode,
Some(SearchMatchMode::Relaxed)
));
}
#[test]
fn merge_search_branches_three_vector_only_preserves_vector_block() {
let mut vector_hit = mk_hit(
"solo",
0.75,
SearchMatchMode::Strict,
SearchHitSource::Vector,
);
vector_hit.match_mode = None;
vector_hit.modality = RetrievalModality::Vector;
let merged = merge_search_branches_three(vec![], vec![], vec![vector_hit], 10);
assert_eq!(merged.len(), 1);
assert_eq!(merged[0].node.logical_id, "solo");
assert!(matches!(merged[0].source, SearchHitSource::Vector));
assert!(matches!(merged[0].modality, RetrievalModality::Vector));
assert!(
merged[0].match_mode.is_none(),
"vector hits carry match_mode=None per addendum 1"
);
}
#[test]
fn merge_search_branches_three_limit_truncates_preserving_block_precedence() {
let strict = vec![
mk_hit("a", 3.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
mk_hit("b", 2.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
mk_hit("c", 1.0, SearchMatchMode::Strict, SearchHitSource::Chunk),
];
let relaxed = vec![mk_hit(
"d",
9.0,
SearchMatchMode::Relaxed,
SearchHitSource::Chunk,
)];
let mut vector_hit = mk_hit("e", 9.5, SearchMatchMode::Strict, SearchHitSource::Vector);
vector_hit.match_mode = None;
vector_hit.modality = RetrievalModality::Vector;
let vector = vec![vector_hit];
let merged = merge_search_branches_three(strict, relaxed, vector, 2);
assert_eq!(merged.len(), 2);
assert_eq!(merged[0].node.logical_id, "a");
assert_eq!(merged[1].node.logical_id, "b");
assert!(
merged
.iter()
.all(|h| matches!(h.match_mode, Some(SearchMatchMode::Strict))),
"strict block must win limit contention against higher-scored relaxed/vector hits"
);
assert!(
merged
.iter()
.all(|h| matches!(h.source, SearchHitSource::Chunk)),
"no vector source hits should leak past the limit"
);
}
#[test]
fn is_vec_table_absent_matches_known_error_messages() {
use rusqlite::ffi;
fn make_err(msg: &str) -> rusqlite::Error {
rusqlite::Error::SqliteFailure(
ffi::Error {
code: ffi::ErrorCode::Unknown,
extended_code: 1,
},
Some(msg.to_owned()),
)
}
assert!(is_vec_table_absent(&make_err(
"no such table: vec_nodes_active"
)));
assert!(is_vec_table_absent(&make_err("no such module: vec0")));
assert!(!is_vec_table_absent(&make_err("vec0 constraint violated")));
assert!(!is_vec_table_absent(&make_err("no such table: nodes")));
assert!(!is_vec_table_absent(&rusqlite::Error::QueryReturnedNoRows));
}
#[test]
fn vector_search_uses_per_kind_table_and_degrades_when_table_absent() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = QueryBuilder::nodes("MyKind")
.vector_search("some query", 5)
.compile()
.expect("vector query compiles");
let rows = coordinator
.execute_compiled_read(&compiled)
.expect("degraded read must succeed");
assert!(
rows.was_degraded,
"must degrade when vec_mykind table does not exist"
);
assert!(
rows.nodes.is_empty(),
"degraded result must return empty nodes"
);
}
#[test]
fn bind_value_text_maps_to_sql_text() {
let val = bind_value_to_sql(&BindValue::Text("hello".to_owned()));
assert_eq!(val, Value::Text("hello".to_owned()));
}
#[test]
fn bind_value_integer_maps_to_sql_integer() {
let val = bind_value_to_sql(&BindValue::Integer(42));
assert_eq!(val, Value::Integer(42));
}
#[test]
fn bind_value_bool_true_maps_to_integer_one() {
let val = bind_value_to_sql(&BindValue::Bool(true));
assert_eq!(val, Value::Integer(1));
}
#[test]
fn bind_value_bool_false_maps_to_integer_zero() {
let val = bind_value_to_sql(&BindValue::Bool(false));
assert_eq!(val, Value::Integer(0));
}
#[test]
fn same_shape_queries_share_one_cache_entry() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled_a = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.limit(10)
.compile()
.expect("compiled a");
let compiled_b = QueryBuilder::nodes("Meeting")
.text_search("standup", 5)
.limit(10)
.compile()
.expect("compiled b");
coordinator
.execute_compiled_read(&compiled_a)
.expect("read a");
coordinator
.execute_compiled_read(&compiled_b)
.expect("read b");
assert_eq!(
compiled_a.shape_hash, compiled_b.shape_hash,
"different bind values, same structural shape → same hash"
);
assert_eq!(coordinator.shape_sql_count(), 1);
}
#[test]
fn vector_read_degrades_gracefully_when_vec_table_absent() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = QueryBuilder::nodes("Meeting")
.vector_search("budget embeddings", 5)
.compile()
.expect("vector query compiles");
let result = coordinator.execute_compiled_read(&compiled);
let rows = result.expect("degraded read must succeed, not error");
assert!(
rows.was_degraded,
"result must be flagged as degraded when vec_nodes_active is absent"
);
assert!(
rows.nodes.is_empty(),
"degraded result must return empty nodes"
);
}
#[test]
fn coordinator_caches_by_shape_hash() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.compile()
.expect("compiled query");
coordinator
.execute_compiled_read(&compiled)
.expect("execute compiled read");
assert_eq!(coordinator.shape_sql_count(), 1);
}
#[test]
fn explain_returns_correct_sql() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.compile()
.expect("compiled query");
let plan = coordinator.explain_compiled_read(&compiled);
assert_eq!(plan.sql, wrap_node_row_projection_sql(&compiled.sql));
}
#[test]
fn explain_returns_correct_driving_table() {
use fathomdb_query::DrivingTable;
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.compile()
.expect("compiled query");
let plan = coordinator.explain_compiled_read(&compiled);
assert_eq!(plan.driving_table, DrivingTable::FtsNodes);
}
#[test]
fn explain_reports_cache_miss_then_hit() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.compile()
.expect("compiled query");
let plan_before = coordinator.explain_compiled_read(&compiled);
assert!(
!plan_before.cache_hit,
"cache miss expected before first execute"
);
coordinator
.execute_compiled_read(&compiled)
.expect("execute read");
let plan_after = coordinator.explain_compiled_read(&compiled);
assert!(
plan_after.cache_hit,
"cache hit expected after first execute"
);
}
#[test]
fn explain_does_not_execute_query() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("anything", 5)
.compile()
.expect("compiled query");
let plan = coordinator.explain_compiled_read(&compiled);
assert!(!plan.sql.is_empty(), "plan must carry the SQL text");
assert_eq!(plan.bind_count, compiled.binds.len());
}
#[test]
fn coordinator_executes_compiled_read() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute_batch(
r#"
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
VALUES ('row-1', 'meeting-1', 'Meeting', '{"status":"active"}', unixepoch());
INSERT INTO chunks (id, node_logical_id, text_content, created_at)
VALUES ('chunk-1', 'meeting-1', 'budget discussion', unixepoch());
INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
VALUES ('chunk-1', 'meeting-1', 'Meeting', 'budget discussion');
"#,
)
.expect("seed data");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.limit(5)
.compile()
.expect("compiled query");
let rows = coordinator
.execute_compiled_read(&compiled)
.expect("execute read");
assert_eq!(rows.nodes.len(), 1);
assert_eq!(rows.nodes[0].logical_id, "meeting-1");
}
#[test]
fn text_search_finds_structured_only_node_via_property_fts() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute_batch(
"CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
node_logical_id UNINDEXED, text_content, \
tokenize = 'porter unicode61 remove_diacritics 2'\
)",
)
.expect("create per-kind fts table");
conn.execute_batch(
r#"
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
VALUES ('row-1', 'goal-1', 'Goal', '{"name":"Ship v2"}', 100, 'seed');
INSERT INTO fts_props_goal (node_logical_id, text_content)
VALUES ('goal-1', 'Ship v2');
"#,
)
.expect("seed data");
let compiled = QueryBuilder::nodes("Goal")
.text_search("Ship", 5)
.limit(5)
.compile()
.expect("compiled query");
let rows = coordinator
.execute_compiled_read(&compiled)
.expect("execute read");
assert_eq!(rows.nodes.len(), 1);
assert_eq!(rows.nodes[0].logical_id, "goal-1");
}
#[test]
fn text_search_returns_both_chunk_and_property_backed_hits() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute_batch(
r"
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
INSERT INTO chunks (id, node_logical_id, text_content, created_at)
VALUES ('chunk-1', 'meeting-1', 'quarterly budget review', 100);
INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
VALUES ('chunk-1', 'meeting-1', 'Meeting', 'quarterly budget review');
",
)
.expect("seed chunk-backed node");
conn.execute_batch(
"CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_meeting USING fts5(\
node_logical_id UNINDEXED, text_content, \
tokenize = 'porter unicode61 remove_diacritics 2'\
)",
)
.expect("create per-kind fts table");
conn.execute_batch(
r#"
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
VALUES ('row-2', 'meeting-2', 'Meeting', '{"title":"quarterly sync"}', 100, 'seed');
INSERT INTO fts_props_meeting (node_logical_id, text_content)
VALUES ('meeting-2', 'quarterly sync');
"#,
)
.expect("seed property-backed node");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("quarterly", 10)
.limit(10)
.compile()
.expect("compiled query");
let rows = coordinator
.execute_compiled_read(&compiled)
.expect("execute read");
let mut ids: Vec<&str> = rows.nodes.iter().map(|r| r.logical_id.as_str()).collect();
ids.sort_unstable();
assert_eq!(ids, vec!["meeting-1", "meeting-2"]);
}
#[test]
fn text_search_finds_literal_lowercase_not_text_in_chunk_content() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute_batch(
r"
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref)
VALUES ('row-1', 'meeting-1', 'Meeting', '{}', 100, 'seed');
INSERT INTO chunks (id, node_logical_id, text_content, created_at)
VALUES ('chunk-1', 'meeting-1', 'the boat is not a ship', 100);
INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
VALUES ('chunk-1', 'meeting-1', 'Meeting', 'the boat is not a ship');
",
)
.expect("seed chunk-backed node");
let compiled = QueryBuilder::nodes("Meeting")
.text_search("not a ship", 10)
.limit(10)
.compile()
.expect("compiled query");
let rows = coordinator
.execute_compiled_read(&compiled)
.expect("execute read");
assert_eq!(rows.nodes.len(), 1);
assert_eq!(rows.nodes[0].logical_id, "meeting-1");
}
#[test]
fn capability_gate_reports_false_without_feature() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
assert!(
!coordinator.vector_enabled(),
"vector_enabled must be false when no dimension is requested"
);
}
#[cfg(feature = "sqlite-vec")]
#[test]
fn capability_gate_reports_true_when_feature_enabled() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
Some(128),
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
assert!(
coordinator.vector_enabled(),
"vector_enabled must be true when sqlite-vec feature is active and dimension is set"
);
}
#[test]
fn read_run_returns_inserted_run() {
use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
let db = NamedTempFile::new().expect("temporary db");
let writer = WriterActor::start(
db.path(),
Arc::new(SchemaManager::new()),
ProvenanceMode::Warn,
Arc::new(TelemetryCounters::default()),
)
.expect("writer");
writer
.submit(WriteRequest {
label: "runtime".to_owned(),
nodes: vec![],
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![RunInsert {
id: "run-r1".to_owned(),
kind: "session".to_owned(),
status: "active".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-1".to_owned()),
upsert: false,
supersedes_id: None,
}],
steps: vec![],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
})
.expect("write run");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let row = coordinator
.read_run("run-r1")
.expect("read_run")
.expect("row exists");
assert_eq!(row.id, "run-r1");
assert_eq!(row.kind, "session");
assert_eq!(row.status, "active");
}
#[test]
fn read_step_returns_inserted_step() {
use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor, writer::StepInsert};
let db = NamedTempFile::new().expect("temporary db");
let writer = WriterActor::start(
db.path(),
Arc::new(SchemaManager::new()),
ProvenanceMode::Warn,
Arc::new(TelemetryCounters::default()),
)
.expect("writer");
writer
.submit(WriteRequest {
label: "runtime".to_owned(),
nodes: vec![],
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![RunInsert {
id: "run-s1".to_owned(),
kind: "session".to_owned(),
status: "active".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-1".to_owned()),
upsert: false,
supersedes_id: None,
}],
steps: vec![StepInsert {
id: "step-s1".to_owned(),
run_id: "run-s1".to_owned(),
kind: "llm".to_owned(),
status: "completed".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-1".to_owned()),
upsert: false,
supersedes_id: None,
}],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
})
.expect("write step");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let row = coordinator
.read_step("step-s1")
.expect("read_step")
.expect("row exists");
assert_eq!(row.id, "step-s1");
assert_eq!(row.run_id, "run-s1");
assert_eq!(row.kind, "llm");
}
#[test]
fn read_action_returns_inserted_action() {
use crate::{
ProvenanceMode, RunInsert, WriteRequest, WriterActor,
writer::{ActionInsert, StepInsert},
};
let db = NamedTempFile::new().expect("temporary db");
let writer = WriterActor::start(
db.path(),
Arc::new(SchemaManager::new()),
ProvenanceMode::Warn,
Arc::new(TelemetryCounters::default()),
)
.expect("writer");
writer
.submit(WriteRequest {
label: "runtime".to_owned(),
nodes: vec![],
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![RunInsert {
id: "run-a1".to_owned(),
kind: "session".to_owned(),
status: "active".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-1".to_owned()),
upsert: false,
supersedes_id: None,
}],
steps: vec![StepInsert {
id: "step-a1".to_owned(),
run_id: "run-a1".to_owned(),
kind: "llm".to_owned(),
status: "completed".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-1".to_owned()),
upsert: false,
supersedes_id: None,
}],
actions: vec![ActionInsert {
id: "action-a1".to_owned(),
step_id: "step-a1".to_owned(),
kind: "emit".to_owned(),
status: "completed".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-1".to_owned()),
upsert: false,
supersedes_id: None,
}],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
})
.expect("write action");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let row = coordinator
.read_action("action-a1")
.expect("read_action")
.expect("row exists");
assert_eq!(row.id, "action-a1");
assert_eq!(row.step_id, "step-a1");
assert_eq!(row.kind, "emit");
}
#[test]
fn read_active_runs_excludes_superseded() {
use crate::{ProvenanceMode, RunInsert, WriteRequest, WriterActor};
let db = NamedTempFile::new().expect("temporary db");
let writer = WriterActor::start(
db.path(),
Arc::new(SchemaManager::new()),
ProvenanceMode::Warn,
Arc::new(TelemetryCounters::default()),
)
.expect("writer");
writer
.submit(WriteRequest {
label: "v1".to_owned(),
nodes: vec![],
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![RunInsert {
id: "run-v1".to_owned(),
kind: "session".to_owned(),
status: "active".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-1".to_owned()),
upsert: false,
supersedes_id: None,
}],
steps: vec![],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
})
.expect("v1 write");
writer
.submit(WriteRequest {
label: "v2".to_owned(),
nodes: vec![],
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![RunInsert {
id: "run-v2".to_owned(),
kind: "session".to_owned(),
status: "completed".to_owned(),
properties: "{}".to_owned(),
source_ref: Some("src-2".to_owned()),
upsert: true,
supersedes_id: Some("run-v1".to_owned()),
}],
steps: vec![],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
})
.expect("v2 write");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let active = coordinator.read_active_runs().expect("read_active_runs");
assert_eq!(active.len(), 1, "only the non-superseded run should appear");
assert_eq!(active[0].id, "run-v2");
}
#[allow(clippy::panic)]
fn poison_connection(coordinator: &ExecutionCoordinator) {
let result = catch_unwind(AssertUnwindSafe(|| {
let _guard = coordinator.pool.connections[0]
.lock()
.expect("poison test lock");
panic!("poison coordinator connection mutex");
}));
assert!(
result.is_err(),
"poison test must unwind while holding the connection mutex"
);
}
#[allow(clippy::panic)]
fn assert_poisoned_connection_error<T, F>(coordinator: &ExecutionCoordinator, op: F)
where
F: FnOnce(&ExecutionCoordinator) -> Result<T, EngineError>,
{
match op(coordinator) {
Err(EngineError::Bridge(message)) => {
assert_eq!(message, "connection mutex poisoned");
}
Ok(_) => panic!("expected poisoned connection error, got Ok(_)"),
Err(error) => panic!("expected poisoned connection error, got {error:?}"),
}
}
#[test]
fn poisoned_connection_returns_bridge_error_for_read_helpers() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
poison_connection(&coordinator);
assert_poisoned_connection_error(&coordinator, |c| c.read_run("run-r1"));
assert_poisoned_connection_error(&coordinator, |c| c.read_step("step-s1"));
assert_poisoned_connection_error(&coordinator, |c| c.read_action("action-a1"));
assert_poisoned_connection_error(
&coordinator,
super::ExecutionCoordinator::read_active_runs,
);
assert_poisoned_connection_error(&coordinator, |c| c.raw_pragma("journal_mode"));
assert_poisoned_connection_error(&coordinator, |c| c.query_provenance_events("source-1"));
}
#[test]
fn shape_cache_stays_bounded() {
use fathomdb_query::ShapeHash;
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
{
let mut cache = coordinator.shape_sql_map.lock().expect("lock shape cache");
for i in 0..=super::MAX_SHAPE_CACHE_SIZE {
cache.insert(ShapeHash(i as u64), format!("SELECT {i}"));
}
}
let compiled = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.limit(10)
.compile()
.expect("compiled query");
coordinator
.execute_compiled_read(&compiled)
.expect("execute read");
assert!(
coordinator.shape_sql_count() <= super::MAX_SHAPE_CACHE_SIZE,
"shape cache must stay bounded: got {} entries, max {}",
coordinator.shape_sql_count(),
super::MAX_SHAPE_CACHE_SIZE
);
}
#[test]
fn read_pool_size_configurable() {
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
2,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator with pool_size=2");
assert_eq!(coordinator.pool.size(), 2);
let compiled = QueryBuilder::nodes("Meeting")
.text_search("budget", 5)
.limit(10)
.compile()
.expect("compiled query");
let result = coordinator.execute_compiled_read(&compiled);
assert!(result.is_ok(), "read through pool must succeed");
}
#[test]
fn grouped_read_results_match_baseline() {
use fathomdb_query::TraverseDirection;
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
{
let conn = rusqlite::Connection::open(db.path()).expect("open db for seeding");
for i in 0..10 {
conn.execute_batch(&format!(
r#"
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
VALUES ('row-meeting-{i}', 'meeting-{i}', 'Meeting', '{{"n":{i}}}', unixepoch());
INSERT INTO chunks (id, node_logical_id, text_content, created_at)
VALUES ('chunk-m-{i}', 'meeting-{i}', 'meeting search text {i}', unixepoch());
INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
VALUES ('chunk-m-{i}', 'meeting-{i}', 'Meeting', 'meeting search text {i}');
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
VALUES ('row-task-{i}-a', 'task-{i}-a', 'Task', '{{"parent":{i},"sub":"a"}}', unixepoch());
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
VALUES ('row-task-{i}-b', 'task-{i}-b', 'Task', '{{"parent":{i},"sub":"b"}}', unixepoch());
INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
VALUES ('edge-{i}-a', 'edge-lid-{i}-a', 'meeting-{i}', 'task-{i}-a', 'HAS_TASK', '{{}}', unixepoch());
INSERT INTO edges (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at)
VALUES ('edge-{i}-b', 'edge-lid-{i}-b', 'meeting-{i}', 'task-{i}-b', 'HAS_TASK', '{{}}', unixepoch());
"#,
)).expect("seed data");
}
}
let compiled = QueryBuilder::nodes("Meeting")
.text_search("meeting", 10)
.expand("tasks", TraverseDirection::Out, "HAS_TASK", 1, None, None)
.limit(10)
.compile_grouped()
.expect("compiled grouped query");
let result = coordinator
.execute_compiled_grouped_read(&compiled)
.expect("grouped read");
assert!(!result.was_degraded, "grouped read should not be degraded");
assert_eq!(result.roots.len(), 10, "expected 10 root nodes");
assert_eq!(result.expansions.len(), 1, "expected 1 expansion slot");
assert_eq!(result.expansions[0].slot, "tasks");
assert_eq!(
result.expansions[0].roots.len(),
10,
"each expansion slot should have entries for all 10 roots"
);
for root_expansion in &result.expansions[0].roots {
assert_eq!(
root_expansion.nodes.len(),
2,
"root {} should have 2 expansion nodes, got {}",
root_expansion.root_logical_id,
root_expansion.nodes.len()
);
}
}
#[test]
fn build_bm25_expr_no_weights() {
let schema_json = r#"["$.title","$.body"]"#;
let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
assert_eq!(result, "bm25(fts_props_testkind)");
}
#[test]
fn build_bm25_expr_with_weights() {
let schema_json = r#"[{"path":"$.title","mode":"scalar","weight":10.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
let result = super::build_bm25_expr("fts_props_testkind", schema_json, " ");
assert_eq!(result, "bm25(fts_props_testkind, 0.0, 10.0, 1.0)");
}
#[test]
#[allow(clippy::too_many_lines)]
fn weighted_schema_bm25_orders_title_match_above_body_match() {
use crate::{
AdminService, FtsPropertyPathSpec, NodeInsert, ProvenanceMode, WriteRequest,
WriterActor, writer::ChunkPolicy,
};
use fathomdb_schema::fts_column_name;
let db = NamedTempFile::new().expect("temporary db");
let schema_manager = Arc::new(SchemaManager::new());
{
let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
admin
.register_fts_property_schema_with_entries(
"Article",
&[
FtsPropertyPathSpec::scalar("$.title").with_weight(10.0),
FtsPropertyPathSpec::scalar("$.body").with_weight(1.0),
],
None,
&[],
crate::rebuild_actor::RebuildMode::Eager,
)
.expect("register schema with weights");
}
let writer = WriterActor::start(
db.path(),
Arc::clone(&schema_manager),
ProvenanceMode::Warn,
Arc::new(TelemetryCounters::default()),
)
.expect("writer");
writer
.submit(WriteRequest {
label: "insert-a".to_owned(),
nodes: vec![NodeInsert {
row_id: "row-a".to_owned(),
logical_id: "article-a".to_owned(),
kind: "Article".to_owned(),
properties: r#"{"title":"rust","body":"other"}"#.to_owned(),
source_ref: Some("src-a".to_owned()),
upsert: false,
chunk_policy: ChunkPolicy::Preserve,
content_ref: None,
}],
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![],
steps: vec![],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
})
.expect("write node A");
writer
.submit(WriteRequest {
label: "insert-b".to_owned(),
nodes: vec![NodeInsert {
row_id: "row-b".to_owned(),
logical_id: "article-b".to_owned(),
kind: "Article".to_owned(),
properties: r#"{"title":"other","body":"rust"}"#.to_owned(),
source_ref: Some("src-b".to_owned()),
upsert: false,
chunk_policy: ChunkPolicy::Preserve,
content_ref: None,
}],
node_retires: vec![],
edges: vec![],
edge_retires: vec![],
chunks: vec![],
runs: vec![],
steps: vec![],
actions: vec![],
optional_backfills: vec![],
vec_inserts: vec![],
operational_writes: vec![],
})
.expect("write node B");
drop(writer);
{
let title_col = fts_column_name("$.title", false);
let body_col = fts_column_name("$.body", false);
let conn = rusqlite::Connection::open(db.path()).expect("open db");
let count: i64 = conn
.query_row("SELECT count(*) FROM fts_props_article", [], |r| r.get(0))
.expect("count fts rows");
assert_eq!(count, 2, "both nodes must have FTS rows in per-kind table");
let (title_a, body_a): (String, String) = conn
.query_row(
&format!(
"SELECT {title_col}, {body_col} FROM fts_props_article \
WHERE node_logical_id = 'article-a'"
),
[],
|r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
)
.expect("select article-a");
assert_eq!(
title_a, "rust",
"article-a must have 'rust' in title column"
);
assert_eq!(
body_a, "other",
"article-a must have 'other' in body column"
);
}
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::clone(&schema_manager),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let compiled = fathomdb_query::QueryBuilder::nodes("Article")
.text_search("rust", 5)
.limit(10)
.compile()
.expect("compiled query");
let rows = coordinator
.execute_compiled_read(&compiled)
.expect("execute read");
assert_eq!(rows.nodes.len(), 2, "both nodes must be returned");
assert_eq!(
rows.nodes[0].logical_id, "article-a",
"article-a (title match, weight 10) must rank above article-b (body match, weight 1)"
);
}
#[test]
fn property_fts_hit_matched_paths_from_positions() {
use crate::{AdminService, rebuild_actor::RebuildMode};
use fathomdb_query::compile_search;
let db = NamedTempFile::new().expect("temporary db");
let schema_manager = Arc::new(SchemaManager::new());
{
let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
admin
.register_fts_property_schema_with_entries(
"Item",
&[
crate::FtsPropertyPathSpec::scalar("$.body"),
crate::FtsPropertyPathSpec::scalar("$.title"),
],
None,
&[],
RebuildMode::Eager,
)
.expect("register Item FTS schema");
}
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::clone(&schema_manager),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let conn = rusqlite::Connection::open(db.path()).expect("open db");
let blob = format!("other{}searchterm", crate::writer::LEAF_SEPARATOR);
assert_eq!(
crate::writer::LEAF_SEPARATOR.len(),
29,
"LEAF_SEPARATOR length changed; update position offsets"
);
conn.execute(
"INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
VALUES ('r1', 'item-1', 'Item', '{\"title\":\"searchterm\",\"body\":\"other\"}', 100)",
[],
)
.expect("insert node");
conn.execute(
"INSERT INTO fts_props_item (node_logical_id, text_content) \
VALUES ('item-1', ?1)",
rusqlite::params![blob],
)
.expect("insert fts row");
conn.execute(
"INSERT INTO fts_node_property_positions \
(node_logical_id, kind, start_offset, end_offset, leaf_path) \
VALUES ('item-1', 'Item', 0, 5, '$.body')",
[],
)
.expect("insert body position");
conn.execute(
"INSERT INTO fts_node_property_positions \
(node_logical_id, kind, start_offset, end_offset, leaf_path) \
VALUES ('item-1', 'Item', 34, 44, '$.title')",
[],
)
.expect("insert title position");
let ast = QueryBuilder::nodes("Item").text_search("searchterm", 10);
let mut compiled = compile_search(ast.ast()).expect("compile search");
compiled.attribution_requested = true;
let rows = coordinator
.execute_compiled_search(&compiled)
.expect("search");
assert!(!rows.hits.is_empty(), "expected at least one hit");
let hit = rows
.hits
.iter()
.find(|h| h.node.logical_id == "item-1")
.expect("item-1 must be in hits");
let att = hit
.attribution
.as_ref()
.expect("attribution must be Some when attribution_requested");
assert!(
att.matched_paths.contains(&"$.title".to_owned()),
"matched_paths must contain '$.title', got {:?}",
att.matched_paths,
);
assert!(
!att.matched_paths.contains(&"$.body".to_owned()),
"matched_paths must NOT contain '$.body', got {:?}",
att.matched_paths,
);
}
#[test]
fn vector_hit_has_no_attribution() {
use fathomdb_query::compile_vector_search;
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let ast = QueryBuilder::nodes("Document").vector_search("[1.0, 0.0]", 5);
let mut compiled = compile_vector_search(ast.ast()).expect("compile vector search");
compiled.attribution_requested = true;
let rows = coordinator
.execute_compiled_vector_search(&compiled)
.expect("vector search must not error");
assert!(
rows.was_degraded,
"vector search without vec table must degrade"
);
for hit in &rows.hits {
assert!(
hit.attribution.is_none(),
"vector hits must carry attribution = None, got {:?}",
hit.attribution
);
}
}
#[test]
fn chunk_hit_has_text_content_attribution() {
use fathomdb_query::compile_search;
let db = NamedTempFile::new().expect("temporary db");
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::new(SchemaManager::new()),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute_batch(
r"
INSERT INTO nodes (row_id, logical_id, kind, properties, created_at)
VALUES ('r1', 'chunk-node', 'Goal', '{}', 100);
INSERT INTO chunks (id, node_logical_id, text_content, created_at)
VALUES ('c1', 'chunk-node', 'uniquesentinelterm', 100);
INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
VALUES ('c1', 'chunk-node', 'Goal', 'uniquesentinelterm');
",
)
.expect("seed chunk node");
let ast = QueryBuilder::nodes("Goal").text_search("uniquesentinelterm", 10);
let mut compiled = compile_search(ast.ast()).expect("compile search");
compiled.attribution_requested = true;
let rows = coordinator
.execute_compiled_search(&compiled)
.expect("search");
assert!(!rows.hits.is_empty(), "expected chunk hit");
let hit = rows
.hits
.iter()
.find(|h| matches!(h.source, SearchHitSource::Chunk))
.expect("must have a Chunk hit");
let att = hit
.attribution
.as_ref()
.expect("attribution must be Some when attribution_requested");
assert_eq!(
att.matched_paths,
vec!["text_content".to_owned()],
"chunk matched_paths must be [\"text_content\"], got {:?}",
att.matched_paths,
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn mixed_kind_results_get_per_kind_matched_paths() {
use crate::{AdminService, rebuild_actor::RebuildMode};
use fathomdb_query::compile_search;
let db = NamedTempFile::new().expect("temporary db");
let schema_manager = Arc::new(SchemaManager::new());
{
let admin = AdminService::new(db.path(), Arc::clone(&schema_manager));
admin
.register_fts_property_schema_with_entries(
"KindA",
&[crate::FtsPropertyPathSpec::scalar("$.alpha")],
None,
&[],
RebuildMode::Eager,
)
.expect("register KindA FTS schema");
admin
.register_fts_property_schema_with_entries(
"KindB",
&[crate::FtsPropertyPathSpec::scalar("$.beta")],
None,
&[],
RebuildMode::Eager,
)
.expect("register KindB FTS schema");
}
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::clone(&schema_manager),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator");
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute(
"INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
VALUES ('rA', 'node-a', 'KindA', '{\"alpha\":\"xenoterm\"}', 100)",
[],
)
.expect("insert KindA node");
conn.execute(
"INSERT INTO fts_props_kinda (node_logical_id, text_content) \
VALUES ('node-a', 'xenoterm')",
[],
)
.expect("insert KindA fts row");
conn.execute(
"INSERT INTO fts_node_property_positions \
(node_logical_id, kind, start_offset, end_offset, leaf_path) \
VALUES ('node-a', 'KindA', 0, 8, '$.alpha')",
[],
)
.expect("insert KindA position");
conn.execute(
"INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
VALUES ('rB', 'node-b', 'KindB', '{\"beta\":\"xenoterm\"}', 100)",
[],
)
.expect("insert KindB node");
conn.execute(
"INSERT INTO fts_props_kindb (node_logical_id, text_content) \
VALUES ('node-b', 'xenoterm')",
[],
)
.expect("insert KindB fts row");
conn.execute(
"INSERT INTO fts_node_property_positions \
(node_logical_id, kind, start_offset, end_offset, leaf_path) \
VALUES ('node-b', 'KindB', 0, 8, '$.beta')",
[],
)
.expect("insert KindB position");
let ast = QueryBuilder::nodes("").text_search("xenoterm", 10);
let mut compiled = compile_search(ast.ast()).expect("compile search");
compiled.attribution_requested = true;
let rows = coordinator
.execute_compiled_search(&compiled)
.expect("search");
assert!(
rows.hits.len() >= 2,
"expected hits for both kinds, got {}",
rows.hits.len()
);
for hit in &rows.hits {
let att = hit
.attribution
.as_ref()
.expect("attribution must be Some when attribution_requested");
match hit.node.kind.as_str() {
"KindA" => {
assert_eq!(
att.matched_paths,
vec!["$.alpha".to_owned()],
"KindA hit must have matched_paths=['$.alpha'], got {:?}",
att.matched_paths,
);
}
"KindB" => {
assert_eq!(
att.matched_paths,
vec!["$.beta".to_owned()],
"KindB hit must have matched_paths=['$.beta'], got {:?}",
att.matched_paths,
);
}
other => {
assert_eq!(other, "KindA", "unexpected kind in result: {other}");
}
}
}
}
#[test]
fn tokenizer_strategy_from_str() {
use super::TokenizerStrategy;
assert_eq!(
TokenizerStrategy::from_str("porter unicode61 remove_diacritics 2"),
TokenizerStrategy::RecallOptimizedEnglish,
);
assert_eq!(
TokenizerStrategy::from_str("unicode61 remove_diacritics 2"),
TokenizerStrategy::PrecisionOptimized,
);
assert_eq!(
TokenizerStrategy::from_str("trigram"),
TokenizerStrategy::SubstringTrigram,
);
assert_eq!(
TokenizerStrategy::from_str("icu"),
TokenizerStrategy::GlobalCjk,
);
assert_eq!(
TokenizerStrategy::from_str("unicode61 tokenchars '.+-'"),
TokenizerStrategy::SourceCode,
);
assert_eq!(
TokenizerStrategy::from_str("unicode61 tokenchars '._-$@'"),
TokenizerStrategy::SourceCode,
);
assert_eq!(
TokenizerStrategy::from_str("my_custom_tokenizer"),
TokenizerStrategy::Custom("my_custom_tokenizer".to_owned()),
);
}
#[test]
fn trigram_short_query_returns_empty() {
use fathomdb_query::compile_search;
let db = NamedTempFile::new().expect("temporary db");
let schema_manager = Arc::new(SchemaManager::new());
{
let bootstrap = ExecutionCoordinator::open(
db.path(),
Arc::clone(&schema_manager),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("bootstrap coordinator");
drop(bootstrap);
}
{
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute_batch(
"INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
VALUES ('Snippet', 'fts', '{\"tokenizer\":\"trigram\"}');",
)
.expect("insert profile");
}
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::clone(&schema_manager),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator reopen");
let ast = QueryBuilder::nodes("Snippet").text_search("ab", 10);
let compiled = compile_search(ast.ast()).expect("compile search");
let rows = coordinator
.execute_compiled_search(&compiled)
.expect("short trigram query must not error");
assert!(
rows.hits.is_empty(),
"2-char trigram query must return empty"
);
}
#[test]
fn source_code_strategy_does_not_corrupt_fts5_syntax() {
use fathomdb_query::compile_search;
let db = NamedTempFile::new().expect("temporary db");
let schema_manager = Arc::new(SchemaManager::new());
{
let bootstrap = ExecutionCoordinator::open(
db.path(),
Arc::clone(&schema_manager),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("bootstrap coordinator");
drop(bootstrap);
}
{
let conn = rusqlite::Connection::open(db.path()).expect("open db");
conn.execute(
"INSERT OR REPLACE INTO projection_profiles (kind, facet, config_json) \
VALUES ('Symbol', 'fts', json_object('tokenizer', 'unicode61 tokenchars ''._-$@'''))",
[],
)
.expect("insert profile");
conn.execute_batch(
"INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
VALUES ('row-sym-1', 'logical-sym-1', 'Symbol', '{}', 1); \
INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
VALUES ('chunk-sym-1', 'logical-sym-1', 'std.io is a rust crate', 1); \
INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
VALUES ('chunk-sym-1', 'logical-sym-1', 'Symbol', 'std.io is a rust crate');",
)
.expect("insert node and fts row");
}
let coordinator = ExecutionCoordinator::open(
db.path(),
Arc::clone(&schema_manager),
None,
1,
Arc::new(TelemetryCounters::default()),
None,
)
.expect("coordinator reopen");
let ast = QueryBuilder::nodes("Symbol").text_search("std.io", 10);
let compiled = compile_search(ast.ast()).expect("compile search");
let rows = coordinator
.execute_compiled_search(&compiled)
.expect("source code search must not error");
assert!(
!rows.hits.is_empty(),
"SourceCode strategy search for 'std.io' must return the document; \
got empty — FTS5 expression was likely corrupted by post-render escaping"
);
}
#[derive(Debug)]
struct StubEmbedder {
model_identity: String,
dimension: usize,
}
impl StubEmbedder {
fn new(model_identity: &str, dimension: usize) -> Self {
Self {
model_identity: model_identity.to_owned(),
dimension,
}
}
}
impl crate::embedder::QueryEmbedder for StubEmbedder {
fn embed_query(&self, _text: &str) -> Result<Vec<f32>, crate::embedder::EmbedderError> {
Ok(vec![0.0; self.dimension])
}
fn identity(&self) -> crate::embedder::QueryEmbedderIdentity {
crate::embedder::QueryEmbedderIdentity {
model_identity: self.model_identity.clone(),
model_version: "1.0".to_owned(),
dimension: self.dimension,
normalization_policy: "l2".to_owned(),
}
}
fn max_tokens(&self) -> usize {
512
}
}
fn make_in_memory_db_with_projection_profiles() -> rusqlite::Connection {
let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS projection_profiles (
kind TEXT NOT NULL,
facet TEXT NOT NULL,
config_json TEXT NOT NULL,
active_at INTEGER,
created_at INTEGER,
PRIMARY KEY (kind, facet)
);",
)
.expect("create projection_profiles");
conn
}
#[test]
fn check_vec_identity_no_profile_no_panic() {
let conn = make_in_memory_db_with_projection_profiles();
let embedder = StubEmbedder::new("bge-small", 384);
let result = super::check_vec_identity_at_open(&conn, &embedder);
assert!(result.is_ok(), "no profile row must return Ok(())");
}
#[test]
fn check_vec_identity_matching_identity_ok() {
let conn = make_in_memory_db_with_projection_profiles();
conn.execute(
"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
[],
)
.expect("insert profile");
let embedder = StubEmbedder::new("bge-small", 384);
let result = super::check_vec_identity_at_open(&conn, &embedder);
assert!(result.is_ok(), "matching profile must return Ok(())");
}
#[test]
fn check_vec_identity_mismatched_dimensions_ok() {
let conn = make_in_memory_db_with_projection_profiles();
conn.execute(
"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
[],
)
.expect("insert profile");
let embedder = StubEmbedder::new("bge-small", 768);
let result = super::check_vec_identity_at_open(&conn, &embedder);
assert!(
result.is_ok(),
"dimension mismatch must warn and return Ok(())"
);
}
#[test]
fn custom_tokenizer_passthrough() {
use super::TokenizerStrategy;
let strategy = TokenizerStrategy::Custom("my_tok".to_owned());
assert_eq!(strategy, TokenizerStrategy::Custom("my_tok".to_owned()));
assert_ne!(strategy, TokenizerStrategy::SubstringTrigram);
assert_ne!(strategy, TokenizerStrategy::SourceCode);
}
#[test]
fn check_vec_identity_mismatched_model_ok() {
let conn = make_in_memory_db_with_projection_profiles();
conn.execute(
"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
VALUES ('*', 'vec', '{\"model_identity\":\"bge-small\",\"dimensions\":384}', 0, 0)",
[],
)
.expect("insert profile");
let embedder = StubEmbedder::new("bge-large", 384);
let result = super::check_vec_identity_at_open(&conn, &embedder);
assert!(
result.is_ok(),
"model_identity mismatch must warn and return Ok(())"
);
}
}