Skip to main content

fathomdb_engine/
rebuild_actor.rs

1/// Background actor that serializes async property-FTS rebuild tasks.
2///
3/// Modeled exactly on [`crate::writer::WriterActor`]: one OS thread,
4/// `std::sync::mpsc`, `JoinHandle` for shutdown.  No tokio.
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use fathomdb_schema::SchemaManager;
13use rusqlite::OptionalExtension;
14
15use crate::{EngineError, sqlite};
16
17/// Mode passed to `register_fts_property_schema_with_entries`.
18#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
19pub enum RebuildMode {
20    /// Legacy behavior: full rebuild runs inside the register transaction.
21    Eager,
22    /// 0.4.1+: schema is persisted synchronously; rebuild runs in background.
23    #[default]
24    Async,
25}
26
27/// A request to rebuild property-FTS for a single kind.
28#[derive(Debug)]
29pub struct RebuildRequest {
30    pub kind: String,
31    pub schema_id: i64,
32}
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub enum RebuildSubmit {
36    Submitted,
37    PersistedPending,
38}
39
40#[derive(Clone, Debug)]
41pub struct RebuildClient {
42    sender: mpsc::SyncSender<RebuildRequest>,
43    shutdown: Arc<AtomicBool>,
44}
45
46impl RebuildClient {
47    #[must_use]
48    pub fn new(sender: mpsc::SyncSender<RebuildRequest>, shutdown: Arc<AtomicBool>) -> Self {
49        Self { sender, shutdown }
50    }
51
52    /// # Errors
53    /// Returns [`EngineError::Bridge`] when the engine is shutting down.
54    pub fn try_submit(&self, req: RebuildRequest) -> Result<RebuildSubmit, EngineError> {
55        if self.shutdown.load(Ordering::Acquire) {
56            return Err(EngineError::Bridge("engine is shutting down".to_owned()));
57        }
58        match self.sender.try_send(req) {
59            Ok(()) => Ok(RebuildSubmit::Submitted),
60            Err(mpsc::TrySendError::Full(_) | mpsc::TrySendError::Disconnected(_)) => {
61                Ok(RebuildSubmit::PersistedPending)
62            }
63        }
64    }
65}
66
67/// Single-threaded actor that processes property-FTS rebuild requests one at
68/// a time. Shutdown is cooperative and controlled by an explicit token so
69/// public admin-service clones cannot keep the actor alive accidentally.
70#[derive(Debug)]
71pub struct RebuildActor {
72    thread_handle: Option<thread::JoinHandle<()>>,
73    shutdown: Arc<AtomicBool>,
74}
75
76impl RebuildActor {
77    /// Create the mpsc channel used to communicate with the rebuild thread.
78    ///
79    /// Returns `(sender, actor)`.  The sender is given to
80    /// [`crate::admin::AdminService`]; the actor is kept in
81    /// [`crate::runtime::EngineRuntime`] for lifecycle management.
82    ///
83    /// # Errors
84    /// Returns [`EngineError::Io`] if the thread cannot be spawned.
85    pub fn start(
86        path: impl AsRef<Path>,
87        schema_manager: Arc<SchemaManager>,
88        receiver: mpsc::Receiver<RebuildRequest>,
89        shutdown: Arc<AtomicBool>,
90    ) -> Result<Self, EngineError> {
91        let database_path = path.as_ref().to_path_buf();
92        let thread_shutdown = Arc::clone(&shutdown);
93
94        let handle = thread::Builder::new()
95            .name("fathomdb-rebuild".to_owned())
96            .spawn(move || {
97                rebuild_loop(&database_path, &schema_manager, receiver, &thread_shutdown);
98            })
99            .map_err(EngineError::Io)?;
100
101        Ok(Self {
102            thread_handle: Some(handle),
103            shutdown,
104        })
105    }
106}
107
108impl Drop for RebuildActor {
109    fn drop(&mut self) {
110        self.shutdown.store(true, Ordering::Release);
111        if let Some(handle) = self.thread_handle.take() {
112            match handle.join() {
113                Ok(()) => {}
114                Err(payload) => {
115                    if std::thread::panicking() {
116                        trace_warn!(
117                            "rebuild thread panicked during shutdown (suppressed: already panicking)"
118                        );
119                    } else {
120                        std::panic::resume_unwind(payload);
121                    }
122                }
123            }
124        }
125    }
126}
127
128// ── rebuild loop ────────────────────────────────────────────────────────────
129
130/// Target wall-clock time for each batch transaction.
131const BATCH_TARGET_MS: u128 = 1000;
132/// Initial batch size.
133const INITIAL_BATCH_SIZE: usize = 5000;
134
135#[allow(clippy::needless_pass_by_value)]
136fn rebuild_loop(
137    database_path: &Path,
138    schema_manager: &Arc<SchemaManager>,
139    receiver: mpsc::Receiver<RebuildRequest>,
140    shutdown: &AtomicBool,
141) {
142    trace_info!("rebuild thread started");
143
144    let mut conn = match sqlite::open_connection(database_path) {
145        Ok(conn) => conn,
146        #[allow(clippy::used_underscore_binding)]
147        Err(_error) => {
148            trace_error!(error = %_error, "rebuild thread: database connection failed");
149            return;
150        }
151    };
152
153    #[allow(clippy::used_underscore_binding)]
154    if let Err(_error) = schema_manager.bootstrap(&conn) {
155        trace_error!(error = %_error, "rebuild thread: schema bootstrap failed");
156        return;
157    }
158
159    let _ = process_pending_rebuilds(&mut conn, shutdown);
160
161    loop {
162        if shutdown.load(Ordering::Acquire) {
163            break;
164        }
165        match receiver.recv_timeout(Duration::from_millis(250)) {
166            Ok(req) => handle_rebuild_request(&mut conn, &req, shutdown),
167            Err(mpsc::RecvTimeoutError::Timeout) => {
168                let _ = process_pending_rebuilds(&mut conn, shutdown);
169            }
170            Err(mpsc::RecvTimeoutError::Disconnected) => {
171                let _ = process_pending_rebuilds(&mut conn, shutdown);
172                break;
173            }
174        }
175    }
176
177    trace_info!("rebuild thread exiting");
178}
179
180fn handle_rebuild_request(
181    conn: &mut rusqlite::Connection,
182    req: &RebuildRequest,
183    shutdown: &AtomicBool,
184) {
185    trace_info!(kind = %req.kind, schema_id = req.schema_id, "rebuild task started");
186    match run_rebuild(conn, req, shutdown) {
187        Ok(()) => {
188            trace_info!(kind = %req.kind, "rebuild task COMPLETE");
189        }
190        Err(error) => {
191            trace_error!(kind = %req.kind, error = %error, "rebuild task failed");
192            let _ = mark_failed(conn, &req.kind, &error.to_string());
193        }
194    }
195}
196
197fn process_pending_rebuilds(
198    conn: &mut rusqlite::Connection,
199    shutdown: &AtomicBool,
200) -> Result<(), EngineError> {
201    let pending: Vec<RebuildRequest> = {
202        let mut stmt = conn.prepare(
203            "SELECT kind, schema_id FROM fts_property_rebuild_state \
204             WHERE state = 'PENDING' ORDER BY started_at, kind LIMIT 16",
205        )?;
206        stmt.query_map([], |r| {
207            Ok(RebuildRequest {
208                kind: r.get(0)?,
209                schema_id: r.get(1)?,
210            })
211        })?
212        .collect::<Result<Vec<_>, _>>()?
213    };
214    for req in &pending {
215        if shutdown.load(Ordering::Acquire) {
216            break;
217        }
218        handle_rebuild_request(conn, req, shutdown);
219    }
220    Ok(())
221}
222
223#[allow(clippy::too_many_lines)]
224fn run_rebuild(
225    conn: &mut rusqlite::Connection,
226    req: &RebuildRequest,
227    shutdown: &AtomicBool,
228) -> Result<(), EngineError> {
229    // Step 1: mark BUILDING.
230    {
231        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
232        let claimed = tx.execute(
233            "UPDATE fts_property_rebuild_state SET state = 'BUILDING' \
234             WHERE kind = ?1 AND schema_id = ?2 AND state = 'PENDING'",
235            rusqlite::params![req.kind, req.schema_id],
236        )?;
237        tx.commit()?;
238        if claimed == 0 {
239            trace_warn!(
240                kind = %req.kind,
241                schema_id = req.schema_id,
242                "rebuild request skipped because it is no longer pending"
243            );
244            return Ok(());
245        }
246    }
247
248    // Step 2: count nodes for this kind (plain SELECT, no tx needed).
249    let rows_total: i64 = conn.query_row(
250        "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
251        rusqlite::params![req.kind],
252        |r| r.get(0),
253    )?;
254
255    {
256        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
257        tx.execute(
258            "UPDATE fts_property_rebuild_state SET rows_total = ?1 WHERE kind = ?2",
259            rusqlite::params![rows_total, req.kind],
260        )?;
261        tx.commit()?;
262    }
263
264    // Load the schema for this kind (plain SELECT).
265    let (paths_json, separator): (String, String) = conn
266        .query_row(
267            "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
268            rusqlite::params![req.kind],
269            |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
270        )
271        .optional()?
272        .ok_or_else(|| {
273            EngineError::Bridge(format!("rebuild: schema for kind '{}' missing", req.kind))
274        })?;
275    let schema = crate::writer::parse_property_schema_json(&paths_json, &separator);
276
277    // Step 3: batch-iterate nodes, insert into staging.
278    let mut batch_size = INITIAL_BATCH_SIZE;
279    let mut rows_done: i64 = 0;
280    let mut last_logical_id = String::new();
281
282    loop {
283        abort_if_shutdown(conn, &req.kind, shutdown)?;
284        // Fetch a batch of node logical_ids + properties using keyset pagination.
285        let batch: Vec<(String, String)> = {
286            let mut stmt = conn.prepare(
287                "SELECT logical_id, properties FROM nodes \
288                 WHERE kind = ?1 AND superseded_at IS NULL AND logical_id > ?2 \
289                 ORDER BY logical_id \
290                 LIMIT ?3",
291            )?;
292            stmt.query_map(
293                rusqlite::params![
294                    req.kind,
295                    last_logical_id,
296                    i64::try_from(batch_size).unwrap_or(i64::MAX),
297                ],
298                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
299            )?
300            .collect::<Result<Vec<_>, _>>()?
301        };
302
303        if batch.is_empty() {
304            break;
305        }
306
307        let batch_len = batch.len();
308        let batch_start = Instant::now();
309        if let Some((logical_id, _)) = batch.last() {
310            last_logical_id.clone_from(logical_id);
311        }
312
313        // Insert staging rows in a single short transaction.
314        {
315            let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
316
317            let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
318
319            for (logical_id, properties_str) in &batch {
320                let props: serde_json::Value =
321                    serde_json::from_str(properties_str).unwrap_or_default();
322                let (text, positions, _stats) =
323                    crate::writer::extract_property_fts(&props, &schema);
324
325                // Serialize positions to a compact JSON blob for later use at swap time.
326                let positions_blob: Option<Vec<u8>> = if positions.is_empty() {
327                    None
328                } else {
329                    let v: Vec<(usize, usize, &str)> = positions
330                        .iter()
331                        .map(|p| (p.start_offset, p.end_offset, p.leaf_path.as_str()))
332                        .collect();
333                    serde_json::to_vec(&v).ok()
334                };
335
336                let text_content = text.unwrap_or_default();
337
338                if has_weights {
339                    let cols = crate::writer::extract_property_fts_columns(&props, &schema);
340                    let json_map: serde_json::Map<String, serde_json::Value> = cols
341                        .into_iter()
342                        .map(|(k, v)| (k, serde_json::Value::String(v)))
343                        .collect();
344                    let columns_json =
345                        serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
346                    tx.execute(
347                        "INSERT INTO fts_property_rebuild_staging \
348                         (kind, node_logical_id, text_content, positions_blob, columns_json) \
349                         VALUES (?1, ?2, ?3, ?4, ?5) \
350                         ON CONFLICT(kind, node_logical_id) DO UPDATE \
351                         SET text_content = excluded.text_content, \
352                             positions_blob = excluded.positions_blob, \
353                             columns_json = excluded.columns_json",
354                        rusqlite::params![
355                            req.kind,
356                            logical_id,
357                            text_content,
358                            positions_blob,
359                            columns_json
360                        ],
361                    )?;
362                } else {
363                    tx.execute(
364                        "INSERT INTO fts_property_rebuild_staging \
365                         (kind, node_logical_id, text_content, positions_blob) \
366                         VALUES (?1, ?2, ?3, ?4) \
367                         ON CONFLICT(kind, node_logical_id) DO UPDATE \
368                         SET text_content = excluded.text_content, \
369                             positions_blob = excluded.positions_blob",
370                        rusqlite::params![req.kind, logical_id, text_content, positions_blob],
371                    )?;
372                }
373            }
374
375            rows_done += i64::try_from(batch_len).unwrap_or(i64::MAX);
376            let now_ms = now_unix_ms();
377            tx.execute(
378                "UPDATE fts_property_rebuild_state \
379                 SET rows_done = ?1, last_progress_at = ?2 \
380                 WHERE kind = ?3",
381                rusqlite::params![rows_done, now_ms, req.kind],
382            )?;
383            tx.commit()?;
384        }
385
386        let elapsed_ms = batch_start.elapsed().as_millis();
387        // Save the limit used for THIS batch before adjusting for the next one.
388        let limit_used = batch_size;
389        // Dynamically adjust batch size to target ~1s per batch.
390        if let Some(new_size) = (batch_size as u128 * BATCH_TARGET_MS).checked_div(elapsed_ms) {
391            let new_size = new_size.clamp(100, 50_000);
392            batch_size = usize::try_from(new_size).unwrap_or(50_000);
393        }
394
395        // If the batch was smaller than the limit used for THIS query, we've reached the end.
396        if batch_len < limit_used {
397            break;
398        }
399    }
400
401    // Step 4: mark SWAPPING.
402    {
403        abort_if_shutdown(conn, &req.kind, shutdown)?;
404        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
405        let now_ms = now_unix_ms();
406        tx.execute(
407            "UPDATE fts_property_rebuild_state \
408             SET state = 'SWAPPING', last_progress_at = ?1 \
409             WHERE kind = ?2",
410            rusqlite::params![now_ms, req.kind],
411        )?;
412        tx.commit()?;
413    }
414
415    // Step 5: Final swap — atomic IMMEDIATE transaction replacing live FTS rows.
416    {
417        abort_if_shutdown(conn, &req.kind, shutdown)?;
418        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
419        let current: Option<(String, i64)> = tx
420            .query_row(
421                "SELECT state, schema_id FROM fts_property_rebuild_state WHERE kind = ?1",
422                rusqlite::params![req.kind],
423                |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)),
424            )
425            .optional()?;
426        if current.as_ref() != Some(&("SWAPPING".to_owned(), req.schema_id)) {
427            trace_warn!(
428                kind = %req.kind,
429                schema_id = req.schema_id,
430                "rebuild swap skipped because state/schema changed"
431            );
432            tx.commit()?;
433            return Ok(());
434        }
435
436        let table = fathomdb_schema::fts_kind_table_name(&req.kind);
437
438        ensure_fts_kind_table_for_schema(&tx, &req.kind, &schema)?;
439
440        // 5a. Delete old live FTS rows for this kind (entire per-kind table).
441        tx.execute(&format!("DELETE FROM {table}"), [])?;
442
443        // 5b. Insert new rows from staging into the per-kind FTS table.
444        // For weighted schemas (columns_json IS NOT NULL), use per-column INSERT.
445        // For non-weighted schemas, use bulk INSERT of text_content.
446        {
447            // Check if any staging rows have columns_json set (weighted schema).
448            let has_columns: bool = tx
449                .query_row(
450                    "SELECT count(*) FROM fts_property_rebuild_staging \
451                     WHERE kind = ?1 AND columns_json IS NOT NULL",
452                    rusqlite::params![req.kind],
453                    |r| r.get::<_, i64>(0),
454                )
455                .unwrap_or(0)
456                > 0;
457
458            if has_columns {
459                // Weighted schema: per-column INSERT row by row.
460                let rows_with_columns: Vec<(String, String)> = {
461                    let mut stmt = tx.prepare(
462                        "SELECT node_logical_id, columns_json \
463                         FROM fts_property_rebuild_staging \
464                         WHERE kind = ?1 AND columns_json IS NOT NULL",
465                    )?;
466                    stmt.query_map(rusqlite::params![req.kind], |r| {
467                        Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
468                    })?
469                    .collect::<Result<Vec<_>, _>>()?
470                };
471
472                for (node_id, columns_json_str) in &rows_with_columns {
473                    let col_map: serde_json::Map<String, serde_json::Value> =
474                        serde_json::from_str(columns_json_str).unwrap_or_default();
475                    let col_names: Vec<String> = col_map.keys().cloned().collect();
476                    let col_values: Vec<String> = col_names
477                        .iter()
478                        .map(|k| {
479                            col_map
480                                .get(k)
481                                .and_then(|v| v.as_str())
482                                .unwrap_or("")
483                                .to_owned()
484                        })
485                        .collect();
486                    let placeholders: Vec<String> =
487                        (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
488                    let sql = format!(
489                        "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
490                        cols = col_names.join(", "),
491                        placeholders = placeholders.join(", "),
492                    );
493                    let mut stmt = tx.prepare(&sql)?;
494                    stmt.execute(rusqlite::params_from_iter(
495                        std::iter::once(node_id.as_str())
496                            .chain(col_values.iter().map(String::as_str)),
497                    ))?;
498                }
499
500                // For weighted schemas, all staging rows should have columns_json set.
501                // Any rows without columns_json are skipped (they have no per-column data
502                // and the weighted table has no text_content column).
503            } else {
504                // Non-weighted schema: bulk INSERT of text_content.
505                tx.execute(
506                    &format!(
507                        "INSERT INTO {table}(node_logical_id, text_content) \
508                         SELECT node_logical_id, text_content \
509                         FROM fts_property_rebuild_staging WHERE kind = ?1"
510                    ),
511                    rusqlite::params![req.kind],
512                )?;
513            }
514        }
515
516        // 5c. Delete old position rows for this kind.
517        tx.execute(
518            "DELETE FROM fts_node_property_positions WHERE kind = ?1",
519            rusqlite::params![req.kind],
520        )?;
521
522        // 5d. Re-populate fts_node_property_positions from positions_blob in staging.
523        {
524            let mut stmt = tx.prepare(
525                "SELECT node_logical_id, positions_blob \
526                 FROM fts_property_rebuild_staging \
527                 WHERE kind = ?1 AND positions_blob IS NOT NULL",
528            )?;
529            let mut ins_pos = tx.prepare(
530                "INSERT INTO fts_node_property_positions \
531                 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
532                 VALUES (?1, ?2, ?3, ?4, ?5)",
533            )?;
534
535            let rows: Vec<(String, Vec<u8>)> = stmt
536                .query_map(rusqlite::params![req.kind], |r| {
537                    Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
538                })?
539                .collect::<Result<Vec<_>, _>>()?;
540
541            for (node_logical_id, blob) in &rows {
542                // positions_blob is JSON: Vec<(start, end, leaf_path)>
543                let positions: Vec<(usize, usize, String)> =
544                    serde_json::from_slice(blob).unwrap_or_default();
545                for (start, end, leaf_path) in positions {
546                    ins_pos.execute(rusqlite::params![
547                        node_logical_id,
548                        req.kind,
549                        i64::try_from(start).unwrap_or(i64::MAX),
550                        i64::try_from(end).unwrap_or(i64::MAX),
551                        leaf_path,
552                    ])?;
553                }
554            }
555        }
556
557        // 5e. Delete staging rows for this kind.
558        tx.execute(
559            "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
560            rusqlite::params![req.kind],
561        )?;
562
563        // 5f. Mark state COMPLETE.
564        let now_ms = now_unix_ms();
565        tx.execute(
566            "UPDATE fts_property_rebuild_state \
567             SET state = 'COMPLETE', last_progress_at = ?1 \
568             WHERE kind = ?2",
569            rusqlite::params![now_ms, req.kind],
570        )?;
571
572        tx.commit()?;
573    }
574
575    Ok(())
576}
577
578fn mark_failed(
579    conn: &rusqlite::Connection,
580    kind: &str,
581    error_message: &str,
582) -> Result<(), EngineError> {
583    let now_ms = now_unix_ms();
584    conn.execute(
585        "UPDATE fts_property_rebuild_state \
586         SET state = 'FAILED', error_message = ?1, last_progress_at = ?2 \
587         WHERE kind = ?3",
588        rusqlite::params![error_message, now_ms, kind],
589    )?;
590    Ok(())
591}
592
593fn mark_failed_and_clear(
594    conn: &rusqlite::Connection,
595    kind: &str,
596    error_message: &str,
597) -> Result<(), EngineError> {
598    conn.execute(
599        "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
600        rusqlite::params![kind],
601    )?;
602    mark_failed(conn, kind, error_message)
603}
604
605fn abort_if_shutdown(
606    conn: &rusqlite::Connection,
607    kind: &str,
608    shutdown: &AtomicBool,
609) -> Result<(), EngineError> {
610    if shutdown.load(Ordering::Acquire) {
611        mark_failed_and_clear(conn, kind, "engine shutdown interrupted rebuild")?;
612        return Err(EngineError::Bridge(
613            "engine shutdown interrupted rebuild".to_owned(),
614        ));
615    }
616    Ok(())
617}
618
619fn ensure_fts_kind_table_for_schema(
620    conn: &rusqlite::Connection,
621    kind: &str,
622    schema: &crate::writer::PropertyFtsSchema,
623) -> Result<(), EngineError> {
624    let table = fathomdb_schema::fts_kind_table_name(kind);
625    let exists: bool = conn
626        .query_row(
627            "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 \
628             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
629            rusqlite::params![table],
630            |_| Ok(true),
631        )
632        .optional()?
633        .unwrap_or(false);
634    if exists {
635        return Ok(());
636    }
637
638    let tokenizer = fathomdb_schema::resolve_fts_tokenizer(conn, kind)?;
639    let tokenizer_sql = tokenizer.replace('\'', "''");
640    let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
641    let cols: Vec<String> = if has_weights {
642        std::iter::once("node_logical_id UNINDEXED".to_owned())
643            .chain(schema.paths.iter().map(|p| {
644                let is_recursive = matches!(p.mode, crate::writer::PropertyPathMode::Recursive);
645                fathomdb_schema::fts_column_name(&p.path, is_recursive)
646            }))
647            .collect()
648    } else {
649        vec![
650            "node_logical_id UNINDEXED".to_owned(),
651            "text_content".to_owned(),
652        ]
653    };
654    conn.execute_batch(&format!(
655        "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5({cols}, tokenize='{tokenizer_sql}')",
656        cols = cols.join(", "),
657    ))?;
658    Ok(())
659}
660
661fn now_unix_ms() -> i64 {
662    now_unix_ms_pub()
663}
664
665/// Public-in-crate version of `now_unix_ms` so `admin.rs` can use it.
666pub(crate) fn now_unix_ms_pub() -> i64 {
667    std::time::SystemTime::now()
668        .duration_since(std::time::UNIX_EPOCH)
669        .unwrap_or(Duration::ZERO)
670        .as_millis()
671        .try_into()
672        .unwrap_or(i64::MAX)
673}
674
675/// Rebuild progress row returned from `AdminService::get_property_fts_rebuild_state`.
676#[derive(Debug)]
677pub struct RebuildStateRow {
678    pub kind: String,
679    pub schema_id: i64,
680    pub state: String,
681    pub rows_total: Option<i64>,
682    pub rows_done: i64,
683    pub started_at: i64,
684    pub is_first_registration: bool,
685    pub error_message: Option<String>,
686}
687
688/// Public progress snapshot returned from
689/// [`crate::coordinator::ExecutionCoordinator::get_property_fts_rebuild_progress`].
690#[derive(Debug, Clone, serde::Serialize)]
691pub struct RebuildProgress {
692    /// Current state: `"PENDING"`, `"BUILDING"`, `"SWAPPING"`, `"COMPLETE"`, or `"FAILED"`.
693    pub state: String,
694    /// Total rows to process. `None` until the actor has counted the nodes.
695    pub rows_total: Option<i64>,
696    /// Rows processed so far.
697    pub rows_done: i64,
698    /// Unix milliseconds when the rebuild was registered.
699    pub started_at: i64,
700    /// Unix milliseconds of the last progress update, if any.
701    pub last_progress_at: Option<i64>,
702    /// Error message if `state == "FAILED"`.
703    pub error_message: Option<String>,
704}
705
706/// Run crash recovery: mark any in-progress rebuilds as FAILED and clear their
707/// staging rows.  Called by `EngineRuntime::open` before spawning the actor.
708///
709/// # Errors
710/// Returns [`crate::EngineError`] if database access fails.
711pub(crate) fn recover_interrupted_rebuilds(
712    conn: &rusqlite::Connection,
713) -> Result<(), crate::EngineError> {
714    // Collect kinds that are in a non-terminal state.
715    let kinds: Vec<String> = {
716        let mut stmt = conn.prepare(
717            "SELECT kind FROM fts_property_rebuild_state \
718             WHERE state IN ('BUILDING', 'SWAPPING')",
719        )?;
720        stmt.query_map([], |r| r.get::<_, String>(0))?
721            .collect::<Result<Vec<_>, _>>()?
722    };
723
724    for kind in &kinds {
725        conn.execute(
726            "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
727            rusqlite::params![kind],
728        )?;
729        conn.execute(
730            "UPDATE fts_property_rebuild_state \
731             SET state = 'FAILED', error_message = 'interrupted by engine restart' \
732             WHERE kind = ?1",
733            rusqlite::params![kind],
734        )?;
735    }
736
737    Ok(())
738}
739
740#[cfg(test)]
741#[allow(clippy::expect_used)]
742mod tests {
743    use rusqlite::Connection;
744
745    use fathomdb_schema::SchemaManager;
746
747    use super::recover_interrupted_rebuilds;
748
749    fn bootstrapped_conn() -> Connection {
750        let conn = Connection::open_in_memory().expect("in-memory sqlite");
751        let manager = SchemaManager::new();
752        manager.bootstrap(&conn).expect("bootstrap");
753        conn
754    }
755
756    fn insert_rebuild_state(conn: &Connection, kind: &str, state: &str) {
757        conn.execute(
758            "INSERT INTO fts_property_rebuild_state \
759             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
760             VALUES (?1, 1, ?2, 0, 0, 0)",
761            rusqlite::params![kind, state],
762        )
763        .expect("insert rebuild state");
764    }
765
766    #[test]
767    fn pending_row_survives_restart() {
768        let conn = bootstrapped_conn();
769        insert_rebuild_state(&conn, "MyKind", "PENDING");
770
771        recover_interrupted_rebuilds(&conn).expect("recover");
772
773        let state: String = conn
774            .query_row(
775                "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
776                [],
777                |r| r.get(0),
778            )
779            .expect("state row");
780        assert_eq!(state, "PENDING", "PENDING rows must survive engine restart");
781    }
782
783    #[test]
784    fn building_row_marked_failed_on_restart() {
785        let conn = bootstrapped_conn();
786        insert_rebuild_state(&conn, "MyKind", "BUILDING");
787
788        recover_interrupted_rebuilds(&conn).expect("recover");
789
790        let state: String = conn
791            .query_row(
792                "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
793                [],
794                |r| r.get(0),
795            )
796            .expect("state row");
797        assert_eq!(
798            state, "FAILED",
799            "BUILDING rows must be marked FAILED on restart"
800        );
801    }
802
803    #[test]
804    fn swapping_row_marked_failed_on_restart() {
805        let conn = bootstrapped_conn();
806        insert_rebuild_state(&conn, "MyKind", "SWAPPING");
807
808        recover_interrupted_rebuilds(&conn).expect("recover");
809
810        let state: String = conn
811            .query_row(
812                "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
813                [],
814                |r| r.get(0),
815            )
816            .expect("state row");
817        assert_eq!(
818            state, "FAILED",
819            "SWAPPING rows must be marked FAILED on restart"
820        );
821    }
822
823    // --- A-6: rebuild swap targets per-kind table ---
824    #[test]
825    fn rebuild_swap_populates_per_kind_table() {
826        // This test calls run_rebuild() end-to-end and asserts the final rows
827        // land in the per-kind FTS table (fts_props_testkind), NOT in
828        // fts_node_properties.
829        let mut conn = bootstrapped_conn();
830        let kind = "TestKind";
831        let table = fathomdb_schema::fts_kind_table_name(kind);
832
833        // NOTE: The per-kind FTS table is intentionally NOT created here.
834        // The guard in run_rebuild (Step 5) must create it if absent.
835
836        // Insert a node with extractable property.
837        conn.execute(
838            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
839             VALUES ('row-1', 'node-1', ?1, '{\"name\":\"hello world\"}', 100, 'seed')",
840            rusqlite::params![kind],
841        )
842        .expect("insert node");
843
844        // Insert schema row.
845        let schema_id: i64 = {
846            conn.execute(
847                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
848                 VALUES (?1, '[\"$.name\"]', ' ')",
849                rusqlite::params![kind],
850            )
851            .expect("insert schema");
852            conn.query_row(
853                "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
854                rusqlite::params![kind],
855                |r| r.get(0),
856            )
857            .expect("schema_id")
858        };
859
860        // Insert rebuild state (PENDING).
861        conn.execute(
862            "INSERT INTO fts_property_rebuild_state \
863             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
864             VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
865            rusqlite::params![kind, schema_id],
866        )
867        .expect("insert rebuild state");
868
869        // Run the rebuild end-to-end.
870        let req = super::RebuildRequest {
871            kind: kind.to_owned(),
872            schema_id,
873        };
874        let shutdown = std::sync::atomic::AtomicBool::new(false);
875        super::run_rebuild(&mut conn, &req, &shutdown).expect("run_rebuild");
876
877        // After A-6: the per-kind table must have the rebuilt row.
878        let per_kind_count: i64 = conn
879            .query_row(
880                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'node-1'"),
881                [],
882                |r| r.get(0),
883            )
884            .expect("per-kind count");
885        assert_eq!(
886            per_kind_count, 1,
887            "per-kind table must have the rebuilt row after run_rebuild"
888        );
889    }
890
891    // --- B-3: rebuild_actor uses per-column INSERT for weighted schemas ---
892
893    #[test]
894    fn rebuild_actor_uses_per_column_for_weighted_schema() {
895        let mut conn = bootstrapped_conn();
896        let kind = "Article";
897        let table = fathomdb_schema::fts_kind_table_name(kind);
898
899        let title_col = fathomdb_schema::fts_column_name("$.title", false);
900        let body_col = fathomdb_schema::fts_column_name("$.body", false);
901
902        // Insert a node with two extractable properties.
903        conn.execute(
904            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
905             VALUES ('row-1', 'article-1', ?1, '{\"title\":\"Hello\",\"body\":\"World\"}', 100, 'seed')",
906            rusqlite::params![kind],
907        )
908        .expect("insert node");
909
910        // Register schema with weights.
911        let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
912        let schema_id: i64 = {
913            conn.execute(
914                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
915                 VALUES (?1, ?2, ' ')",
916                rusqlite::params![kind, paths_json],
917            )
918            .expect("insert schema");
919            conn.query_row(
920                "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
921                rusqlite::params![kind],
922                |r| r.get(0),
923            )
924            .expect("schema_id")
925        };
926
927        // Create the weighted per-kind FTS table.
928        conn.execute_batch(&format!(
929            "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
930                node_logical_id UNINDEXED, {title_col}, {body_col}, \
931                tokenize = 'porter unicode61 remove_diacritics 2'\
932            )"
933        ))
934        .expect("create weighted per-kind table");
935
936        // Insert rebuild state (PENDING).
937        conn.execute(
938            "INSERT INTO fts_property_rebuild_state \
939             (kind, schema_id, state, rows_done, started_at, is_first_registration) \
940             VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
941            rusqlite::params![kind, schema_id],
942        )
943        .expect("insert rebuild state");
944
945        // Run the rebuild end-to-end.
946        let req = super::RebuildRequest {
947            kind: kind.to_owned(),
948            schema_id,
949        };
950        let shutdown = std::sync::atomic::AtomicBool::new(false);
951        super::run_rebuild(&mut conn, &req, &shutdown).expect("run_rebuild");
952
953        // The per-kind table must have the rebuilt row.
954        let count: i64 = conn
955            .query_row(
956                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'article-1'"),
957                [],
958                |r| r.get(0),
959            )
960            .expect("count");
961        assert_eq!(count, 1, "per-kind table must have the rebuilt row");
962
963        // Verify per-column values.
964        let (title_val, body_val): (String, String) = conn
965            .query_row(
966                &format!(
967                    "SELECT {title_col}, {body_col} FROM {table} \
968                     WHERE node_logical_id = 'article-1'"
969                ),
970                [],
971                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
972            )
973            .expect("select per-column");
974        assert_eq!(
975            title_val, "Hello",
976            "title column must have correct value after rebuild"
977        );
978        assert_eq!(
979            body_val, "World",
980            "body column must have correct value after rebuild"
981        );
982    }
983}