1use rusqlite::{OptionalExtension, TransactionBehavior};
2
3use super::{
4 AdminService, EngineError, FtsProfile, FtsPropertyPathMode, FtsPropertyPathSpec,
5 FtsPropertySchemaRecord, RebuildMode, RebuildRequest, resolve_tokenizer_preset,
6};
7
8impl AdminService {
9 pub fn set_fts_profile(
19 &self,
20 kind: &str,
21 tokenizer_str: &str,
22 ) -> Result<FtsProfile, EngineError> {
23 let resolved = resolve_tokenizer_preset(tokenizer_str);
24 if !resolved
26 .chars()
27 .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
28 {
29 return Err(EngineError::Bridge(format!(
30 "invalid tokenizer string: {resolved:?}"
31 )));
32 }
33 let conn = self.connect()?;
34 conn.execute(
35 r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
36 VALUES (?1, 'fts', json_object('tokenizer', ?2), unixepoch(), unixepoch())
37 ON CONFLICT(kind, facet) DO UPDATE SET
38 config_json = json_object('tokenizer', ?2),
39 active_at = unixepoch()",
40 rusqlite::params![kind, resolved],
41 )?;
42 let row = conn.query_row(
43 "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
44 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
45 rusqlite::params![kind],
46 |row| {
47 Ok(FtsProfile {
48 kind: row.get(0)?,
49 tokenizer: row.get(1)?,
50 active_at: row.get(2)?,
51 created_at: row.get(3)?,
52 })
53 },
54 )?;
55 Ok(row)
56 }
57
58 pub fn get_fts_profile(&self, kind: &str) -> Result<Option<FtsProfile>, EngineError> {
65 let conn = self.connect()?;
66 let result = conn
67 .query_row(
68 "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
69 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
70 rusqlite::params![kind],
71 |row| {
72 Ok(FtsProfile {
73 kind: row.get(0)?,
74 tokenizer: row.get(1)?,
75 active_at: row.get(2)?,
76 created_at: row.get(3)?,
77 })
78 },
79 )
80 .optional()?;
81 Ok(result)
82 }
83
84 pub fn register_fts_property_schema(
93 &self,
94 kind: &str,
95 property_paths: &[String],
96 separator: Option<&str>,
97 ) -> Result<FtsPropertySchemaRecord, EngineError> {
98 let specs: Vec<FtsPropertyPathSpec> = property_paths
99 .iter()
100 .map(|p| FtsPropertyPathSpec::scalar(p.clone()))
101 .collect();
102 self.register_fts_property_schema_with_entries(
103 kind,
104 &specs,
105 separator,
106 &[],
107 RebuildMode::Eager,
108 )
109 }
110
111 pub fn register_fts_property_schema_with_entries(
127 &self,
128 kind: &str,
129 entries: &[FtsPropertyPathSpec],
130 separator: Option<&str>,
131 exclude_paths: &[String],
132 mode: RebuildMode,
133 ) -> Result<FtsPropertySchemaRecord, EngineError> {
134 let paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
135 validate_fts_property_paths(&paths)?;
136 for p in exclude_paths {
137 if !p.starts_with("$.") {
138 return Err(EngineError::InvalidWrite(format!(
139 "exclude_paths entries must start with '$.' but got: {p}"
140 )));
141 }
142 }
143 for e in entries {
144 if let Some(w) = e.weight
145 && !(w > 0.0 && w <= 1000.0)
146 {
147 return Err(EngineError::Bridge(format!(
148 "weight out of range: {w} (must satisfy 0.0 < weight <= 1000.0)"
149 )));
150 }
151 }
152 let separator = separator.unwrap_or(" ");
153 let paths_json = serialize_property_paths_json(entries, exclude_paths)?;
154
155 match mode {
156 RebuildMode::Eager => self.register_fts_property_schema_eager(
157 kind,
158 entries,
159 separator,
160 exclude_paths,
161 &paths,
162 &paths_json,
163 ),
164 RebuildMode::Async => self.register_fts_property_schema_async(
165 kind,
166 entries,
167 separator,
168 &paths,
169 &paths_json,
170 ),
171 }
172 }
173
174 fn register_fts_property_schema_eager(
176 &self,
177 kind: &str,
178 entries: &[FtsPropertyPathSpec],
179 separator: &str,
180 exclude_paths: &[String],
181 paths: &[String],
182 paths_json: &str,
183 ) -> Result<FtsPropertySchemaRecord, EngineError> {
184 let mut conn = self.connect()?;
185 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
186
187 let previous_row: Option<(String, String)> = tx
193 .query_row(
194 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
195 [kind],
196 |row| {
197 let json: String = row.get(0)?;
198 let sep: String = row.get(1)?;
199 Ok((json, sep))
200 },
201 )
202 .optional()?;
203 let had_previous_schema = previous_row.is_some();
204 let previous_recursive_paths: Vec<String> = previous_row
205 .map(|(json, sep)| crate::writer::parse_property_schema_json(&json, &sep))
206 .map_or(Vec::new(), |schema| {
207 schema
208 .paths
209 .into_iter()
210 .filter(|p| p.mode == crate::writer::PropertyPathMode::Recursive)
211 .map(|p| p.path)
212 .collect()
213 });
214 let new_recursive_paths: Vec<&str> = entries
215 .iter()
216 .filter(|e| e.mode == FtsPropertyPathMode::Recursive)
217 .map(|e| e.path.as_str())
218 .collect();
219 let introduces_new_recursive = new_recursive_paths
220 .iter()
221 .any(|p| !previous_recursive_paths.iter().any(|prev| prev == p));
222
223 tx.execute(
224 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
225 VALUES (?1, ?2, ?3) \
226 ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
227 rusqlite::params![kind, paths_json, separator],
228 )?;
229
230 let _ = (introduces_new_recursive, had_previous_schema);
236 let needs_rebuild = true;
237 if needs_rebuild {
238 let any_weight = entries.iter().any(|e| e.weight.is_some());
239 let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
240 .map_err(|e| EngineError::Bridge(e.to_string()))?;
241 if any_weight {
242 create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
246 tx.execute(
247 "DELETE FROM fts_node_property_positions WHERE kind = ?1",
248 [kind],
249 )?;
250 } else {
253 create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
257 tx.execute(
258 "DELETE FROM fts_node_property_positions WHERE kind = ?1",
259 [kind],
260 )?;
261 crate::projection::insert_property_fts_rows_for_kind(&tx, kind)?;
266 }
267 }
268
269 super::persist_simple_provenance_event(
270 &tx,
271 "fts_property_schema_registered",
272 kind,
273 Some(serde_json::json!({
274 "property_paths": paths,
275 "separator": separator,
276 "exclude_paths": exclude_paths,
277 "eager_rebuild": needs_rebuild,
278 })),
279 )?;
280 tx.commit()?;
281
282 self.describe_fts_property_schema(kind)?.ok_or_else(|| {
283 EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
284 })
285 }
286
287 fn register_fts_property_schema_async(
289 &self,
290 kind: &str,
291 entries: &[FtsPropertyPathSpec],
292 separator: &str,
293 paths: &[String],
294 paths_json: &str,
295 ) -> Result<FtsPropertySchemaRecord, EngineError> {
296 let mut conn = self.connect()?;
297 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
298
299 let had_previous_schema: bool = tx
301 .query_row(
302 "SELECT count(*) FROM fts_property_schemas WHERE kind = ?1",
303 rusqlite::params![kind],
304 |r| r.get::<_, i64>(0),
305 )
306 .unwrap_or(0)
307 > 0;
308
309 tx.execute(
311 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
312 VALUES (?1, ?2, ?3) \
313 ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
314 rusqlite::params![kind, paths_json, separator],
315 )?;
316
317 let any_weight = entries.iter().any(|e| e.weight.is_some());
321 let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
322 .map_err(|e| EngineError::Bridge(e.to_string()))?;
323 if any_weight {
324 create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
325 } else {
326 create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
329 }
330
331 let schema_id: i64 = tx.query_row(
333 "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
334 rusqlite::params![kind],
335 |r| r.get(0),
336 )?;
337
338 let now_ms = crate::rebuild_actor::now_unix_ms_pub();
339 let is_first = i64::from(!had_previous_schema);
340
341 tx.execute(
343 "INSERT INTO fts_property_rebuild_state \
344 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
345 VALUES (?1, ?2, 'PENDING', 0, ?3, ?4) \
346 ON CONFLICT(kind) DO UPDATE SET \
347 schema_id = excluded.schema_id, \
348 state = 'PENDING', \
349 rows_total = NULL, \
350 rows_done = 0, \
351 started_at = excluded.started_at, \
352 last_progress_at = NULL, \
353 error_message = NULL, \
354 is_first_registration = excluded.is_first_registration",
355 rusqlite::params![kind, schema_id, now_ms, is_first],
356 )?;
357
358 super::persist_simple_provenance_event(
359 &tx,
360 "fts_property_schema_registered",
361 kind,
362 Some(serde_json::json!({
363 "property_paths": paths,
364 "separator": separator,
365 "mode": "async",
366 })),
367 )?;
368 tx.commit()?;
369
370 if let Some(sender) = &self.rebuild_sender
376 && sender
377 .try_send(RebuildRequest {
378 kind: kind.to_owned(),
379 schema_id,
380 })
381 .is_err()
382 {
383 trace_warn!(
384 kind = %kind,
385 "rebuild channel full; rebuild request dropped — state remains PENDING"
386 );
387 }
388
389 self.describe_fts_property_schema(kind)?.ok_or_else(|| {
390 EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
391 })
392 }
393
394 pub fn get_property_fts_rebuild_state(
399 &self,
400 kind: &str,
401 ) -> Result<Option<crate::rebuild_actor::RebuildStateRow>, EngineError> {
402 let conn = self.connect()?;
403 let row = conn
404 .query_row(
405 "SELECT kind, schema_id, state, rows_total, rows_done, \
406 started_at, is_first_registration, error_message \
407 FROM fts_property_rebuild_state WHERE kind = ?1",
408 rusqlite::params![kind],
409 |r| {
410 Ok(crate::rebuild_actor::RebuildStateRow {
411 kind: r.get(0)?,
412 schema_id: r.get(1)?,
413 state: r.get(2)?,
414 rows_total: r.get(3)?,
415 rows_done: r.get(4)?,
416 started_at: r.get(5)?,
417 is_first_registration: r.get::<_, i64>(6)? != 0,
418 error_message: r.get(7)?,
419 })
420 },
421 )
422 .optional()?;
423 Ok(row)
424 }
425
426 pub fn count_staging_rows(&self, kind: &str) -> Result<i64, EngineError> {
432 let conn = self.connect()?;
433 let count: i64 = conn.query_row(
434 "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1",
435 rusqlite::params![kind],
436 |r| r.get(0),
437 )?;
438 Ok(count)
439 }
440
441 pub fn staging_row_exists(
447 &self,
448 kind: &str,
449 node_logical_id: &str,
450 ) -> Result<bool, EngineError> {
451 let conn = self.connect()?;
452 let count: i64 = conn.query_row(
453 "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1 AND node_logical_id = ?2",
454 rusqlite::params![kind, node_logical_id],
455 |r| r.get(0),
456 )?;
457 Ok(count > 0)
458 }
459
460 pub fn describe_fts_property_schema(
465 &self,
466 kind: &str,
467 ) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
468 let conn = self.connect()?;
469 load_fts_property_schema_record(&conn, kind)
470 }
471
472 pub fn list_fts_property_schemas(&self) -> Result<Vec<FtsPropertySchemaRecord>, EngineError> {
477 let conn = self.connect()?;
478 let mut stmt = conn.prepare(
479 "SELECT kind, property_paths_json, separator, format_version \
480 FROM fts_property_schemas ORDER BY kind",
481 )?;
482 let records = stmt
483 .query_map([], |row| {
484 let kind: String = row.get(0)?;
485 let paths_json: String = row.get(1)?;
486 let separator: String = row.get(2)?;
487 let format_version: i64 = row.get(3)?;
488 Ok(build_fts_property_schema_record(
489 kind,
490 &paths_json,
491 separator,
492 format_version,
493 ))
494 })?
495 .collect::<Result<Vec<_>, _>>()?;
496 Ok(records)
497 }
498
499 pub fn remove_fts_property_schema(&self, kind: &str) -> Result<(), EngineError> {
507 let mut conn = self.connect()?;
508 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
509 let deleted = tx.execute("DELETE FROM fts_property_schemas WHERE kind = ?1", [kind])?;
510 if deleted == 0 {
511 return Err(EngineError::InvalidWrite(format!(
512 "FTS property schema for kind '{kind}' is not registered"
513 )));
514 }
515 let table = fathomdb_schema::fts_kind_table_name(kind);
517 let table_exists: bool = tx
518 .query_row(
519 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
520 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
521 rusqlite::params![table],
522 |r| r.get::<_, i64>(0),
523 )
524 .unwrap_or(0)
525 > 0;
526 if table_exists {
527 tx.execute_batch(&format!("DELETE FROM {table}"))?;
528 }
529 super::persist_simple_provenance_event(&tx, "fts_property_schema_removed", kind, None)?;
530 tx.commit()?;
531 Ok(())
532 }
533}
534
535pub(super) fn serialize_property_paths_json(
536 entries: &[FtsPropertyPathSpec],
537 exclude_paths: &[String],
538) -> Result<String, EngineError> {
539 let all_scalar = entries
543 .iter()
544 .all(|e| e.mode == FtsPropertyPathMode::Scalar);
545 let any_weight = entries.iter().any(|e| e.weight.is_some());
546 if all_scalar && exclude_paths.is_empty() && !any_weight {
547 let paths: Vec<&str> = entries.iter().map(|e| e.path.as_str()).collect();
548 return serde_json::to_string(&paths).map_err(|e| {
549 EngineError::InvalidWrite(format!("failed to serialize property paths: {e}"))
550 });
551 }
552
553 let mut obj = serde_json::Map::new();
554 let paths_json: Vec<serde_json::Value> = entries
555 .iter()
556 .map(|e| {
557 let mode_str = match e.mode {
558 FtsPropertyPathMode::Scalar => "scalar",
559 FtsPropertyPathMode::Recursive => "recursive",
560 };
561 let mut entry = serde_json::json!({ "path": e.path, "mode": mode_str });
562 if let Some(w) = e.weight {
563 entry["weight"] = serde_json::json!(w);
564 }
565 entry
566 })
567 .collect();
568 obj.insert("paths".to_owned(), serde_json::Value::Array(paths_json));
569 if !exclude_paths.is_empty() {
570 obj.insert("exclude_paths".to_owned(), serde_json::json!(exclude_paths));
571 }
572 serde_json::to_string(&serde_json::Value::Object(obj))
573 .map_err(|e| EngineError::InvalidWrite(format!("failed to serialize property paths: {e}")))
574}
575
576pub(super) fn create_or_replace_fts_kind_table(
582 conn: &rusqlite::Connection,
583 kind: &str,
584 specs: &[FtsPropertyPathSpec],
585 tokenizer: &str,
586) -> Result<(), EngineError> {
587 let table = fathomdb_schema::fts_kind_table_name(kind);
588
589 if !tokenizer
594 .chars()
595 .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
596 {
597 return Err(EngineError::Bridge(format!(
598 "invalid tokenizer string: {tokenizer:?}"
599 )));
600 }
601
602 let cols: Vec<String> = if specs.is_empty() {
603 vec![
604 "node_logical_id UNINDEXED".to_owned(),
605 "text_content".to_owned(),
606 ]
607 } else {
608 std::iter::once("node_logical_id UNINDEXED".to_owned())
609 .chain(specs.iter().map(|s| {
610 let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
611 fathomdb_schema::fts_column_name(&s.path, is_recursive)
612 }))
613 .collect()
614 };
615
616 let tokenizer_sql = tokenizer.replace('\'', "''");
619 conn.execute_batch(&format!(
620 "DROP TABLE IF EXISTS {table}; \
621 CREATE VIRTUAL TABLE {table} USING fts5({cols}, tokenize='{tokenizer_sql}');",
622 cols = cols.join(", "),
623 ))?;
624
625 Ok(())
626}
627
628pub(super) fn validate_fts_property_paths(paths: &[String]) -> Result<(), EngineError> {
629 if paths.is_empty() {
630 return Err(EngineError::InvalidWrite(
631 "FTS property paths must not be empty".to_owned(),
632 ));
633 }
634 let mut seen = std::collections::HashSet::new();
635 for path in paths {
636 if !path.starts_with("$.") {
637 return Err(EngineError::InvalidWrite(format!(
638 "FTS property path must start with '$.' but got: {path}"
639 )));
640 }
641 let after_prefix = &path[2..]; let segments: Vec<&str> = after_prefix.split('.').collect();
643 if segments.is_empty() || segments.iter().any(|s| s.is_empty()) {
644 return Err(EngineError::InvalidWrite(format!(
645 "FTS property path has empty segment(s): {path}"
646 )));
647 }
648 for seg in &segments {
649 if !seg.chars().all(|c| c.is_alphanumeric() || c == '_') {
650 return Err(EngineError::InvalidWrite(format!(
651 "FTS property path segment contains invalid characters: {path}"
652 )));
653 }
654 }
655 if !seen.insert(path) {
656 return Err(EngineError::InvalidWrite(format!(
657 "duplicate FTS property path: {path}"
658 )));
659 }
660 }
661 Ok(())
662}
663
664pub(super) fn load_fts_property_schema_record(
665 conn: &rusqlite::Connection,
666 kind: &str,
667) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
668 let row = conn
669 .query_row(
670 "SELECT kind, property_paths_json, separator, format_version \
671 FROM fts_property_schemas WHERE kind = ?1",
672 [kind],
673 |row| {
674 let kind: String = row.get(0)?;
675 let paths_json: String = row.get(1)?;
676 let separator: String = row.get(2)?;
677 let format_version: i64 = row.get(3)?;
678 Ok(build_fts_property_schema_record(
679 kind,
680 &paths_json,
681 separator,
682 format_version,
683 ))
684 },
685 )
686 .optional()?;
687 Ok(row)
688}
689
690pub(super) fn build_fts_property_schema_record(
696 kind: String,
697 paths_json: &str,
698 separator: String,
699 format_version: i64,
700) -> FtsPropertySchemaRecord {
701 let schema = crate::writer::parse_property_schema_json(paths_json, &separator);
702 let entries: Vec<FtsPropertyPathSpec> = schema
703 .paths
704 .into_iter()
705 .map(|entry| FtsPropertyPathSpec {
706 path: entry.path,
707 mode: match entry.mode {
708 crate::writer::PropertyPathMode::Scalar => FtsPropertyPathMode::Scalar,
709 crate::writer::PropertyPathMode::Recursive => FtsPropertyPathMode::Recursive,
710 },
711 weight: entry.weight,
712 })
713 .collect();
714 let property_paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
715 FtsPropertySchemaRecord {
716 kind,
717 property_paths,
718 entries,
719 exclude_paths: schema.exclude_paths,
720 separator,
721 format_version,
722 }
723}