1use rusqlite::{OptionalExtension, TransactionBehavior};
2
3use super::{
4 AdminService, EngineError, FtsProfile, FtsPropertyPathMode, FtsPropertyPathSpec,
5 FtsPropertySchemaRecord, RebuildMode, RebuildRequest, resolve_tokenizer_preset,
6};
7
8impl AdminService {
9 pub fn set_fts_profile(
19 &self,
20 kind: &str,
21 tokenizer_str: &str,
22 ) -> Result<FtsProfile, EngineError> {
23 let resolved = resolve_tokenizer_preset(tokenizer_str);
24 if !resolved
26 .chars()
27 .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
28 {
29 return Err(EngineError::Bridge(format!(
30 "invalid tokenizer string: {resolved:?}"
31 )));
32 }
33 let conn = self.connect()?;
34 conn.execute(
35 r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
36 VALUES (?1, 'fts', json_object('tokenizer', ?2), unixepoch(), unixepoch())
37 ON CONFLICT(kind, facet) DO UPDATE SET
38 config_json = json_object('tokenizer', ?2),
39 active_at = unixepoch()",
40 rusqlite::params![kind, resolved],
41 )?;
42 let row = conn.query_row(
43 "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
44 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
45 rusqlite::params![kind],
46 |row| {
47 Ok(FtsProfile {
48 kind: row.get(0)?,
49 tokenizer: row.get(1)?,
50 active_at: row.get(2)?,
51 created_at: row.get(3)?,
52 })
53 },
54 )?;
55 Ok(row)
56 }
57
58 pub fn get_fts_profile(&self, kind: &str) -> Result<Option<FtsProfile>, EngineError> {
65 let conn = self.connect()?;
66 let result = conn
67 .query_row(
68 "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
69 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
70 rusqlite::params![kind],
71 |row| {
72 Ok(FtsProfile {
73 kind: row.get(0)?,
74 tokenizer: row.get(1)?,
75 active_at: row.get(2)?,
76 created_at: row.get(3)?,
77 })
78 },
79 )
80 .optional()?;
81 Ok(result)
82 }
83
84 pub fn register_fts_property_schema(
93 &self,
94 kind: &str,
95 property_paths: &[String],
96 separator: Option<&str>,
97 ) -> Result<FtsPropertySchemaRecord, EngineError> {
98 let specs: Vec<FtsPropertyPathSpec> = property_paths
99 .iter()
100 .map(|p| FtsPropertyPathSpec::scalar(p.clone()))
101 .collect();
102 self.register_fts_property_schema_with_entries(
103 kind,
104 &specs,
105 separator,
106 &[],
107 RebuildMode::Eager,
108 )
109 }
110
111 pub fn register_fts_property_schema_with_entries(
127 &self,
128 kind: &str,
129 entries: &[FtsPropertyPathSpec],
130 separator: Option<&str>,
131 exclude_paths: &[String],
132 mode: RebuildMode,
133 ) -> Result<FtsPropertySchemaRecord, EngineError> {
134 let paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
135 validate_fts_property_paths(&paths)?;
136 for p in exclude_paths {
137 if !p.starts_with("$.") {
138 return Err(EngineError::InvalidWrite(format!(
139 "exclude_paths entries must start with '$.' but got: {p}"
140 )));
141 }
142 }
143 for e in entries {
144 if let Some(w) = e.weight
145 && !(w > 0.0 && w <= 1000.0)
146 {
147 return Err(EngineError::Bridge(format!(
148 "weight out of range: {w} (must satisfy 0.0 < weight <= 1000.0)"
149 )));
150 }
151 }
152 let separator = separator.unwrap_or(" ");
153 let paths_json = serialize_property_paths_json(entries, exclude_paths)?;
154
155 match mode {
156 RebuildMode::Eager => self.register_fts_property_schema_eager(
157 kind,
158 entries,
159 separator,
160 exclude_paths,
161 &paths,
162 &paths_json,
163 ),
164 RebuildMode::Async => self.register_fts_property_schema_async(
165 kind,
166 entries,
167 separator,
168 &paths,
169 &paths_json,
170 ),
171 }
172 }
173
174 fn register_fts_property_schema_eager(
176 &self,
177 kind: &str,
178 entries: &[FtsPropertyPathSpec],
179 separator: &str,
180 exclude_paths: &[String],
181 paths: &[String],
182 paths_json: &str,
183 ) -> Result<FtsPropertySchemaRecord, EngineError> {
184 let mut conn = self.connect()?;
185 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
186
187 let previous_row: Option<(String, String)> = tx
193 .query_row(
194 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
195 [kind],
196 |row| {
197 let json: String = row.get(0)?;
198 let sep: String = row.get(1)?;
199 Ok((json, sep))
200 },
201 )
202 .optional()?;
203 let had_previous_schema = previous_row.is_some();
204 let previous_recursive_paths: Vec<String> = previous_row
205 .map(|(json, sep)| crate::writer::parse_property_schema_json(&json, &sep))
206 .map_or(Vec::new(), |schema| {
207 schema
208 .paths
209 .into_iter()
210 .filter(|p| p.mode == crate::writer::PropertyPathMode::Recursive)
211 .map(|p| p.path)
212 .collect()
213 });
214 let new_recursive_paths: Vec<&str> = entries
215 .iter()
216 .filter(|e| e.mode == FtsPropertyPathMode::Recursive)
217 .map(|e| e.path.as_str())
218 .collect();
219 let introduces_new_recursive = new_recursive_paths
220 .iter()
221 .any(|p| !previous_recursive_paths.iter().any(|prev| prev == p));
222
223 tx.execute(
224 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
225 VALUES (?1, ?2, ?3) \
226 ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
227 rusqlite::params![kind, paths_json, separator],
228 )?;
229
230 let _ = (introduces_new_recursive, had_previous_schema);
236 let needs_rebuild = true;
237 if needs_rebuild {
238 let any_weight = entries.iter().any(|e| e.weight.is_some());
239 let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
240 .map_err(|e| EngineError::Bridge(e.to_string()))?;
241 if any_weight {
242 create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
246 tx.execute(
247 "DELETE FROM fts_node_property_positions WHERE kind = ?1",
248 [kind],
249 )?;
250 } else {
253 create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
257 tx.execute(
258 "DELETE FROM fts_node_property_positions WHERE kind = ?1",
259 [kind],
260 )?;
261 crate::projection::insert_property_fts_rows_for_kind(&tx, kind)?;
266 }
267 }
268
269 super::persist_simple_provenance_event(
270 &tx,
271 "fts_property_schema_registered",
272 kind,
273 Some(serde_json::json!({
274 "property_paths": paths,
275 "separator": separator,
276 "exclude_paths": exclude_paths,
277 "eager_rebuild": needs_rebuild,
278 })),
279 )?;
280 tx.commit()?;
281
282 self.describe_fts_property_schema(kind)?.ok_or_else(|| {
283 EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
284 })
285 }
286
287 fn register_fts_property_schema_async(
289 &self,
290 kind: &str,
291 entries: &[FtsPropertyPathSpec],
292 separator: &str,
293 paths: &[String],
294 paths_json: &str,
295 ) -> Result<FtsPropertySchemaRecord, EngineError> {
296 let mut conn = self.connect()?;
297 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
298
299 let had_previous_schema: bool = tx
301 .query_row(
302 "SELECT count(*) FROM fts_property_schemas WHERE kind = ?1",
303 rusqlite::params![kind],
304 |r| r.get::<_, i64>(0),
305 )
306 .unwrap_or(0)
307 > 0;
308
309 tx.execute(
311 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
312 VALUES (?1, ?2, ?3) \
313 ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
314 rusqlite::params![kind, paths_json, separator],
315 )?;
316
317 let any_weight = entries.iter().any(|e| e.weight.is_some());
327 let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
328 .map_err(|e| EngineError::Bridge(e.to_string()))?;
329 let desired = desired_fts_shape(entries, &tok);
330 let existing = fts_kind_table_shape(&tx, kind)?;
331 let must_drop = match &existing {
332 None => false,
333 Some(existing) => !shape_compatible(existing, &desired),
334 };
335 if must_drop {
336 if any_weight {
337 create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
338 } else {
339 create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
342 }
343 }
344
345 let schema_id: i64 = tx.query_row(
347 "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
348 rusqlite::params![kind],
349 |r| r.get(0),
350 )?;
351
352 let now_ms = crate::rebuild_actor::now_unix_ms_pub();
353 let is_first = i64::from(!had_previous_schema);
354
355 tx.execute(
357 "INSERT INTO fts_property_rebuild_state \
358 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
359 VALUES (?1, ?2, 'PENDING', 0, ?3, ?4) \
360 ON CONFLICT(kind) DO UPDATE SET \
361 schema_id = excluded.schema_id, \
362 state = 'PENDING', \
363 rows_total = NULL, \
364 rows_done = 0, \
365 started_at = excluded.started_at, \
366 last_progress_at = NULL, \
367 error_message = NULL, \
368 is_first_registration = excluded.is_first_registration",
369 rusqlite::params![kind, schema_id, now_ms, is_first],
370 )?;
371
372 super::persist_simple_provenance_event(
373 &tx,
374 "fts_property_schema_registered",
375 kind,
376 Some(serde_json::json!({
377 "property_paths": paths,
378 "separator": separator,
379 "mode": "async",
380 })),
381 )?;
382 tx.commit()?;
383
384 if let Some(sender) = &self.rebuild_sender
390 && sender
391 .try_send(RebuildRequest {
392 kind: kind.to_owned(),
393 schema_id,
394 })
395 .is_err()
396 {
397 trace_warn!(
398 kind = %kind,
399 "rebuild channel full; rebuild request dropped — state remains PENDING"
400 );
401 }
402
403 self.describe_fts_property_schema(kind)?.ok_or_else(|| {
404 EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
405 })
406 }
407
408 pub fn get_property_fts_rebuild_state(
413 &self,
414 kind: &str,
415 ) -> Result<Option<crate::rebuild_actor::RebuildStateRow>, EngineError> {
416 let conn = self.connect()?;
417 let row = conn
418 .query_row(
419 "SELECT kind, schema_id, state, rows_total, rows_done, \
420 started_at, is_first_registration, error_message \
421 FROM fts_property_rebuild_state WHERE kind = ?1",
422 rusqlite::params![kind],
423 |r| {
424 Ok(crate::rebuild_actor::RebuildStateRow {
425 kind: r.get(0)?,
426 schema_id: r.get(1)?,
427 state: r.get(2)?,
428 rows_total: r.get(3)?,
429 rows_done: r.get(4)?,
430 started_at: r.get(5)?,
431 is_first_registration: r.get::<_, i64>(6)? != 0,
432 error_message: r.get(7)?,
433 })
434 },
435 )
436 .optional()?;
437 Ok(row)
438 }
439
440 pub fn count_staging_rows(&self, kind: &str) -> Result<i64, EngineError> {
446 let conn = self.connect()?;
447 let count: i64 = conn.query_row(
448 "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1",
449 rusqlite::params![kind],
450 |r| r.get(0),
451 )?;
452 Ok(count)
453 }
454
455 pub fn staging_row_exists(
461 &self,
462 kind: &str,
463 node_logical_id: &str,
464 ) -> Result<bool, EngineError> {
465 let conn = self.connect()?;
466 let count: i64 = conn.query_row(
467 "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1 AND node_logical_id = ?2",
468 rusqlite::params![kind, node_logical_id],
469 |r| r.get(0),
470 )?;
471 Ok(count > 0)
472 }
473
474 pub fn describe_fts_property_schema(
479 &self,
480 kind: &str,
481 ) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
482 let conn = self.connect()?;
483 load_fts_property_schema_record(&conn, kind)
484 }
485
486 pub fn list_fts_property_schemas(&self) -> Result<Vec<FtsPropertySchemaRecord>, EngineError> {
491 let conn = self.connect()?;
492 let mut stmt = conn.prepare(
493 "SELECT kind, property_paths_json, separator, format_version \
494 FROM fts_property_schemas ORDER BY kind",
495 )?;
496 let records = stmt
497 .query_map([], |row| {
498 let kind: String = row.get(0)?;
499 let paths_json: String = row.get(1)?;
500 let separator: String = row.get(2)?;
501 let format_version: i64 = row.get(3)?;
502 Ok(build_fts_property_schema_record(
503 kind,
504 &paths_json,
505 separator,
506 format_version,
507 ))
508 })?
509 .collect::<Result<Vec<_>, _>>()?;
510 Ok(records)
511 }
512
513 pub fn remove_fts_property_schema(&self, kind: &str) -> Result<(), EngineError> {
521 let mut conn = self.connect()?;
522 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
523 let deleted = tx.execute("DELETE FROM fts_property_schemas WHERE kind = ?1", [kind])?;
524 if deleted == 0 {
525 return Err(EngineError::InvalidWrite(format!(
526 "FTS property schema for kind '{kind}' is not registered"
527 )));
528 }
529 let table = fathomdb_schema::fts_kind_table_name(kind);
531 let table_exists: bool = tx
532 .query_row(
533 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
534 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
535 rusqlite::params![table],
536 |r| r.get::<_, i64>(0),
537 )
538 .unwrap_or(0)
539 > 0;
540 if table_exists {
541 tx.execute_batch(&format!("DELETE FROM {table}"))?;
542 }
543 super::persist_simple_provenance_event(&tx, "fts_property_schema_removed", kind, None)?;
544 tx.commit()?;
545 Ok(())
546 }
547}
548
549pub(super) fn serialize_property_paths_json(
550 entries: &[FtsPropertyPathSpec],
551 exclude_paths: &[String],
552) -> Result<String, EngineError> {
553 let all_scalar = entries
557 .iter()
558 .all(|e| e.mode == FtsPropertyPathMode::Scalar);
559 let any_weight = entries.iter().any(|e| e.weight.is_some());
560 if all_scalar && exclude_paths.is_empty() && !any_weight {
561 let paths: Vec<&str> = entries.iter().map(|e| e.path.as_str()).collect();
562 return serde_json::to_string(&paths).map_err(|e| {
563 EngineError::InvalidWrite(format!("failed to serialize property paths: {e}"))
564 });
565 }
566
567 let mut obj = serde_json::Map::new();
568 let paths_json: Vec<serde_json::Value> = entries
569 .iter()
570 .map(|e| {
571 let mode_str = match e.mode {
572 FtsPropertyPathMode::Scalar => "scalar",
573 FtsPropertyPathMode::Recursive => "recursive",
574 };
575 let mut entry = serde_json::json!({ "path": e.path, "mode": mode_str });
576 if let Some(w) = e.weight {
577 entry["weight"] = serde_json::json!(w);
578 }
579 entry
580 })
581 .collect();
582 obj.insert("paths".to_owned(), serde_json::Value::Array(paths_json));
583 if !exclude_paths.is_empty() {
584 obj.insert("exclude_paths".to_owned(), serde_json::json!(exclude_paths));
585 }
586 serde_json::to_string(&serde_json::Value::Object(obj))
587 .map_err(|e| EngineError::InvalidWrite(format!("failed to serialize property paths: {e}")))
588}
589
590#[derive(Debug, Clone, PartialEq, Eq)]
597pub(super) struct FtsTableShape {
598 pub tokenizer: String,
599 pub columns: Vec<String>,
601}
602
603pub(super) fn fts_kind_table_shape(
609 conn: &rusqlite::Connection,
610 kind: &str,
611) -> Result<Option<FtsTableShape>, EngineError> {
612 let table = fathomdb_schema::fts_kind_table_name(kind);
613 let create_sql: Option<String> = conn
614 .query_row(
615 "SELECT sql FROM sqlite_master WHERE type = 'table' AND name = ?1 \
616 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
617 rusqlite::params![table],
618 |r| r.get::<_, String>(0),
619 )
620 .optional()?;
621 let Some(create_sql) = create_sql else {
622 return Ok(None);
623 };
624
625 let tokenizer = extract_tokenizer_clause(&create_sql).unwrap_or_default();
627
628 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
630 let rows = stmt.query_map([], |r| r.get::<_, String>(1))?;
631 let mut columns: Vec<String> = rows
632 .collect::<Result<Vec<_>, _>>()?
633 .into_iter()
634 .filter(|c| c != "node_logical_id")
635 .collect();
636 columns.sort();
637
638 Ok(Some(FtsTableShape { tokenizer, columns }))
639}
640
641pub(super) fn desired_fts_shape(specs: &[FtsPropertyPathSpec], tokenizer: &str) -> FtsTableShape {
644 let any_weight = specs.iter().any(|s| s.weight.is_some());
648 let mut columns: Vec<String> = if any_weight {
649 specs
650 .iter()
651 .map(|s| {
652 let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
653 fathomdb_schema::fts_column_name(&s.path, is_recursive)
654 })
655 .collect()
656 } else {
657 vec!["text_content".to_owned()]
658 };
659 columns.sort();
660 FtsTableShape {
661 tokenizer: tokenizer.to_owned(),
662 columns,
663 }
664}
665
666pub(super) fn shape_compatible(existing: &FtsTableShape, desired: &FtsTableShape) -> bool {
671 existing.tokenizer == desired.tokenizer && existing.columns == desired.columns
672}
673
674fn extract_tokenizer_clause(sql: &str) -> Option<String> {
677 let lower = sql.to_lowercase();
678 let key_idx = lower.find("tokenize")?;
679 let after_key = &sql[key_idx..];
680 let eq_rel = after_key.find('=')?;
682 let rest = &after_key[eq_rel + 1..];
683 let rest = rest.trim_start();
684 let rest = rest.strip_prefix('\'')?;
685 let bytes = rest.as_bytes();
687 let mut i = 0;
688 let mut out = String::new();
689 while i < bytes.len() {
690 let c = bytes[i] as char;
691 if c == '\'' {
692 if i + 1 < bytes.len() && bytes[i + 1] as char == '\'' {
693 out.push('\'');
694 i += 2;
695 continue;
696 }
697 return Some(out);
698 }
699 out.push(c);
700 i += 1;
701 }
702 None
703}
704
705pub(super) fn create_or_replace_fts_kind_table(
711 conn: &rusqlite::Connection,
712 kind: &str,
713 specs: &[FtsPropertyPathSpec],
714 tokenizer: &str,
715) -> Result<(), EngineError> {
716 let table = fathomdb_schema::fts_kind_table_name(kind);
717
718 if !tokenizer
723 .chars()
724 .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
725 {
726 return Err(EngineError::Bridge(format!(
727 "invalid tokenizer string: {tokenizer:?}"
728 )));
729 }
730
731 let cols: Vec<String> = if specs.is_empty() {
732 vec![
733 "node_logical_id UNINDEXED".to_owned(),
734 "text_content".to_owned(),
735 ]
736 } else {
737 std::iter::once("node_logical_id UNINDEXED".to_owned())
738 .chain(specs.iter().map(|s| {
739 let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
740 fathomdb_schema::fts_column_name(&s.path, is_recursive)
741 }))
742 .collect()
743 };
744
745 let tokenizer_sql = tokenizer.replace('\'', "''");
748 conn.execute_batch(&format!(
749 "DROP TABLE IF EXISTS {table}; \
750 CREATE VIRTUAL TABLE {table} USING fts5({cols}, tokenize='{tokenizer_sql}');",
751 cols = cols.join(", "),
752 ))?;
753
754 Ok(())
755}
756
757pub(super) fn validate_fts_property_paths(paths: &[String]) -> Result<(), EngineError> {
758 if paths.is_empty() {
759 return Err(EngineError::InvalidWrite(
760 "FTS property paths must not be empty".to_owned(),
761 ));
762 }
763 let mut seen = std::collections::HashSet::new();
764 for path in paths {
765 if !path.starts_with("$.") {
766 return Err(EngineError::InvalidWrite(format!(
767 "FTS property path must start with '$.' but got: {path}"
768 )));
769 }
770 let after_prefix = &path[2..]; let segments: Vec<&str> = after_prefix.split('.').collect();
772 if segments.is_empty() || segments.iter().any(|s| s.is_empty()) {
773 return Err(EngineError::InvalidWrite(format!(
774 "FTS property path has empty segment(s): {path}"
775 )));
776 }
777 for seg in &segments {
778 if !seg.chars().all(|c| c.is_alphanumeric() || c == '_') {
779 return Err(EngineError::InvalidWrite(format!(
780 "FTS property path segment contains invalid characters: {path}"
781 )));
782 }
783 }
784 if !seen.insert(path) {
785 return Err(EngineError::InvalidWrite(format!(
786 "duplicate FTS property path: {path}"
787 )));
788 }
789 }
790 Ok(())
791}
792
793pub(super) fn load_fts_property_schema_record(
794 conn: &rusqlite::Connection,
795 kind: &str,
796) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
797 let row = conn
798 .query_row(
799 "SELECT kind, property_paths_json, separator, format_version \
800 FROM fts_property_schemas WHERE kind = ?1",
801 [kind],
802 |row| {
803 let kind: String = row.get(0)?;
804 let paths_json: String = row.get(1)?;
805 let separator: String = row.get(2)?;
806 let format_version: i64 = row.get(3)?;
807 Ok(build_fts_property_schema_record(
808 kind,
809 &paths_json,
810 separator,
811 format_version,
812 ))
813 },
814 )
815 .optional()?;
816 Ok(row)
817}
818
819pub(super) fn build_fts_property_schema_record(
825 kind: String,
826 paths_json: &str,
827 separator: String,
828 format_version: i64,
829) -> FtsPropertySchemaRecord {
830 let schema = crate::writer::parse_property_schema_json(paths_json, &separator);
831 let entries: Vec<FtsPropertyPathSpec> = schema
832 .paths
833 .into_iter()
834 .map(|entry| FtsPropertyPathSpec {
835 path: entry.path,
836 mode: match entry.mode {
837 crate::writer::PropertyPathMode::Scalar => FtsPropertyPathMode::Scalar,
838 crate::writer::PropertyPathMode::Recursive => FtsPropertyPathMode::Recursive,
839 },
840 weight: entry.weight,
841 })
842 .collect();
843 let property_paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
844 FtsPropertySchemaRecord {
845 kind,
846 property_paths,
847 entries,
848 exclude_paths: schema.exclude_paths,
849 separator,
850 format_version,
851 }
852}