1use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use fathomdb_schema::SchemaManager;
13use rusqlite::OptionalExtension;
14
15use crate::{EngineError, sqlite};
16
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
19pub enum RebuildMode {
20 Eager,
22 #[default]
24 Async,
25}
26
27#[derive(Debug)]
29pub struct RebuildRequest {
30 pub kind: String,
31 pub schema_id: i64,
32}
33
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub enum RebuildSubmit {
36 Submitted,
37 PersistedPending,
38}
39
40#[derive(Clone, Debug)]
41pub struct RebuildClient {
42 sender: mpsc::SyncSender<RebuildRequest>,
43 shutdown: Arc<AtomicBool>,
44}
45
46impl RebuildClient {
47 #[must_use]
48 pub fn new(sender: mpsc::SyncSender<RebuildRequest>, shutdown: Arc<AtomicBool>) -> Self {
49 Self { sender, shutdown }
50 }
51
52 pub fn try_submit(&self, req: RebuildRequest) -> Result<RebuildSubmit, EngineError> {
55 if self.shutdown.load(Ordering::Acquire) {
56 return Err(EngineError::Bridge("engine is shutting down".to_owned()));
57 }
58 match self.sender.try_send(req) {
59 Ok(()) => Ok(RebuildSubmit::Submitted),
60 Err(mpsc::TrySendError::Full(_) | mpsc::TrySendError::Disconnected(_)) => {
61 Ok(RebuildSubmit::PersistedPending)
62 }
63 }
64 }
65}
66
67#[derive(Debug)]
71pub struct RebuildActor {
72 thread_handle: Option<thread::JoinHandle<()>>,
73 shutdown: Arc<AtomicBool>,
74}
75
76impl RebuildActor {
77 pub fn start(
86 path: impl AsRef<Path>,
87 schema_manager: Arc<SchemaManager>,
88 receiver: mpsc::Receiver<RebuildRequest>,
89 shutdown: Arc<AtomicBool>,
90 ) -> Result<Self, EngineError> {
91 let database_path = path.as_ref().to_path_buf();
92 let thread_shutdown = Arc::clone(&shutdown);
93
94 let handle = thread::Builder::new()
95 .name("fathomdb-rebuild".to_owned())
96 .spawn(move || {
97 rebuild_loop(&database_path, &schema_manager, receiver, &thread_shutdown);
98 })
99 .map_err(EngineError::Io)?;
100
101 Ok(Self {
102 thread_handle: Some(handle),
103 shutdown,
104 })
105 }
106}
107
108impl Drop for RebuildActor {
109 fn drop(&mut self) {
110 self.shutdown.store(true, Ordering::Release);
111 if let Some(handle) = self.thread_handle.take() {
112 match handle.join() {
113 Ok(()) => {}
114 Err(payload) => {
115 if std::thread::panicking() {
116 trace_warn!(
117 "rebuild thread panicked during shutdown (suppressed: already panicking)"
118 );
119 } else {
120 std::panic::resume_unwind(payload);
121 }
122 }
123 }
124 }
125 }
126}
127
128const BATCH_TARGET_MS: u128 = 1000;
132const INITIAL_BATCH_SIZE: usize = 5000;
134
135#[allow(clippy::needless_pass_by_value)]
136fn rebuild_loop(
137 database_path: &Path,
138 schema_manager: &Arc<SchemaManager>,
139 receiver: mpsc::Receiver<RebuildRequest>,
140 shutdown: &AtomicBool,
141) {
142 trace_info!("rebuild thread started");
143
144 let mut conn = match sqlite::open_connection(database_path) {
145 Ok(conn) => conn,
146 #[allow(clippy::used_underscore_binding)]
147 Err(_error) => {
148 trace_error!(error = %_error, "rebuild thread: database connection failed");
149 return;
150 }
151 };
152
153 #[allow(clippy::used_underscore_binding)]
154 if let Err(_error) = schema_manager.bootstrap(&conn) {
155 trace_error!(error = %_error, "rebuild thread: schema bootstrap failed");
156 return;
157 }
158
159 let _ = process_pending_rebuilds(&mut conn, shutdown);
160
161 loop {
162 if shutdown.load(Ordering::Acquire) {
163 break;
164 }
165 match receiver.recv_timeout(Duration::from_millis(250)) {
166 Ok(req) => handle_rebuild_request(&mut conn, &req, shutdown),
167 Err(mpsc::RecvTimeoutError::Timeout) => {
168 let _ = process_pending_rebuilds(&mut conn, shutdown);
169 }
170 Err(mpsc::RecvTimeoutError::Disconnected) => {
171 let _ = process_pending_rebuilds(&mut conn, shutdown);
172 break;
173 }
174 }
175 }
176
177 trace_info!("rebuild thread exiting");
178}
179
180fn handle_rebuild_request(
181 conn: &mut rusqlite::Connection,
182 req: &RebuildRequest,
183 shutdown: &AtomicBool,
184) {
185 trace_info!(kind = %req.kind, schema_id = req.schema_id, "rebuild task started");
186 match run_rebuild(conn, req, shutdown) {
187 Ok(()) => {
188 trace_info!(kind = %req.kind, "rebuild task COMPLETE");
189 }
190 Err(error) => {
191 trace_error!(kind = %req.kind, error = %error, "rebuild task failed");
192 let _ = mark_failed(conn, &req.kind, &error.to_string());
193 }
194 }
195}
196
197fn process_pending_rebuilds(
198 conn: &mut rusqlite::Connection,
199 shutdown: &AtomicBool,
200) -> Result<(), EngineError> {
201 let pending: Vec<RebuildRequest> = {
202 let mut stmt = conn.prepare(
203 "SELECT kind, schema_id FROM fts_property_rebuild_state \
204 WHERE state = 'PENDING' ORDER BY started_at, kind LIMIT 16",
205 )?;
206 stmt.query_map([], |r| {
207 Ok(RebuildRequest {
208 kind: r.get(0)?,
209 schema_id: r.get(1)?,
210 })
211 })?
212 .collect::<Result<Vec<_>, _>>()?
213 };
214 for req in &pending {
215 if shutdown.load(Ordering::Acquire) {
216 break;
217 }
218 handle_rebuild_request(conn, req, shutdown);
219 }
220 Ok(())
221}
222
223#[allow(clippy::too_many_lines)]
224fn run_rebuild(
225 conn: &mut rusqlite::Connection,
226 req: &RebuildRequest,
227 shutdown: &AtomicBool,
228) -> Result<(), EngineError> {
229 {
231 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
232 let claimed = tx.execute(
233 "UPDATE fts_property_rebuild_state SET state = 'BUILDING' \
234 WHERE kind = ?1 AND schema_id = ?2 AND state = 'PENDING'",
235 rusqlite::params![req.kind, req.schema_id],
236 )?;
237 tx.commit()?;
238 if claimed == 0 {
239 trace_warn!(
240 kind = %req.kind,
241 schema_id = req.schema_id,
242 "rebuild request skipped because it is no longer pending"
243 );
244 return Ok(());
245 }
246 }
247
248 let rows_total: i64 = conn.query_row(
250 "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
251 rusqlite::params![req.kind],
252 |r| r.get(0),
253 )?;
254
255 {
256 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
257 tx.execute(
258 "UPDATE fts_property_rebuild_state SET rows_total = ?1 WHERE kind = ?2",
259 rusqlite::params![rows_total, req.kind],
260 )?;
261 tx.commit()?;
262 }
263
264 let (paths_json, separator): (String, String) = conn
266 .query_row(
267 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
268 rusqlite::params![req.kind],
269 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
270 )
271 .optional()?
272 .ok_or_else(|| {
273 EngineError::Bridge(format!("rebuild: schema for kind '{}' missing", req.kind))
274 })?;
275 let schema = crate::writer::parse_property_schema_json(&paths_json, &separator);
276
277 let mut batch_size = INITIAL_BATCH_SIZE;
279 let mut rows_done: i64 = 0;
280 let mut last_logical_id = String::new();
281
282 loop {
283 abort_if_shutdown(conn, &req.kind, shutdown)?;
284 let batch: Vec<(String, String)> = {
286 let mut stmt = conn.prepare(
287 "SELECT logical_id, properties FROM nodes \
288 WHERE kind = ?1 AND superseded_at IS NULL AND logical_id > ?2 \
289 ORDER BY logical_id \
290 LIMIT ?3",
291 )?;
292 stmt.query_map(
293 rusqlite::params![
294 req.kind,
295 last_logical_id,
296 i64::try_from(batch_size).unwrap_or(i64::MAX),
297 ],
298 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
299 )?
300 .collect::<Result<Vec<_>, _>>()?
301 };
302
303 if batch.is_empty() {
304 break;
305 }
306
307 let batch_len = batch.len();
308 let batch_start = Instant::now();
309 if let Some((logical_id, _)) = batch.last() {
310 last_logical_id.clone_from(logical_id);
311 }
312
313 {
315 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
316
317 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
318
319 for (logical_id, properties_str) in &batch {
320 let props: serde_json::Value =
321 serde_json::from_str(properties_str).unwrap_or_default();
322 let (text, positions, _stats) =
323 crate::writer::extract_property_fts(&props, &schema);
324
325 let positions_blob: Option<Vec<u8>> = if positions.is_empty() {
327 None
328 } else {
329 let v: Vec<(usize, usize, &str)> = positions
330 .iter()
331 .map(|p| (p.start_offset, p.end_offset, p.leaf_path.as_str()))
332 .collect();
333 serde_json::to_vec(&v).ok()
334 };
335
336 let text_content = text.unwrap_or_default();
337
338 if has_weights {
339 let cols = crate::writer::extract_property_fts_columns(&props, &schema);
340 let json_map: serde_json::Map<String, serde_json::Value> = cols
341 .into_iter()
342 .map(|(k, v)| (k, serde_json::Value::String(v)))
343 .collect();
344 let columns_json =
345 serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
346 tx.execute(
347 "INSERT INTO fts_property_rebuild_staging \
348 (kind, node_logical_id, text_content, positions_blob, columns_json) \
349 VALUES (?1, ?2, ?3, ?4, ?5) \
350 ON CONFLICT(kind, node_logical_id) DO UPDATE \
351 SET text_content = excluded.text_content, \
352 positions_blob = excluded.positions_blob, \
353 columns_json = excluded.columns_json",
354 rusqlite::params![
355 req.kind,
356 logical_id,
357 text_content,
358 positions_blob,
359 columns_json
360 ],
361 )?;
362 } else {
363 tx.execute(
364 "INSERT INTO fts_property_rebuild_staging \
365 (kind, node_logical_id, text_content, positions_blob) \
366 VALUES (?1, ?2, ?3, ?4) \
367 ON CONFLICT(kind, node_logical_id) DO UPDATE \
368 SET text_content = excluded.text_content, \
369 positions_blob = excluded.positions_blob",
370 rusqlite::params![req.kind, logical_id, text_content, positions_blob],
371 )?;
372 }
373 }
374
375 rows_done += i64::try_from(batch_len).unwrap_or(i64::MAX);
376 let now_ms = now_unix_ms();
377 tx.execute(
378 "UPDATE fts_property_rebuild_state \
379 SET rows_done = ?1, last_progress_at = ?2 \
380 WHERE kind = ?3",
381 rusqlite::params![rows_done, now_ms, req.kind],
382 )?;
383 tx.commit()?;
384 }
385
386 let elapsed_ms = batch_start.elapsed().as_millis();
387 let limit_used = batch_size;
389 if let Some(new_size) = (batch_size as u128 * BATCH_TARGET_MS).checked_div(elapsed_ms) {
391 let new_size = new_size.clamp(100, 50_000);
392 batch_size = usize::try_from(new_size).unwrap_or(50_000);
393 }
394
395 if batch_len < limit_used {
397 break;
398 }
399 }
400
401 {
403 abort_if_shutdown(conn, &req.kind, shutdown)?;
404 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
405 let now_ms = now_unix_ms();
406 tx.execute(
407 "UPDATE fts_property_rebuild_state \
408 SET state = 'SWAPPING', last_progress_at = ?1 \
409 WHERE kind = ?2",
410 rusqlite::params![now_ms, req.kind],
411 )?;
412 tx.commit()?;
413 }
414
415 {
417 abort_if_shutdown(conn, &req.kind, shutdown)?;
418 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
419 let current: Option<(String, i64)> = tx
420 .query_row(
421 "SELECT state, schema_id FROM fts_property_rebuild_state WHERE kind = ?1",
422 rusqlite::params![req.kind],
423 |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)),
424 )
425 .optional()?;
426 if current.as_ref() != Some(&("SWAPPING".to_owned(), req.schema_id)) {
427 trace_warn!(
428 kind = %req.kind,
429 schema_id = req.schema_id,
430 "rebuild swap skipped because state/schema changed"
431 );
432 tx.commit()?;
433 return Ok(());
434 }
435
436 let table = fathomdb_schema::fts_kind_table_name(&req.kind);
437
438 ensure_fts_kind_table_for_schema(&tx, &req.kind, &schema)?;
439
440 tx.execute(&format!("DELETE FROM {table}"), [])?;
442
443 {
447 let has_columns: bool = tx
449 .query_row(
450 "SELECT count(*) FROM fts_property_rebuild_staging \
451 WHERE kind = ?1 AND columns_json IS NOT NULL",
452 rusqlite::params![req.kind],
453 |r| r.get::<_, i64>(0),
454 )
455 .unwrap_or(0)
456 > 0;
457
458 if has_columns {
459 let rows_with_columns: Vec<(String, String)> = {
461 let mut stmt = tx.prepare(
462 "SELECT node_logical_id, columns_json \
463 FROM fts_property_rebuild_staging \
464 WHERE kind = ?1 AND columns_json IS NOT NULL",
465 )?;
466 stmt.query_map(rusqlite::params![req.kind], |r| {
467 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
468 })?
469 .collect::<Result<Vec<_>, _>>()?
470 };
471
472 for (node_id, columns_json_str) in &rows_with_columns {
473 let col_map: serde_json::Map<String, serde_json::Value> =
474 serde_json::from_str(columns_json_str).unwrap_or_default();
475 let col_names: Vec<String> = col_map.keys().cloned().collect();
476 let col_values: Vec<String> = col_names
477 .iter()
478 .map(|k| {
479 col_map
480 .get(k)
481 .and_then(|v| v.as_str())
482 .unwrap_or("")
483 .to_owned()
484 })
485 .collect();
486 let placeholders: Vec<String> =
487 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
488 let sql = format!(
489 "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
490 cols = col_names.join(", "),
491 placeholders = placeholders.join(", "),
492 );
493 let mut stmt = tx.prepare(&sql)?;
494 stmt.execute(rusqlite::params_from_iter(
495 std::iter::once(node_id.as_str())
496 .chain(col_values.iter().map(String::as_str)),
497 ))?;
498 }
499
500 } else {
504 tx.execute(
506 &format!(
507 "INSERT INTO {table}(node_logical_id, text_content) \
508 SELECT node_logical_id, text_content \
509 FROM fts_property_rebuild_staging WHERE kind = ?1"
510 ),
511 rusqlite::params![req.kind],
512 )?;
513 }
514 }
515
516 tx.execute(
518 "DELETE FROM fts_node_property_positions WHERE kind = ?1",
519 rusqlite::params![req.kind],
520 )?;
521
522 {
524 let mut stmt = tx.prepare(
525 "SELECT node_logical_id, positions_blob \
526 FROM fts_property_rebuild_staging \
527 WHERE kind = ?1 AND positions_blob IS NOT NULL",
528 )?;
529 let mut ins_pos = tx.prepare(
530 "INSERT INTO fts_node_property_positions \
531 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
532 VALUES (?1, ?2, ?3, ?4, ?5)",
533 )?;
534
535 let rows: Vec<(String, Vec<u8>)> = stmt
536 .query_map(rusqlite::params![req.kind], |r| {
537 Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
538 })?
539 .collect::<Result<Vec<_>, _>>()?;
540
541 for (node_logical_id, blob) in &rows {
542 let positions: Vec<(usize, usize, String)> =
544 serde_json::from_slice(blob).unwrap_or_default();
545 for (start, end, leaf_path) in positions {
546 ins_pos.execute(rusqlite::params![
547 node_logical_id,
548 req.kind,
549 i64::try_from(start).unwrap_or(i64::MAX),
550 i64::try_from(end).unwrap_or(i64::MAX),
551 leaf_path,
552 ])?;
553 }
554 }
555 }
556
557 tx.execute(
559 "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
560 rusqlite::params![req.kind],
561 )?;
562
563 let now_ms = now_unix_ms();
565 tx.execute(
566 "UPDATE fts_property_rebuild_state \
567 SET state = 'COMPLETE', last_progress_at = ?1 \
568 WHERE kind = ?2",
569 rusqlite::params![now_ms, req.kind],
570 )?;
571
572 tx.commit()?;
573 }
574
575 Ok(())
576}
577
578fn mark_failed(
579 conn: &rusqlite::Connection,
580 kind: &str,
581 error_message: &str,
582) -> Result<(), EngineError> {
583 let now_ms = now_unix_ms();
584 conn.execute(
585 "UPDATE fts_property_rebuild_state \
586 SET state = 'FAILED', error_message = ?1, last_progress_at = ?2 \
587 WHERE kind = ?3",
588 rusqlite::params![error_message, now_ms, kind],
589 )?;
590 Ok(())
591}
592
593fn mark_failed_and_clear(
594 conn: &rusqlite::Connection,
595 kind: &str,
596 error_message: &str,
597) -> Result<(), EngineError> {
598 conn.execute(
599 "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
600 rusqlite::params![kind],
601 )?;
602 mark_failed(conn, kind, error_message)
603}
604
605fn abort_if_shutdown(
606 conn: &rusqlite::Connection,
607 kind: &str,
608 shutdown: &AtomicBool,
609) -> Result<(), EngineError> {
610 if shutdown.load(Ordering::Acquire) {
611 mark_failed_and_clear(conn, kind, "engine shutdown interrupted rebuild")?;
612 return Err(EngineError::Bridge(
613 "engine shutdown interrupted rebuild".to_owned(),
614 ));
615 }
616 Ok(())
617}
618
619fn ensure_fts_kind_table_for_schema(
620 conn: &rusqlite::Connection,
621 kind: &str,
622 schema: &crate::writer::PropertyFtsSchema,
623) -> Result<(), EngineError> {
624 let table = fathomdb_schema::fts_kind_table_name(kind);
625 let exists: bool = conn
626 .query_row(
627 "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 \
628 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
629 rusqlite::params![table],
630 |_| Ok(true),
631 )
632 .optional()?
633 .unwrap_or(false);
634 if exists {
635 return Ok(());
636 }
637
638 let tokenizer = fathomdb_schema::resolve_fts_tokenizer(conn, kind)?;
639 let tokenizer_sql = tokenizer.replace('\'', "''");
640 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
641 let cols: Vec<String> = if has_weights {
642 std::iter::once("node_logical_id UNINDEXED".to_owned())
643 .chain(schema.paths.iter().map(|p| {
644 let is_recursive = matches!(p.mode, crate::writer::PropertyPathMode::Recursive);
645 fathomdb_schema::fts_column_name(&p.path, is_recursive)
646 }))
647 .collect()
648 } else {
649 vec![
650 "node_logical_id UNINDEXED".to_owned(),
651 "text_content".to_owned(),
652 ]
653 };
654 conn.execute_batch(&format!(
655 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5({cols}, tokenize='{tokenizer_sql}')",
656 cols = cols.join(", "),
657 ))?;
658 Ok(())
659}
660
661fn now_unix_ms() -> i64 {
662 now_unix_ms_pub()
663}
664
665pub(crate) fn now_unix_ms_pub() -> i64 {
667 std::time::SystemTime::now()
668 .duration_since(std::time::UNIX_EPOCH)
669 .unwrap_or(Duration::ZERO)
670 .as_millis()
671 .try_into()
672 .unwrap_or(i64::MAX)
673}
674
675#[derive(Debug)]
677pub struct RebuildStateRow {
678 pub kind: String,
679 pub schema_id: i64,
680 pub state: String,
681 pub rows_total: Option<i64>,
682 pub rows_done: i64,
683 pub started_at: i64,
684 pub is_first_registration: bool,
685 pub error_message: Option<String>,
686}
687
688#[derive(Debug, Clone, serde::Serialize)]
691pub struct RebuildProgress {
692 pub state: String,
694 pub rows_total: Option<i64>,
696 pub rows_done: i64,
698 pub started_at: i64,
700 pub last_progress_at: Option<i64>,
702 pub error_message: Option<String>,
704}
705
706pub(crate) fn recover_interrupted_rebuilds(
712 conn: &rusqlite::Connection,
713) -> Result<(), crate::EngineError> {
714 let kinds: Vec<String> = {
716 let mut stmt = conn.prepare(
717 "SELECT kind FROM fts_property_rebuild_state \
718 WHERE state IN ('BUILDING', 'SWAPPING')",
719 )?;
720 stmt.query_map([], |r| r.get::<_, String>(0))?
721 .collect::<Result<Vec<_>, _>>()?
722 };
723
724 for kind in &kinds {
725 conn.execute(
726 "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
727 rusqlite::params![kind],
728 )?;
729 conn.execute(
730 "UPDATE fts_property_rebuild_state \
731 SET state = 'FAILED', error_message = 'interrupted by engine restart' \
732 WHERE kind = ?1",
733 rusqlite::params![kind],
734 )?;
735 }
736
737 Ok(())
738}
739
740#[cfg(test)]
741#[allow(clippy::expect_used)]
742mod tests {
743 use rusqlite::Connection;
744
745 use fathomdb_schema::SchemaManager;
746
747 use super::recover_interrupted_rebuilds;
748
749 fn bootstrapped_conn() -> Connection {
750 let conn = Connection::open_in_memory().expect("in-memory sqlite");
751 let manager = SchemaManager::new();
752 manager.bootstrap(&conn).expect("bootstrap");
753 conn
754 }
755
756 fn insert_rebuild_state(conn: &Connection, kind: &str, state: &str) {
757 conn.execute(
758 "INSERT INTO fts_property_rebuild_state \
759 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
760 VALUES (?1, 1, ?2, 0, 0, 0)",
761 rusqlite::params![kind, state],
762 )
763 .expect("insert rebuild state");
764 }
765
766 #[test]
767 fn pending_row_survives_restart() {
768 let conn = bootstrapped_conn();
769 insert_rebuild_state(&conn, "MyKind", "PENDING");
770
771 recover_interrupted_rebuilds(&conn).expect("recover");
772
773 let state: String = conn
774 .query_row(
775 "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
776 [],
777 |r| r.get(0),
778 )
779 .expect("state row");
780 assert_eq!(state, "PENDING", "PENDING rows must survive engine restart");
781 }
782
783 #[test]
784 fn building_row_marked_failed_on_restart() {
785 let conn = bootstrapped_conn();
786 insert_rebuild_state(&conn, "MyKind", "BUILDING");
787
788 recover_interrupted_rebuilds(&conn).expect("recover");
789
790 let state: String = conn
791 .query_row(
792 "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
793 [],
794 |r| r.get(0),
795 )
796 .expect("state row");
797 assert_eq!(
798 state, "FAILED",
799 "BUILDING rows must be marked FAILED on restart"
800 );
801 }
802
803 #[test]
804 fn swapping_row_marked_failed_on_restart() {
805 let conn = bootstrapped_conn();
806 insert_rebuild_state(&conn, "MyKind", "SWAPPING");
807
808 recover_interrupted_rebuilds(&conn).expect("recover");
809
810 let state: String = conn
811 .query_row(
812 "SELECT state FROM fts_property_rebuild_state WHERE kind = 'MyKind'",
813 [],
814 |r| r.get(0),
815 )
816 .expect("state row");
817 assert_eq!(
818 state, "FAILED",
819 "SWAPPING rows must be marked FAILED on restart"
820 );
821 }
822
823 #[test]
825 fn rebuild_swap_populates_per_kind_table() {
826 let mut conn = bootstrapped_conn();
830 let kind = "TestKind";
831 let table = fathomdb_schema::fts_kind_table_name(kind);
832
833 conn.execute(
838 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
839 VALUES ('row-1', 'node-1', ?1, '{\"name\":\"hello world\"}', 100, 'seed')",
840 rusqlite::params![kind],
841 )
842 .expect("insert node");
843
844 let schema_id: i64 = {
846 conn.execute(
847 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
848 VALUES (?1, '[\"$.name\"]', ' ')",
849 rusqlite::params![kind],
850 )
851 .expect("insert schema");
852 conn.query_row(
853 "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
854 rusqlite::params![kind],
855 |r| r.get(0),
856 )
857 .expect("schema_id")
858 };
859
860 conn.execute(
862 "INSERT INTO fts_property_rebuild_state \
863 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
864 VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
865 rusqlite::params![kind, schema_id],
866 )
867 .expect("insert rebuild state");
868
869 let req = super::RebuildRequest {
871 kind: kind.to_owned(),
872 schema_id,
873 };
874 let shutdown = std::sync::atomic::AtomicBool::new(false);
875 super::run_rebuild(&mut conn, &req, &shutdown).expect("run_rebuild");
876
877 let per_kind_count: i64 = conn
879 .query_row(
880 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'node-1'"),
881 [],
882 |r| r.get(0),
883 )
884 .expect("per-kind count");
885 assert_eq!(
886 per_kind_count, 1,
887 "per-kind table must have the rebuilt row after run_rebuild"
888 );
889 }
890
891 #[test]
894 fn rebuild_actor_uses_per_column_for_weighted_schema() {
895 let mut conn = bootstrapped_conn();
896 let kind = "Article";
897 let table = fathomdb_schema::fts_kind_table_name(kind);
898
899 let title_col = fathomdb_schema::fts_column_name("$.title", false);
900 let body_col = fathomdb_schema::fts_column_name("$.body", false);
901
902 conn.execute(
904 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
905 VALUES ('row-1', 'article-1', ?1, '{\"title\":\"Hello\",\"body\":\"World\"}', 100, 'seed')",
906 rusqlite::params![kind],
907 )
908 .expect("insert node");
909
910 let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
912 let schema_id: i64 = {
913 conn.execute(
914 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
915 VALUES (?1, ?2, ' ')",
916 rusqlite::params![kind, paths_json],
917 )
918 .expect("insert schema");
919 conn.query_row(
920 "SELECT rowid FROM fts_property_schemas WHERE kind = ?1",
921 rusqlite::params![kind],
922 |r| r.get(0),
923 )
924 .expect("schema_id")
925 };
926
927 conn.execute_batch(&format!(
929 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
930 node_logical_id UNINDEXED, {title_col}, {body_col}, \
931 tokenize = 'porter unicode61 remove_diacritics 2'\
932 )"
933 ))
934 .expect("create weighted per-kind table");
935
936 conn.execute(
938 "INSERT INTO fts_property_rebuild_state \
939 (kind, schema_id, state, rows_done, started_at, is_first_registration) \
940 VALUES (?1, ?2, 'PENDING', 0, 0, 1)",
941 rusqlite::params![kind, schema_id],
942 )
943 .expect("insert rebuild state");
944
945 let req = super::RebuildRequest {
947 kind: kind.to_owned(),
948 schema_id,
949 };
950 let shutdown = std::sync::atomic::AtomicBool::new(false);
951 super::run_rebuild(&mut conn, &req, &shutdown).expect("run_rebuild");
952
953 let count: i64 = conn
955 .query_row(
956 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'article-1'"),
957 [],
958 |r| r.get(0),
959 )
960 .expect("count");
961 assert_eq!(count, 1, "per-kind table must have the rebuilt row");
962
963 let (title_val, body_val): (String, String) = conn
965 .query_row(
966 &format!(
967 "SELECT {title_col}, {body_col} FROM {table} \
968 WHERE node_logical_id = 'article-1'"
969 ),
970 [],
971 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
972 )
973 .expect("select per-column");
974 assert_eq!(
975 title_val, "Hello",
976 "title column must have correct value after rebuild"
977 );
978 assert_eq!(
979 body_val, "World",
980 "body column must have correct value after rebuild"
981 );
982 }
983}