Skip to main content

fathomdb_engine/admin/
fts.rs

1use rusqlite::{OptionalExtension, TransactionBehavior};
2
3use super::{
4    AdminService, EngineError, FtsProfile, FtsPropertyPathMode, FtsPropertyPathSpec,
5    FtsPropertySchemaRecord, RebuildMode, RebuildRequest, resolve_tokenizer_preset,
6};
7
8impl AdminService {
9    /// Persist or update the FTS tokenizer profile for a node kind.
10    ///
11    /// `tokenizer_str` may be a preset name (see [`TOKENIZER_PRESETS`]) or a
12    /// raw FTS5 tokenizer string.  The resolved string is validated before
13    /// being written to `projection_profiles`.
14    ///
15    /// # Errors
16    /// Returns [`EngineError`] if the tokenizer string contains disallowed
17    /// characters, or if the database write fails.
18    pub fn set_fts_profile(
19        &self,
20        kind: &str,
21        tokenizer_str: &str,
22    ) -> Result<FtsProfile, EngineError> {
23        let resolved = resolve_tokenizer_preset(tokenizer_str);
24        // Allowed chars: alphanumeric, space, apostrophe, dot, underscore, hyphen, dollar, at
25        if !resolved
26            .chars()
27            .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
28        {
29            return Err(EngineError::Bridge(format!(
30                "invalid tokenizer string: {resolved:?}"
31            )));
32        }
33        let conn = self.connect()?;
34        conn.execute(
35            r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
36              VALUES (?1, 'fts', json_object('tokenizer', ?2), unixepoch(), unixepoch())
37              ON CONFLICT(kind, facet) DO UPDATE SET
38                  config_json = json_object('tokenizer', ?2),
39                  active_at   = unixepoch()",
40            rusqlite::params![kind, resolved],
41        )?;
42        let row = conn.query_row(
43            "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
44             FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
45            rusqlite::params![kind],
46            |row| {
47                Ok(FtsProfile {
48                    kind: row.get(0)?,
49                    tokenizer: row.get(1)?,
50                    active_at: row.get(2)?,
51                    created_at: row.get(3)?,
52                })
53            },
54        )?;
55        Ok(row)
56    }
57
58    /// Retrieve the FTS tokenizer profile for a node kind.
59    ///
60    /// Returns `None` if no profile has been set for `kind`.
61    ///
62    /// # Errors
63    /// Returns [`EngineError`] if the database query fails.
64    pub fn get_fts_profile(&self, kind: &str) -> Result<Option<FtsProfile>, EngineError> {
65        let conn = self.connect()?;
66        let result = conn
67            .query_row(
68                "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
69                 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
70                rusqlite::params![kind],
71                |row| {
72                    Ok(FtsProfile {
73                        kind: row.get(0)?,
74                        tokenizer: row.get(1)?,
75                        active_at: row.get(2)?,
76                        created_at: row.get(3)?,
77                    })
78                },
79            )
80            .optional()?;
81        Ok(result)
82    }
83
84    /// Register (or update) an FTS property projection schema for the given node kind.
85    ///
86    /// After registration, any node of this kind will have the declared JSON property
87    /// paths extracted, concatenated, and indexed in the per-kind `fts_props_<kind>` FTS5 table.
88    ///
89    /// # Errors
90    /// Returns [`EngineError`] if `property_paths` is empty, contains duplicates,
91    /// or if the database write fails.
92    pub fn register_fts_property_schema(
93        &self,
94        kind: &str,
95        property_paths: &[String],
96        separator: Option<&str>,
97    ) -> Result<FtsPropertySchemaRecord, EngineError> {
98        let specs: Vec<FtsPropertyPathSpec> = property_paths
99            .iter()
100            .map(|p| FtsPropertyPathSpec::scalar(p.clone()))
101            .collect();
102        self.register_fts_property_schema_with_entries(
103            kind,
104            &specs,
105            separator,
106            &[],
107            RebuildMode::Eager,
108        )
109    }
110
111    /// Register (or update) an FTS property projection schema with
112    /// per-path modes and optional exclude paths.
113    ///
114    /// Under `RebuildMode::Eager` (the legacy mode), the full rebuild runs
115    /// inside the registration transaction — same behavior as before Pack 7.
116    ///
117    /// Under `RebuildMode::Async` (the 0.4.1 default), the schema row is
118    /// persisted in a short IMMEDIATE transaction, a rebuild-state row is
119    /// upserted, and the actual rebuild is handed off to the background
120    /// `RebuildActor`.  The register call returns in <100ms even for large
121    /// kinds.
122    ///
123    /// # Errors
124    /// Returns [`EngineError`] if the paths are invalid, the JSON
125    /// serialization fails, or the (schema-persist / rebuild) transaction fails.
126    pub fn register_fts_property_schema_with_entries(
127        &self,
128        kind: &str,
129        entries: &[FtsPropertyPathSpec],
130        separator: Option<&str>,
131        exclude_paths: &[String],
132        mode: RebuildMode,
133    ) -> Result<FtsPropertySchemaRecord, EngineError> {
134        let paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
135        validate_fts_property_paths(&paths)?;
136        for p in exclude_paths {
137            if !p.starts_with("$.") {
138                return Err(EngineError::InvalidWrite(format!(
139                    "exclude_paths entries must start with '$.' but got: {p}"
140                )));
141            }
142        }
143        for e in entries {
144            if let Some(w) = e.weight
145                && !(w > 0.0 && w <= 1000.0)
146            {
147                return Err(EngineError::Bridge(format!(
148                    "weight out of range: {w} (must satisfy 0.0 < weight <= 1000.0)"
149                )));
150            }
151        }
152        let separator = separator.unwrap_or(" ");
153        let paths_json = serialize_property_paths_json(entries, exclude_paths)?;
154
155        match mode {
156            RebuildMode::Eager => self.register_fts_property_schema_eager(
157                kind,
158                entries,
159                separator,
160                exclude_paths,
161                &paths,
162                &paths_json,
163            ),
164            RebuildMode::Async => self.register_fts_property_schema_async(
165                kind,
166                entries,
167                separator,
168                &paths,
169                &paths_json,
170            ),
171        }
172    }
173
174    /// Eager path: existing transactional behavior unchanged.
175    fn register_fts_property_schema_eager(
176        &self,
177        kind: &str,
178        entries: &[FtsPropertyPathSpec],
179        separator: &str,
180        exclude_paths: &[String],
181        paths: &[String],
182        paths_json: &str,
183    ) -> Result<FtsPropertySchemaRecord, EngineError> {
184        let mut conn = self.connect()?;
185        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
186
187        // Determine whether the registration introduces a recursive path
188        // that was not present in the previously-registered schema for
189        // this kind. If so, we must eagerly rebuild property FTS rows and
190        // position map for every active node of this kind within the same
191        // transaction.
192        let previous_row: Option<(String, String)> = tx
193            .query_row(
194                "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
195                [kind],
196                |row| {
197                    let json: String = row.get(0)?;
198                    let sep: String = row.get(1)?;
199                    Ok((json, sep))
200                },
201            )
202            .optional()?;
203        let had_previous_schema = previous_row.is_some();
204        let previous_recursive_paths: Vec<String> = previous_row
205            .map(|(json, sep)| crate::writer::parse_property_schema_json(&json, &sep))
206            .map_or(Vec::new(), |schema| {
207                schema
208                    .paths
209                    .into_iter()
210                    .filter(|p| p.mode == crate::writer::PropertyPathMode::Recursive)
211                    .map(|p| p.path)
212                    .collect()
213            });
214        let new_recursive_paths: Vec<&str> = entries
215            .iter()
216            .filter(|e| e.mode == FtsPropertyPathMode::Recursive)
217            .map(|e| e.path.as_str())
218            .collect();
219        let introduces_new_recursive = new_recursive_paths
220            .iter()
221            .any(|p| !previous_recursive_paths.iter().any(|prev| prev == p));
222
223        tx.execute(
224            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
225             VALUES (?1, ?2, ?3) \
226             ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
227            rusqlite::params![kind, paths_json, separator],
228        )?;
229
230        // Eager transactional rebuild: always fire on any registration or update.
231        // First-time registrations must populate the per-kind FTS table from any
232        // existing nodes; updates must clear and re-populate so stale rows don't
233        // linger. This covers recursive-path additions AND scalar-only
234        // re-registrations where only the path or separator changed. (P4-P2-1)
235        let _ = (introduces_new_recursive, had_previous_schema);
236        let needs_rebuild = true;
237        if needs_rebuild {
238            let any_weight = entries.iter().any(|e| e.weight.is_some());
239            let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
240                .map_err(|e| EngineError::Bridge(e.to_string()))?;
241            if any_weight {
242                // Per-spec column mode: drop and recreate the table with one column
243                // per spec. Data population into per-spec columns is future work;
244                // the table is left empty after recreation.
245                create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
246                tx.execute(
247                    "DELETE FROM fts_node_property_positions WHERE kind = ?1",
248                    [kind],
249                )?;
250                // Skip insert_property_fts_rows_for_kind — it uses text_content
251                // which is not present in the per-spec column layout.
252            } else {
253                // Legacy text_content mode: drop and recreate the table to ensure
254                // the correct single-column layout (handles weighted-to-unweighted
255                // downgrade where a stale per-spec table might otherwise remain).
256                create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
257                tx.execute(
258                    "DELETE FROM fts_node_property_positions WHERE kind = ?1",
259                    [kind],
260                )?;
261                // Scope the rebuild to `kind` only. The multi-kind
262                // `insert_property_fts_rows` iterates over every registered
263                // schema and would re-insert rows for siblings that were not
264                // deleted above, duplicating their FTS entries.
265                crate::projection::insert_property_fts_rows_for_kind(&tx, kind)?;
266            }
267        }
268
269        super::persist_simple_provenance_event(
270            &tx,
271            "fts_property_schema_registered",
272            kind,
273            Some(serde_json::json!({
274                "property_paths": paths,
275                "separator": separator,
276                "exclude_paths": exclude_paths,
277                "eager_rebuild": needs_rebuild,
278            })),
279        )?;
280        tx.commit()?;
281
282        self.describe_fts_property_schema(kind)?.ok_or_else(|| {
283            EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
284        })
285    }
286
287    /// Async path: schema persisted in a short tx; rebuild handed to actor.
288    fn register_fts_property_schema_async(
289        &self,
290        kind: &str,
291        entries: &[FtsPropertyPathSpec],
292        separator: &str,
293        paths: &[String],
294        paths_json: &str,
295    ) -> Result<FtsPropertySchemaRecord, EngineError> {
296        let mut conn = self.connect()?;
297        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
298
299        // Detect first-registration vs re-registration.
300        let had_previous_schema: bool = tx
301            .query_row(
302                "SELECT count(*) FROM fts_property_schemas WHERE kind = ?1",
303                rusqlite::params![kind],
304                |r| r.get::<_, i64>(0),
305            )
306            .unwrap_or(0)
307            > 0;
308
309        // Upsert schema row (fast — just a metadata write).
310        tx.execute(
311            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
312             VALUES (?1, ?2, ?3) \
313             ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
314            rusqlite::params![kind, paths_json, separator],
315        )?;
316
317        // Preserve the live per-kind FTS table when the new schema is
318        // shape-compatible with the existing one. Readers arriving during
319        // PENDING/BUILDING then continue to see the pre-registration rows
320        // until the rebuild actor's step 5 atomic swap commits. Only drop
321        // when the new schema is shape-incompatible (column set or
322        // tokenizer change) — the live table's columns cannot service the
323        // new schema in that case. First registration (existing = None)
324        // leaves the table alone; the actor's defensive CREATE IF NOT
325        // EXISTS in step 5 creates it.
326        let any_weight = entries.iter().any(|e| e.weight.is_some());
327        let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
328            .map_err(|e| EngineError::Bridge(e.to_string()))?;
329        let desired = desired_fts_shape(entries, &tok);
330        let existing = fts_kind_table_shape(&tx, kind)?;
331        let must_drop = match &existing {
332            None => false,
333            Some(existing) => !shape_compatible(existing, &desired),
334        };
335        if must_drop {
336            if any_weight {
337                create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
338            } else {
339                // Legacy text_content layout — pass empty specs so
340                // create_or_replace_fts_kind_table uses the single text_content column.
341                create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
342            }
343        }
344
345        // Retrieve the rowid of the schema row as schema_id.
346        let schema_id: i64 = tx.query_row(
347            "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
348            rusqlite::params![kind],
349            |r| r.get(0),
350        )?;
351
352        let now_ms = crate::rebuild_actor::now_unix_ms_pub();
353        let is_first = i64::from(!had_previous_schema);
354
355        // Upsert rebuild state row.
356        tx.execute(
357            "INSERT INTO fts_property_rebuild_state \
358             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
359             VALUES (?1, ?2, 'PENDING', 0, ?3, ?4) \
360             ON CONFLICT(kind) DO UPDATE SET \
361                 schema_id = excluded.schema_id, \
362                 state = 'PENDING', \
363                 rows_total = NULL, \
364                 rows_done = 0, \
365                 started_at = excluded.started_at, \
366                 last_progress_at = NULL, \
367                 error_message = NULL, \
368                 is_first_registration = excluded.is_first_registration",
369            rusqlite::params![kind, schema_id, now_ms, is_first],
370        )?;
371
372        super::persist_simple_provenance_event(
373            &tx,
374            "fts_property_schema_registered",
375            kind,
376            Some(serde_json::json!({
377                "property_paths": paths,
378                "separator": separator,
379                "mode": "async",
380            })),
381        )?;
382        tx.commit()?;
383
384        // Enqueue the rebuild request if the actor is available.
385        // try_send is non-blocking: if the channel is full (capacity 64), the
386        // request is dropped. The state row stays PENDING and the caller can
387        // observe this via get_property_fts_rebuild_state. No automatic retry
388        // in 0.4.1 — caller must re-invoke register to re-enqueue.
389        if let Some(sender) = &self.rebuild_sender
390            && sender
391                .try_send(RebuildRequest {
392                    kind: kind.to_owned(),
393                    schema_id,
394                })
395                .is_err()
396        {
397            trace_warn!(
398                kind = %kind,
399                "rebuild channel full; rebuild request dropped — state remains PENDING"
400            );
401        }
402
403        self.describe_fts_property_schema(kind)?.ok_or_else(|| {
404            EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
405        })
406    }
407
408    /// Return the rebuild state row for a kind, if one exists.
409    ///
410    /// # Errors
411    /// Returns [`EngineError`] if the database query fails.
412    pub fn get_property_fts_rebuild_state(
413        &self,
414        kind: &str,
415    ) -> Result<Option<crate::rebuild_actor::RebuildStateRow>, EngineError> {
416        let conn = self.connect()?;
417        let row = conn
418            .query_row(
419                "SELECT kind, schema_id, state, rows_total, rows_done, \
420                 started_at, is_first_registration, error_message \
421                 FROM fts_property_rebuild_state WHERE kind = ?1",
422                rusqlite::params![kind],
423                |r| {
424                    Ok(crate::rebuild_actor::RebuildStateRow {
425                        kind: r.get(0)?,
426                        schema_id: r.get(1)?,
427                        state: r.get(2)?,
428                        rows_total: r.get(3)?,
429                        rows_done: r.get(4)?,
430                        started_at: r.get(5)?,
431                        is_first_registration: r.get::<_, i64>(6)? != 0,
432                        error_message: r.get(7)?,
433                    })
434                },
435            )
436            .optional()?;
437        Ok(row)
438    }
439
440    /// Return the count of rows in `fts_property_rebuild_staging` for a kind.
441    /// Used by tests to verify the staging table was populated.
442    ///
443    /// # Errors
444    /// Returns [`EngineError`] if the database query fails.
445    pub fn count_staging_rows(&self, kind: &str) -> Result<i64, EngineError> {
446        let conn = self.connect()?;
447        let count: i64 = conn.query_row(
448            "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1",
449            rusqlite::params![kind],
450            |r| r.get(0),
451        )?;
452        Ok(count)
453    }
454
455    /// Return whether a specific node is present in `fts_property_rebuild_staging`.
456    /// Used by tests to verify the double-write path.
457    ///
458    /// # Errors
459    /// Returns [`EngineError`] if the database query fails.
460    pub fn staging_row_exists(
461        &self,
462        kind: &str,
463        node_logical_id: &str,
464    ) -> Result<bool, EngineError> {
465        let conn = self.connect()?;
466        let count: i64 = conn.query_row(
467            "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1 AND node_logical_id = ?2",
468            rusqlite::params![kind, node_logical_id],
469            |r| r.get(0),
470        )?;
471        Ok(count > 0)
472    }
473
474    /// Return the FTS property schema for a single node kind, if registered.
475    ///
476    /// # Errors
477    /// Returns [`EngineError`] if the database query fails.
478    pub fn describe_fts_property_schema(
479        &self,
480        kind: &str,
481    ) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
482        let conn = self.connect()?;
483        load_fts_property_schema_record(&conn, kind)
484    }
485
486    /// Return all registered FTS property schemas.
487    ///
488    /// # Errors
489    /// Returns [`EngineError`] if the database query fails.
490    pub fn list_fts_property_schemas(&self) -> Result<Vec<FtsPropertySchemaRecord>, EngineError> {
491        let conn = self.connect()?;
492        let mut stmt = conn.prepare(
493            "SELECT kind, property_paths_json, separator, format_version \
494             FROM fts_property_schemas ORDER BY kind",
495        )?;
496        let records = stmt
497            .query_map([], |row| {
498                let kind: String = row.get(0)?;
499                let paths_json: String = row.get(1)?;
500                let separator: String = row.get(2)?;
501                let format_version: i64 = row.get(3)?;
502                Ok(build_fts_property_schema_record(
503                    kind,
504                    &paths_json,
505                    separator,
506                    format_version,
507                ))
508            })?
509            .collect::<Result<Vec<_>, _>>()?;
510        Ok(records)
511    }
512
513    /// Remove the FTS property schema for a node kind.
514    ///
515    /// This does **not** delete existing FTS rows for this kind;
516    /// call `rebuild_projections(Fts)` to clean up stale rows.
517    ///
518    /// # Errors
519    /// Returns [`EngineError`] if the kind is not registered or the delete fails.
520    pub fn remove_fts_property_schema(&self, kind: &str) -> Result<(), EngineError> {
521        let mut conn = self.connect()?;
522        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
523        let deleted = tx.execute("DELETE FROM fts_property_schemas WHERE kind = ?1", [kind])?;
524        if deleted == 0 {
525            return Err(EngineError::InvalidWrite(format!(
526                "FTS property schema for kind '{kind}' is not registered"
527            )));
528        }
529        // Delete all FTS rows from the per-kind table (if it exists).
530        let table = fathomdb_schema::fts_kind_table_name(kind);
531        let table_exists: bool = tx
532            .query_row(
533                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
534                 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
535                rusqlite::params![table],
536                |r| r.get::<_, i64>(0),
537            )
538            .unwrap_or(0)
539            > 0;
540        if table_exists {
541            tx.execute_batch(&format!("DELETE FROM {table}"))?;
542        }
543        super::persist_simple_provenance_event(&tx, "fts_property_schema_removed", kind, None)?;
544        tx.commit()?;
545        Ok(())
546    }
547}
548
549pub(super) fn serialize_property_paths_json(
550    entries: &[FtsPropertyPathSpec],
551    exclude_paths: &[String],
552) -> Result<String, EngineError> {
553    // Scalar-only schemas with no exclude_paths and no weights are
554    // serialised in the legacy shape (bare array of strings) for full
555    // backwards compatibility with earlier schema versions.
556    let all_scalar = entries
557        .iter()
558        .all(|e| e.mode == FtsPropertyPathMode::Scalar);
559    let any_weight = entries.iter().any(|e| e.weight.is_some());
560    if all_scalar && exclude_paths.is_empty() && !any_weight {
561        let paths: Vec<&str> = entries.iter().map(|e| e.path.as_str()).collect();
562        return serde_json::to_string(&paths).map_err(|e| {
563            EngineError::InvalidWrite(format!("failed to serialize property paths: {e}"))
564        });
565    }
566
567    let mut obj = serde_json::Map::new();
568    let paths_json: Vec<serde_json::Value> = entries
569        .iter()
570        .map(|e| {
571            let mode_str = match e.mode {
572                FtsPropertyPathMode::Scalar => "scalar",
573                FtsPropertyPathMode::Recursive => "recursive",
574            };
575            let mut entry = serde_json::json!({ "path": e.path, "mode": mode_str });
576            if let Some(w) = e.weight {
577                entry["weight"] = serde_json::json!(w);
578            }
579            entry
580        })
581        .collect();
582    obj.insert("paths".to_owned(), serde_json::Value::Array(paths_json));
583    if !exclude_paths.is_empty() {
584        obj.insert("exclude_paths".to_owned(), serde_json::json!(exclude_paths));
585    }
586    serde_json::to_string(&serde_json::Value::Object(obj))
587        .map_err(|e| EngineError::InvalidWrite(format!("failed to serialize property paths: {e}")))
588}
589
590/// Shape of the per-kind FTS5 virtual table — tokenizer string and the
591/// sorted set of non-metadata indexed column names.
592///
593/// Used by `register_fts_property_schema_async` to decide whether a
594/// re-registration can preserve the existing live table (shape-compatible)
595/// or must drop and recreate (shape-incompatible).
596#[derive(Debug, Clone, PartialEq, Eq)]
597pub(super) struct FtsTableShape {
598    pub tokenizer: String,
599    /// Sorted list of indexed (non-`UNINDEXED`, non-`node_logical_id`) columns.
600    pub columns: Vec<String>,
601}
602
603/// Read the current shape of the per-kind FTS5 virtual table, if it exists.
604///
605/// Returns `None` when the table is absent. Parses columns via
606/// `PRAGMA table_info` and the tokenizer clause from the
607/// `CREATE VIRTUAL TABLE` SQL stored in `sqlite_master`.
608pub(super) fn fts_kind_table_shape(
609    conn: &rusqlite::Connection,
610    kind: &str,
611) -> Result<Option<FtsTableShape>, EngineError> {
612    let table = fathomdb_schema::fts_kind_table_name(kind);
613    let create_sql: Option<String> = conn
614        .query_row(
615            "SELECT sql FROM sqlite_master WHERE type = 'table' AND name = ?1 \
616             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
617            rusqlite::params![table],
618            |r| r.get::<_, String>(0),
619        )
620        .optional()?;
621    let Some(create_sql) = create_sql else {
622        return Ok(None);
623    };
624
625    // Extract the tokenizer= clause: tokenize='...'
626    let tokenizer = extract_tokenizer_clause(&create_sql).unwrap_or_default();
627
628    // Read columns via PRAGMA table_info.
629    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
630    let rows = stmt.query_map([], |r| r.get::<_, String>(1))?;
631    let mut columns: Vec<String> = rows
632        .collect::<Result<Vec<_>, _>>()?
633        .into_iter()
634        .filter(|c| c != "node_logical_id")
635        .collect();
636    columns.sort();
637
638    Ok(Some(FtsTableShape { tokenizer, columns }))
639}
640
641/// Compute the shape that `create_or_replace_fts_kind_table` would
642/// produce for the given specs and tokenizer.
643pub(super) fn desired_fts_shape(specs: &[FtsPropertyPathSpec], tokenizer: &str) -> FtsTableShape {
644    // Mirror the branch in `register_fts_property_schema_async`:
645    // if any spec carries a weight the table uses per-spec columns; otherwise
646    // it uses the single legacy `text_content` column.
647    let any_weight = specs.iter().any(|s| s.weight.is_some());
648    let mut columns: Vec<String> = if any_weight {
649        specs
650            .iter()
651            .map(|s| {
652                let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
653                fathomdb_schema::fts_column_name(&s.path, is_recursive)
654            })
655            .collect()
656    } else {
657        vec!["text_content".to_owned()]
658    };
659    columns.sort();
660    FtsTableShape {
661        tokenizer: tokenizer.to_owned(),
662        columns,
663    }
664}
665
666/// Return true iff two FTS table shapes have identical tokenizer and
667/// identical (sorted) column sets. The `tokenizer` comparison is a
668/// plain string equality after extracting the value from the
669/// `tokenize='...'` clause.
670pub(super) fn shape_compatible(existing: &FtsTableShape, desired: &FtsTableShape) -> bool {
671    existing.tokenizer == desired.tokenizer && existing.columns == desired.columns
672}
673
674/// Parse the value of a `tokenize='...'` clause from a CREATE VIRTUAL
675/// TABLE SQL statement. Returns `None` if no such clause is present.
676fn extract_tokenizer_clause(sql: &str) -> Option<String> {
677    let lower = sql.to_lowercase();
678    let key_idx = lower.find("tokenize")?;
679    let after_key = &sql[key_idx..];
680    // Advance past "tokenize", optional spaces, '=', optional spaces.
681    let eq_rel = after_key.find('=')?;
682    let rest = &after_key[eq_rel + 1..];
683    let rest = rest.trim_start();
684    let rest = rest.strip_prefix('\'')?;
685    // Find the closing single quote, respecting doubled-single-quote escape.
686    let bytes = rest.as_bytes();
687    let mut i = 0;
688    let mut out = String::new();
689    while i < bytes.len() {
690        let c = bytes[i] as char;
691        if c == '\'' {
692            if i + 1 < bytes.len() && bytes[i + 1] as char == '\'' {
693                out.push('\'');
694                i += 2;
695                continue;
696            }
697            return Some(out);
698        }
699        out.push(c);
700        i += 1;
701    }
702    None
703}
704
705/// Drop and recreate the per-kind FTS5 virtual table with one column per spec.
706///
707/// The tokenizer string is validated before interpolation into DDL to
708/// prevent SQL injection.  If `specs` is empty a single `text_content`
709/// column is used (matching the migration-21 baseline shape).
710pub(super) fn create_or_replace_fts_kind_table(
711    conn: &rusqlite::Connection,
712    kind: &str,
713    specs: &[FtsPropertyPathSpec],
714    tokenizer: &str,
715) -> Result<(), EngineError> {
716    let table = fathomdb_schema::fts_kind_table_name(kind);
717
718    // Validate tokenizer string: alphanumeric plus the set used by all known presets.
719    // Must match the allowlist in `set_fts_profile` so that profiles written by one
720    // function are accepted by the other.  The source-code preset
721    // (`"unicode61 tokenchars '._-$@'"`) requires `.`, `-`, `$`, `@`.
722    if !tokenizer
723        .chars()
724        .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
725    {
726        return Err(EngineError::Bridge(format!(
727            "invalid tokenizer string: {tokenizer:?}"
728        )));
729    }
730
731    let cols: Vec<String> = if specs.is_empty() {
732        vec![
733            "node_logical_id UNINDEXED".to_owned(),
734            "text_content".to_owned(),
735        ]
736    } else {
737        std::iter::once("node_logical_id UNINDEXED".to_owned())
738            .chain(specs.iter().map(|s| {
739                let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
740                fathomdb_schema::fts_column_name(&s.path, is_recursive)
741            }))
742            .collect()
743    };
744
745    // Escape inner apostrophes so the SQL single-quoted tokenize= clause is valid.
746    // "unicode61 tokenchars '._-$@'" → "unicode61 tokenchars ''._-$@''"
747    let tokenizer_sql = tokenizer.replace('\'', "''");
748    conn.execute_batch(&format!(
749        "DROP TABLE IF EXISTS {table}; \
750         CREATE VIRTUAL TABLE {table} USING fts5({cols}, tokenize='{tokenizer_sql}');",
751        cols = cols.join(", "),
752    ))?;
753
754    Ok(())
755}
756
757pub(super) fn validate_fts_property_paths(paths: &[String]) -> Result<(), EngineError> {
758    if paths.is_empty() {
759        return Err(EngineError::InvalidWrite(
760            "FTS property paths must not be empty".to_owned(),
761        ));
762    }
763    let mut seen = std::collections::HashSet::new();
764    for path in paths {
765        if !path.starts_with("$.") {
766            return Err(EngineError::InvalidWrite(format!(
767                "FTS property path must start with '$.' but got: {path}"
768            )));
769        }
770        let after_prefix = &path[2..]; // safe: already validated "$." prefix
771        let segments: Vec<&str> = after_prefix.split('.').collect();
772        if segments.is_empty() || segments.iter().any(|s| s.is_empty()) {
773            return Err(EngineError::InvalidWrite(format!(
774                "FTS property path has empty segment(s): {path}"
775            )));
776        }
777        for seg in &segments {
778            if !seg.chars().all(|c| c.is_alphanumeric() || c == '_') {
779                return Err(EngineError::InvalidWrite(format!(
780                    "FTS property path segment contains invalid characters: {path}"
781                )));
782            }
783        }
784        if !seen.insert(path) {
785            return Err(EngineError::InvalidWrite(format!(
786                "duplicate FTS property path: {path}"
787            )));
788        }
789    }
790    Ok(())
791}
792
793pub(super) fn load_fts_property_schema_record(
794    conn: &rusqlite::Connection,
795    kind: &str,
796) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
797    let row = conn
798        .query_row(
799            "SELECT kind, property_paths_json, separator, format_version \
800             FROM fts_property_schemas WHERE kind = ?1",
801            [kind],
802            |row| {
803                let kind: String = row.get(0)?;
804                let paths_json: String = row.get(1)?;
805                let separator: String = row.get(2)?;
806                let format_version: i64 = row.get(3)?;
807                Ok(build_fts_property_schema_record(
808                    kind,
809                    &paths_json,
810                    separator,
811                    format_version,
812                ))
813            },
814        )
815        .optional()?;
816    Ok(row)
817}
818
819/// Build an [`FtsPropertySchemaRecord`] from a raw
820/// `fts_property_schemas` row. Delegates JSON parsing to
821/// [`crate::writer::parse_property_schema_json`] — the same parser the
822/// recursive walker uses at rebuild time — so both the legacy bare-array
823/// shape and the Phase 4 object-shaped envelope round-trip correctly.
824pub(super) fn build_fts_property_schema_record(
825    kind: String,
826    paths_json: &str,
827    separator: String,
828    format_version: i64,
829) -> FtsPropertySchemaRecord {
830    let schema = crate::writer::parse_property_schema_json(paths_json, &separator);
831    let entries: Vec<FtsPropertyPathSpec> = schema
832        .paths
833        .into_iter()
834        .map(|entry| FtsPropertyPathSpec {
835            path: entry.path,
836            mode: match entry.mode {
837                crate::writer::PropertyPathMode::Scalar => FtsPropertyPathMode::Scalar,
838                crate::writer::PropertyPathMode::Recursive => FtsPropertyPathMode::Recursive,
839            },
840            weight: entry.weight,
841        })
842        .collect();
843    let property_paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
844    FtsPropertySchemaRecord {
845        kind,
846        property_paths,
847        entries,
848        exclude_paths: schema.exclude_paths,
849        separator,
850        format_version,
851    }
852}