1use rusqlite::{DatabaseName, OptionalExtension, TransactionBehavior};
2use sha2::{Digest, Sha256};
3use std::fs;
4use std::io;
5use std::path::Path;
6use std::time::SystemTime;
7
8use crate::ids::new_id;
9use crate::{EngineError, SkippedEdge};
10
11use super::{
12 AdminService, EXPORT_PROTOCOL_VERSION, LogicalPurgeReport, LogicalRestoreReport,
13 ProvenancePurgeOptions, ProvenancePurgeReport, SafeExportManifest, SafeExportOptions,
14 TraceReport, clear_operational_current_rows, i64_to_usize, persist_simple_provenance_event,
15 rebuild_operational_current_rows,
16};
17
18impl AdminService {
19 pub fn trace_source(&self, source_ref: &str) -> Result<TraceReport, EngineError> {
22 let conn = self.connect()?;
23
24 let node_logical_ids = collect_strings(
25 &conn,
26 "SELECT logical_id FROM nodes WHERE source_ref = ?1 ORDER BY created_at",
27 source_ref,
28 )?;
29 let action_ids = collect_strings(
30 &conn,
31 "SELECT id FROM actions WHERE source_ref = ?1 ORDER BY created_at",
32 source_ref,
33 )?;
34 let operational_mutation_ids = collect_strings(
35 &conn,
36 "SELECT id FROM operational_mutations WHERE source_ref = ?1 ORDER BY mutation_order",
37 source_ref,
38 )?;
39
40 Ok(TraceReport {
41 source_ref: source_ref.to_owned(),
42 node_rows: count_source_ref(&conn, "nodes", source_ref)?,
43 edge_rows: count_source_ref(&conn, "edges", source_ref)?,
44 action_rows: count_source_ref(&conn, "actions", source_ref)?,
45 operational_mutation_rows: count_source_ref(
46 &conn,
47 "operational_mutations",
48 source_ref,
49 )?,
50 node_logical_ids,
51 action_ids,
52 operational_mutation_ids,
53 })
54 }
55
56 #[allow(clippy::too_many_lines)]
60 pub fn restore_logical_id(
61 &self,
62 logical_id: &str,
63 ) -> Result<LogicalRestoreReport, EngineError> {
64 let mut conn = self.connect()?;
65 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
66
67 let active_count: i64 = tx.query_row(
68 "SELECT count(*) FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
69 [logical_id],
70 |row| row.get(0),
71 )?;
72 if active_count > 0 {
73 return Ok(LogicalRestoreReport {
74 logical_id: logical_id.to_owned(),
75 was_noop: true,
76 restored_node_rows: 0,
77 restored_edge_rows: 0,
78 restored_chunk_rows: 0,
79 restored_fts_rows: 0,
80 restored_property_fts_rows: 0,
81 restored_vec_rows: 0,
82 skipped_edges: Vec::new(),
83 notes: vec!["logical_id already active".to_owned()],
84 });
85 }
86
87 let restored_node: Option<(String, String)> = tx
88 .query_row(
89 "SELECT row_id, kind FROM nodes \
90 WHERE logical_id = ?1 AND superseded_at IS NOT NULL \
91 ORDER BY superseded_at DESC, created_at DESC, rowid DESC LIMIT 1",
92 [logical_id],
93 |row| Ok((row.get(0)?, row.get(1)?)),
94 )
95 .optional()?;
96 let (restored_node_row_id, restored_kind) = restored_node.ok_or_else(|| {
97 EngineError::InvalidWrite(format!("logical_id '{logical_id}' is not retired"))
98 })?;
99
100 tx.execute(
101 "UPDATE nodes SET superseded_at = NULL WHERE row_id = ?1",
102 [restored_node_row_id.as_str()],
103 )?;
104
105 let retire_scope: Option<(i64, Option<String>, i64)> = tx
106 .query_row(
107 "SELECT rowid, source_ref, created_at FROM provenance_events \
108 WHERE event_type = 'node_retire' AND subject = ?1 \
109 ORDER BY created_at DESC, rowid DESC LIMIT 1",
110 [logical_id],
111 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
112 )
113 .optional()?;
114 let (restored_edge_rows, skipped_edges) = if let Some((
115 retire_event_rowid,
116 retire_source_ref,
117 retire_created_at,
118 )) = retire_scope
119 {
120 restore_validated_edges(
121 &tx,
122 logical_id,
123 retire_source_ref.as_deref(),
124 retire_created_at,
125 retire_event_rowid,
126 )?
127 } else {
128 (0, Vec::new())
129 };
130
131 let restored_chunk_rows: usize = tx
132 .query_row(
133 "SELECT count(*) FROM chunks WHERE node_logical_id = ?1",
134 [logical_id],
135 |row| row.get::<_, i64>(0),
136 )
137 .map(i64_to_usize)?;
138 tx.execute(
139 "DELETE FROM fts_nodes WHERE node_logical_id = ?1",
140 [logical_id],
141 )?;
142 let restored_fts_rows = tx.execute(
143 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
144 SELECT id, node_logical_id, ?2, text_content \
145 FROM chunks WHERE node_logical_id = ?1",
146 rusqlite::params![logical_id, restored_kind],
147 )?;
148 let restored_vec_rows = count_vec_rows_for_logical_id(&tx, logical_id)?;
149
150 let table = fathomdb_schema::fts_kind_table_name(&restored_kind);
153 let fts_table_exists: bool = tx
154 .query_row(
155 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
156 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
157 rusqlite::params![table],
158 |r| r.get::<_, i64>(0),
159 )
160 .unwrap_or(0)
161 > 0;
162 if fts_table_exists {
163 tx.execute(
164 &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
165 [logical_id],
166 )?;
167 }
168 let restored_property_fts_rows =
169 rebuild_single_node_property_fts(&tx, logical_id, &restored_kind)?;
170
171 persist_simple_provenance_event(
172 &tx,
173 "restore_logical_id",
174 logical_id,
175 Some(serde_json::json!({
176 "restored_node_rows": 1,
177 "restored_edge_rows": restored_edge_rows,
178 "restored_chunk_rows": restored_chunk_rows,
179 "restored_fts_rows": restored_fts_rows,
180 "restored_property_fts_rows": restored_property_fts_rows,
181 "restored_vec_rows": restored_vec_rows,
182 })),
183 )?;
184 tx.commit()?;
185
186 Ok(LogicalRestoreReport {
187 logical_id: logical_id.to_owned(),
188 was_noop: false,
189 restored_node_rows: 1,
190 restored_edge_rows,
191 restored_chunk_rows,
192 restored_fts_rows,
193 restored_property_fts_rows,
194 restored_vec_rows,
195 skipped_edges,
196 notes: Vec::new(),
197 })
198 }
199
200 pub fn purge_logical_id(&self, logical_id: &str) -> Result<LogicalPurgeReport, EngineError> {
204 let mut conn = self.connect()?;
205 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
206
207 let active_count: i64 = tx.query_row(
208 "SELECT count(*) FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
209 [logical_id],
210 |row| row.get(0),
211 )?;
212 if active_count > 0 {
213 return Ok(LogicalPurgeReport {
214 logical_id: logical_id.to_owned(),
215 was_noop: true,
216 deleted_node_rows: 0,
217 deleted_edge_rows: 0,
218 deleted_chunk_rows: 0,
219 deleted_fts_rows: 0,
220 deleted_vec_rows: 0,
221 notes: vec!["logical_id is active; purge skipped".to_owned()],
222 });
223 }
224
225 let node_rows: i64 = tx.query_row(
226 "SELECT count(*) FROM nodes WHERE logical_id = ?1",
227 [logical_id],
228 |row| row.get(0),
229 )?;
230 if node_rows == 0 {
231 return Err(EngineError::InvalidWrite(format!(
232 "logical_id '{logical_id}' does not exist"
233 )));
234 }
235
236 let deleted_vec_rows = delete_vec_rows_for_logical_id(&tx, logical_id)?;
237 let deleted_fts_rows = tx.execute(
238 "DELETE FROM fts_nodes WHERE node_logical_id = ?1",
239 [logical_id],
240 )?;
241 let deleted_edge_rows = tx.execute(
242 "DELETE FROM edges WHERE source_logical_id = ?1 OR target_logical_id = ?1",
243 [logical_id],
244 )?;
245 let deleted_chunk_rows = tx.execute(
246 "DELETE FROM chunks WHERE node_logical_id = ?1",
247 [logical_id],
248 )?;
249 let deleted_node_rows =
250 tx.execute("DELETE FROM nodes WHERE logical_id = ?1", [logical_id])?;
251 tx.execute(
252 "DELETE FROM node_access_metadata WHERE logical_id = ?1",
253 [logical_id],
254 )?;
255
256 persist_simple_provenance_event(
257 &tx,
258 "purge_logical_id",
259 logical_id,
260 Some(serde_json::json!({
261 "deleted_node_rows": deleted_node_rows,
262 "deleted_edge_rows": deleted_edge_rows,
263 "deleted_chunk_rows": deleted_chunk_rows,
264 "deleted_fts_rows": deleted_fts_rows,
265 "deleted_vec_rows": deleted_vec_rows,
266 })),
267 )?;
268 tx.commit()?;
269
270 Ok(LogicalPurgeReport {
271 logical_id: logical_id.to_owned(),
272 was_noop: false,
273 deleted_node_rows,
274 deleted_edge_rows,
275 deleted_chunk_rows,
276 deleted_fts_rows,
277 deleted_vec_rows,
278 notes: Vec::new(),
279 })
280 }
281
282 pub fn purge_provenance_events(
292 &self,
293 before_timestamp: i64,
294 options: &ProvenancePurgeOptions,
295 ) -> Result<ProvenancePurgeReport, EngineError> {
296 let mut conn = self.connect()?;
297 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
298
299 let preserved_types: Vec<&str> = if options.preserve_event_types.is_empty() {
300 vec!["excise", "purge_logical_id"]
301 } else {
302 options
303 .preserve_event_types
304 .iter()
305 .map(String::as_str)
306 .collect()
307 };
308
309 let placeholders: String = (0..preserved_types.len())
311 .map(|i| format!("?{}", i + 2))
312 .collect::<Vec<_>>()
313 .join(", ");
314 let count_query = format!(
315 "SELECT count(*) FROM provenance_events \
316 WHERE created_at < ?1 AND event_type NOT IN ({placeholders})"
317 );
318 let delete_query = format!(
319 "DELETE FROM provenance_events WHERE rowid IN (\
320 SELECT rowid FROM provenance_events \
321 WHERE created_at < ?1 AND event_type NOT IN ({placeholders}) \
322 LIMIT 10000)"
323 );
324
325 let bind_params = |stmt: &mut rusqlite::Statement<'_>| -> Result<(), rusqlite::Error> {
326 stmt.raw_bind_parameter(1, before_timestamp)?;
327 for (i, event_type) in preserved_types.iter().enumerate() {
328 stmt.raw_bind_parameter(i + 2, *event_type)?;
329 }
330 Ok(())
331 };
332
333 let events_deleted = if options.dry_run {
334 let mut stmt = tx.prepare(&count_query)?;
335 bind_params(&mut stmt)?;
336 stmt.raw_query()
337 .next()?
338 .map_or(0, |row| row.get::<_, u64>(0).unwrap_or(0))
339 } else {
340 let mut total_deleted: u64 = 0;
341 loop {
342 let mut stmt = tx.prepare(&delete_query)?;
343 bind_params(&mut stmt)?;
344 let deleted = stmt.raw_execute()?;
345 if deleted == 0 {
346 break;
347 }
348 total_deleted += deleted as u64;
349 }
350 total_deleted
351 };
352
353 let total_after: u64 =
354 tx.query_row("SELECT count(*) FROM provenance_events", [], |row| {
355 row.get(0)
356 })?;
357
358 let oldest_remaining: Option<i64> = tx
359 .query_row("SELECT MIN(created_at) FROM provenance_events", [], |row| {
360 row.get(0)
361 })
362 .optional()?
363 .flatten();
364
365 if !options.dry_run {
366 tx.commit()?;
367 }
368
369 let events_preserved = if options.dry_run {
372 total_after - events_deleted
373 } else {
374 total_after
375 };
376
377 Ok(ProvenancePurgeReport {
378 events_deleted,
379 events_preserved,
380 oldest_remaining,
381 })
382 }
383
384 #[allow(clippy::too_many_lines)]
388 pub fn excise_source(&self, source_ref: &str) -> Result<TraceReport, EngineError> {
389 let mut conn = self.connect()?;
390
391 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
392 let affected_operational_collections = collect_strings_tx(
393 &tx,
394 "SELECT DISTINCT m.collection_name \
395 FROM operational_mutations m \
396 JOIN operational_collections c ON c.name = m.collection_name \
397 WHERE m.source_ref = ?1 AND c.kind = 'latest_state' \
398 ORDER BY m.collection_name",
399 source_ref,
400 )?;
401
402 let pairs: Vec<(String, String)> = {
404 let mut stmt = tx.prepare(
405 "SELECT row_id, logical_id FROM nodes \
406 WHERE source_ref = ?1 AND superseded_at IS NULL",
407 )?;
408 stmt.query_map([source_ref], |row| {
409 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
410 })?
411 .collect::<Result<Vec<_>, _>>()?
412 };
413 let affected_logical_ids: Vec<String> = pairs
414 .iter()
415 .map(|(_, logical_id)| logical_id.clone())
416 .collect();
417
418 tx.execute(
420 "UPDATE nodes SET superseded_at = unixepoch() \
421 WHERE source_ref = ?1 AND superseded_at IS NULL",
422 [source_ref],
423 )?;
424 tx.execute(
425 "UPDATE edges SET superseded_at = unixepoch() \
426 WHERE source_ref = ?1 AND superseded_at IS NULL",
427 [source_ref],
428 )?;
429 tx.execute(
430 "UPDATE actions SET superseded_at = unixepoch() \
431 WHERE source_ref = ?1 AND superseded_at IS NULL",
432 [source_ref],
433 )?;
434 clear_operational_current_rows(&tx, &affected_operational_collections)?;
435 tx.execute(
436 "DELETE FROM operational_mutations WHERE source_ref = ?1",
437 [source_ref],
438 )?;
439 for logical_id in &affected_logical_ids {
440 delete_vec_rows_for_logical_id(&tx, logical_id)?;
441 tx.execute(
442 "DELETE FROM chunks WHERE node_logical_id = ?1",
443 [logical_id.as_str()],
444 )?;
445 }
446
447 for (excised_row_id, logical_id) in &pairs {
449 let prior: Option<String> = tx
450 .query_row(
451 "SELECT row_id FROM nodes \
452 WHERE logical_id = ?1 AND row_id != ?2 \
453 ORDER BY created_at DESC LIMIT 1",
454 [logical_id.as_str(), excised_row_id.as_str()],
455 |row| row.get(0),
456 )
457 .optional()?;
458 if let Some(prior_id) = prior {
459 tx.execute(
460 "UPDATE nodes SET superseded_at = NULL WHERE row_id = ?1",
461 [prior_id.as_str()],
462 )?;
463 }
464 }
465
466 for logical_id in &affected_logical_ids {
467 let has_active_node = tx
468 .query_row(
469 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
470 [logical_id.as_str()],
471 |row| row.get::<_, i64>(0),
472 )
473 .optional()?
474 .is_some();
475 if !has_active_node {
476 tx.execute(
477 "DELETE FROM node_access_metadata WHERE logical_id = ?1",
478 [logical_id.as_str()],
479 )?;
480 }
481 }
482
483 rebuild_operational_current_rows(&tx, &affected_operational_collections)?;
484
485 tx.execute("DELETE FROM fts_nodes", [])?;
488 tx.execute(
489 r"
490 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
491 SELECT c.id, n.logical_id, n.kind, c.text_content
492 FROM chunks c
493 JOIN nodes n
494 ON n.logical_id = c.node_logical_id
495 AND n.superseded_at IS NULL
496 ",
497 [],
498 )?;
499
500 rebuild_property_fts_in_tx(&tx)?;
502
503 tx.execute(
507 "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
508 VALUES (?1, 'excise_source', ?2, ?2)",
509 rusqlite::params![new_id(), source_ref],
510 )?;
511
512 tx.commit()?;
513
514 self.trace_source(source_ref)
515 }
516
517 pub fn safe_export(
521 &self,
522 destination_path: impl AsRef<Path>,
523 options: SafeExportOptions,
524 ) -> Result<SafeExportManifest, EngineError> {
525 let destination_path = destination_path.as_ref();
526
527 let conn = self.connect()?;
531
532 if options.force_checkpoint {
533 trace_info!("safe_export: wal checkpoint started");
534 let (busy, log, checkpointed): (i64, i64, i64) =
535 conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
536 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
537 })?;
538 if busy != 0 {
539 trace_warn!(
540 busy,
541 log_frames = log,
542 checkpointed_frames = checkpointed,
543 "safe_export: wal checkpoint blocked by active readers"
544 );
545 return Err(EngineError::Bridge(format!(
546 "WAL checkpoint blocked: {busy} active reader(s) prevented a full checkpoint; \
547 log frames={log}, checkpointed={checkpointed}; \
548 retry export when no readers are active"
549 )));
550 }
551 trace_info!(
552 log_frames = log,
553 checkpointed_frames = checkpointed,
554 "safe_export: wal checkpoint completed"
555 );
556 }
557
558 let schema_version: u32 = conn
559 .query_row(
560 "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
561 [],
562 |row| row.get(0),
563 )
564 .unwrap_or(0);
565
566 if let Some(parent) = destination_path.parent() {
569 fs::create_dir_all(parent)?;
570 }
571 conn.backup(DatabaseName::Main, destination_path, None)?;
572
573 drop(conn);
574
575 let page_count: u64 = {
579 let export_conn = rusqlite::Connection::open_with_flags(
580 destination_path,
581 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
582 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
583 )?;
584 export_conn.query_row("PRAGMA page_count", [], |row| row.get(0))?
585 };
586
587 let sha256 = {
590 let mut file = fs::File::open(destination_path)?;
591 let mut hasher = Sha256::new();
592 io::copy(&mut file, &mut hasher)?;
593 format!("{:x}", hasher.finalize())
594 };
595
596 let exported_at = SystemTime::now()
598 .duration_since(SystemTime::UNIX_EPOCH)
599 .map_err(|e| EngineError::Bridge(format!("system clock error: {e}")))?
600 .as_secs();
601
602 let manifest = SafeExportManifest {
603 exported_at,
604 sha256,
605 schema_version,
606 protocol_version: EXPORT_PROTOCOL_VERSION,
607 page_count,
608 };
609
610 let manifest_path = {
612 let mut p = destination_path.to_path_buf();
613 let stem = p
614 .file_name()
615 .map(|n| format!("{}.export-manifest.json", n.to_string_lossy()))
616 .ok_or_else(|| {
617 EngineError::Bridge("destination path has no filename".to_owned())
618 })?;
619 p.set_file_name(stem);
620 p
621 };
622 let manifest_json =
623 serde_json::to_string(&manifest).map_err(|e| EngineError::Bridge(e.to_string()))?;
624
625 let manifest_tmp = manifest_path.with_extension("json.tmp");
628 if let Err(e) = fs::write(&manifest_tmp, &manifest_json)
629 .and_then(|()| fs::rename(&manifest_tmp, &manifest_path))
630 {
631 let _ = fs::remove_file(&manifest_tmp);
632 return Err(e.into());
633 }
634
635 Ok(manifest)
636 }
637}
638
639pub(super) fn rebuild_property_fts_in_tx(
640 conn: &rusqlite::Connection,
641) -> Result<usize, EngineError> {
642 let all_per_kind_tables: Vec<String> = {
645 let mut stmt = conn.prepare(
646 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'fts_props_%' \
647 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
648 )?;
649 stmt.query_map([], |r| r.get::<_, String>(0))?
650 .collect::<Result<Vec<_>, _>>()?
651 };
652 for table in &all_per_kind_tables {
653 conn.execute_batch(&format!("DELETE FROM {table}"))?;
654 }
655 conn.execute("DELETE FROM fts_node_property_positions", [])?;
656 let inserted = crate::projection::insert_property_fts_rows(
657 conn,
658 "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
659 )?;
660 Ok(inserted)
661}
662
663pub(super) fn rebuild_single_node_property_fts(
666 conn: &rusqlite::Connection,
667 logical_id: &str,
668 kind: &str,
669) -> Result<usize, EngineError> {
670 let schema: Option<(String, String)> = conn
671 .query_row(
672 "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
673 [kind],
674 |row| {
675 let paths_json: String = row.get(0)?;
676 let separator: String = row.get(1)?;
677 Ok((paths_json, separator))
678 },
679 )
680 .optional()?;
681 let Some((paths_json, separator)) = schema else {
682 return Ok(0);
683 };
684 let parsed = crate::writer::parse_property_schema_json(&paths_json, &separator);
685 let properties_str: Option<String> = conn
686 .query_row(
687 "SELECT properties FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
688 [logical_id],
689 |row| row.get(0),
690 )
691 .optional()?;
692 let Some(properties_str) = properties_str else {
693 return Ok(0);
694 };
695 let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
696 let (text, positions, _stats) = crate::writer::extract_property_fts(&props, &parsed);
697 let Some(text) = text else {
698 return Ok(0);
699 };
700 conn.execute(
701 "DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1",
702 rusqlite::params![logical_id],
703 )?;
704 let table = fathomdb_schema::fts_kind_table_name(kind);
705 let tok = fathomdb_schema::DEFAULT_FTS_TOKENIZER;
706 conn.execute_batch(&format!(
707 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
708 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = '{tok}')"
709 ))?;
710 conn.execute(
711 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"),
712 rusqlite::params![logical_id, text],
713 )?;
714 for pos in &positions {
715 conn.execute(
716 "INSERT INTO fts_node_property_positions \
717 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
718 VALUES (?1, ?2, ?3, ?4, ?5)",
719 rusqlite::params![
720 logical_id,
721 kind,
722 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
723 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
724 pos.leaf_path,
725 ],
726 )?;
727 }
728 Ok(1)
729}
730
731fn count_source_ref(
732 conn: &rusqlite::Connection,
733 table: &str,
734 source_ref: &str,
735) -> Result<usize, EngineError> {
736 let sql = match table {
737 "nodes" => "SELECT count(*) FROM nodes WHERE source_ref = ?1",
738 "edges" => "SELECT count(*) FROM edges WHERE source_ref = ?1",
739 "actions" => "SELECT count(*) FROM actions WHERE source_ref = ?1",
740 "operational_mutations" => {
741 "SELECT count(*) FROM operational_mutations WHERE source_ref = ?1"
742 }
743 other => return Err(EngineError::Bridge(format!("unknown table: {other}"))),
744 };
745 let count: i64 = conn.query_row(sql, [source_ref], |row| row.get(0))?;
746 usize::try_from(count)
749 .map_err(|_| EngineError::Bridge(format!("count overflow for table {table}: {count}")))
750}
751
752fn collect_strings_tx(
753 tx: &rusqlite::Transaction<'_>,
754 sql: &str,
755 value: &str,
756) -> Result<Vec<String>, EngineError> {
757 let mut stmt = tx.prepare(sql)?;
758 let rows = stmt.query_map([value], |row| row.get::<_, String>(0))?;
759 rows.collect::<Result<Vec<_>, _>>()
760 .map_err(EngineError::from)
761}
762
763fn collect_strings(
768 conn: &rusqlite::Connection,
769 sql: &str,
770 param: &str,
771) -> Result<Vec<String>, EngineError> {
772 let mut stmt = conn.prepare(sql)?;
773 let values = stmt
774 .query_map([param], |row| row.get::<_, String>(0))?
775 .collect::<Result<Vec<_>, _>>()?;
776 Ok(values)
777}
778
779fn collect_edge_logical_ids_for_restore(
780 tx: &rusqlite::Transaction<'_>,
781 logical_id: &str,
782 retire_source_ref: Option<&str>,
783 retire_created_at: i64,
784 retire_event_rowid: i64,
785) -> Result<Vec<String>, EngineError> {
786 let mut stmt = tx.prepare(
787 "SELECT DISTINCT e.logical_id \
788 FROM edges e \
789 JOIN provenance_events p \
790 ON p.subject = e.logical_id \
791 AND p.event_type = 'edge_retire' \
792 AND ( \
793 p.created_at > ?3 \
794 OR (p.created_at = ?3 AND p.rowid >= ?4) \
795 ) \
796 AND ((?2 IS NULL AND p.source_ref IS NULL) OR p.source_ref = ?2) \
797 WHERE e.superseded_at IS NOT NULL \
798 AND (e.source_logical_id = ?1 OR e.target_logical_id = ?1) \
799 AND NOT EXISTS ( \
800 SELECT 1 FROM edges active \
801 WHERE active.logical_id = e.logical_id \
802 AND active.superseded_at IS NULL \
803 ) \
804 ORDER BY e.logical_id",
805 )?;
806 let edge_ids = stmt
807 .query_map(
808 rusqlite::params![
809 logical_id,
810 retire_source_ref,
811 retire_created_at,
812 retire_event_rowid
813 ],
814 |row| row.get::<_, String>(0),
815 )?
816 .collect::<Result<Vec<_>, _>>()?;
817 Ok(edge_ids)
818}
819
820fn restore_validated_edges(
823 tx: &rusqlite::Transaction<'_>,
824 logical_id: &str,
825 retire_source_ref: Option<&str>,
826 retire_created_at: i64,
827 retire_event_rowid: i64,
828) -> Result<(usize, Vec<SkippedEdge>), EngineError> {
829 let edge_logical_ids = collect_edge_logical_ids_for_restore(
830 tx,
831 logical_id,
832 retire_source_ref,
833 retire_created_at,
834 retire_event_rowid,
835 )?;
836 let mut restored = 0usize;
837 let mut skipped = Vec::new();
838 for edge_logical_id in &edge_logical_ids {
839 let edge_detail: Option<(String, String, String)> = tx
840 .query_row(
841 "SELECT row_id, source_logical_id, target_logical_id FROM edges \
842 WHERE logical_id = ?1 AND superseded_at IS NOT NULL \
843 ORDER BY superseded_at DESC, created_at DESC, rowid DESC LIMIT 1",
844 [edge_logical_id.as_str()],
845 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
846 )
847 .optional()?;
848 let Some((edge_row_id, source_lid, target_lid)) = edge_detail else {
849 continue;
850 };
851 let other_endpoint = if source_lid == logical_id {
852 &target_lid
853 } else {
854 &source_lid
855 };
856 let endpoint_active: bool = tx
857 .query_row(
858 "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
859 [other_endpoint.as_str()],
860 |_| Ok(true),
861 )
862 .optional()?
863 .unwrap_or(false);
864 if !endpoint_active {
865 skipped.push(SkippedEdge {
866 edge_logical_id: edge_logical_id.clone(),
867 missing_endpoint: other_endpoint.clone(),
868 });
869 continue;
870 }
871 restored += tx.execute(
872 "UPDATE edges SET superseded_at = NULL WHERE row_id = ?1",
873 [edge_row_id.as_str()],
874 )?;
875 }
876 Ok((restored, skipped))
877}
878
879#[cfg(feature = "sqlite-vec")]
880fn count_vec_rows_for_logical_id(
881 tx: &rusqlite::Transaction<'_>,
882 logical_id: &str,
883) -> Result<usize, EngineError> {
884 let kind: Option<String> = tx
886 .query_row(
887 "SELECT kind FROM nodes WHERE logical_id = ?1 LIMIT 1",
888 [logical_id],
889 |row| row.get(0),
890 )
891 .optional()?;
892 let Some(kind) = kind else {
893 return Ok(0);
894 };
895 let table_name = fathomdb_schema::vec_kind_table_name(&kind);
896 match tx.query_row(
897 &format!(
898 "SELECT count(*) FROM {table_name} v \
899 JOIN chunks c ON c.id = v.chunk_id \
900 WHERE c.node_logical_id = ?1"
901 ),
902 [logical_id],
903 |row| row.get::<_, i64>(0),
904 ) {
905 Ok(count) => Ok(i64_to_usize(count)),
906 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
907 if msg.contains(&table_name) || msg.contains("no such module: vec0") =>
908 {
909 Ok(0)
910 }
911 Err(error) => Err(EngineError::Sqlite(error)),
912 }
913}
914
915#[cfg(not(feature = "sqlite-vec"))]
916#[allow(clippy::unnecessary_wraps)]
917fn count_vec_rows_for_logical_id(
918 _tx: &rusqlite::Transaction<'_>,
919 _logical_id: &str,
920) -> Result<usize, EngineError> {
921 Ok(0)
922}
923
924#[cfg(feature = "sqlite-vec")]
925fn delete_vec_rows_for_logical_id(
926 tx: &rusqlite::Transaction<'_>,
927 logical_id: &str,
928) -> Result<usize, EngineError> {
929 let kind: Option<String> = tx
931 .query_row(
932 "SELECT kind FROM nodes WHERE logical_id = ?1 LIMIT 1",
933 [logical_id],
934 |row| row.get(0),
935 )
936 .optional()?;
937 let Some(kind) = kind else {
938 return Ok(0);
939 };
940 let table_name = fathomdb_schema::vec_kind_table_name(&kind);
941 match tx.execute(
942 &format!(
943 "DELETE FROM {table_name} WHERE chunk_id IN (SELECT id FROM chunks WHERE node_logical_id = ?1)"
944 ),
945 [logical_id],
946 ) {
947 Ok(count) => Ok(count),
948 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
949 if msg.contains(&table_name) || msg.contains("no such module: vec0") =>
950 {
951 Ok(0)
952 }
953 Err(error) => Err(EngineError::Sqlite(error)),
954 }
955}
956
957#[cfg(not(feature = "sqlite-vec"))]
958#[allow(clippy::unnecessary_wraps)]
959fn delete_vec_rows_for_logical_id(
960 _tx: &rusqlite::Transaction<'_>,
961 _logical_id: &str,
962) -> Result<usize, EngineError> {
963 Ok(0)
964}