Skip to main content

fathomdb_engine/
rebuild_actor.rs

1/// Background actor that serializes async property-FTS rebuild tasks.
2///
3/// Modeled exactly on [`crate::writer::WriterActor`]: one OS thread,
4/// `std::sync::mpsc`, `JoinHandle` for shutdown.  No tokio.
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::mpsc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use fathomdb_schema::SchemaManager;
12use rusqlite::OptionalExtension;
13
14use crate::{EngineError, sqlite};
15
16/// Mode passed to `register_fts_property_schema_with_entries`.
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum RebuildMode {
19    /// Legacy behavior: full rebuild runs inside the register transaction.
20    Eager,
21    /// 0.4.1+: schema is persisted synchronously; rebuild runs in background.
22    #[default]
23    Async,
24}
25
26/// A request to rebuild property-FTS for a single kind.
27#[derive(Debug)]
28pub struct RebuildRequest {
29    pub kind: String,
30    pub schema_id: i64,
31}
32
33/// Single-threaded actor that processes property-FTS rebuild requests one at
34/// a time.  Shutdown is cooperative: drop the sender side to close the channel,
35/// then join the thread.
36///
37/// The `RebuildActor` owns the `JoinHandle` only. The `SyncSender` lives in
38/// [`crate::admin::AdminService`] so the service can enqueue rebuild requests
39/// directly without going through the runtime.  The channel is created by
40/// [`RebuildActor::create_channel`] and the two halves are distributed by
41/// [`crate::runtime::EngineRuntime::open`].
42#[derive(Debug)]
43pub struct RebuildActor {
44    thread_handle: Option<thread::JoinHandle<()>>,
45}
46
47impl RebuildActor {
48    /// Create the mpsc channel used to communicate with the rebuild thread.
49    ///
50    /// Returns `(sender, actor)`.  The sender is given to
51    /// [`crate::admin::AdminService`]; the actor is kept in
52    /// [`crate::runtime::EngineRuntime`] for lifecycle management.
53    ///
54    /// # Errors
55    /// Returns [`EngineError::Io`] if the thread cannot be spawned.
56    pub fn start(
57        path: impl AsRef<Path>,
58        schema_manager: Arc<SchemaManager>,
59        receiver: mpsc::Receiver<RebuildRequest>,
60    ) -> Result<Self, EngineError> {
61        let database_path = path.as_ref().to_path_buf();
62
63        let handle = thread::Builder::new()
64            .name("fathomdb-rebuild".to_owned())
65            .spawn(move || {
66                rebuild_loop(&database_path, &schema_manager, receiver);
67            })
68            .map_err(EngineError::Io)?;
69
70        Ok(Self {
71            thread_handle: Some(handle),
72        })
73    }
74}
75
76impl Drop for RebuildActor {
77    fn drop(&mut self) {
78        // The sender was already closed by AdminService (or dropped when the
79        // engine closes).  Just join the thread.
80        if let Some(handle) = self.thread_handle.take() {
81            match handle.join() {
82                Ok(()) => {}
83                Err(payload) => {
84                    if std::thread::panicking() {
85                        trace_warn!(
86                            "rebuild thread panicked during shutdown (suppressed: already panicking)"
87                        );
88                    } else {
89                        std::panic::resume_unwind(payload);
90                    }
91                }
92            }
93        }
94    }
95}
96
97// ── rebuild loop ────────────────────────────────────────────────────────────
98
99/// Target wall-clock time for each batch transaction.
100const BATCH_TARGET_MS: u128 = 1000;
101/// Initial batch size.
102const INITIAL_BATCH_SIZE: usize = 5000;
103
104fn rebuild_loop(
105    database_path: &Path,
106    schema_manager: &Arc<SchemaManager>,
107    receiver: mpsc::Receiver<RebuildRequest>,
108) {
109    trace_info!("rebuild thread started");
110
111    let mut conn = match sqlite::open_connection(database_path) {
112        Ok(conn) => conn,
113        #[allow(clippy::used_underscore_binding)]
114        Err(_error) => {
115            trace_error!(error = %_error, "rebuild thread: database connection failed");
116            return;
117        }
118    };
119
120    #[allow(clippy::used_underscore_binding)]
121    if let Err(_error) = schema_manager.bootstrap(&conn) {
122        trace_error!(error = %_error, "rebuild thread: schema bootstrap failed");
123        return;
124    }
125
126    for req in receiver {
127        trace_info!(kind = %req.kind, schema_id = req.schema_id, "rebuild task started");
128        match run_rebuild(&mut conn, &req) {
129            Ok(()) => {
130                trace_info!(kind = %req.kind, "rebuild task COMPLETE");
131            }
132            Err(error) => {
133                trace_error!(kind = %req.kind, error = %error, "rebuild task failed");
134                let _ = mark_failed(&conn, &req.kind, &error.to_string());
135            }
136        }
137    }
138
139    trace_info!("rebuild thread exiting");
140}
141
142#[allow(clippy::too_many_lines)]
143fn run_rebuild(conn: &mut rusqlite::Connection, req: &RebuildRequest) -> Result<(), EngineError> {
144    // Step 1: mark BUILDING.
145    {
146        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
147        tx.execute(
148            "UPDATE fts_property_rebuild_state SET state = 'BUILDING' \
149             WHERE kind = ?1 AND schema_id = ?2",
150            rusqlite::params![req.kind, req.schema_id],
151        )?;
152        tx.commit()?;
153    }
154
155    // Step 2: count nodes for this kind (plain SELECT, no tx needed).
156    let rows_total: i64 = conn.query_row(
157        "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
158        rusqlite::params![req.kind],
159        |r| r.get(0),
160    )?;
161
162    {
163        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
164        tx.execute(
165            "UPDATE fts_property_rebuild_state SET rows_total = ?1 WHERE kind = ?2",
166            rusqlite::params![rows_total, req.kind],
167        )?;
168        tx.commit()?;
169    }
170
171    // Load the schema for this kind (plain SELECT).
172    let (paths_json, separator): (String, String) = conn
173        .query_row(
174            "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
175            rusqlite::params![req.kind],
176            |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
177        )
178        .optional()?
179        .ok_or_else(|| {
180            EngineError::Bridge(format!("rebuild: schema for kind '{}' missing", req.kind))
181        })?;
182    let schema = crate::writer::parse_property_schema_json(&paths_json, &separator);
183
184    // Step 3: batch-iterate nodes, insert into staging.
185    let mut offset: i64 = 0;
186    let mut batch_size = INITIAL_BATCH_SIZE;
187    let mut rows_done: i64 = 0;
188
189    loop {
190        // Fetch a batch of node logical_ids + properties (plain SELECT — no tx needed for reads).
191        let batch: Vec<(String, String)> = {
192            let mut stmt = conn.prepare(
193                "SELECT logical_id, properties FROM nodes \
194                 WHERE kind = ?1 AND superseded_at IS NULL \
195                 ORDER BY logical_id \
196                 LIMIT ?2 OFFSET ?3",
197            )?;
198            stmt.query_map(
199                rusqlite::params![
200                    req.kind,
201                    i64::try_from(batch_size).unwrap_or(i64::MAX),
202                    offset
203                ],
204                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
205            )?
206            .collect::<Result<Vec<_>, _>>()?
207        };
208
209        if batch.is_empty() {
210            break;
211        }
212
213        let batch_len = batch.len();
214        let batch_start = Instant::now();
215
216        // Insert staging rows in a single short transaction.
217        {
218            let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
219
220            let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
221
222            for (logical_id, properties_str) in &batch {
223                let props: serde_json::Value =
224                    serde_json::from_str(properties_str).unwrap_or_default();
225                let (text, positions, _stats) =
226                    crate::writer::extract_property_fts(&props, &schema);
227
228                // Serialize positions to a compact JSON blob for later use at swap time.
229                let positions_blob: Option<Vec<u8>> = if positions.is_empty() {
230                    None
231                } else {
232                    let v: Vec<(usize, usize, &str)> = positions
233                        .iter()
234                        .map(|p| (p.start_offset, p.end_offset, p.leaf_path.as_str()))
235                        .collect();
236                    serde_json::to_vec(&v).ok()
237                };
238
239                let text_content = text.unwrap_or_default();
240
241                if has_weights {
242                    let cols = crate::writer::extract_property_fts_columns(&props, &schema);
243                    let json_map: serde_json::Map<String, serde_json::Value> = cols
244                        .into_iter()
245                        .map(|(k, v)| (k, serde_json::Value::String(v)))
246                        .collect();
247                    let columns_json =
248                        serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
249                    tx.execute(
250                        "INSERT INTO fts_property_rebuild_staging \
251                         (kind, node_logical_id, text_content, positions_blob, columns_json) \
252                         VALUES (?1, ?2, ?3, ?4, ?5) \
253                         ON CONFLICT(kind, node_logical_id) DO UPDATE \
254                         SET text_content = excluded.text_content, \
255                             positions_blob = excluded.positions_blob, \
256                             columns_json = excluded.columns_json",
257                        rusqlite::params![
258                            req.kind,
259                            logical_id,
260                            text_content,
261                            positions_blob,
262                            columns_json
263                        ],
264                    )?;
265                } else {
266                    tx.execute(
267                        "INSERT INTO fts_property_rebuild_staging \
268                         (kind, node_logical_id, text_content, positions_blob) \
269                         VALUES (?1, ?2, ?3, ?4) \
270                         ON CONFLICT(kind, node_logical_id) DO UPDATE \
271                         SET text_content = excluded.text_content, \
272                             positions_blob = excluded.positions_blob",
273                        rusqlite::params![req.kind, logical_id, text_content, positions_blob],
274                    )?;
275                }
276            }
277
278            rows_done += i64::try_from(batch_len).unwrap_or(i64::MAX);
279            let now_ms = now_unix_ms();
280            tx.execute(
281                "UPDATE fts_property_rebuild_state \
282                 SET rows_done = ?1, last_progress_at = ?2 \
283                 WHERE kind = ?3",
284                rusqlite::params![rows_done, now_ms, req.kind],
285            )?;
286            tx.commit()?;
287        }
288
289        let elapsed_ms = batch_start.elapsed().as_millis();
290        // Save the limit used for THIS batch before adjusting for the next one.
291        let limit_used = batch_size;
292        // Dynamically adjust batch size to target ~1s per batch.
293        if let Some(new_size) = (batch_size as u128 * BATCH_TARGET_MS).checked_div(elapsed_ms) {
294            let new_size = new_size.clamp(100, 50_000);
295            batch_size = usize::try_from(new_size).unwrap_or(50_000);
296        }
297
298        offset += i64::try_from(batch_len).unwrap_or(i64::MAX);
299
300        // If the batch was smaller than the limit used for THIS query, we've reached the end.
301        if batch_len < limit_used {
302            break;
303        }
304    }
305
306    // Step 4: mark SWAPPING.
307    {
308        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
309        let now_ms = now_unix_ms();
310        tx.execute(
311            "UPDATE fts_property_rebuild_state \
312             SET state = 'SWAPPING', last_progress_at = ?1 \
313             WHERE kind = ?2",
314            rusqlite::params![now_ms, req.kind],
315        )?;
316        tx.commit()?;
317    }
318
319    // Step 5: Final swap — atomic IMMEDIATE transaction replacing live FTS rows.
320    {
321        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
322
323        let table = fathomdb_schema::fts_kind_table_name(&req.kind);
324
325        // Ensure the per-kind table exists before the swap (defensive: created at write
326        // time normally, but may be absent on async first-time registration with no writes).
327        let tokenizer = fathomdb_schema::DEFAULT_FTS_TOKENIZER;
328        let create_ddl = format!(
329            "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
330                node_logical_id UNINDEXED, text_content, \
331                tokenize = '{tokenizer}'\
332            )"
333        );
334        tx.execute_batch(&create_ddl)?;
335
336        // 5a. Delete old live FTS rows for this kind (entire per-kind table).
337        tx.execute(&format!("DELETE FROM {table}"), [])?;
338
339        // 5b. Insert new rows from staging into the per-kind FTS table.
340        // For weighted schemas (columns_json IS NOT NULL), use per-column INSERT.
341        // For non-weighted schemas, use bulk INSERT of text_content.
342        {
343            // Check if any staging rows have columns_json set (weighted schema).
344            let has_columns: bool = tx
345                .query_row(
346                    "SELECT count(*) FROM fts_property_rebuild_staging \
347                     WHERE kind = ?1 AND columns_json IS NOT NULL",
348                    rusqlite::params![req.kind],
349                    |r| r.get::<_, i64>(0),
350                )
351                .unwrap_or(0)
352                > 0;
353
354            if has_columns {
355                // Weighted schema: per-column INSERT row by row.
356                let rows_with_columns: Vec<(String, String)> = {
357                    let mut stmt = tx.prepare(
358                        "SELECT node_logical_id, columns_json \
359                         FROM fts_property_rebuild_staging \
360                         WHERE kind = ?1 AND columns_json IS NOT NULL",
361                    )?;
362                    stmt.query_map(rusqlite::params![req.kind], |r| {
363                        Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
364                    })?
365                    .collect::<Result<Vec<_>, _>>()?
366                };
367
368                for (node_id, columns_json_str) in &rows_with_columns {
369                    let col_map: serde_json::Map<String, serde_json::Value> =
370                        serde_json::from_str(columns_json_str).unwrap_or_default();
371                    let col_names: Vec<String> = col_map.keys().cloned().collect();
372                    let col_values: Vec<String> = col_names
373                        .iter()
374                        .map(|k| {
375                            col_map
376                                .get(k)
377                                .and_then(|v| v.as_str())
378                                .unwrap_or("")
379                                .to_owned()
380                        })
381                        .collect();
382                    let placeholders: Vec<String> =
383                        (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
384                    let sql = format!(
385                        "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
386                        cols = col_names.join(", "),
387                        placeholders = placeholders.join(", "),
388                    );
389                    let mut stmt = tx.prepare(&sql)?;
390                    stmt.execute(rusqlite::params_from_iter(
391                        std::iter::once(node_id.as_str())
392                            .chain(col_values.iter().map(String::as_str)),
393                    ))?;
394                }
395
396                // For weighted schemas, all staging rows should have columns_json set.
397                // Any rows without columns_json are skipped (they have no per-column data
398                // and the weighted table has no text_content column).
399            } else {
400                // Non-weighted schema: bulk INSERT of text_content.
401                tx.execute(
402                    &format!(
403                        "INSERT INTO {table}(node_logical_id, text_content) \
404                         SELECT node_logical_id, text_content \
405                         FROM fts_property_rebuild_staging WHERE kind = ?1"
406                    ),
407                    rusqlite::params![req.kind],
408                )?;
409            }
410        }
411
412        // 5c. Delete old position rows for this kind.
413        tx.execute(
414            "DELETE FROM fts_node_property_positions WHERE kind = ?1",
415            rusqlite::params![req.kind],
416        )?;
417
418        // 5d. Re-populate fts_node_property_positions from positions_blob in staging.
419        {
420            let mut stmt = tx.prepare(
421                "SELECT node_logical_id, positions_blob \
422                 FROM fts_property_rebuild_staging \
423                 WHERE kind = ?1 AND positions_blob IS NOT NULL",
424            )?;
425            let mut ins_pos = tx.prepare(
426                "INSERT INTO fts_node_property_positions \
427                 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
428                 VALUES (?1, ?2, ?3, ?4, ?5)",
429            )?;
430
431            let rows: Vec<(String, Vec<u8>)> = stmt
432                .query_map(rusqlite::params![req.kind], |r| {
433                    Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
434                })?
435                .collect::<Result<Vec<_>, _>>()?;
436
437            for (node_logical_id, blob) in &rows {
438                // positions_blob is JSON: Vec<(start, end, leaf_path)>
439                let positions: Vec<(usize, usize, String)> =
440                    serde_json::from_slice(blob).unwrap_or_default();
441                for (start, end, leaf_path) in positions {
442                    ins_pos.execute(rusqlite::params![
443                        node_logical_id,
444                        req.kind,
445                        i64::try_from(start).unwrap_or(i64::MAX),
446                        i64::try_from(end).unwrap_or(i64::MAX),
447                        leaf_path,
448                    ])?;
449                }
450            }
451        }
452
453        // 5e. Delete staging rows for this kind.
454        tx.execute(
455            "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
456            rusqlite::params![req.kind],
457        )?;
458
459        // 5f. Mark state COMPLETE.
460        let now_ms = now_unix_ms();
461        tx.execute(
462            "UPDATE fts_property_rebuild_state \
463             SET state = 'COMPLETE', last_progress_at = ?1 \
464             WHERE kind = ?2",
465            rusqlite::params![now_ms, req.kind],
466        )?;
467
468        tx.commit()?;
469    }
470
471    Ok(())
472}
473
474fn mark_failed(
475    conn: &rusqlite::Connection,
476    kind: &str,
477    error_message: &str,
478) -> Result<(), EngineError> {
479    let now_ms = now_unix_ms();
480    conn.execute(
481        "UPDATE fts_property_rebuild_state \
482         SET state = 'FAILED', error_message = ?1, last_progress_at = ?2 \
483         WHERE kind = ?3",
484        rusqlite::params![error_message, now_ms, kind],
485    )?;
486    Ok(())
487}
488
489fn now_unix_ms() -> i64 {
490    now_unix_ms_pub()
491}
492
493/// Public-in-crate version of `now_unix_ms` so `admin.rs` can use it.
494pub(crate) fn now_unix_ms_pub() -> i64 {
495    std::time::SystemTime::now()
496        .duration_since(std::time::UNIX_EPOCH)
497        .unwrap_or(Duration::ZERO)
498        .as_millis()
499        .try_into()
500        .unwrap_or(i64::MAX)
501}
502
503/// Rebuild progress row returned from `AdminService::get_property_fts_rebuild_state`.
504#[derive(Debug)]
505pub struct RebuildStateRow {
506    pub kind: String,
507    pub schema_id: i64,
508    pub state: String,
509    pub rows_total: Option<i64>,
510    pub rows_done: i64,
511    pub started_at: i64,
512    pub is_first_registration: bool,
513    pub error_message: Option<String>,
514}
515
516/// Public progress snapshot returned from
517/// [`crate::coordinator::ExecutionCoordinator::get_property_fts_rebuild_progress`].
518#[derive(Debug, Clone, serde::Serialize)]
519pub struct RebuildProgress {
520    /// Current state: `"PENDING"`, `"BUILDING"`, `"SWAPPING"`, `"COMPLETE"`, or `"FAILED"`.
521    pub state: String,
522    /// Total rows to process. `None` until the actor has counted the nodes.
523    pub rows_total: Option<i64>,
524    /// Rows processed so far.
525    pub rows_done: i64,
526    /// Unix milliseconds when the rebuild was registered.
527    pub started_at: i64,
528    /// Unix milliseconds of the last progress update, if any.
529    pub last_progress_at: Option<i64>,
530    /// Error message if `state == "FAILED"`.
531    pub error_message: Option<String>,
532}
533
534/// Run crash recovery: mark any in-progress rebuilds as FAILED and clear their
535/// staging rows.  Called by `EngineRuntime::open` before spawning the actor.
536///
537/// # Errors
538/// Returns [`crate::EngineError`] if database access fails.
539pub(crate) fn recover_interrupted_rebuilds(
540    conn: &rusqlite::Connection,
541) -> Result<(), crate::EngineError> {
542    // Collect kinds that are in a non-terminal state.
543    let kinds: Vec<String> = {
544        let mut stmt = conn.prepare(
545            "SELECT kind FROM fts_property_rebuild_state \
546             WHERE state IN ('BUILDING', 'SWAPPING')",
547        )?;
548        stmt.query_map([], |r| r.get::<_, String>(0))?
549            .collect::<Result<Vec<_>, _>>()?
550    };
551
552    for kind in &kinds {
553        conn.execute(
554            "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
555            rusqlite::params![kind],
556        )?;
557        conn.execute(
558            "UPDATE fts_property_rebuild_state \
559             SET state = 'FAILED', error_message = 'interrupted by engine restart' \
560             WHERE kind = ?1",
561            rusqlite::params![kind],
562        )?;
563    }
564
565    Ok(())
566}
567
568#[cfg(test)]
569#[allow(clippy::expect_used)]
570mod tests {
571    use rusqlite::Connection;
572
573    use fathomdb_schema::SchemaManager;
574
575    use super::recover_interrupted_rebuilds;
576
577    fn bootstrapped_conn() -> Connection {
578        let conn = Connection::open_in_memory().expect("in-memory sqlite");
579        let manager = SchemaManager::new();
580        manager.bootstrap(&conn).expect("bootstrap");
581        conn
582    }
583
584    fn insert_rebuild_state(conn: &Connection, kind: &str, state: &str) {
585        conn.execute(
586            "INSERT INTO fts_property_rebuild_state \
587             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
588             VALUES (?1, 1, ?2, 0, 0, 0)",
589            rusqlite::params![kind, state],
590        )
591        .expect("insert rebuild state");
592    }
593
594    #[test]
595    fn pending_row_survives_restart() {
596        let conn = bootstrapped_conn();
597        insert_rebuild_state(&conn, "MyKind", "PENDING");
598
599        recover_interrupted_rebuilds(&conn).expect("recover");
600
601        let state: String = conn
602            .query_row(
603                "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
604                [],
605                |r| r.get(0),
606            )
607            .expect("state row");
608        assert_eq!(state, "PENDING", "PENDING rows must survive engine restart");
609    }
610
611    #[test]
612    fn building_row_marked_failed_on_restart() {
613        let conn = bootstrapped_conn();
614        insert_rebuild_state(&conn, "MyKind", "BUILDING");
615
616        recover_interrupted_rebuilds(&conn).expect("recover");
617
618        let state: String = conn
619            .query_row(
620                "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
621                [],
622                |r| r.get(0),
623            )
624            .expect("state row");
625        assert_eq!(
626            state, "FAILED",
627            "BUILDING rows must be marked FAILED on restart"
628        );
629    }
630
631    #[test]
632    fn swapping_row_marked_failed_on_restart() {
633        let conn = bootstrapped_conn();
634        insert_rebuild_state(&conn, "MyKind", "SWAPPING");
635
636        recover_interrupted_rebuilds(&conn).expect("recover");
637
638        let state: String = conn
639            .query_row(
640                "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
641                [],
642                |r| r.get(0),
643            )
644            .expect("state row");
645        assert_eq!(
646            state, "FAILED",
647            "SWAPPING rows must be marked FAILED on restart"
648        );
649    }
650
651    // --- A-6: rebuild swap targets per-kind table ---
652    #[test]
653    fn rebuild_swap_populates_per_kind_table() {
654        // This test calls run_rebuild() end-to-end and asserts the final rows
655        // land in the per-kind FTS table (fts_props_testkind), NOT in
656        // fts_node_properties.
657        let mut conn = bootstrapped_conn();
658        let kind = "TestKind";
659        let table = fathomdb_schema::fts_kind_table_name(kind);
660
661        // NOTE: The per-kind FTS table is intentionally NOT created here.
662        // The guard in run_rebuild (Step 5) must create it if absent.
663
664        // Insert a node with extractable property.
665        conn.execute(
666            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
667             VALUES ('row-1', 'node-1', ?1, '{\"name\":\"hello world\"}', 100, 'seed')",
668            rusqlite::params![kind],
669        )
670        .expect("insert node");
671
672        // Insert schema row.
673        let schema_id: i64 = {
674            conn.execute(
675                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
676                 VALUES (?1, '[\"$.name\"]', ' ')",
677                rusqlite::params![kind],
678            )
679            .expect("insert schema");
680            conn.query_row(
681                "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
682                rusqlite::params![kind],
683                |r| r.get(0),
684            )
685            .expect("schema_id")
686        };
687
688        // Insert rebuild state (PENDING).
689        conn.execute(
690            "INSERT INTO fts_property_rebuild_state \
691             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
692             VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
693            rusqlite::params![kind, schema_id],
694        )
695        .expect("insert rebuild state");
696
697        // Run the rebuild end-to-end.
698        let req = super::RebuildRequest {
699            kind: kind.to_owned(),
700            schema_id,
701        };
702        super::run_rebuild(&mut conn, &req).expect("run_rebuild");
703
704        // After A-6: the per-kind table must have the rebuilt row.
705        let per_kind_count: i64 = conn
706            .query_row(
707                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'node-1'"),
708                [],
709                |r| r.get(0),
710            )
711            .expect("per-kind count");
712        assert_eq!(
713            per_kind_count, 1,
714            "per-kind table must have the rebuilt row after run_rebuild"
715        );
716    }
717
718    // --- B-3: rebuild_actor uses per-column INSERT for weighted schemas ---
719
720    #[test]
721    fn rebuild_actor_uses_per_column_for_weighted_schema() {
722        let mut conn = bootstrapped_conn();
723        let kind = "Article";
724        let table = fathomdb_schema::fts_kind_table_name(kind);
725
726        let title_col = fathomdb_schema::fts_column_name("$.title", false);
727        let body_col = fathomdb_schema::fts_column_name("$.body", false);
728
729        // Insert a node with two extractable properties.
730        conn.execute(
731            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
732             VALUES ('row-1', 'article-1', ?1, '{\"title\":\"Hello\",\"body\":\"World\"}', 100, 'seed')",
733            rusqlite::params![kind],
734        )
735        .expect("insert node");
736
737        // Register schema with weights.
738        let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
739        let schema_id: i64 = {
740            conn.execute(
741                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
742                 VALUES (?1, ?2, ' ')",
743                rusqlite::params![kind, paths_json],
744            )
745            .expect("insert schema");
746            conn.query_row(
747                "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
748                rusqlite::params![kind],
749                |r| r.get(0),
750            )
751            .expect("schema_id")
752        };
753
754        // Create the weighted per-kind FTS table.
755        conn.execute_batch(&format!(
756            "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
757                node_logical_id UNINDEXED, {title_col}, {body_col}, \
758                tokenize = 'porter unicode61 remove_diacritics 2'\
759            )"
760        ))
761        .expect("create weighted per-kind table");
762
763        // Insert rebuild state (PENDING).
764        conn.execute(
765            "INSERT INTO fts_property_rebuild_state \
766             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
767             VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
768            rusqlite::params![kind, schema_id],
769        )
770        .expect("insert rebuild state");
771
772        // Run the rebuild end-to-end.
773        let req = super::RebuildRequest {
774            kind: kind.to_owned(),
775            schema_id,
776        };
777        super::run_rebuild(&mut conn, &req).expect("run_rebuild");
778
779        // The per-kind table must have the rebuilt row.
780        let count: i64 = conn
781            .query_row(
782                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'article-1'"),
783                [],
784                |r| r.get(0),
785            )
786            .expect("count");
787        assert_eq!(count, 1, "per-kind table must have the rebuilt row");
788
789        // Verify per-column values.
790        let (title_val, body_val): (String, String) = conn
791            .query_row(
792                &format!(
793                    "SELECT {title_col}, {body_col} FROM {table} \
794                     WHERE node_logical_id = 'article-1'"
795                ),
796                [],
797                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
798            )
799            .expect("select per-column");
800        assert_eq!(
801            title_val, "Hello",
802            "title column must have correct value after rebuild"
803        );
804        assert_eq!(
805            body_val, "World",
806            "body column must have correct value after rebuild"
807        );
808    }
809}