Skip to main content

fathomdb_engine/admin/
fts.rs

1use rusqlite::{OptionalExtension, TransactionBehavior};
2
3use super::{
4    AdminService, EngineError, FtsProfile, FtsPropertyPathMode, FtsPropertyPathSpec,
5    FtsPropertySchemaRecord, RebuildMode, RebuildRequest, resolve_tokenizer_preset,
6};
7
8impl AdminService {
9    /// Persist or update the FTS tokenizer profile for a node kind.
10    ///
11    /// `tokenizer_str` may be a preset name (see [`TOKENIZER_PRESETS`]) or a
12    /// raw FTS5 tokenizer string.  The resolved string is validated before
13    /// being written to `projection_profiles`.
14    ///
15    /// # Errors
16    /// Returns [`EngineError`] if the tokenizer string contains disallowed
17    /// characters, or if the database write fails.
18    pub fn set_fts_profile(
19        &self,
20        kind: &str,
21        tokenizer_str: &str,
22    ) -> Result<FtsProfile, EngineError> {
23        let resolved = resolve_tokenizer_preset(tokenizer_str);
24        // Allowed chars: alphanumeric, space, apostrophe, dot, underscore, hyphen, dollar, at
25        if !resolved
26            .chars()
27            .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
28        {
29            return Err(EngineError::Bridge(format!(
30                "invalid tokenizer string: {resolved:?}"
31            )));
32        }
33        let conn = self.connect()?;
34        conn.execute(
35            r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
36              VALUES (?1, 'fts', json_object('tokenizer', ?2), unixepoch(), unixepoch())
37              ON CONFLICT(kind, facet) DO UPDATE SET
38                  config_json = json_object('tokenizer', ?2),
39                  active_at   = unixepoch()",
40            rusqlite::params![kind, resolved],
41        )?;
42        let row = conn.query_row(
43            "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
44             FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
45            rusqlite::params![kind],
46            |row| {
47                Ok(FtsProfile {
48                    kind: row.get(0)?,
49                    tokenizer: row.get(1)?,
50                    active_at: row.get(2)?,
51                    created_at: row.get(3)?,
52                })
53            },
54        )?;
55        Ok(row)
56    }
57
58    /// Retrieve the FTS tokenizer profile for a node kind.
59    ///
60    /// Returns `None` if no profile has been set for `kind`.
61    ///
62    /// # Errors
63    /// Returns [`EngineError`] if the database query fails.
64    pub fn get_fts_profile(&self, kind: &str) -> Result<Option<FtsProfile>, EngineError> {
65        let conn = self.connect()?;
66        let result = conn
67            .query_row(
68                "SELECT kind, json_extract(config_json, '$.tokenizer'), active_at, created_at \
69                 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
70                rusqlite::params![kind],
71                |row| {
72                    Ok(FtsProfile {
73                        kind: row.get(0)?,
74                        tokenizer: row.get(1)?,
75                        active_at: row.get(2)?,
76                        created_at: row.get(3)?,
77                    })
78                },
79            )
80            .optional()?;
81        Ok(result)
82    }
83
84    /// Register (or update) an FTS property projection schema for the given node kind.
85    ///
86    /// After registration, any node of this kind will have the declared JSON property
87    /// paths extracted, concatenated, and indexed in the per-kind `fts_props_<kind>` FTS5 table.
88    ///
89    /// # Errors
90    /// Returns [`EngineError`] if `property_paths` is empty, contains duplicates,
91    /// or if the database write fails.
92    pub fn register_fts_property_schema(
93        &self,
94        kind: &str,
95        property_paths: &[String],
96        separator: Option<&str>,
97    ) -> Result<FtsPropertySchemaRecord, EngineError> {
98        let specs: Vec<FtsPropertyPathSpec> = property_paths
99            .iter()
100            .map(|p| FtsPropertyPathSpec::scalar(p.clone()))
101            .collect();
102        self.register_fts_property_schema_with_entries(
103            kind,
104            &specs,
105            separator,
106            &[],
107            RebuildMode::Eager,
108        )
109    }
110
111    /// Register (or update) an FTS property projection schema with
112    /// per-path modes and optional exclude paths.
113    ///
114    /// Under `RebuildMode::Eager` (the legacy mode), the full rebuild runs
115    /// inside the registration transaction — same behavior as before Pack 7.
116    ///
117    /// Under `RebuildMode::Async` (the 0.4.1 default), the schema row is
118    /// persisted in a short IMMEDIATE transaction, a rebuild-state row is
119    /// upserted, and the actual rebuild is handed off to the background
120    /// `RebuildActor`.  The register call returns in <100ms even for large
121    /// kinds.
122    ///
123    /// # Errors
124    /// Returns [`EngineError`] if the paths are invalid, the JSON
125    /// serialization fails, or the (schema-persist / rebuild) transaction fails.
126    pub fn register_fts_property_schema_with_entries(
127        &self,
128        kind: &str,
129        entries: &[FtsPropertyPathSpec],
130        separator: Option<&str>,
131        exclude_paths: &[String],
132        mode: RebuildMode,
133    ) -> Result<FtsPropertySchemaRecord, EngineError> {
134        let paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
135        validate_fts_property_paths(&paths)?;
136        for p in exclude_paths {
137            if !p.starts_with("$.") {
138                return Err(EngineError::InvalidWrite(format!(
139                    "exclude_paths entries must start with '$.' but got: {p}"
140                )));
141            }
142        }
143        for e in entries {
144            if let Some(w) = e.weight
145                && !(w > 0.0 && w <= 1000.0)
146            {
147                return Err(EngineError::Bridge(format!(
148                    "weight out of range: {w} (must satisfy 0.0 < weight <= 1000.0)"
149                )));
150            }
151        }
152        let separator = separator.unwrap_or(" ");
153        let paths_json = serialize_property_paths_json(entries, exclude_paths)?;
154
155        match mode {
156            RebuildMode::Eager => self.register_fts_property_schema_eager(
157                kind,
158                entries,
159                separator,
160                exclude_paths,
161                &paths,
162                &paths_json,
163            ),
164            RebuildMode::Async => self.register_fts_property_schema_async(
165                kind,
166                entries,
167                separator,
168                &paths,
169                &paths_json,
170            ),
171        }
172    }
173
174    /// Eager path: existing transactional behavior unchanged.
175    fn register_fts_property_schema_eager(
176        &self,
177        kind: &str,
178        entries: &[FtsPropertyPathSpec],
179        separator: &str,
180        exclude_paths: &[String],
181        paths: &[String],
182        paths_json: &str,
183    ) -> Result<FtsPropertySchemaRecord, EngineError> {
184        let mut conn = self.connect()?;
185        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
186
187        // Determine whether the registration introduces a recursive path
188        // that was not present in the previously-registered schema for
189        // this kind. If so, we must eagerly rebuild property FTS rows and
190        // position map for every active node of this kind within the same
191        // transaction.
192        let previous_row: Option<(String, String)> = tx
193            .query_row(
194                "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
195                [kind],
196                |row| {
197                    let json: String = row.get(0)?;
198                    let sep: String = row.get(1)?;
199                    Ok((json, sep))
200                },
201            )
202            .optional()?;
203        let had_previous_schema = previous_row.is_some();
204        let previous_recursive_paths: Vec<String> = previous_row
205            .map(|(json, sep)| crate::writer::parse_property_schema_json(&json, &sep))
206            .map_or(Vec::new(), |schema| {
207                schema
208                    .paths
209                    .into_iter()
210                    .filter(|p| p.mode == crate::writer::PropertyPathMode::Recursive)
211                    .map(|p| p.path)
212                    .collect()
213            });
214        let new_recursive_paths: Vec<&str> = entries
215            .iter()
216            .filter(|e| e.mode == FtsPropertyPathMode::Recursive)
217            .map(|e| e.path.as_str())
218            .collect();
219        let introduces_new_recursive = new_recursive_paths
220            .iter()
221            .any(|p| !previous_recursive_paths.iter().any(|prev| prev == p));
222
223        tx.execute(
224            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
225             VALUES (?1, ?2, ?3) \
226             ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
227            rusqlite::params![kind, paths_json, separator],
228        )?;
229
230        // Eager transactional rebuild: always fire on any registration or update.
231        // First-time registrations must populate the per-kind FTS table from any
232        // existing nodes; updates must clear and re-populate so stale rows don't
233        // linger. This covers recursive-path additions AND scalar-only
234        // re-registrations where only the path or separator changed. (P4-P2-1)
235        let _ = (introduces_new_recursive, had_previous_schema);
236        let needs_rebuild = true;
237        if needs_rebuild {
238            let any_weight = entries.iter().any(|e| e.weight.is_some());
239            let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
240                .map_err(|e| EngineError::Bridge(e.to_string()))?;
241            if any_weight {
242                // Per-spec column mode: drop and recreate the table with one column
243                // per spec. Data population into per-spec columns is future work;
244                // the table is left empty after recreation.
245                create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
246                tx.execute(
247                    "DELETE FROM fts_node_property_positions WHERE kind = ?1",
248                    [kind],
249                )?;
250                // Skip insert_property_fts_rows_for_kind — it uses text_content
251                // which is not present in the per-spec column layout.
252            } else {
253                // Legacy text_content mode: drop and recreate the table to ensure
254                // the correct single-column layout (handles weighted-to-unweighted
255                // downgrade where a stale per-spec table might otherwise remain).
256                create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
257                tx.execute(
258                    "DELETE FROM fts_node_property_positions WHERE kind = ?1",
259                    [kind],
260                )?;
261                // Scope the rebuild to `kind` only. The multi-kind
262                // `insert_property_fts_rows` iterates over every registered
263                // schema and would re-insert rows for siblings that were not
264                // deleted above, duplicating their FTS entries.
265                crate::projection::insert_property_fts_rows_for_kind(&tx, kind)?;
266            }
267        }
268
269        super::persist_simple_provenance_event(
270            &tx,
271            "fts_property_schema_registered",
272            kind,
273            Some(serde_json::json!({
274                "property_paths": paths,
275                "separator": separator,
276                "exclude_paths": exclude_paths,
277                "eager_rebuild": needs_rebuild,
278            })),
279        )?;
280        tx.commit()?;
281
282        self.describe_fts_property_schema(kind)?.ok_or_else(|| {
283            EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
284        })
285    }
286
287    /// Async path: schema persisted in a short tx; rebuild handed to actor.
288    fn register_fts_property_schema_async(
289        &self,
290        kind: &str,
291        entries: &[FtsPropertyPathSpec],
292        separator: &str,
293        paths: &[String],
294        paths_json: &str,
295    ) -> Result<FtsPropertySchemaRecord, EngineError> {
296        let mut conn = self.connect()?;
297        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
298
299        // Detect first-registration vs re-registration.
300        let had_previous_schema: bool = tx
301            .query_row(
302                "SELECT count(*) FROM fts_property_schemas WHERE kind = ?1",
303                rusqlite::params![kind],
304                |r| r.get::<_, i64>(0),
305            )
306            .unwrap_or(0)
307            > 0;
308
309        // Upsert schema row (fast — just a metadata write).
310        tx.execute(
311            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
312             VALUES (?1, ?2, ?3) \
313             ON CONFLICT(kind) DO UPDATE SET property_paths_json = ?2, separator = ?3",
314            rusqlite::params![kind, paths_json, separator],
315        )?;
316
317        // Always drop and recreate the per-kind FTS table to ensure the schema
318        // matches the registered spec layout. This handles weighted-to-unweighted
319        // downgrade where a stale per-spec table would otherwise remain.
320        let any_weight = entries.iter().any(|e| e.weight.is_some());
321        let tok = fathomdb_schema::resolve_fts_tokenizer(&tx, kind)
322            .map_err(|e| EngineError::Bridge(e.to_string()))?;
323        if any_weight {
324            create_or_replace_fts_kind_table(&tx, kind, entries, &tok)?;
325        } else {
326            // Legacy text_content layout — pass empty specs so
327            // create_or_replace_fts_kind_table uses the single text_content column.
328            create_or_replace_fts_kind_table(&tx, kind, &[], &tok)?;
329        }
330
331        // Retrieve the rowid of the schema row as schema_id.
332        let schema_id: i64 = tx.query_row(
333            "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
334            rusqlite::params![kind],
335            |r| r.get(0),
336        )?;
337
338        let now_ms = crate::rebuild_actor::now_unix_ms_pub();
339        let is_first = i64::from(!had_previous_schema);
340
341        // Upsert rebuild state row.
342        tx.execute(
343            "INSERT INTO fts_property_rebuild_state \
344             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
345             VALUES (?1, ?2, 'PENDING', 0, ?3, ?4) \
346             ON CONFLICT(kind) DO UPDATE SET \
347                 schema_id = excluded.schema_id, \
348                 state = 'PENDING', \
349                 rows_total = NULL, \
350                 rows_done = 0, \
351                 started_at = excluded.started_at, \
352                 last_progress_at = NULL, \
353                 error_message = NULL, \
354                 is_first_registration = excluded.is_first_registration",
355            rusqlite::params![kind, schema_id, now_ms, is_first],
356        )?;
357
358        super::persist_simple_provenance_event(
359            &tx,
360            "fts_property_schema_registered",
361            kind,
362            Some(serde_json::json!({
363                "property_paths": paths,
364                "separator": separator,
365                "mode": "async",
366            })),
367        )?;
368        tx.commit()?;
369
370        // Enqueue the rebuild request if the actor is available.
371        // try_send is non-blocking: if the channel is full (capacity 64), the
372        // request is dropped. The state row stays PENDING and the caller can
373        // observe this via get_property_fts_rebuild_state. No automatic retry
374        // in 0.4.1 — caller must re-invoke register to re-enqueue.
375        if let Some(sender) = &self.rebuild_sender
376            && sender
377                .try_send(RebuildRequest {
378                    kind: kind.to_owned(),
379                    schema_id,
380                })
381                .is_err()
382        {
383            trace_warn!(
384                kind = %kind,
385                "rebuild channel full; rebuild request dropped — state remains PENDING"
386            );
387        }
388
389        self.describe_fts_property_schema(kind)?.ok_or_else(|| {
390            EngineError::Bridge("registered FTS property schema missing after commit".to_owned())
391        })
392    }
393
394    /// Return the rebuild state row for a kind, if one exists.
395    ///
396    /// # Errors
397    /// Returns [`EngineError`] if the database query fails.
398    pub fn get_property_fts_rebuild_state(
399        &self,
400        kind: &str,
401    ) -> Result<Option<crate::rebuild_actor::RebuildStateRow>, EngineError> {
402        let conn = self.connect()?;
403        let row = conn
404            .query_row(
405                "SELECT kind, schema_id, state, rows_total, rows_done, \
406                 started_at, is_first_registration, error_message \
407                 FROM fts_property_rebuild_state WHERE kind = ?1",
408                rusqlite::params![kind],
409                |r| {
410                    Ok(crate::rebuild_actor::RebuildStateRow {
411                        kind: r.get(0)?,
412                        schema_id: r.get(1)?,
413                        state: r.get(2)?,
414                        rows_total: r.get(3)?,
415                        rows_done: r.get(4)?,
416                        started_at: r.get(5)?,
417                        is_first_registration: r.get::<_, i64>(6)? != 0,
418                        error_message: r.get(7)?,
419                    })
420                },
421            )
422            .optional()?;
423        Ok(row)
424    }
425
426    /// Return the count of rows in `fts_property_rebuild_staging` for a kind.
427    /// Used by tests to verify the staging table was populated.
428    ///
429    /// # Errors
430    /// Returns [`EngineError`] if the database query fails.
431    pub fn count_staging_rows(&self, kind: &str) -> Result<i64, EngineError> {
432        let conn = self.connect()?;
433        let count: i64 = conn.query_row(
434            "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1",
435            rusqlite::params![kind],
436            |r| r.get(0),
437        )?;
438        Ok(count)
439    }
440
441    /// Return whether a specific node is present in `fts_property_rebuild_staging`.
442    /// Used by tests to verify the double-write path.
443    ///
444    /// # Errors
445    /// Returns [`EngineError`] if the database query fails.
446    pub fn staging_row_exists(
447        &self,
448        kind: &str,
449        node_logical_id: &str,
450    ) -> Result<bool, EngineError> {
451        let conn = self.connect()?;
452        let count: i64 = conn.query_row(
453            "SELECT count(*) FROM fts_property_rebuild_staging WHERE kind = ?1 AND node_logical_id = ?2",
454            rusqlite::params![kind, node_logical_id],
455            |r| r.get(0),
456        )?;
457        Ok(count > 0)
458    }
459
460    /// Return the FTS property schema for a single node kind, if registered.
461    ///
462    /// # Errors
463    /// Returns [`EngineError`] if the database query fails.
464    pub fn describe_fts_property_schema(
465        &self,
466        kind: &str,
467    ) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
468        let conn = self.connect()?;
469        load_fts_property_schema_record(&conn, kind)
470    }
471
472    /// Return all registered FTS property schemas.
473    ///
474    /// # Errors
475    /// Returns [`EngineError`] if the database query fails.
476    pub fn list_fts_property_schemas(&self) -> Result<Vec<FtsPropertySchemaRecord>, EngineError> {
477        let conn = self.connect()?;
478        let mut stmt = conn.prepare(
479            "SELECT kind, property_paths_json, separator, format_version \
480             FROM fts_property_schemas ORDER BY kind",
481        )?;
482        let records = stmt
483            .query_map([], |row| {
484                let kind: String = row.get(0)?;
485                let paths_json: String = row.get(1)?;
486                let separator: String = row.get(2)?;
487                let format_version: i64 = row.get(3)?;
488                Ok(build_fts_property_schema_record(
489                    kind,
490                    &paths_json,
491                    separator,
492                    format_version,
493                ))
494            })?
495            .collect::<Result<Vec<_>, _>>()?;
496        Ok(records)
497    }
498
499    /// Remove the FTS property schema for a node kind.
500    ///
501    /// This does **not** delete existing FTS rows for this kind;
502    /// call `rebuild_projections(Fts)` to clean up stale rows.
503    ///
504    /// # Errors
505    /// Returns [`EngineError`] if the kind is not registered or the delete fails.
506    pub fn remove_fts_property_schema(&self, kind: &str) -> Result<(), EngineError> {
507        let mut conn = self.connect()?;
508        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
509        let deleted = tx.execute("DELETE FROM fts_property_schemas WHERE kind = ?1", [kind])?;
510        if deleted == 0 {
511            return Err(EngineError::InvalidWrite(format!(
512                "FTS property schema for kind '{kind}' is not registered"
513            )));
514        }
515        // Delete all FTS rows from the per-kind table (if it exists).
516        let table = fathomdb_schema::fts_kind_table_name(kind);
517        let table_exists: bool = tx
518            .query_row(
519                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
520                 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
521                rusqlite::params![table],
522                |r| r.get::<_, i64>(0),
523            )
524            .unwrap_or(0)
525            > 0;
526        if table_exists {
527            tx.execute_batch(&format!("DELETE FROM {table}"))?;
528        }
529        super::persist_simple_provenance_event(&tx, "fts_property_schema_removed", kind, None)?;
530        tx.commit()?;
531        Ok(())
532    }
533}
534
535pub(super) fn serialize_property_paths_json(
536    entries: &[FtsPropertyPathSpec],
537    exclude_paths: &[String],
538) -> Result<String, EngineError> {
539    // Scalar-only schemas with no exclude_paths and no weights are
540    // serialised in the legacy shape (bare array of strings) for full
541    // backwards compatibility with earlier schema versions.
542    let all_scalar = entries
543        .iter()
544        .all(|e| e.mode == FtsPropertyPathMode::Scalar);
545    let any_weight = entries.iter().any(|e| e.weight.is_some());
546    if all_scalar && exclude_paths.is_empty() && !any_weight {
547        let paths: Vec<&str> = entries.iter().map(|e| e.path.as_str()).collect();
548        return serde_json::to_string(&paths).map_err(|e| {
549            EngineError::InvalidWrite(format!("failed to serialize property paths: {e}"))
550        });
551    }
552
553    let mut obj = serde_json::Map::new();
554    let paths_json: Vec<serde_json::Value> = entries
555        .iter()
556        .map(|e| {
557            let mode_str = match e.mode {
558                FtsPropertyPathMode::Scalar => "scalar",
559                FtsPropertyPathMode::Recursive => "recursive",
560            };
561            let mut entry = serde_json::json!({ "path": e.path, "mode": mode_str });
562            if let Some(w) = e.weight {
563                entry["weight"] = serde_json::json!(w);
564            }
565            entry
566        })
567        .collect();
568    obj.insert("paths".to_owned(), serde_json::Value::Array(paths_json));
569    if !exclude_paths.is_empty() {
570        obj.insert("exclude_paths".to_owned(), serde_json::json!(exclude_paths));
571    }
572    serde_json::to_string(&serde_json::Value::Object(obj))
573        .map_err(|e| EngineError::InvalidWrite(format!("failed to serialize property paths: {e}")))
574}
575
576/// Drop and recreate the per-kind FTS5 virtual table with one column per spec.
577///
578/// The tokenizer string is validated before interpolation into DDL to
579/// prevent SQL injection.  If `specs` is empty a single `text_content`
580/// column is used (matching the migration-21 baseline shape).
581pub(super) fn create_or_replace_fts_kind_table(
582    conn: &rusqlite::Connection,
583    kind: &str,
584    specs: &[FtsPropertyPathSpec],
585    tokenizer: &str,
586) -> Result<(), EngineError> {
587    let table = fathomdb_schema::fts_kind_table_name(kind);
588
589    // Validate tokenizer string: alphanumeric plus the set used by all known presets.
590    // Must match the allowlist in `set_fts_profile` so that profiles written by one
591    // function are accepted by the other.  The source-code preset
592    // (`"unicode61 tokenchars '._-$@'"`) requires `.`, `-`, `$`, `@`.
593    if !tokenizer
594        .chars()
595        .all(|c| c.is_alphanumeric() || "'._-$@ ".contains(c))
596    {
597        return Err(EngineError::Bridge(format!(
598            "invalid tokenizer string: {tokenizer:?}"
599        )));
600    }
601
602    let cols: Vec<String> = if specs.is_empty() {
603        vec![
604            "node_logical_id UNINDEXED".to_owned(),
605            "text_content".to_owned(),
606        ]
607    } else {
608        std::iter::once("node_logical_id UNINDEXED".to_owned())
609            .chain(specs.iter().map(|s| {
610                let is_recursive = matches!(s.mode, FtsPropertyPathMode::Recursive);
611                fathomdb_schema::fts_column_name(&s.path, is_recursive)
612            }))
613            .collect()
614    };
615
616    // Escape inner apostrophes so the SQL single-quoted tokenize= clause is valid.
617    // "unicode61 tokenchars '._-$@'" → "unicode61 tokenchars ''._-$@''"
618    let tokenizer_sql = tokenizer.replace('\'', "''");
619    conn.execute_batch(&format!(
620        "DROP TABLE IF EXISTS {table}; \
621         CREATE VIRTUAL TABLE {table} USING fts5({cols}, tokenize='{tokenizer_sql}');",
622        cols = cols.join(", "),
623    ))?;
624
625    Ok(())
626}
627
628pub(super) fn validate_fts_property_paths(paths: &[String]) -> Result<(), EngineError> {
629    if paths.is_empty() {
630        return Err(EngineError::InvalidWrite(
631            "FTS property paths must not be empty".to_owned(),
632        ));
633    }
634    let mut seen = std::collections::HashSet::new();
635    for path in paths {
636        if !path.starts_with("$.") {
637            return Err(EngineError::InvalidWrite(format!(
638                "FTS property path must start with '$.' but got: {path}"
639            )));
640        }
641        let after_prefix = &path[2..]; // safe: already validated "$." prefix
642        let segments: Vec<&str> = after_prefix.split('.').collect();
643        if segments.is_empty() || segments.iter().any(|s| s.is_empty()) {
644            return Err(EngineError::InvalidWrite(format!(
645                "FTS property path has empty segment(s): {path}"
646            )));
647        }
648        for seg in &segments {
649            if !seg.chars().all(|c| c.is_alphanumeric() || c == '_') {
650                return Err(EngineError::InvalidWrite(format!(
651                    "FTS property path segment contains invalid characters: {path}"
652                )));
653            }
654        }
655        if !seen.insert(path) {
656            return Err(EngineError::InvalidWrite(format!(
657                "duplicate FTS property path: {path}"
658            )));
659        }
660    }
661    Ok(())
662}
663
664pub(super) fn load_fts_property_schema_record(
665    conn: &rusqlite::Connection,
666    kind: &str,
667) -> Result<Option<FtsPropertySchemaRecord>, EngineError> {
668    let row = conn
669        .query_row(
670            "SELECT kind, property_paths_json, separator, format_version \
671             FROM fts_property_schemas WHERE kind = ?1",
672            [kind],
673            |row| {
674                let kind: String = row.get(0)?;
675                let paths_json: String = row.get(1)?;
676                let separator: String = row.get(2)?;
677                let format_version: i64 = row.get(3)?;
678                Ok(build_fts_property_schema_record(
679                    kind,
680                    &paths_json,
681                    separator,
682                    format_version,
683                ))
684            },
685        )
686        .optional()?;
687    Ok(row)
688}
689
690/// Build an [`FtsPropertySchemaRecord`] from a raw
691/// `fts_property_schemas` row. Delegates JSON parsing to
692/// [`crate::writer::parse_property_schema_json`] — the same parser the
693/// recursive walker uses at rebuild time — so both the legacy bare-array
694/// shape and the Phase 4 object-shaped envelope round-trip correctly.
695pub(super) fn build_fts_property_schema_record(
696    kind: String,
697    paths_json: &str,
698    separator: String,
699    format_version: i64,
700) -> FtsPropertySchemaRecord {
701    let schema = crate::writer::parse_property_schema_json(paths_json, &separator);
702    let entries: Vec<FtsPropertyPathSpec> = schema
703        .paths
704        .into_iter()
705        .map(|entry| FtsPropertyPathSpec {
706            path: entry.path,
707            mode: match entry.mode {
708                crate::writer::PropertyPathMode::Scalar => FtsPropertyPathMode::Scalar,
709                crate::writer::PropertyPathMode::Recursive => FtsPropertyPathMode::Recursive,
710            },
711            weight: entry.weight,
712        })
713        .collect();
714    let property_paths: Vec<String> = entries.iter().map(|e| e.path.clone()).collect();
715    FtsPropertySchemaRecord {
716        kind,
717        property_paths,
718        entries,
719        exclude_paths: schema.exclude_paths,
720        separator,
721        format_version,
722    }
723}