1use std::path::Path;
6use std::sync::Arc;
7use std::sync::mpsc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use fathomdb_schema::SchemaManager;
12use rusqlite::OptionalExtension;
13
14use crate::{EngineError, sqlite};
15
16#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum RebuildMode {
19 Eager,
21 #[default]
23 Async,
24}
25
26#[derive(Debug)]
28pub struct RebuildRequest {
29 pub kind: String,
30 pub schema_id: i64,
31}
32
33#[derive(Debug)]
43pub struct RebuildActor {
44 thread_handle: Option<thread::JoinHandle<()>>,
45}
46
47impl RebuildActor {
48 pub fn start(
57 path: impl AsRef<Path>,
58 schema_manager: Arc<SchemaManager>,
59 receiver: mpsc::Receiver<RebuildRequest>,
60 ) -> Result<Self, EngineError> {
61 let database_path = path.as_ref().to_path_buf();
62
63 let handle = thread::Builder::new()
64 .name("fathomdb-rebuild".to_owned())
65 .spawn(move || {
66 rebuild_loop(&database_path, &schema_manager, receiver);
67 })
68 .map_err(EngineError::Io)?;
69
70 Ok(Self {
71 thread_handle: Some(handle),
72 })
73 }
74}
75
76impl Drop for RebuildActor {
77 fn drop(&mut self) {
78 if let Some(handle) = self.thread_handle.take() {
81 match handle.join() {
82 Ok(()) => {}
83 Err(payload) => {
84 if std::thread::panicking() {
85 trace_warn!(
86 "rebuild thread panicked during shutdown (suppressed: already panicking)"
87 );
88 } else {
89 std::panic::resume_unwind(payload);
90 }
91 }
92 }
93 }
94 }
95}
96
97const BATCH_TARGET_MS: u128 = 1000;
101const INITIAL_BATCH_SIZE: usize = 5000;
103
104fn rebuild_loop(
105 database_path: &Path,
106 schema_manager: &Arc<SchemaManager>,
107 receiver: mpsc::Receiver<RebuildRequest>,
108) {
109 trace_info!("rebuild thread started");
110
111 let mut conn = match sqlite::open_connection(database_path) {
112 Ok(conn) => conn,
113 #[allow(clippy::used_underscore_binding)]
114 Err(_error) => {
115 trace_error!(error = %_error, "rebuild thread: database connection failed");
116 return;
117 }
118 };
119
120 #[allow(clippy::used_underscore_binding)]
121 if let Err(_error) = schema_manager.bootstrap(&conn) {
122 trace_error!(error = %_error, "rebuild thread: schema bootstrap failed");
123 return;
124 }
125
126 for req in receiver {
127 trace_info!(kind = %req.kind, schema_id = req.schema_id, "rebuild task started");
128 match run_rebuild(&mut conn, &req) {
129 Ok(()) => {
130 trace_info!(kind = %req.kind, "rebuild task COMPLETE");
131 }
132 Err(error) => {
133 trace_error!(kind = %req.kind, error = %error, "rebuild task failed");
134 let _ = mark_failed(&conn, &req.kind, &error.to_string());
135 }
136 }
137 }
138
139 trace_info!("rebuild thread exiting");
140}
141
142#[allow(clippy::too_many_lines)]
143fn run_rebuild(conn: &mut rusqlite::Connection, req: &RebuildRequest) -> Result<(), EngineError> {
144 {
146 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
147 tx.execute(
148 "UPDATE fts_property_rebuild_state SET state = 'BUILDING' \
149 WHERE kind = ?1 AND schema_id = ?2",
150 rusqlite::params![req.kind, req.schema_id],
151 )?;
152 tx.commit()?;
153 }
154
155 let rows_total: i64 = conn.query_row(
157 "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
158 rusqlite::params![req.kind],
159 |r| r.get(0),
160 )?;
161
162 {
163 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
164 tx.execute(
165 "UPDATE fts_property_rebuild_state SET rows_total = ?1 WHERE kind = ?2",
166 rusqlite::params![rows_total, req.kind],
167 )?;
168 tx.commit()?;
169 }
170
171 let (paths_json, separator): (String, String) = conn
173 .query_row(
174 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
175 rusqlite::params![req.kind],
176 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
177 )
178 .optional()?
179 .ok_or_else(|| {
180 EngineError::Bridge(format!("rebuild: schema for kind '{}' missing", req.kind))
181 })?;
182 let schema = crate::writer::parse_property_schema_json(&paths_json, &separator);
183
184 let mut offset: i64 = 0;
186 let mut batch_size = INITIAL_BATCH_SIZE;
187 let mut rows_done: i64 = 0;
188
189 loop {
190 let batch: Vec<(String, String)> = {
192 let mut stmt = conn.prepare(
193 "SELECT logical_id, properties FROM nodes \
194 WHERE kind = ?1 AND superseded_at IS NULL \
195 ORDER BY logical_id \
196 LIMIT ?2 OFFSET ?3",
197 )?;
198 stmt.query_map(
199 rusqlite::params![
200 req.kind,
201 i64::try_from(batch_size).unwrap_or(i64::MAX),
202 offset
203 ],
204 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
205 )?
206 .collect::<Result<Vec<_>, _>>()?
207 };
208
209 if batch.is_empty() {
210 break;
211 }
212
213 let batch_len = batch.len();
214 let batch_start = Instant::now();
215
216 {
218 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
219
220 for (logical_id, properties_str) in &batch {
221 let props: serde_json::Value =
222 serde_json::from_str(properties_str).unwrap_or_default();
223 let (text, positions, _stats) =
224 crate::writer::extract_property_fts(&props, &schema);
225
226 let positions_blob: Option<Vec<u8>> = if positions.is_empty() {
228 None
229 } else {
230 let v: Vec<(usize, usize, &str)> = positions
231 .iter()
232 .map(|p| (p.start_offset, p.end_offset, p.leaf_path.as_str()))
233 .collect();
234 serde_json::to_vec(&v).ok()
235 };
236
237 let text_content = text.unwrap_or_default();
238
239 tx.execute(
240 "INSERT INTO fts_property_rebuild_staging \
241 (kind, node_logical_id, text_content, positions_blob) \
242 VALUES (?1, ?2, ?3, ?4) \
243 ON CONFLICT(kind, node_logical_id) DO UPDATE \
244 SET text_content = excluded.text_content, \
245 positions_blob = excluded.positions_blob",
246 rusqlite::params![req.kind, logical_id, text_content, positions_blob],
247 )?;
248 }
249
250 rows_done += i64::try_from(batch_len).unwrap_or(i64::MAX);
251 let now_ms = now_unix_ms();
252 tx.execute(
253 "UPDATE fts_property_rebuild_state \
254 SET rows_done = ?1, last_progress_at = ?2 \
255 WHERE kind = ?3",
256 rusqlite::params![rows_done, now_ms, req.kind],
257 )?;
258 tx.commit()?;
259 }
260
261 let elapsed_ms = batch_start.elapsed().as_millis();
262 let limit_used = batch_size;
264 if elapsed_ms > 0 {
266 let new_size = (batch_size as u128 * BATCH_TARGET_MS / elapsed_ms).clamp(100, 50_000);
267 batch_size = usize::try_from(new_size).unwrap_or(50_000);
268 }
269
270 offset += i64::try_from(batch_len).unwrap_or(i64::MAX);
271
272 if batch_len < limit_used {
274 break;
275 }
276 }
277
278 {
280 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
281 let now_ms = now_unix_ms();
282 tx.execute(
283 "UPDATE fts_property_rebuild_state \
284 SET state = 'SWAPPING', last_progress_at = ?1 \
285 WHERE kind = ?2",
286 rusqlite::params![now_ms, req.kind],
287 )?;
288 tx.commit()?;
289 }
290
291 {
293 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
294
295 tx.execute(
297 "DELETE FROM fts_node_properties WHERE kind = ?1",
298 rusqlite::params![req.kind],
299 )?;
300
301 tx.execute(
303 "INSERT INTO fts_node_properties(node_logical_id, kind, text_content) \
304 SELECT node_logical_id, kind, text_content \
305 FROM fts_property_rebuild_staging WHERE kind = ?1",
306 rusqlite::params![req.kind],
307 )?;
308
309 tx.execute(
311 "DELETE FROM fts_node_property_positions WHERE kind = ?1",
312 rusqlite::params![req.kind],
313 )?;
314
315 {
317 let mut stmt = tx.prepare(
318 "SELECT node_logical_id, positions_blob \
319 FROM fts_property_rebuild_staging \
320 WHERE kind = ?1 AND positions_blob IS NOT NULL",
321 )?;
322 let mut ins_pos = tx.prepare(
323 "INSERT INTO fts_node_property_positions \
324 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
325 VALUES (?1, ?2, ?3, ?4, ?5)",
326 )?;
327
328 let rows: Vec<(String, Vec<u8>)> = stmt
329 .query_map(rusqlite::params![req.kind], |r| {
330 Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?))
331 })?
332 .collect::<Result<Vec<_>, _>>()?;
333
334 for (node_logical_id, blob) in &rows {
335 let positions: Vec<(usize, usize, String)> =
337 serde_json::from_slice(blob).unwrap_or_default();
338 for (start, end, leaf_path) in positions {
339 ins_pos.execute(rusqlite::params![
340 node_logical_id,
341 req.kind,
342 i64::try_from(start).unwrap_or(i64::MAX),
343 i64::try_from(end).unwrap_or(i64::MAX),
344 leaf_path,
345 ])?;
346 }
347 }
348 }
349
350 tx.execute(
352 "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
353 rusqlite::params![req.kind],
354 )?;
355
356 let now_ms = now_unix_ms();
358 tx.execute(
359 "UPDATE fts_property_rebuild_state \
360 SET state = 'COMPLETE', last_progress_at = ?1 \
361 WHERE kind = ?2",
362 rusqlite::params![now_ms, req.kind],
363 )?;
364
365 tx.commit()?;
366 }
367
368 Ok(())
369}
370
371fn mark_failed(
372 conn: &rusqlite::Connection,
373 kind: &str,
374 error_message: &str,
375) -> Result<(), EngineError> {
376 let now_ms = now_unix_ms();
377 conn.execute(
378 "UPDATE fts_property_rebuild_state \
379 SET state = 'FAILED', error_message = ?1, last_progress_at = ?2 \
380 WHERE kind = ?3",
381 rusqlite::params![error_message, now_ms, kind],
382 )?;
383 Ok(())
384}
385
386fn now_unix_ms() -> i64 {
387 now_unix_ms_pub()
388}
389
390pub(crate) fn now_unix_ms_pub() -> i64 {
392 std::time::SystemTime::now()
393 .duration_since(std::time::UNIX_EPOCH)
394 .unwrap_or(Duration::ZERO)
395 .as_millis()
396 .try_into()
397 .unwrap_or(i64::MAX)
398}
399
400#[derive(Debug)]
402pub struct RebuildStateRow {
403 pub kind: String,
404 pub schema_id: i64,
405 pub state: String,
406 pub rows_total: Option<i64>,
407 pub rows_done: i64,
408 pub started_at: i64,
409 pub is_first_registration: bool,
410 pub error_message: Option<String>,
411}
412
413#[derive(Debug, Clone, serde::Serialize)]
416pub struct RebuildProgress {
417 pub state: String,
419 pub rows_total: Option<i64>,
421 pub rows_done: i64,
423 pub started_at: i64,
425 pub last_progress_at: Option<i64>,
427 pub error_message: Option<String>,
429}
430
431pub(crate) fn recover_interrupted_rebuilds(
437 conn: &rusqlite::Connection,
438) -> Result<(), crate::EngineError> {
439 let kinds: Vec<String> = {
441 let mut stmt = conn.prepare(
442 "SELECT kind FROM fts_property_rebuild_state \
443 WHERE state IN ('PENDING', 'BUILDING', 'SWAPPING')",
444 )?;
445 stmt.query_map([], |r| r.get::<_, String>(0))?
446 .collect::<Result<Vec<_>, _>>()?
447 };
448
449 for kind in &kinds {
450 conn.execute(
451 "DELETE FROM fts_property_rebuild_staging WHERE kind = ?1",
452 rusqlite::params![kind],
453 )?;
454 conn.execute(
455 "UPDATE fts_property_rebuild_state \
456 SET state = 'FAILED', error_message = 'interrupted by engine restart' \
457 WHERE kind = ?1",
458 rusqlite::params![kind],
459 )?;
460 }
461
462 Ok(())
463}