Skip to main content

fathomdb_engine/
rebuild_actor.rs

1/// Background actor that serializes async property-FTS rebuild tasks.
2///
3/// Modeled exactly on [`crate::writer::WriterActor`]: one OS thread,
4/// `std::sync::mpsc`, `JoinHandle` for shutdown.  No tokio.
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::mpsc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use fathomdb_schema::SchemaManager;
12use rusqlite::OptionalExtension;
13
14use crate::{EngineError, sqlite};
15
16/// Mode passed to `register_fts_property_schema_with_entries`.
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum RebuildMode {
19    /// Legacy behavior: full rebuild runs inside the register transaction.
20    Eager,
21    /// 0.4.1+: schema is persisted synchronously; rebuild runs in background.
22    #[default]
23    Async,
24}
25
26/// A request to rebuild property-FTS for a single kind.
27#[derive(Debug)]
28pub struct RebuildRequest {
29    pub kind: String,
30    pub schema_id: i64,
31}
32
33/// Single-threaded actor that processes property-FTS rebuild requests one at
34/// a time.  Shutdown is cooperative: drop the sender side to close the channel,
35/// then join the thread.
36///
37/// The `RebuildActor` owns the `JoinHandle` only. The `SyncSender` lives in
38/// [`crate::admin::AdminService`] so the service can enqueue rebuild requests
39/// directly without going through the runtime.  The channel is created by
40/// [`RebuildActor::create_channel`] and the two halves are distributed by
41/// [`crate::runtime::EngineRuntime::open`].
42#[derive(Debug)]
43pub struct RebuildActor {
44    thread_handle: Option<thread::JoinHandle<()>>,
45}
46
47impl RebuildActor {
48    /// Create the mpsc channel used to communicate with the rebuild thread.
49    ///
50    /// Returns `(sender, actor)`.  The sender is given to
51    /// [`crate::admin::AdminService`]; the actor is kept in
52    /// [`crate::runtime::EngineRuntime`] for lifecycle management.
53    ///
54    /// # Errors
55    /// Returns [`EngineError::Io`] if the thread cannot be spawned.
56    pub fn start(
57        path: impl AsRef<Path>,
58        schema_manager: Arc<SchemaManager>,
59        receiver: mpsc::Receiver<RebuildRequest>,
60    ) -> Result<Self, EngineError> {
61        let database_path = path.as_ref().to_path_buf();
62
63        let handle = thread::Builder::new()
64            .name("fathomdb-rebuild".to_owned())
65            .spawn(move || {
66                rebuild_loop(&database_path, &schema_manager, receiver);
67            })
68            .map_err(EngineError::Io)?;
69
70        Ok(Self {
71            thread_handle: Some(handle),
72        })
73    }
74}
75
76impl Drop for RebuildActor {
77    fn drop(&mut self) {
78        // The sender was already closed by AdminService (or dropped when the
79        // engine closes).  Just join the thread.
80        if let Some(handle) = self.thread_handle.take() {
81            match handle.join() {
82                Ok(()) => {}
83                Err(payload) => {
84                    if std::thread::panicking() {
85                        trace_warn!(
86                            "rebuild thread panicked during shutdown (suppressed: already panicking)"
87                        );
88                    } else {
89                        std::panic::resume_unwind(payload);
90                    }
91                }
92            }
93        }
94    }
95}
96
97// ── rebuild loop ────────────────────────────────────────────────────────────
98
99/// Target wall-clock time for each batch transaction.
100const BATCH_TARGET_MS: u128 = 1000;
101/// Initial batch size.
102const INITIAL_BATCH_SIZE: usize = 5000;
103
104fn rebuild_loop(
105    database_path: &Path,
106    schema_manager: &Arc<SchemaManager>,
107    receiver: mpsc::Receiver<RebuildRequest>,
108) {
109    trace_info!("rebuild thread started");
110
111    let mut conn = match sqlite::open_connection(database_path) {
112        Ok(conn) => conn,
113        #[allow(clippy::used_underscore_binding)]
114        Err(_error) => {
115            trace_error!(error = %_error, "rebuild thread: database connection failed");
116            return;
117        }
118    };
119
120    #[allow(clippy::used_underscore_binding)]
121    if let Err(_error) = schema_manager.bootstrap(&conn) {
122        trace_error!(error = %_error, "rebuild thread: schema bootstrap failed");
123        return;
124    }
125
126    for req in receiver {
127        trace_info!(kind = %req.kind, schema_id = req.schema_id, "rebuild task started");
128        match run_rebuild(&mut conn, &req) {
129            Ok(()) => {
130                trace_info!(kind = %req.kind, "rebuild task COMPLETE");
131            }
132            Err(error) => {
133                trace_error!(kind = %req.kind, error = %error, "rebuild task failed");
134                let _ = mark_failed(&conn, &req.kind, &error.to_string());
135            }
136        }
137    }
138
139    trace_info!("rebuild thread exiting");
140}
141
142#[allow(clippy::too_many_lines)]
143fn run_rebuild(conn: &mut rusqlite::Connection, req: &RebuildRequest) -> Result<(), EngineError> {
144    // Step 1: mark BUILDING.
145    {
146        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
147        tx.execute(
148            "UPDATE fts_property_rebuild_state SET state = 'BUILDING' \
149             WHERE kind = ?1 AND schema_id = ?2",
150            rusqlite::params![req.kind, req.schema_id],
151        )?;
152        tx.commit()?;
153    }
154
155    // Step 2: count nodes for this kind (plain SELECT, no tx needed).
156    let rows_total: i64 = conn.query_row(
157        "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
158        rusqlite::params![req.kind],
159        |r| r.get(0),
160    )?;
161
162    {
163        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
164        tx.execute(
165            "UPDATE fts_property_rebuild_state SET rows_total = ?1 WHERE kind = ?2",
166            rusqlite::params![rows_total, req.kind],
167        )?;
168        tx.commit()?;
169    }
170
171    // Load the schema for this kind (plain SELECT).
172    let (paths_json, separator): (String, String) = conn
173        .query_row(
174            "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
175            rusqlite::params![req.kind],
176            |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
177        )
178        .optional()?
179        .ok_or_else(|| {
180            EngineError::Bridge(format!("rebuild: schema for kind '{}' missing", req.kind))
181        })?;
182    let schema = crate::writer::parse_property_schema_json(&paths_json, &separator);
183
184    // Step 3: batch-iterate nodes, insert into staging.
185    let mut offset: i64 = 0;
186    let mut batch_size = INITIAL_BATCH_SIZE;
187    let mut rows_done: i64 = 0;
188
189    loop {
190        // Fetch a batch of node logical_ids + properties (plain SELECT — no tx needed for reads).
191        let batch: Vec<(String, String)> = {
192            let mut stmt = conn.prepare(
193                "SELECT logical_id, properties FROM nodes \
194                 WHERE kind = ?1 AND superseded_at IS NULL \
195                 ORDER BY logical_id \
196                 LIMIT ?2 OFFSET ?3",
197            )?;
198            stmt.query_map(
199                rusqlite::params![
200                    req.kind,
201                    i64::try_from(batch_size).unwrap_or(i64::MAX),
202                    offset
203                ],
204                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
205            )?
206            .collect::<Result<Vec<_>, _>>()?
207        };
208
209        if batch.is_empty() {
210            break;
211        }
212
213        let batch_len = batch.len();
214        let batch_start = Instant::now();
215
216        // Insert staging rows in a single short transaction.
217        {
218            let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
219
220            for (logical_id, properties_str) in &batch {
221                let props: serde_json::Value =
222                    serde_json::from_str(properties_str).unwrap_or_default();
223                let (text, positions, _stats) =
224                    crate::writer::extract_property_fts(&props, &schema);
225
226                // Serialize positions to a compact JSON blob for later use at swap time.
227                let positions_blob: Option<Vec<u8>> = if positions.is_empty() {
228                    None
229                } else {
230                    let v: Vec<(usize, usize, &str)> = positions
231                        .iter()
232                        .map(|p| (p.start_offset, p.end_offset, p.leaf_path.as_str()))
233                        .collect();
234                    serde_json::to_vec(&v).ok()
235                };
236
237                let text_content = text.unwrap_or_default();
238
239                tx.execute(
240                    "INSERT INTO fts_property_rebuild_staging \
241                     (kind, node_logical_id, text_content, positions_blob) \
242                     VALUES (?1, ?2, ?3, ?4) \
243                     ON CONFLICT(kind, node_logical_id) DO UPDATE \
244                     SET text_content = excluded.text_content, \
245                         positions_blob = excluded.positions_blob",
246                    rusqlite::params![req.kind, logical_id, text_content, positions_blob],
247                )?;
248            }
249
250            rows_done += i64::try_from(batch_len).unwrap_or(i64::MAX);
251            let now_ms = now_unix_ms();
252            tx.execute(
253                "UPDATE fts_property_rebuild_state \
254                 SET rows_done = ?1, last_progress_at = ?2 \
255                 WHERE kind = ?3",
256                rusqlite::params![rows_done, now_ms, req.kind],
257            )?;
258            tx.commit()?;
259        }
260
261        let elapsed_ms = batch_start.elapsed().as_millis();
262        // Save the limit used for THIS batch before adjusting for the next one.
263        let limit_used = batch_size;
264        // Dynamically adjust batch size to target ~1s per batch.
265        if elapsed_ms > 0 {
266            let new_size = (batch_size as u128 * BATCH_TARGET_MS / elapsed_ms).clamp(100, 50_000);
267            batch_size = usize::try_from(new_size).unwrap_or(50_000);
268        }
269
270        offset += i64::try_from(batch_len).unwrap_or(i64::MAX);
271
272        // If the batch was smaller than the limit used for THIS query, we've reached the end.
273        if batch_len < limit_used {
274            break;
275        }
276    }
277
278    // Step 4: mark SWAPPING.
279    {
280        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
281        let now_ms = now_unix_ms();
282        tx.execute(
283            "UPDATE fts_property_rebuild_state \
284             SET state = 'SWAPPING', last_progress_at = ?1 \
285             WHERE kind = ?2",
286            rusqlite::params![now_ms, req.kind],
287        )?;
288        tx.commit()?;
289    }
290
291    // Step 5: Final swap — atomic IMMEDIATE transaction replacing live FTS rows.
292    {
293        let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
294
295        // 5a. Delete old live FTS rows for this kind.
296        tx.execute(
297            "DELETE FROM fts_node_properties WHERE kind = ?1",
298            rusqlite::params![req.kind],
299        )?;
300
301        // 5b. Insert new rows from staging into the live FTS table.
302        tx.execute(
303            "INSERT INTO fts_node_properties(node_logical_id, kind, text_content) \
304             SELECT node_logical_id, kind, text_content \
305             FROM fts_property_rebuild_staging WHERE kind = ?1",
306            rusqlite::params![req.kind],
307        )?;
308
309        // 5c. Delete old position rows for this kind.
310        tx.execute(
311            "DELETE FROM fts_node_property_positions WHERE kind = ?1",
312            rusqlite::params![req.kind],
313        )?;
314
315        // 5d. Re-populate fts_node_property_positions from positions_blob in staging.
316        {
317            let mut stmt = tx.prepare(
318                "SELECT node_logical_id, positions_blob \
319                 FROM fts_property_rebuild_staging \
320                 WHERE kind = ?1 AND positions_blob IS NOT NULL",
321            )?;
322            let mut ins_pos = tx.prepare(
323                "INSERT INTO fts_node_property_positions \
324                 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
325                 VALUES (?1, ?2, ?3, ?4, ?5)",
326            )?;
327
328            let rows: Vec<(String, Vec<u8>)> = stmt
329                .query_map(rusqlite::params![req.kind], |r| {
330                    Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
331                })?
332                .collect::<Result<Vec<_>, _>>()?;
333
334            for (node_logical_id, blob) in &rows {
335                // positions_blob is JSON: Vec<(start, end, leaf_path)>
336                let positions: Vec<(usize, usize, String)> =
337                    serde_json::from_slice(blob).unwrap_or_default();
338                for (start, end, leaf_path) in positions {
339                    ins_pos.execute(rusqlite::params![
340                        node_logical_id,
341                        req.kind,
342                        i64::try_from(start).unwrap_or(i64::MAX),
343                        i64::try_from(end).unwrap_or(i64::MAX),
344                        leaf_path,
345                    ])?;
346                }
347            }
348        }
349
350        // 5e. Delete staging rows for this kind.
351        tx.execute(
352            "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
353            rusqlite::params![req.kind],
354        )?;
355
356        // 5f. Mark state COMPLETE.
357        let now_ms = now_unix_ms();
358        tx.execute(
359            "UPDATE fts_property_rebuild_state \
360             SET state = 'COMPLETE', last_progress_at = ?1 \
361             WHERE kind = ?2",
362            rusqlite::params![now_ms, req.kind],
363        )?;
364
365        tx.commit()?;
366    }
367
368    Ok(())
369}
370
371fn mark_failed(
372    conn: &rusqlite::Connection,
373    kind: &str,
374    error_message: &str,
375) -> Result<(), EngineError> {
376    let now_ms = now_unix_ms();
377    conn.execute(
378        "UPDATE fts_property_rebuild_state \
379         SET state = 'FAILED', error_message = ?1, last_progress_at = ?2 \
380         WHERE kind = ?3",
381        rusqlite::params![error_message, now_ms, kind],
382    )?;
383    Ok(())
384}
385
386fn now_unix_ms() -> i64 {
387    now_unix_ms_pub()
388}
389
390/// Public-in-crate version of `now_unix_ms` so `admin.rs` can use it.
391pub(crate) fn now_unix_ms_pub() -> i64 {
392    std::time::SystemTime::now()
393        .duration_since(std::time::UNIX_EPOCH)
394        .unwrap_or(Duration::ZERO)
395        .as_millis()
396        .try_into()
397        .unwrap_or(i64::MAX)
398}
399
400/// Rebuild progress row returned from `AdminService::get_property_fts_rebuild_state`.
401#[derive(Debug)]
402pub struct RebuildStateRow {
403    pub kind: String,
404    pub schema_id: i64,
405    pub state: String,
406    pub rows_total: Option<i64>,
407    pub rows_done: i64,
408    pub started_at: i64,
409    pub is_first_registration: bool,
410    pub error_message: Option<String>,
411}
412
413/// Public progress snapshot returned from
414/// [`crate::coordinator::ExecutionCoordinator::get_property_fts_rebuild_progress`].
415#[derive(Debug, Clone, serde::Serialize)]
416pub struct RebuildProgress {
417    /// Current state: `"PENDING"`, `"BUILDING"`, `"SWAPPING"`, `"COMPLETE"`, or `"FAILED"`.
418    pub state: String,
419    /// Total rows to process. `None` until the actor has counted the nodes.
420    pub rows_total: Option<i64>,
421    /// Rows processed so far.
422    pub rows_done: i64,
423    /// Unix milliseconds when the rebuild was registered.
424    pub started_at: i64,
425    /// Unix milliseconds of the last progress update, if any.
426    pub last_progress_at: Option<i64>,
427    /// Error message if `state == "FAILED"`.
428    pub error_message: Option<String>,
429}
430
431/// Run crash recovery: mark any in-progress rebuilds as FAILED and clear their
432/// staging rows.  Called by `EngineRuntime::open` before spawning the actor.
433///
434/// # Errors
435/// Returns [`crate::EngineError`] if database access fails.
436pub(crate) fn recover_interrupted_rebuilds(
437    conn: &rusqlite::Connection,
438) -> Result<(), crate::EngineError> {
439    // Collect kinds that are in a non-terminal state.
440    let kinds: Vec<String> = {
441        let mut stmt = conn.prepare(
442            "SELECT kind FROM fts_property_rebuild_state \
443             WHERE state IN ('PENDING', 'BUILDING', 'SWAPPING')",
444        )?;
445        stmt.query_map([], |r| r.get::<_, String>(0))?
446            .collect::<Result<Vec<_>, _>>()?
447    };
448
449    for kind in &kinds {
450        conn.execute(
451            "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
452            rusqlite::params![kind],
453        )?;
454        conn.execute(
455            "UPDATE fts_property_rebuild_state \
456             SET state = 'FAILED', error_message = 'interrupted by engine restart' \
457             WHERE kind = ?1",
458            rusqlite::params![kind],
459        )?;
460    }
461
462    Ok(())
463}