1use std::path::Path;
6use std::sync::Arc;
7use std::sync::mpsc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use fathomdb_schema::SchemaManager;
12use rusqlite::OptionalExtension;
13
14use crate::{EngineError, sqlite};
15
16#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum RebuildMode {
19 Eager,
21 #[default]
23 Async,
24}
25
26#[derive(Debug)]
28pub struct RebuildRequest {
29 pub kind: String,
30 pub schema_id: i64,
31}
32
33#[derive(Debug)]
43pub struct RebuildActor {
44 thread_handle: Option<thread::JoinHandle<()>>,
45}
46
47impl RebuildActor {
48 pub fn start(
57 path: impl AsRef<Path>,
58 schema_manager: Arc<SchemaManager>,
59 receiver: mpsc::Receiver<RebuildRequest>,
60 ) -> Result<Self, EngineError> {
61 let database_path = path.as_ref().to_path_buf();
62
63 let handle = thread::Builder::new()
64 .name("fathomdb-rebuild".to_owned())
65 .spawn(move || {
66 rebuild_loop(&database_path, &schema_manager, receiver);
67 })
68 .map_err(EngineError::Io)?;
69
70 Ok(Self {
71 thread_handle: Some(handle),
72 })
73 }
74}
75
76impl Drop for RebuildActor {
77 fn drop(&mut self) {
78 if let Some(handle) = self.thread_handle.take() {
81 match handle.join() {
82 Ok(()) => {}
83 Err(payload) => {
84 if std::thread::panicking() {
85 trace_warn!(
86 "rebuild thread panicked during shutdown (suppressed: already panicking)"
87 );
88 } else {
89 std::panic::resume_unwind(payload);
90 }
91 }
92 }
93 }
94 }
95}
96
97const BATCH_TARGET_MS: u128 = 1000;
101const INITIAL_BATCH_SIZE: usize = 5000;
103
104fn rebuild_loop(
105 database_path: &Path,
106 schema_manager: &Arc<SchemaManager>,
107 receiver: mpsc::Receiver<RebuildRequest>,
108) {
109 trace_info!("rebuild thread started");
110
111 let mut conn = match sqlite::open_connection(database_path) {
112 Ok(conn) => conn,
113 #[allow(clippy::used_underscore_binding)]
114 Err(_error) => {
115 trace_error!(error = %_error, "rebuild thread: database connection failed");
116 return;
117 }
118 };
119
120 #[allow(clippy::used_underscore_binding)]
121 if let Err(_error) = schema_manager.bootstrap(&conn) {
122 trace_error!(error = %_error, "rebuild thread: schema bootstrap failed");
123 return;
124 }
125
126 for req in receiver {
127 trace_info!(kind = %req.kind, schema_id = req.schema_id, "rebuild task started");
128 match run_rebuild(&mut conn, &req) {
129 Ok(()) => {
130 trace_info!(kind = %req.kind, "rebuild task COMPLETE");
131 }
132 Err(error) => {
133 trace_error!(kind = %req.kind, error = %error, "rebuild task failed");
134 let _ = mark_failed(&conn, &req.kind, &error.to_string());
135 }
136 }
137 }
138
139 trace_info!("rebuild thread exiting");
140}
141
142#[allow(clippy::too_many_lines)]
143fn run_rebuild(conn: &mut rusqlite::Connection, req: &RebuildRequest) -> Result<(), EngineError> {
144 {
146 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
147 tx.execute(
148 "UPDATE fts_property_rebuild_state SET state = 'BUILDING' \
149 WHERE kind = ?1 AND schema_id = ?2",
150 rusqlite::params![req.kind, req.schema_id],
151 )?;
152 tx.commit()?;
153 }
154
155 let rows_total: i64 = conn.query_row(
157 "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
158 rusqlite::params![req.kind],
159 |r| r.get(0),
160 )?;
161
162 {
163 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
164 tx.execute(
165 "UPDATE fts_property_rebuild_state SET rows_total = ?1 WHERE kind = ?2",
166 rusqlite::params![rows_total, req.kind],
167 )?;
168 tx.commit()?;
169 }
170
171 let (paths_json, separator): (String, String) = conn
173 .query_row(
174 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
175 rusqlite::params![req.kind],
176 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
177 )
178 .optional()?
179 .ok_or_else(|| {
180 EngineError::Bridge(format!("rebuild: schema for kind '{}' missing", req.kind))
181 })?;
182 let schema = crate::writer::parse_property_schema_json(&paths_json, &separator);
183
184 let mut offset: i64 = 0;
186 let mut batch_size = INITIAL_BATCH_SIZE;
187 let mut rows_done: i64 = 0;
188
189 loop {
190 let batch: Vec<(String, String)> = {
192 let mut stmt = conn.prepare(
193 "SELECT logical_id, properties FROM nodes \
194 WHERE kind = ?1 AND superseded_at IS NULL \
195 ORDER BY logical_id \
196 LIMIT ?2 OFFSET ?3",
197 )?;
198 stmt.query_map(
199 rusqlite::params![
200 req.kind,
201 i64::try_from(batch_size).unwrap_or(i64::MAX),
202 offset
203 ],
204 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
205 )?
206 .collect::<Result<Vec<_>, _>>()?
207 };
208
209 if batch.is_empty() {
210 break;
211 }
212
213 let batch_len = batch.len();
214 let batch_start = Instant::now();
215
216 {
218 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
219
220 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
221
222 for (logical_id, properties_str) in &batch {
223 let props: serde_json::Value =
224 serde_json::from_str(properties_str).unwrap_or_default();
225 let (text, positions, _stats) =
226 crate::writer::extract_property_fts(&props, &schema);
227
228 let positions_blob: Option<Vec<u8>> = if positions.is_empty() {
230 None
231 } else {
232 let v: Vec<(usize, usize, &str)> = positions
233 .iter()
234 .map(|p| (p.start_offset, p.end_offset, p.leaf_path.as_str()))
235 .collect();
236 serde_json::to_vec(&v).ok()
237 };
238
239 let text_content = text.unwrap_or_default();
240
241 if has_weights {
242 let cols = crate::writer::extract_property_fts_columns(&props, &schema);
243 let json_map: serde_json::Map<String, serde_json::Value> = cols
244 .into_iter()
245 .map(|(k, v)| (k, serde_json::Value::String(v)))
246 .collect();
247 let columns_json =
248 serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
249 tx.execute(
250 "INSERT INTO fts_property_rebuild_staging \
251 (kind, node_logical_id, text_content, positions_blob, columns_json) \
252 VALUES (?1, ?2, ?3, ?4, ?5) \
253 ON CONFLICT(kind, node_logical_id) DO UPDATE \
254 SET text_content = excluded.text_content, \
255 positions_blob = excluded.positions_blob, \
256 columns_json = excluded.columns_json",
257 rusqlite::params![
258 req.kind,
259 logical_id,
260 text_content,
261 positions_blob,
262 columns_json
263 ],
264 )?;
265 } else {
266 tx.execute(
267 "INSERT INTO fts_property_rebuild_staging \
268 (kind, node_logical_id, text_content, positions_blob) \
269 VALUES (?1, ?2, ?3, ?4) \
270 ON CONFLICT(kind, node_logical_id) DO UPDATE \
271 SET text_content = excluded.text_content, \
272 positions_blob = excluded.positions_blob",
273 rusqlite::params![req.kind, logical_id, text_content, positions_blob],
274 )?;
275 }
276 }
277
278 rows_done += i64::try_from(batch_len).unwrap_or(i64::MAX);
279 let now_ms = now_unix_ms();
280 tx.execute(
281 "UPDATE fts_property_rebuild_state \
282 SET rows_done = ?1, last_progress_at = ?2 \
283 WHERE kind = ?3",
284 rusqlite::params![rows_done, now_ms, req.kind],
285 )?;
286 tx.commit()?;
287 }
288
289 let elapsed_ms = batch_start.elapsed().as_millis();
290 let limit_used = batch_size;
292 if let Some(new_size) = (batch_size as u128 * BATCH_TARGET_MS).checked_div(elapsed_ms) {
294 let new_size = new_size.clamp(100, 50_000);
295 batch_size = usize::try_from(new_size).unwrap_or(50_000);
296 }
297
298 offset += i64::try_from(batch_len).unwrap_or(i64::MAX);
299
300 if batch_len < limit_used {
302 break;
303 }
304 }
305
306 {
308 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
309 let now_ms = now_unix_ms();
310 tx.execute(
311 "UPDATE fts_property_rebuild_state \
312 SET state = 'SWAPPING', last_progress_at = ?1 \
313 WHERE kind = ?2",
314 rusqlite::params![now_ms, req.kind],
315 )?;
316 tx.commit()?;
317 }
318
319 {
321 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
322
323 let table = fathomdb_schema::fts_kind_table_name(&req.kind);
324
325 let tokenizer = fathomdb_schema::DEFAULT_FTS_TOKENIZER;
328 let create_ddl = format!(
329 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
330 node_logical_id UNINDEXED, text_content, \
331 tokenize = '{tokenizer}'\
332 )"
333 );
334 tx.execute_batch(&create_ddl)?;
335
336 tx.execute(&format!("DELETE FROM {table}"), [])?;
338
339 {
343 let has_columns: bool = tx
345 .query_row(
346 "SELECT count(*) FROM fts_property_rebuild_staging \
347 WHERE kind = ?1 AND columns_json IS NOT NULL",
348 rusqlite::params![req.kind],
349 |r| r.get::<_, i64>(0),
350 )
351 .unwrap_or(0)
352 > 0;
353
354 if has_columns {
355 let rows_with_columns: Vec<(String, String)> = {
357 let mut stmt = tx.prepare(
358 "SELECT node_logical_id, columns_json \
359 FROM fts_property_rebuild_staging \
360 WHERE kind = ?1 AND columns_json IS NOT NULL",
361 )?;
362 stmt.query_map(rusqlite::params![req.kind], |r| {
363 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
364 })?
365 .collect::<Result<Vec<_>, _>>()?
366 };
367
368 for (node_id, columns_json_str) in &rows_with_columns {
369 let col_map: serde_json::Map<String, serde_json::Value> =
370 serde_json::from_str(columns_json_str).unwrap_or_default();
371 let col_names: Vec<String> = col_map.keys().cloned().collect();
372 let col_values: Vec<String> = col_names
373 .iter()
374 .map(|k| {
375 col_map
376 .get(k)
377 .and_then(|v| v.as_str())
378 .unwrap_or("")
379 .to_owned()
380 })
381 .collect();
382 let placeholders: Vec<String> =
383 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
384 let sql = format!(
385 "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
386 cols = col_names.join(", "),
387 placeholders = placeholders.join(", "),
388 );
389 let mut stmt = tx.prepare(&sql)?;
390 stmt.execute(rusqlite::params_from_iter(
391 std::iter::once(node_id.as_str())
392 .chain(col_values.iter().map(String::as_str)),
393 ))?;
394 }
395
396 } else {
400 tx.execute(
402 &format!(
403 "INSERT INTO {table}(node_logical_id, text_content) \
404 SELECT node_logical_id, text_content \
405 FROM fts_property_rebuild_staging WHERE kind = ?1"
406 ),
407 rusqlite::params![req.kind],
408 )?;
409 }
410 }
411
412 tx.execute(
414 "DELETE FROM fts_node_property_positions WHERE kind = ?1",
415 rusqlite::params![req.kind],
416 )?;
417
418 {
420 let mut stmt = tx.prepare(
421 "SELECT node_logical_id, positions_blob \
422 FROM fts_property_rebuild_staging \
423 WHERE kind = ?1 AND positions_blob IS NOT NULL",
424 )?;
425 let mut ins_pos = tx.prepare(
426 "INSERT INTO fts_node_property_positions \
427 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
428 VALUES (?1, ?2, ?3, ?4, ?5)",
429 )?;
430
431 let rows: Vec<(String, Vec<u8>)> = stmt
432 .query_map(rusqlite::params![req.kind], |r| {
433 Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
434 })?
435 .collect::<Result<Vec<_>, _>>()?;
436
437 for (node_logical_id, blob) in &rows {
438 let positions: Vec<(usize, usize, String)> =
440 serde_json::from_slice(blob).unwrap_or_default();
441 for (start, end, leaf_path) in positions {
442 ins_pos.execute(rusqlite::params![
443 node_logical_id,
444 req.kind,
445 i64::try_from(start).unwrap_or(i64::MAX),
446 i64::try_from(end).unwrap_or(i64::MAX),
447 leaf_path,
448 ])?;
449 }
450 }
451 }
452
453 tx.execute(
455 "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
456 rusqlite::params![req.kind],
457 )?;
458
459 let now_ms = now_unix_ms();
461 tx.execute(
462 "UPDATE fts_property_rebuild_state \
463 SET state = 'COMPLETE', last_progress_at = ?1 \
464 WHERE kind = ?2",
465 rusqlite::params![now_ms, req.kind],
466 )?;
467
468 tx.commit()?;
469 }
470
471 Ok(())
472}
473
474fn mark_failed(
475 conn: &rusqlite::Connection,
476 kind: &str,
477 error_message: &str,
478) -> Result<(), EngineError> {
479 let now_ms = now_unix_ms();
480 conn.execute(
481 "UPDATE fts_property_rebuild_state \
482 SET state = 'FAILED', error_message = ?1, last_progress_at = ?2 \
483 WHERE kind = ?3",
484 rusqlite::params![error_message, now_ms, kind],
485 )?;
486 Ok(())
487}
488
489fn now_unix_ms() -> i64 {
490 now_unix_ms_pub()
491}
492
493pub(crate) fn now_unix_ms_pub() -> i64 {
495 std::time::SystemTime::now()
496 .duration_since(std::time::UNIX_EPOCH)
497 .unwrap_or(Duration::ZERO)
498 .as_millis()
499 .try_into()
500 .unwrap_or(i64::MAX)
501}
502
503#[derive(Debug)]
505pub struct RebuildStateRow {
506 pub kind: String,
507 pub schema_id: i64,
508 pub state: String,
509 pub rows_total: Option<i64>,
510 pub rows_done: i64,
511 pub started_at: i64,
512 pub is_first_registration: bool,
513 pub error_message: Option<String>,
514}
515
516#[derive(Debug, Clone, serde::Serialize)]
519pub struct RebuildProgress {
520 pub state: String,
522 pub rows_total: Option<i64>,
524 pub rows_done: i64,
526 pub started_at: i64,
528 pub last_progress_at: Option<i64>,
530 pub error_message: Option<String>,
532}
533
534pub(crate) fn recover_interrupted_rebuilds(
540 conn: &rusqlite::Connection,
541) -> Result<(), crate::EngineError> {
542 let kinds: Vec<String> = {
544 let mut stmt = conn.prepare(
545 "SELECT kind FROM fts_property_rebuild_state \
546 WHERE state IN ('BUILDING', 'SWAPPING')",
547 )?;
548 stmt.query_map([], |r| r.get::<_, String>(0))?
549 .collect::<Result<Vec<_>, _>>()?
550 };
551
552 for kind in &kinds {
553 conn.execute(
554 "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
555 rusqlite::params![kind],
556 )?;
557 conn.execute(
558 "UPDATE fts_property_rebuild_state \
559 SET state = 'FAILED', error_message = 'interrupted by engine restart' \
560 WHERE kind = ?1",
561 rusqlite::params![kind],
562 )?;
563 }
564
565 Ok(())
566}
567
568#[cfg(test)]
569#[allow(clippy::expect_used)]
570mod tests {
571 use rusqlite::Connection;
572
573 use fathomdb_schema::SchemaManager;
574
575 use super::recover_interrupted_rebuilds;
576
577 fn bootstrapped_conn() -> Connection {
578 let conn = Connection::open_in_memory().expect("in-memory sqlite");
579 let manager = SchemaManager::new();
580 manager.bootstrap(&conn).expect("bootstrap");
581 conn
582 }
583
584 fn insert_rebuild_state(conn: &Connection, kind: &str, state: &str) {
585 conn.execute(
586 "INSERT INTO fts_property_rebuild_state \
587 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
588 VALUES (?1, 1, ?2, 0, 0, 0)",
589 rusqlite::params![kind, state],
590 )
591 .expect("insert rebuild state");
592 }
593
594 #[test]
595 fn pending_row_survives_restart() {
596 let conn = bootstrapped_conn();
597 insert_rebuild_state(&conn, "MyKind", "PENDING");
598
599 recover_interrupted_rebuilds(&conn).expect("recover");
600
601 let state: String = conn
602 .query_row(
603 "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
604 [],
605 |r| r.get(0),
606 )
607 .expect("state row");
608 assert_eq!(state, "PENDING", "PENDING rows must survive engine restart");
609 }
610
611 #[test]
612 fn building_row_marked_failed_on_restart() {
613 let conn = bootstrapped_conn();
614 insert_rebuild_state(&conn, "MyKind", "BUILDING");
615
616 recover_interrupted_rebuilds(&conn).expect("recover");
617
618 let state: String = conn
619 .query_row(
620 "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
621 [],
622 |r| r.get(0),
623 )
624 .expect("state row");
625 assert_eq!(
626 state, "FAILED",
627 "BUILDING rows must be marked FAILED on restart"
628 );
629 }
630
631 #[test]
632 fn swapping_row_marked_failed_on_restart() {
633 let conn = bootstrapped_conn();
634 insert_rebuild_state(&conn, "MyKind", "SWAPPING");
635
636 recover_interrupted_rebuilds(&conn).expect("recover");
637
638 let state: String = conn
639 .query_row(
640 "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
641 [],
642 |r| r.get(0),
643 )
644 .expect("state row");
645 assert_eq!(
646 state, "FAILED",
647 "SWAPPING rows must be marked FAILED on restart"
648 );
649 }
650
651 #[test]
653 fn rebuild_swap_populates_per_kind_table() {
654 let mut conn = bootstrapped_conn();
658 let kind = "TestKind";
659 let table = fathomdb_schema::fts_kind_table_name(kind);
660
661 conn.execute(
666 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
667 VALUES ('row-1', 'node-1', ?1, '{\"name\":\"hello world\"}', 100, 'seed')",
668 rusqlite::params![kind],
669 )
670 .expect("insert node");
671
672 let schema_id: i64 = {
674 conn.execute(
675 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
676 VALUES (?1, '[\"$.name\"]', ' ')",
677 rusqlite::params![kind],
678 )
679 .expect("insert schema");
680 conn.query_row(
681 "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
682 rusqlite::params![kind],
683 |r| r.get(0),
684 )
685 .expect("schema_id")
686 };
687
688 conn.execute(
690 "INSERT INTO fts_property_rebuild_state \
691 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
692 VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
693 rusqlite::params![kind, schema_id],
694 )
695 .expect("insert rebuild state");
696
697 let req = super::RebuildRequest {
699 kind: kind.to_owned(),
700 schema_id,
701 };
702 super::run_rebuild(&mut conn, &req).expect("run_rebuild");
703
704 let per_kind_count: i64 = conn
706 .query_row(
707 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'node-1'"),
708 [],
709 |r| r.get(0),
710 )
711 .expect("per-kind count");
712 assert_eq!(
713 per_kind_count, 1,
714 "per-kind table must have the rebuilt row after run_rebuild"
715 );
716 }
717
718 #[test]
721 fn rebuild_actor_uses_per_column_for_weighted_schema() {
722 let mut conn = bootstrapped_conn();
723 let kind = "Article";
724 let table = fathomdb_schema::fts_kind_table_name(kind);
725
726 let title_col = fathomdb_schema::fts_column_name("$.title", false);
727 let body_col = fathomdb_schema::fts_column_name("$.body", false);
728
729 conn.execute(
731 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
732 VALUES ('row-1', 'article-1', ?1, '{\"title\":\"Hello\",\"body\":\"World\"}', 100, 'seed')",
733 rusqlite::params![kind],
734 )
735 .expect("insert node");
736
737 let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
739 let schema_id: i64 = {
740 conn.execute(
741 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
742 VALUES (?1, ?2, ' ')",
743 rusqlite::params![kind, paths_json],
744 )
745 .expect("insert schema");
746 conn.query_row(
747 "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
748 rusqlite::params![kind],
749 |r| r.get(0),
750 )
751 .expect("schema_id")
752 };
753
754 conn.execute_batch(&format!(
756 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
757 node_logical_id UNINDEXED, {title_col}, {body_col}, \
758 tokenize = 'porter unicode61 remove_diacritics 2'\
759 )"
760 ))
761 .expect("create weighted per-kind table");
762
763 conn.execute(
765 "INSERT INTO fts_property_rebuild_state \
766 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
767 VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
768 rusqlite::params![kind, schema_id],
769 )
770 .expect("insert rebuild state");
771
772 let req = super::RebuildRequest {
774 kind: kind.to_owned(),
775 schema_id,
776 };
777 super::run_rebuild(&mut conn, &req).expect("run_rebuild");
778
779 let count: i64 = conn
781 .query_row(
782 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'article-1'"),
783 [],
784 |r| r.get(0),
785 )
786 .expect("count");
787 assert_eq!(count, 1, "per-kind table must have the rebuilt row");
788
789 let (title_val, body_val): (String, String) = conn
791 .query_row(
792 &format!(
793 "SELECT {title_col}, {body_col} FROM {table} \
794 WHERE node_logical_id = 'article-1'"
795 ),
796 [],
797 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
798 )
799 .expect("select per-column");
800 assert_eq!(
801 title_val, "Hello",
802 "title column must have correct value after rebuild"
803 );
804 assert_eq!(
805 body_val, "World",
806 "body column must have correct value after rebuild"
807 );
808 }
809}