Skip to main content

fathomdb_engine/admin/
fts.rs

1use rusqlite::{OptionalExtension, TransactionBehavior};
2
3use super::{
4    AdminService, EngineError, FtsProfile, FtsPropertyPathMode, FtsPropertyPathSpec,
5    FtsPropertySchemaRecord, RebuildMode, RebuildRequest, RebuildSubmit, resolve_tokenizer_preset,
6};
7
8impl AdminService {
9    /// Persist or update the FTS tokenizer profile for a node kind.
10    ///
11    /// `tokenizer_str` may be a preset name (see [`TOKENIZER_PRESETS`]) or a
12    /// raw FTS5 tokenizer string.  The resolved string is validated before
13    /// being written to `projection_profiles`.
14    ///
15    /// # Errors
16    /// Returns [`EngineError`] if the tokenizer string contains disallowed
17    /// characters, or if the database write fails.
18    pub fn set_fts_profile(
19        &self,
20        kind: &str,
21        tokenizer_str: &str,
22    ) -> Result<FtsProfile, EngineError> {
23        let resolved = resolve_tokenizer_preset(tokenizer_str);
24        // Allowed chars: alphanumeric, space, apostrophe, dot, underscore, hyphen, dollar, at
25        if !resolved
26            .chars()
27            .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
28        {
29            return Err(EngineError::Bridge(format!(
30                "invalid tokenizer string: {resolved:?}"
31            )));
32        }
33        let conn = self.connect()?;
34        conn.execute(
35            r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
36              VALUES (?1, 'fts', json_object('tokenizer', ?2), unixepoch(), unixepoch())
37              ON CONFLICT(kind, facet) DO UPDATE SET
38                  config_json = json_object('tokenizer', ?2),
39                  active_at   = unixepoch()",
40            rusqlite::params![kind, resolved],
41        )?;
42        let row = conn.query_row(
43            "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
44             FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
45            rusqlite::params![kind],
46            |row| {
47                Ok(FtsProfile {
48                    kind: row.get(0)?,
49                    tokenizer: row.get(1)?,
50                    active_at: row.get(2)?,
51                    created_at: row.get(3)?,
52                })
53            },
54        )?;
55        Ok(row)
56    }
57
58    /// Retrieve the FTS tokenizer profile for a node kind.
59    ///
60    /// Returns `None` if no profile has been set for `kind`.
61    ///
62    /// # Errors
63    /// Returns [`EngineError`] if the database query fails.
64    pub fn get_fts_profile(&self, kind: &str) -> Result<Option<FtsProfile>, EngineError> {
65        let conn = self.connect()?;
66        let result = conn
67            .query_row(
68                "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
69                 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
70                rusqlite::params![kind],
71                |row| {
72                    Ok(FtsProfile {
73                        kind: row.get(0)?,
74                        tokenizer: row.get(1)?,
75                        active_at: row.get(2)?,
76                        created_at: row.get(3)?,
77                    })
78                },
79            )
80            .optional()?;
81        Ok(result)
82    }
83
84    /// Register (or update) an FTS property projection schema for the given node kind.
85    ///
86    /// After registration, any node of this kind will have the declared JSON property
87    /// paths extracted, concatenated, and indexed in the per-kind `fts_props_<kind>` FTS5 table.
88    ///
89    /// # Errors
90    /// Returns [`EngineError`] if `property_paths` is empty, contains duplicates,
91    /// or if the database write fails.
92    pub fn register_fts_property_schema(
93        &self,
94        kind: &str,
95        property_paths: &[String],
96        separator: Option<&str>,
97    ) -> Result<FtsPropertySchemaRecord, EngineError> {
98        let specs: Vec<FtsPropertyPathSpec> = property_paths
99            .iter()
100            .map(|p| FtsPropertyPathSpec::scalar(p.clone()))
101            .collect();
102        self.register_fts_property_schema_with_entries(
103            kind,
104            &specs,
105            separator,
106            &[],
107            RebuildMode::Eager,
108        )
109    }
110
111    /// Register (or update) an FTS property projection schema with
112    /// per-path modes and optional exclude paths.
113    ///
114    /// Under `RebuildMode::Eager` (the legacy mode), the full rebuild runs
115    /// inside the registration transaction — same behavior as before Pack 7.
116    ///
117    /// Under `RebuildMode::Async` (the 0.4.1 default), the schema row is
118    /// persisted in a short IMMEDIATE transaction, a rebuild-state row is
119    /// upserted, and the actual rebuild is handed off to the background
120    /// `RebuildActor`.  The register call returns in <100ms even for large
121    /// kinds.
122    ///
123    /// # Errors
124    /// Returns [`EngineError`] if the paths are invalid, the JSON
125    /// serialization fails, or the (schema-persist / rebuild) transaction fails.
126    pub fn register_fts_property_schema_with_entries(
127        &self,
128        kind: &str,
129        entries: &[FtsPropertyPathSpec],
130        separator: Option<&str>,
131        exclude_paths: &[String],
132        mode: RebuildMode,
133    ) -> Result<FtsPropertySchemaRecord, EngineError> {
134        let paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
135        validate_fts_property_paths(&paths)?;
136        for p in exclude_paths {
137            if !p.starts_with("$.") {
138                return Err(EngineError::InvalidWrite(format!(
139                    "exclude_paths entries must start with '$.' but got: {p}"
140                )));
141            }
142        }
143        for e in entries {
144            if let Some(w) = e.weight
145                && !(w > 0.0 && w <= 1000.0)
146            {
147                return Err(EngineError::Bridge(format!(
148                    "weight out of range: {w} (must satisfy 0.0 < weight <= 1000.0)"
149                )));
150            }
151        }
152        let separator = separator.unwrap_or(" ");
153        let paths_json = serialize_property_paths_json(entries, exclude_paths)?;
154
155        match mode {
156            RebuildMode::Eager => self.register_fts_property_schema_eager(
157                kind,
158                entries,
159                separator,
160                exclude_paths,
161                &paths,
162                &paths_json,
163            ),
164            RebuildMode::Async => self.register_fts_property_schema_async(
165                kind,
166                entries,
167                separator,
168                &paths,
169                &paths_json,
170            ),
171        }
172    }
173
174    /// Eager path: existing transactional behavior unchanged.
175    fn register_fts_property_schema_eager(
176        &self,
177        kind: &str,
178        entries: &[FtsPropertyPathSpec],
179        separator: &str,
180        exclude_paths: &[String],
181        paths: &[String],
182        paths_json: &str,
183    ) -> Result<FtsPropertySchemaRecord, EngineError> {
184        let mut conn = self.connect()?;
185        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
186
187        // Determine whether the registration introduces a recursive path
188        // that was not present in the previously-registered schema for
189        // this kind. If so, we must eagerly rebuild property FTS rows and
190        // position map for every active node of this kind within the same
191        // transaction.
192        let previous_row: Option<(String, String)> = tx
193            .query_row(
194                "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
195                [kind],
196                |row| {
197                    let json: String = row.get(0)?;
198                    let sep: String = row.get(1)?;
199                    Ok((json, sep))
200                },
201            )
202            .optional()?;
203        let had_previous_schema = previous_row.is_some();
204        let previous_recursive_paths: Vec<String> = previous_row
205            .map(|(json, sep)| crate::writer::parse_property_schema_json(&json, &sep))
206            .map_or(Vec::new(), |schema| {
207                schema
208                    .paths
209                    .into_iter()
210                    .filter(|p| p.mode == crate::writer::PropertyPathMode::Recursive)
211                    .map(|p| p.path)
212                    .collect()
213            });
214        let new_recursive_paths: Vec<&str> = entries
215            .iter()
216            .filter(|e| e.mode == FtsPropertyPathMode::Recursive)
217            .map(|e| e.path.as_str())
218            .collect();
219        let introduces_new_recursive = new_recursive_paths
220            .iter()
221            .any(|p| !previous_recursive_paths.iter().any(|prev| prev == p));
222
223        tx.execute(
224            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
225             VALUES (?1, ?2, ?3) \
226             ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
227            rusqlite::params![kind, paths_json, separator],
228        )?;
229
230        // Eager transactional rebuild: always fire on any registration or update.
231        // First-time registrations must populate the per-kind FTS table from any
232        // existing nodes; updates must clear and re-populate so stale rows don't
233        // linger. This covers recursive-path additions AND scalar-only
234        // re-registrations where only the path or separator changed. (P4-P2-1)
235        let _ = (introduces_new_recursive, had_previous_schema);
236        let needs_rebuild = true;
237        if needs_rebuild {
238            let any_weight = entries.iter().any(|e| e.weight.is_some());
239            let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
240                .map_err(|e| EngineError::Bridge(e.to_string()))?;
241            if any_weight {
242                // Per-spec column mode: drop and recreate the table with one column
243                // per spec. Data population into per-spec columns is future work;
244                // the table is left empty after recreation.
245                create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
246                tx.execute(
247                    "DELETE FROM fts_node_property_positions WHERE kind = ?1",
248                    [kind],
249                )?;
250                // Skip insert_property_fts_rows_for_kind — it uses text_content
251                // which is not present in the per-spec column layout.
252            } else {
253                // Legacy text_content mode: drop and recreate the table to ensure
254                // the correct single-column layout (handles weighted-to-unweighted
255                // downgrade where a stale per-spec table might otherwise remain).
256                create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
257                tx.execute(
258                    "DELETE FROM fts_node_property_positions WHERE kind = ?1",
259                    [kind],
260                )?;
261                // Scope the rebuild to `kind` only. The multi-kind
262                // `insert_property_fts_rows` iterates over every registered
263                // schema and would re-insert rows for siblings that were not
264                // deleted above, duplicating their FTS entries.
265                crate::projection::insert_property_fts_rows_for_kind(&tx, kind)?;
266            }
267        }
268
269        super::persist_simple_provenance_event(
270            &tx,
271            "fts_property_schema_registered",
272            kind,
273            Some(serde_json::json!({
274                "property_paths": paths,
275                "separator": separator,
276                "exclude_paths": exclude_paths,
277                "eager_rebuild": needs_rebuild,
278            })),
279        )?;
280        tx.commit()?;
281
282        self.describe_fts_property_schema(kind)?.ok_or_else(|| {
283            EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
284        })
285    }
286
287    /// Async path: schema persisted in a short tx; rebuild handed to actor.
288    fn register_fts_property_schema_async(
289        &self,
290        kind: &str,
291        entries: &[FtsPropertyPathSpec],
292        separator: &str,
293        paths: &[String],
294        paths_json: &str,
295    ) -> Result<FtsPropertySchemaRecord, EngineError> {
296        let mut conn = self.connect()?;
297        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
298
299        // Detect first-registration vs re-registration.
300        let had_previous_schema: bool = tx
301            .query_row(
302                "SELECT count(*) FROM fts_property_schemas WHERE kind = ?1",
303                rusqlite::params![kind],
304                |r| r.get::<_, i64>(0),
305            )
306            .unwrap_or(0)
307            > 0;
308
309        // Upsert schema row (fast — just a metadata write).
310        tx.execute(
311            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
312             VALUES (?1, ?2, ?3) \
313             ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
314            rusqlite::params![kind, paths_json, separator],
315        )?;
316
317        // Preserve the live per-kind FTS table when the new schema is
318        // shape-compatible with the existing one. Readers arriving during
319        // PENDING/BUILDING then continue to see the pre-registration rows
320        // until the rebuild actor's step 5 atomic swap commits. Only drop
321        // when the new schema is shape-incompatible (column set or
322        // tokenizer change) — the live table's columns cannot service the
323        // new schema in that case. First registration (existing = None)
324        // leaves the table alone; the actor's defensive CREATE IF NOT
325        // EXISTS in step 5 creates it.
326        let any_weight = entries.iter().any(|e| e.weight.is_some());
327        let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
328            .map_err(|e| EngineError::Bridge(e.to_string()))?;
329        let desired = desired_fts_shape(entries, &tok);
330        let existing = fts_kind_table_shape(&tx, kind)?;
331        let must_drop = match &existing {
332            None => false,
333            Some(existing) => !shape_compatible(existing, &desired),
334        };
335        if must_drop {
336            if any_weight {
337                create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
338            } else {
339                // Legacy text_content layout — pass empty specs so
340                // create_or_replace_fts_kind_table uses the single text_content column.
341                create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
342            }
343        }
344
345        // Retrieve the rowid of the schema row as schema_id.
346        let schema_id: i64 = tx.query_row(
347            "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
348            rusqlite::params![kind],
349            |r| r.get(0),
350        )?;
351
352        let now_ms = crate::rebuild_actor::now_unix_ms_pub();
353        let is_first = i64::from(!had_previous_schema);
354
355        // Upsert rebuild state row.
356        tx.execute(
357            "INSERT INTO fts_property_rebuild_state \
358             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
359             VALUES (?1, ?2, 'PENDING', 0, ?3, ?4) \
360             ON CONFLICT(kind) DO UPDATE SET \
361                 schema_id = excluded.schema_id, \
362                 state = 'PENDING', \
363                 rows_total = NULL, \
364                 rows_done = 0, \
365                 started_at = excluded.started_at, \
366                 last_progress_at = NULL, \
367                 error_message = NULL, \
368                 is_first_registration = excluded.is_first_registration",
369            rusqlite::params![kind, schema_id, now_ms, is_first],
370        )?;
371
372        super::persist_simple_provenance_event(
373            &tx,
374            "fts_property_schema_registered",
375            kind,
376            Some(serde_json::json!({
377                "property_paths": paths,
378                "separator": separator,
379                "mode": "async",
380            })),
381        )?;
382        tx.commit()?;
383
384        // Enqueue a wakeup if the actor is available. Correctness is durable:
385        // the state row remains PENDING and the actor also polls the database.
386        if let Some(client) = &self.rebuild_client {
387            match client.try_submit(RebuildRequest {
388                kind: kind.to_owned(),
389                schema_id,
390            })? {
391                RebuildSubmit::Submitted => {}
392                RebuildSubmit::PersistedPending => {
393                    trace_warn!(
394                        kind = %kind,
395                        "rebuild wakeup not enqueued; durable PENDING row will be polled"
396                    );
397                }
398            }
399        }
400
401        self.describe_fts_property_schema(kind)?.ok_or_else(|| {
402            EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
403        })
404    }
405
406    /// Return the rebuild state row for a kind, if one exists.
407    ///
408    /// # Errors
409    /// Returns [`EngineError`] if the database query fails.
410    pub fn get_property_fts_rebuild_state(
411        &self,
412        kind: &str,
413    ) -> Result<Option<crate::rebuild_actor::RebuildStateRow>, EngineError> {
414        let conn = self.connect()?;
415        let row = conn
416            .query_row(
417                "SELECT kind, schema_id, state, rows_total, rows_done, \
418                 started_at, is_first_registration, error_message \
419                 FROM fts_property_rebuild_state WHERE kind = ?1",
420                rusqlite::params![kind],
421                |r| {
422                    Ok(crate::rebuild_actor::RebuildStateRow {
423                        kind: r.get(0)?,
424                        schema_id: r.get(1)?,
425                        state: r.get(2)?,
426                        rows_total: r.get(3)?,
427                        rows_done: r.get(4)?,
428                        started_at: r.get(5)?,
429                        is_first_registration: r.get::<_, i64>(6)? != 0,
430                        error_message: r.get(7)?,
431                    })
432                },
433            )
434            .optional()?;
435        Ok(row)
436    }
437
438    /// Return the count of rows in `fts_property_rebuild_staging` for a kind.
439    /// Used by tests to verify the staging table was populated.
440    ///
441    /// # Errors
442    /// Returns [`EngineError`] if the database query fails.
443    pub fn count_staging_rows(&self, kind: &str) -> Result<i64, EngineError> {
444        let conn = self.connect()?;
445        let count: i64 = conn.query_row(
446            "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1",
447            rusqlite::params![kind],
448            |r| r.get(0),
449        )?;
450        Ok(count)
451    }
452
453    /// Return whether a specific node is present in `fts_property_rebuild_staging`.
454    /// Used by tests to verify the double-write path.
455    ///
456    /// # Errors
457    /// Returns [`EngineError`] if the database query fails.
458    pub fn staging_row_exists(
459        &self,
460        kind: &str,
461        node_logical_id: &str,
462    ) -> Result<bool, EngineError> {
463        let conn = self.connect()?;
464        let count: i64 = conn.query_row(
465            "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1 AND node_logical_id = ?2",
466            rusqlite::params![kind, node_logical_id],
467            |r| r.get(0),
468        )?;
469        Ok(count > 0)
470    }
471
472    /// Return the FTS property schema for a single node kind, if registered.
473    ///
474    /// # Errors
475    /// Returns [`EngineError`] if the database query fails.
476    pub fn describe_fts_property_schema(
477        &self,
478        kind: &str,
479    ) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
480        let conn = self.connect()?;
481        load_fts_property_schema_record(&conn, kind)
482    }
483
484    /// Return all registered FTS property schemas.
485    ///
486    /// # Errors
487    /// Returns [`EngineError`] if the database query fails.
488    pub fn list_fts_property_schemas(&self) -> Result<Vec<FtsPropertySchemaRecord>, EngineError> {
489        let conn = self.connect()?;
490        let mut stmt = conn.prepare(
491            "SELECT kind, property_paths_json, separator, format_version \
492             FROM fts_property_schemas ORDER BY kind",
493        )?;
494        let records = stmt
495            .query_map([], |row| {
496                let kind: String = row.get(0)?;
497                let paths_json: String = row.get(1)?;
498                let separator: String = row.get(2)?;
499                let format_version: i64 = row.get(3)?;
500                Ok(build_fts_property_schema_record(
501                    kind,
502                    &paths_json,
503                    separator,
504                    format_version,
505                ))
506            })?
507            .collect::<Result<Vec<_>, _>>()?;
508        Ok(records)
509    }
510
511    /// Remove the FTS property schema for a node kind.
512    ///
513    /// This does **not** delete existing FTS rows for this kind;
514    /// call `rebuild_projections(Fts)` to clean up stale rows.
515    ///
516    /// # Errors
517    /// Returns [`EngineError`] if the kind is not registered or the delete fails.
518    pub fn remove_fts_property_schema(&self, kind: &str) -> Result<(), EngineError> {
519        let mut conn = self.connect()?;
520        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
521        let deleted = tx.execute("DELETE FROM fts_property_schemas WHERE kind = ?1", [kind])?;
522        if deleted == 0 {
523            return Err(EngineError::InvalidWrite(format!(
524                "FTS property schema for kind '{kind}' is not registered"
525            )));
526        }
527        // Delete all FTS rows from the per-kind table (if it exists).
528        let table = fathomdb_schema::fts_kind_table_name(kind);
529        let table_exists: bool = tx
530            .query_row(
531                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
532                 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
533                rusqlite::params![table],
534                |r| r.get::<_, i64>(0),
535            )
536            .unwrap_or(0)
537            > 0;
538        if table_exists {
539            tx.execute_batch(&format!("DELETE FROM {table}"))?;
540        }
541        super::persist_simple_provenance_event(&tx, "fts_property_schema_removed", kind, None)?;
542        tx.commit()?;
543        Ok(())
544    }
545}
546
547pub(super) fn serialize_property_paths_json(
548    entries: &[FtsPropertyPathSpec],
549    exclude_paths: &[String],
550) -> Result<String, EngineError> {
551    // Scalar-only schemas with no exclude_paths and no weights are
552    // serialised in the legacy shape (bare array of strings) for full
553    // backwards compatibility with earlier schema versions.
554    let all_scalar = entries
555        .iter()
556        .all(|e| e.mode == FtsPropertyPathMode::Scalar);
557    let any_weight = entries.iter().any(|e| e.weight.is_some());
558    if all_scalar && exclude_paths.is_empty() && !any_weight {
559        let paths: Vec<&str> = entries.iter().map(|e| e.path.as_str()).collect();
560        return serde_json::to_string(&paths).map_err(|e| {
561            EngineError::InvalidWrite(format!("failed to serialize property paths: {e}"))
562        });
563    }
564
565    let mut obj = serde_json::Map::new();
566    let paths_json: Vec<serde_json::Value> = entries
567        .iter()
568        .map(|e| {
569            let mode_str = match e.mode {
570                FtsPropertyPathMode::Scalar => "scalar",
571                FtsPropertyPathMode::Recursive => "recursive",
572            };
573            let mut entry = serde_json::json!({ "path": e.path, "mode": mode_str });
574            if let Some(w) = e.weight {
575                entry["weight"] = serde_json::json!(w);
576            }
577            entry
578        })
579        .collect();
580    obj.insert("paths".to_owned(), serde_json::Value::Array(paths_json));
581    if !exclude_paths.is_empty() {
582        obj.insert("exclude_paths".to_owned(), serde_json::json!(exclude_paths));
583    }
584    serde_json::to_string(&serde_json::Value::Object(obj))
585        .map_err(|e| EngineError::InvalidWrite(format!("failed to serialize property paths: {e}")))
586}
587
588/// Shape of the per-kind FTS5 virtual table — tokenizer string and the
589/// sorted set of non-metadata indexed column names.
590///
591/// Used by `register_fts_property_schema_async` to decide whether a
592/// re-registration can preserve the existing live table (shape-compatible)
593/// or must drop and recreate (shape-incompatible).
594#[derive(Debug, Clone, PartialEq, Eq)]
595pub(super) struct FtsTableShape {
596    pub tokenizer: String,
597    /// Sorted list of indexed (non-`UNINDEXED`, non-`node_logical_id`) columns.
598    pub columns: Vec<String>,
599}
600
601/// Read the current shape of the per-kind FTS5 virtual table, if it exists.
602///
603/// Returns `None` when the table is absent. Parses columns via
604/// `PRAGMA table_info` and the tokenizer clause from the
605/// `CREATE VIRTUAL TABLE` SQL stored in `sqlite_master`.
606pub(super) fn fts_kind_table_shape(
607    conn: &rusqlite::Connection,
608    kind: &str,
609) -> Result<Option<FtsTableShape>, EngineError> {
610    let table = fathomdb_schema::fts_kind_table_name(kind);
611    let create_sql: Option<String> = conn
612        .query_row(
613            "SELECT sql FROM sqlite_master WHERE type = 'table' AND name = ?1 \
614             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
615            rusqlite::params![table],
616            |r| r.get::<_, String>(0),
617        )
618        .optional()?;
619    let Some(create_sql) = create_sql else {
620        return Ok(None);
621    };
622
623    // Extract the tokenizer= clause: tokenize='...'
624    let tokenizer = extract_tokenizer_clause(&create_sql).unwrap_or_default();
625
626    // Read columns via PRAGMA table_info.
627    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
628    let rows = stmt.query_map([], |r| r.get::<_, String>(1))?;
629    let mut columns: Vec<String> = rows
630        .collect::<Result<Vec<_>, _>>()?
631        .into_iter()
632        .filter(|c| c != "node_logical_id")
633        .collect();
634    columns.sort();
635
636    Ok(Some(FtsTableShape { tokenizer, columns }))
637}
638
639/// Compute the shape that `create_or_replace_fts_kind_table` would
640/// produce for the given specs and tokenizer.
641pub(super) fn desired_fts_shape(specs: &[FtsPropertyPathSpec], tokenizer: &str) -> FtsTableShape {
642    // Mirror the branch in `register_fts_property_schema_async`:
643    // if any spec carries a weight the table uses per-spec columns; otherwise
644    // it uses the single legacy `text_content` column.
645    let any_weight = specs.iter().any(|s| s.weight.is_some());
646    let mut columns: Vec<String> = if any_weight {
647        specs
648            .iter()
649            .map(|s| {
650                let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
651                fathomdb_schema::fts_column_name(&s.path, is_recursive)
652            })
653            .collect()
654    } else {
655        vec!["text_content".to_owned()]
656    };
657    columns.sort();
658    FtsTableShape {
659        tokenizer: tokenizer.to_owned(),
660        columns,
661    }
662}
663
664/// Return true iff two FTS table shapes have identical tokenizer and
665/// identical (sorted) column sets. The `tokenizer` comparison is a
666/// plain string equality after extracting the value from the
667/// `tokenize='...'` clause.
668pub(super) fn shape_compatible(existing: &FtsTableShape, desired: &FtsTableShape) -> bool {
669    existing.tokenizer == desired.tokenizer && existing.columns == desired.columns
670}
671
672/// Parse the value of a `tokenize='...'` clause from a CREATE VIRTUAL
673/// TABLE SQL statement. Returns `None` if no such clause is present.
674fn extract_tokenizer_clause(sql: &str) -> Option<String> {
675    let lower = sql.to_lowercase();
676    let key_idx = lower.find("tokenize")?;
677    let after_key = &sql[key_idx..];
678    // Advance past "tokenize", optional spaces, '=', optional spaces.
679    let eq_rel = after_key.find('=')?;
680    let rest = &after_key[eq_rel + 1..];
681    let rest = rest.trim_start();
682    let rest = rest.strip_prefix('\'')?;
683    // Find the closing single quote, respecting doubled-single-quote escape.
684    let bytes = rest.as_bytes();
685    let mut i = 0;
686    let mut out = String::new();
687    while i < bytes.len() {
688        let c = bytes[i] as char;
689        if c == '\'' {
690            if i + 1 < bytes.len() && bytes[i + 1] as char == '\'' {
691                out.push('\'');
692                i += 2;
693                continue;
694            }
695            return Some(out);
696        }
697        out.push(c);
698        i += 1;
699    }
700    None
701}
702
703/// Drop and recreate the per-kind FTS5 virtual table with one column per spec.
704///
705/// The tokenizer string is validated before interpolation into DDL to
706/// prevent SQL injection.  If `specs` is empty a single `text_content`
707/// column is used (matching the migration-21 baseline shape).
708pub(super) fn create_or_replace_fts_kind_table(
709    conn: &rusqlite::Connection,
710    kind: &str,
711    specs: &[FtsPropertyPathSpec],
712    tokenizer: &str,
713) -> Result<(), EngineError> {
714    let table = fathomdb_schema::fts_kind_table_name(kind);
715
716    // Validate tokenizer string: alphanumeric plus the set used by all known presets.
717    // Must match the allowlist in `set_fts_profile` so that profiles written by one
718    // function are accepted by the other.  The source-code preset
719    // (`"unicode61 tokenchars '._-$@'"`) requires `.`, `-`, `$`, `@`.
720    if !tokenizer
721        .chars()
722        .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
723    {
724        return Err(EngineError::Bridge(format!(
725            "invalid tokenizer string: {tokenizer:?}"
726        )));
727    }
728
729    let cols: Vec<String> = if specs.is_empty() {
730        vec![
731            "node_logical_id UNINDEXED".to_owned(),
732            "text_content".to_owned(),
733        ]
734    } else {
735        std::iter::once("node_logical_id UNINDEXED".to_owned())
736            .chain(specs.iter().map(|s| {
737                let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
738                fathomdb_schema::fts_column_name(&s.path, is_recursive)
739            }))
740            .collect()
741    };
742
743    // Escape inner apostrophes so the SQL single-quoted tokenize= clause is valid.
744    // "unicode61 tokenchars '._-$@'" → "unicode61 tokenchars ''._-$@''"
745    let tokenizer_sql = tokenizer.replace('\'', "''");
746    conn.execute_batch(&format!(
747        "DROP TABLE IF EXISTS {table}; \
748         CREATE VIRTUAL TABLE {table} USING fts5({cols}, tokenize='{tokenizer_sql}');",
749        cols = cols.join(", "),
750    ))?;
751
752    Ok(())
753}
754
755pub(super) fn validate_fts_property_paths(paths: &[String]) -> Result<(), EngineError> {
756    if paths.is_empty() {
757        return Err(EngineError::InvalidWrite(
758            "FTS property paths must not be empty".to_owned(),
759        ));
760    }
761    let mut seen = std::collections::HashSet::new();
762    for path in paths {
763        if !path.starts_with("$.") {
764            return Err(EngineError::InvalidWrite(format!(
765                "FTS property path must start with '$.' but got: {path}"
766            )));
767        }
768        let after_prefix = &path[2..]; // safe: already validated "$." prefix
769        let segments: Vec<&str> = after_prefix.split('.').collect();
770        if segments.is_empty() || segments.iter().any(|s| s.is_empty()) {
771            return Err(EngineError::InvalidWrite(format!(
772                "FTS property path has empty segment(s): {path}"
773            )));
774        }
775        for seg in &segments {
776            if !seg.chars().all(|c| c.is_alphanumeric() || c == '_') {
777                return Err(EngineError::InvalidWrite(format!(
778                    "FTS property path segment contains invalid characters: {path}"
779                )));
780            }
781        }
782        if !seen.insert(path) {
783            return Err(EngineError::InvalidWrite(format!(
784                "duplicate FTS property path: {path}"
785            )));
786        }
787    }
788    Ok(())
789}
790
791pub(super) fn load_fts_property_schema_record(
792    conn: &rusqlite::Connection,
793    kind: &str,
794) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
795    let row = conn
796        .query_row(
797            "SELECT kind, property_paths_json, separator, format_version \
798             FROM fts_property_schemas WHERE kind = ?1",
799            [kind],
800            |row| {
801                let kind: String = row.get(0)?;
802                let paths_json: String = row.get(1)?;
803                let separator: String = row.get(2)?;
804                let format_version: i64 = row.get(3)?;
805                Ok(build_fts_property_schema_record(
806                    kind,
807                    &paths_json,
808                    separator,
809                    format_version,
810                ))
811            },
812        )
813        .optional()?;
814    Ok(row)
815}
816
817/// Build an [`FtsPropertySchemaRecord`] from a raw
818/// `fts_property_schemas` row. Delegates JSON parsing to
819/// [`crate::writer::parse_property_schema_json`] — the same parser the
820/// recursive walker uses at rebuild time — so both the legacy bare-array
821/// shape and the Phase 4 object-shaped envelope round-trip correctly.
822pub(super) fn build_fts_property_schema_record(
823    kind: String,
824    paths_json: &str,
825    separator: String,
826    format_version: i64,
827) -> FtsPropertySchemaRecord {
828    let schema = crate::writer::parse_property_schema_json(paths_json, &separator);
829    let entries: Vec<FtsPropertyPathSpec> = schema
830        .paths
831        .into_iter()
832        .map(|entry| FtsPropertyPathSpec {
833            path: entry.path,
834            mode: match entry.mode {
835                crate::writer::PropertyPathMode::Scalar => FtsPropertyPathMode::Scalar,
836                crate::writer::PropertyPathMode::Recursive => FtsPropertyPathMode::Recursive,
837            },
838            weight: entry.weight,
839        })
840        .collect();
841    let property_paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
842    FtsPropertySchemaRecord {
843        kind,
844        property_paths,
845        entries,
846        exclude_paths: schema.exclude_paths,
847        separator,
848        format_version,
849    }
850}