pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) enum PropertyPathMode {
#[default]
Scalar,
Recursive,
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct PropertyPathEntry {
pub path: String,
pub mode: PropertyPathMode,
pub weight: Option<f32>,
}
impl Eq for PropertyPathEntry {}
impl PropertyPathEntry {
pub(crate) fn scalar(path: impl Into<String>) -> Self {
Self {
path: path.into(),
mode: PropertyPathMode::Scalar,
weight: None,
}
}
#[cfg(test)]
pub(crate) fn recursive(path: impl Into<String>) -> Self {
Self {
path: path.into(),
mode: PropertyPathMode::Recursive,
weight: None,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PropertyFtsSchema {
pub paths: Vec<PropertyPathEntry>,
pub separator: String,
pub exclude_paths: Vec<String>,
}
impl PropertyFtsSchema {
pub(crate) fn is_weighted(&self) -> bool {
self.paths.iter().any(|p| p.weight.is_some())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PositionEntry {
pub start_offset: usize,
pub end_offset: usize,
pub leaf_path: String,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) struct ExtractStats {
pub depth_cap_hit: usize,
pub byte_cap_reached: bool,
pub excluded_subtree: usize,
}
impl ExtractStats {
pub(super) fn merge(&mut self, other: ExtractStats) {
self.depth_cap_hit += other.depth_cap_hit;
self.byte_cap_reached |= other.byte_cap_reached;
self.excluded_subtree += other.excluded_subtree;
}
}
pub(super) struct RecursiveWalker {
pub(super) blob: String,
pub(super) positions: Vec<PositionEntry>,
pub(super) stats: ExtractStats,
pub(super) exclude_paths: Vec<String>,
pub(super) stopped: bool,
}
impl RecursiveWalker {
pub(super) fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
if self.stopped {
return;
}
if self.exclude_paths.iter().any(|p| p == current_path) {
self.stats.excluded_subtree += 1;
return;
}
match value {
serde_json::Value::String(s) => self.emit_leaf(current_path, s),
serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
serde_json::Value::Null => {}
serde_json::Value::Object(map) => {
if depth >= MAX_RECURSIVE_DEPTH {
self.stats.depth_cap_hit += 1;
return;
}
let mut keys: Vec<&String> = map.keys().collect();
keys.sort();
for key in keys {
if self.stopped {
return;
}
let child_path = format!("{current_path}.{key}");
if let Some(child) = map.get(key) {
self.walk(&child_path, child, depth + 1);
}
}
}
serde_json::Value::Array(items) => {
if depth >= MAX_RECURSIVE_DEPTH {
self.stats.depth_cap_hit += 1;
return;
}
for (idx, item) in items.iter().enumerate() {
if self.stopped {
return;
}
let child_path = format!("{current_path}[{idx}]");
self.walk(&child_path, item, depth + 1);
}
}
}
}
fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
if self.stopped {
return;
}
if value.is_empty() {
return;
}
let sep_len = if self.blob.is_empty() {
0
} else {
LEAF_SEPARATOR.len()
};
let projected_len = self.blob.len() + sep_len + value.len();
if projected_len > MAX_EXTRACTED_BYTES {
self.stats.byte_cap_reached = true;
self.stopped = true;
return;
}
if !self.blob.is_empty() {
self.blob.push_str(LEAF_SEPARATOR);
}
let start_offset = self.blob.len();
self.blob.push_str(value);
let end_offset = self.blob.len();
self.positions.push(PositionEntry {
start_offset,
end_offset,
leaf_path: leaf_path.to_owned(),
});
}
}
pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
let Some(path) = path.strip_prefix("$.") else {
return Vec::new();
};
let mut current = value;
for segment in path.split('.') {
match current.get(segment) {
Some(v) => current = v,
None => return Vec::new(),
}
}
match current {
serde_json::Value::String(s) => vec![s.clone()],
serde_json::Value::Number(n) => vec![n.to_string()],
serde_json::Value::Bool(b) => vec![b.to_string()],
serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
serde_json::Value::Array(arr) => arr
.iter()
.filter_map(|v| match v {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
_ => None,
})
.collect(),
}
}
pub(crate) fn extract_property_fts(
props: &serde_json::Value,
schema: &PropertyFtsSchema,
) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
let mut walker = RecursiveWalker {
blob: String::new(),
positions: Vec::new(),
stats: ExtractStats::default(),
exclude_paths: schema.exclude_paths.clone(),
stopped: false,
};
let mut scalar_parts: Vec<String> = Vec::new();
for entry in &schema.paths {
match entry.mode {
PropertyPathMode::Scalar => {
scalar_parts.extend(extract_json_path(props, &entry.path));
}
PropertyPathMode::Recursive => {
let root = resolve_path_root(props, &entry.path);
if let Some(root) = root {
walker.walk(&entry.path, root, 0);
}
}
}
}
let scalar_text = if scalar_parts.is_empty() {
None
} else {
Some(scalar_parts.join(&schema.separator))
};
let combined = match (scalar_text, walker.blob.is_empty()) {
(None, true) => None,
(None, false) => Some(walker.blob.clone()),
(Some(s), true) => Some(s),
(Some(mut s), false) => {
let offset = s.len() + LEAF_SEPARATOR.len();
for pos in &mut walker.positions {
pos.start_offset += offset;
pos.end_offset += offset;
}
s.push_str(LEAF_SEPARATOR);
s.push_str(&walker.blob);
Some(s)
}
};
(combined, walker.positions, walker.stats)
}
pub(crate) fn extract_property_fts_columns(
props: &serde_json::Value,
schema: &PropertyFtsSchema,
) -> Vec<(String, String)> {
let mut result = Vec::new();
for entry in &schema.paths {
let is_recursive = matches!(entry.mode, PropertyPathMode::Recursive);
let column_name = fathomdb_schema::fts_column_name(&entry.path, is_recursive);
let text = match entry.mode {
PropertyPathMode::Scalar => {
let parts = extract_json_path(props, &entry.path);
parts.join(&schema.separator)
}
PropertyPathMode::Recursive => {
let mut walker = RecursiveWalker {
blob: String::new(),
positions: Vec::new(),
stats: ExtractStats::default(),
exclude_paths: schema.exclude_paths.clone(),
stopped: false,
};
if let Some(root) = resolve_path_root(props, &entry.path) {
walker.walk(&entry.path, root, 0);
}
walker.blob
}
};
result.push((column_name, text));
}
result
}
pub(super) fn resolve_path_root<'a>(
value: &'a serde_json::Value,
path: &str,
) -> Option<&'a serde_json::Value> {
let stripped = path.strip_prefix("$.")?;
let mut current = value;
for segment in stripped.split('.') {
current = current.get(segment)?;
}
Some(current)
}
pub(crate) fn load_fts_property_schemas(
conn: &rusqlite::Connection,
) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
let mut stmt =
conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
stmt.query_map([], |row| {
let kind: String = row.get(0)?;
let paths_json: String = row.get(1)?;
let separator: String = row.get(2)?;
let schema = parse_property_schema_json(&paths_json, &separator);
Ok((kind, schema))
})?
.collect::<Result<Vec<_>, _>>()
}
pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
let mut paths = Vec::new();
let mut exclude_paths: Vec<String> = Vec::new();
let path_values: Vec<serde_json::Value> = match value {
serde_json::Value::Array(arr) => arr,
serde_json::Value::Object(map) => {
if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
exclude_paths = excl
.iter()
.filter_map(|v| v.as_str().map(str::to_owned))
.collect();
}
match map.get("paths") {
Some(serde_json::Value::Array(arr)) => arr.clone(),
_ => Vec::new(),
}
}
_ => Vec::new(),
};
for entry in path_values {
match entry {
serde_json::Value::String(path) => {
paths.push(PropertyPathEntry::scalar(path));
}
serde_json::Value::Object(map) => {
let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
continue;
};
let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
PropertyPathMode::Scalar,
|m| match m {
"recursive" => PropertyPathMode::Recursive,
_ => PropertyPathMode::Scalar,
},
);
#[allow(clippy::cast_possible_truncation)]
let weight = map
.get("weight")
.and_then(serde_json::Value::as_f64)
.map(|w| w as f32);
paths.push(PropertyPathEntry {
path: path.to_owned(),
mode,
weight,
});
if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
for p in excl {
if let Some(s) = p.as_str() {
exclude_paths.push(s.to_owned());
}
}
}
}
_ => {}
}
}
PropertyFtsSchema {
paths,
separator: separator.to_owned(),
exclude_paths,
}
}