Skip to main content

fathomdb_engine/admin/
provenance.rs

1use rusqlite::{DatabaseName, OptionalExtension, TransactionBehavior};
2use sha2::{Digest, Sha256};
3use std::fs;
4use std::io;
5use std::path::Path;
6use std::time::SystemTime;
7
8use crate::ids::new_id;
9use crate::{EngineError, SkippedEdge};
10
11use super::{
12    AdminService, EXPORT_PROTOCOL_VERSION, LogicalPurgeReport, LogicalRestoreReport,
13    ProvenancePurgeOptions, ProvenancePurgeReport, SafeExportManifest, SafeExportOptions,
14    TraceReport, clear_operational_current_rows, i64_to_usize, persist_simple_provenance_event,
15    rebuild_operational_current_rows,
16};
17
18impl AdminService {
19    /// # Errors
20    /// Returns [`EngineError`] if the database connection fails or any SQL query fails.
21    pub fn trace_source(&self, source_ref: &str) -> Result<TraceReport, EngineError> {
22        let conn = self.connect()?;
23
24        let node_logical_ids = collect_strings(
25            &conn,
26            "SELECT logical_id FROM nodes WHERE source_ref = ?1 ORDER BY created_at",
27            source_ref,
28        )?;
29        let action_ids = collect_strings(
30            &conn,
31            "SELECT id FROM actions WHERE source_ref = ?1 ORDER BY created_at",
32            source_ref,
33        )?;
34        let operational_mutation_ids = collect_strings(
35            &conn,
36            "SELECT id FROM operational_mutations WHERE source_ref = ?1 ORDER BY mutation_order",
37            source_ref,
38        )?;
39
40        Ok(TraceReport {
41            source_ref: source_ref.to_owned(),
42            node_rows: count_source_ref(&conn, "nodes", source_ref)?,
43            edge_rows: count_source_ref(&conn, "edges", source_ref)?,
44            action_rows: count_source_ref(&conn, "actions", source_ref)?,
45            operational_mutation_rows: count_source_ref(
46                &conn,
47                "operational_mutations",
48                source_ref,
49            )?,
50            node_logical_ids,
51            action_ids,
52            operational_mutation_ids,
53        })
54    }
55
56    /// # Errors
57    /// Returns [`EngineError`] if the database connection fails, the transaction cannot be
58    /// started, or lifecycle restoration prerequisites are missing.
59    #[allow(clippy::too_many_lines)]
60    pub fn restore_logical_id(
61        &self,
62        logical_id: &str,
63    ) -> Result<LogicalRestoreReport, EngineError> {
64        let mut conn = self.connect()?;
65        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
66
67        let active_count: i64 = tx.query_row(
68            "SELECT count(*) FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
69            [logical_id],
70            |row| row.get(0),
71        )?;
72        if active_count > 0 {
73            return Ok(LogicalRestoreReport {
74                logical_id: logical_id.to_owned(),
75                was_noop: true,
76                restored_node_rows: 0,
77                restored_edge_rows: 0,
78                restored_chunk_rows: 0,
79                restored_fts_rows: 0,
80                restored_property_fts_rows: 0,
81                restored_vec_rows: 0,
82                skipped_edges: Vec::new(),
83                notes: vec!["logical_id already active".to_owned()],
84            });
85        }
86
87        let restored_node: Option<(String, String)> = tx
88            .query_row(
89                "SELECT row_id, kind FROM nodes \
90                 WHERE logical_id = ?1 AND superseded_at IS NOT NULL \
91                 ORDER BY superseded_at DESC, created_at DESC, rowid DESC LIMIT 1",
92                [logical_id],
93                |row| Ok((row.get(0)?, row.get(1)?)),
94            )
95            .optional()?;
96        let (restored_node_row_id, restored_kind) = restored_node.ok_or_else(|| {
97            EngineError::InvalidWrite(format!("logical_id '{logical_id}' is not retired"))
98        })?;
99
100        tx.execute(
101            "UPDATE nodes SET superseded_at = NULL WHERE row_id = ?1",
102            [restored_node_row_id.as_str()],
103        )?;
104
105        let retire_scope: Option<(i64, Option<String>, i64)> = tx
106            .query_row(
107                "SELECT rowid, source_ref, created_at FROM provenance_events \
108                 WHERE event_type = 'node_retire' AND subject = ?1 \
109                 ORDER BY created_at DESC, rowid DESC LIMIT 1",
110                [logical_id],
111                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
112            )
113            .optional()?;
114        let (restored_edge_rows, skipped_edges) = if let Some((
115            retire_event_rowid,
116            retire_source_ref,
117            retire_created_at,
118        )) = retire_scope
119        {
120            restore_validated_edges(
121                &tx,
122                logical_id,
123                retire_source_ref.as_deref(),
124                retire_created_at,
125                retire_event_rowid,
126            )?
127        } else {
128            (0, Vec::new())
129        };
130
131        let restored_chunk_rows: usize = tx
132            .query_row(
133                "SELECT count(*) FROM chunks WHERE node_logical_id = ?1",
134                [logical_id],
135                |row| row.get::<_, i64>(0),
136            )
137            .map(i64_to_usize)?;
138        tx.execute(
139            "DELETE FROM fts_nodes WHERE node_logical_id = ?1",
140            [logical_id],
141        )?;
142        let restored_fts_rows = tx.execute(
143            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
144             SELECT id, node_logical_id, ?2, text_content \
145             FROM chunks WHERE node_logical_id = ?1",
146            rusqlite::params![logical_id, restored_kind],
147        )?;
148        let restored_vec_rows = count_vec_rows_for_logical_id(&tx, logical_id)?;
149
150        // Rebuild property FTS for the restored node.
151        // Delete from the per-kind FTS table for this node (if the table exists).
152        let table = fathomdb_schema::fts_kind_table_name(&restored_kind);
153        let fts_table_exists: bool = tx
154            .query_row(
155                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
156                 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
157                rusqlite::params![table],
158                |r| r.get::<_, i64>(0),
159            )
160            .unwrap_or(0)
161            > 0;
162        if fts_table_exists {
163            tx.execute(
164                &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
165                [logical_id],
166            )?;
167        }
168        let restored_property_fts_rows =
169            rebuild_single_node_property_fts(&tx, logical_id, &restored_kind)?;
170
171        persist_simple_provenance_event(
172            &tx,
173            "restore_logical_id",
174            logical_id,
175            Some(serde_json::json!({
176                "restored_node_rows": 1,
177                "restored_edge_rows": restored_edge_rows,
178                "restored_chunk_rows": restored_chunk_rows,
179                "restored_fts_rows": restored_fts_rows,
180                "restored_property_fts_rows": restored_property_fts_rows,
181                "restored_vec_rows": restored_vec_rows,
182            })),
183        )?;
184        tx.commit()?;
185
186        Ok(LogicalRestoreReport {
187            logical_id: logical_id.to_owned(),
188            was_noop: false,
189            restored_node_rows: 1,
190            restored_edge_rows,
191            restored_chunk_rows,
192            restored_fts_rows,
193            restored_property_fts_rows,
194            restored_vec_rows,
195            skipped_edges,
196            notes: Vec::new(),
197        })
198    }
199
200    /// # Errors
201    /// Returns [`EngineError`] if the database connection fails, the transaction cannot be
202    /// started, or the purge mutation fails.
203    pub fn purge_logical_id(&self, logical_id: &str) -> Result<LogicalPurgeReport, EngineError> {
204        let mut conn = self.connect()?;
205        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
206
207        let active_count: i64 = tx.query_row(
208            "SELECT count(*) FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
209            [logical_id],
210            |row| row.get(0),
211        )?;
212        if active_count > 0 {
213            return Ok(LogicalPurgeReport {
214                logical_id: logical_id.to_owned(),
215                was_noop: true,
216                deleted_node_rows: 0,
217                deleted_edge_rows: 0,
218                deleted_chunk_rows: 0,
219                deleted_fts_rows: 0,
220                deleted_vec_rows: 0,
221                notes: vec!["logical_id is active; purge skipped".to_owned()],
222            });
223        }
224
225        let node_rows: i64 = tx.query_row(
226            "SELECT count(*) FROM nodes WHERE logical_id = ?1",
227            [logical_id],
228            |row| row.get(0),
229        )?;
230        if node_rows == 0 {
231            return Err(EngineError::InvalidWrite(format!(
232                "logical_id '{logical_id}' does not exist"
233            )));
234        }
235
236        let deleted_vec_rows = delete_vec_rows_for_logical_id(&tx, logical_id)?;
237        let deleted_fts_rows = tx.execute(
238            "DELETE FROM fts_nodes WHERE node_logical_id = ?1",
239            [logical_id],
240        )?;
241        let deleted_edge_rows = tx.execute(
242            "DELETE FROM edges WHERE source_logical_id = ?1 OR target_logical_id = ?1",
243            [logical_id],
244        )?;
245        let deleted_chunk_rows = tx.execute(
246            "DELETE FROM chunks WHERE node_logical_id = ?1",
247            [logical_id],
248        )?;
249        let deleted_node_rows =
250            tx.execute("DELETE FROM nodes WHERE logical_id = ?1", [logical_id])?;
251        tx.execute(
252            "DELETE FROM node_access_metadata WHERE logical_id = ?1",
253            [logical_id],
254        )?;
255
256        persist_simple_provenance_event(
257            &tx,
258            "purge_logical_id",
259            logical_id,
260            Some(serde_json::json!({
261                "deleted_node_rows": deleted_node_rows,
262                "deleted_edge_rows": deleted_edge_rows,
263                "deleted_chunk_rows": deleted_chunk_rows,
264                "deleted_fts_rows": deleted_fts_rows,
265                "deleted_vec_rows": deleted_vec_rows,
266            })),
267        )?;
268        tx.commit()?;
269
270        Ok(LogicalPurgeReport {
271            logical_id: logical_id.to_owned(),
272            was_noop: false,
273            deleted_node_rows,
274            deleted_edge_rows,
275            deleted_chunk_rows,
276            deleted_fts_rows,
277            deleted_vec_rows,
278            notes: Vec::new(),
279        })
280    }
281
282    /// Purge provenance events older than `before_timestamp`.
283    ///
284    /// By default, `excise` and `purge_logical_id` event types are preserved so that
285    /// data-deletion audit trails survive. Pass an explicit
286    /// `preserve_event_types` list to override this default.
287    ///
288    /// # Errors
289    /// Returns [`EngineError`] if the database connection fails, the transaction
290    /// cannot be started, or any SQL statement fails.
291    pub fn purge_provenance_events(
292        &self,
293        before_timestamp: i64,
294        options: &ProvenancePurgeOptions,
295    ) -> Result<ProvenancePurgeReport, EngineError> {
296        let mut conn = self.connect()?;
297        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
298
299        let preserved_types: Vec<&str> = if options.preserve_event_types.is_empty() {
300            vec!["excise", "purge_logical_id"]
301        } else {
302            options
303                .preserve_event_types
304                .iter()
305                .map(String::as_str)
306                .collect()
307        };
308
309        // Build the NOT IN clause dynamically based on preserved types.
310        let placeholders: String = (0..preserved_types.len())
311            .map(|i| format!("?{}", i + 2))
312            .collect::<Vec<_>>()
313            .join(", ");
314        let count_query = format!(
315            "SELECT count(*) FROM provenance_events \
316             WHERE created_at < ?1 AND event_type NOT IN ({placeholders})"
317        );
318        let delete_query = format!(
319            "DELETE FROM provenance_events WHERE rowid IN (\
320             SELECT rowid FROM provenance_events \
321             WHERE created_at < ?1 AND event_type NOT IN ({placeholders}) \
322             LIMIT 10000)"
323        );
324
325        let bind_params = |stmt: &mut rusqlite::Statement<'_>| -> Result<(), rusqlite::Error> {
326            stmt.raw_bind_parameter(1, before_timestamp)?;
327            for (i, event_type) in preserved_types.iter().enumerate() {
328                stmt.raw_bind_parameter(i + 2, *event_type)?;
329            }
330            Ok(())
331        };
332
333        let events_deleted = if options.dry_run {
334            let mut stmt = tx.prepare(&count_query)?;
335            bind_params(&mut stmt)?;
336            stmt.raw_query()
337                .next()?
338                .map_or(0, |row| row.get::<_, u64>(0).unwrap_or(0))
339        } else {
340            let mut total_deleted: u64 = 0;
341            loop {
342                let mut stmt = tx.prepare(&delete_query)?;
343                bind_params(&mut stmt)?;
344                let deleted = stmt.raw_execute()?;
345                if deleted == 0 {
346                    break;
347                }
348                total_deleted += deleted as u64;
349            }
350            total_deleted
351        };
352
353        let total_after: u64 =
354            tx.query_row("SELECT count(*) FROM provenance_events", [], |row| {
355                row.get(0)
356            })?;
357
358        let oldest_remaining: Option<i64> = tx
359            .query_row("SELECT MIN(created_at) FROM provenance_events", [], |row| {
360                row.get(0)
361            })
362            .optional()?
363            .flatten();
364
365        if !options.dry_run {
366            tx.commit()?;
367        }
368
369        // In dry_run mode nothing was deleted, so total_after includes the
370        // would-be-deleted rows; subtract to get the preserved count.
371        let events_preserved = if options.dry_run {
372            total_after - events_deleted
373        } else {
374            total_after
375        };
376
377        Ok(ProvenancePurgeReport {
378            events_deleted,
379            events_preserved,
380            oldest_remaining,
381        })
382    }
383
384    /// # Errors
385    /// Returns [`EngineError`] if the database connection fails, the transaction cannot be
386    /// started, or any SQL statement fails.
387    #[allow(clippy::too_many_lines)]
388    pub fn excise_source(&self, source_ref: &str) -> Result<TraceReport, EngineError> {
389        let mut conn = self.connect()?;
390
391        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
392        let affected_operational_collections = collect_strings_tx(
393            &tx,
394            "SELECT DISTINCT m.collection_name \
395             FROM operational_mutations m \
396             JOIN operational_collections c ON c.name = m.collection_name \
397             WHERE m.source_ref = ?1 AND c.kind = 'latest_state' \
398             ORDER BY m.collection_name",
399            source_ref,
400        )?;
401
402        // Collect (row_id, logical_id) for active rows that will be excised.
403        let pairs: Vec<(String, String)> = {
404            let mut stmt = tx.prepare(
405                "SELECT row_id, logical_id FROM nodes \
406                 WHERE source_ref = ?1 AND superseded_at IS NULL",
407            )?;
408            stmt.query_map([source_ref], |row| {
409                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
410            })?
411            .collect::<Result<Vec<_>, _>>()?
412        };
413        let affected_logical_ids: Vec<String> = pairs
414            .iter()
415            .map(|(_, logical_id)| logical_id.clone())
416            .collect();
417
418        // Supersede bad rows in all tables.
419        tx.execute(
420            "UPDATE nodes SET superseded_at = unixepoch() \
421             WHERE source_ref = ?1 AND superseded_at IS NULL",
422            [source_ref],
423        )?;
424        tx.execute(
425            "UPDATE edges SET superseded_at = unixepoch() \
426             WHERE source_ref = ?1 AND superseded_at IS NULL",
427            [source_ref],
428        )?;
429        tx.execute(
430            "UPDATE actions SET superseded_at = unixepoch() \
431             WHERE source_ref = ?1 AND superseded_at IS NULL",
432            [source_ref],
433        )?;
434        clear_operational_current_rows(&tx, &affected_operational_collections)?;
435        tx.execute(
436            "DELETE FROM operational_mutations WHERE source_ref = ?1",
437            [source_ref],
438        )?;
439        for logical_id in &affected_logical_ids {
440            delete_vec_rows_for_logical_id(&tx, logical_id)?;
441            tx.execute(
442                "DELETE FROM chunks WHERE node_logical_id = ?1",
443                [logical_id.as_str()],
444            )?;
445        }
446
447        // Restore the most recent prior version for each affected logical_id.
448        for (excised_row_id, logical_id) in &pairs {
449            let prior: Option<String> = tx
450                .query_row(
451                    "SELECT row_id FROM nodes \
452                     WHERE logical_id = ?1 AND row_id != ?2 \
453                     ORDER BY created_at DESC LIMIT 1",
454                    [logical_id.as_str(), excised_row_id.as_str()],
455                    |row| row.get(0),
456                )
457                .optional()?;
458            if let Some(prior_id) = prior {
459                tx.execute(
460                    "UPDATE nodes SET superseded_at = NULL WHERE row_id = ?1",
461                    [prior_id.as_str()],
462                )?;
463            }
464        }
465
466        for logical_id in &affected_logical_ids {
467            let has_active_node = tx
468                .query_row(
469                    "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
470                    [logical_id.as_str()],
471                    |row| row.get::<_, i64>(0),
472                )
473                .optional()?
474                .is_some();
475            if !has_active_node {
476                tx.execute(
477                    "DELETE FROM node_access_metadata WHERE logical_id = ?1",
478                    [logical_id.as_str()],
479                )?;
480            }
481        }
482
483        rebuild_operational_current_rows(&tx, &affected_operational_collections)?;
484
485        // Rebuild FTS atomically within the same transaction so readers never
486        // observe a post-excise node state with a stale FTS index.
487        tx.execute("DELETE FROM fts_nodes", [])?;
488        tx.execute(
489            r"
490            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
491            SELECT c.id, n.logical_id, n.kind, c.text_content
492            FROM chunks c
493            JOIN nodes n
494              ON n.logical_id = c.node_logical_id
495             AND n.superseded_at IS NULL
496            ",
497            [],
498        )?;
499
500        // Rebuild property FTS in the same transaction.
501        rebuild_property_fts_in_tx(&tx)?;
502
503        // Record the audit event inside the same transaction so the excision and its
504        // audit record are committed atomically — no window where the excision is
505        // durable but unaudited.
506        tx.execute(
507            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
508             VALUES (?1, 'excise_source', ?2, ?2)",
509            rusqlite::params![new_id(), source_ref],
510        )?;
511
512        tx.commit()?;
513
514        self.trace_source(source_ref)
515    }
516
517    /// # Errors
518    /// Returns [`EngineError`] if the WAL checkpoint fails, the `SQLite` backup fails,
519    /// the SHA-256 digest cannot be computed, or the manifest file cannot be written.
520    pub fn safe_export(
521        &self,
522        destination_path: impl AsRef<Path>,
523        options: SafeExportOptions,
524    ) -> Result<SafeExportManifest, EngineError> {
525        let destination_path = destination_path.as_ref();
526
527        // 1. Optionally checkpoint WAL before exporting. This keeps the on-disk file tidy for
528        // callers that want a fully checkpointed export, but export correctness does not depend
529        // on it because the backup API copies from the live SQLite connection state.
530        let conn = self.connect()?;
531
532        if options.force_checkpoint {
533            trace_info!("safe_export: wal checkpoint started");
534            let (busy, log, checkpointed): (i64, i64, i64) =
535                conn.query_row("PRAGMA wal_checkpoint(FULL)", [], |row| {
536                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
537                })?;
538            if busy != 0 {
539                trace_warn!(
540                    busy,
541                    log_frames = log,
542                    checkpointed_frames = checkpointed,
543                    "safe_export: wal checkpoint blocked by active readers"
544                );
545                return Err(EngineError::Bridge(format!(
546                    "WAL checkpoint blocked: {busy} active reader(s) prevented a full checkpoint; \
547                     log frames={log}, checkpointed={checkpointed}; \
548                     retry export when no readers are active"
549                )));
550            }
551            trace_info!(
552                log_frames = log,
553                checkpointed_frames = checkpointed,
554                "safe_export: wal checkpoint completed"
555            );
556        }
557
558        let schema_version: u32 = conn
559            .query_row(
560                "SELECT COALESCE(MAX(version), 0) FROM fathom_schema_migrations",
561                [],
562                |row| row.get(0),
563            )
564            .unwrap_or(0);
565
566        // 2. Export the database through SQLite's online backup API so committed data in the WAL
567        // is included even when `force_checkpoint` is false.
568        if let Some(parent) = destination_path.parent() {
569            fs::create_dir_all(parent)?;
570        }
571        conn.backup(DatabaseName::Main, destination_path, None)?;
572
573        drop(conn);
574
575        // 2b. Query page_count from the EXPORTED file so the manifest reflects what was
576        // actually backed up, not the source (which may have changed between the PRAGMA
577        // and the backup call).
578        let page_count: u64 = {
579            let export_conn = rusqlite::Connection::open_with_flags(
580                destination_path,
581                rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
582                    | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
583            )?;
584            export_conn.query_row("PRAGMA page_count", [], |row| row.get(0))?
585        };
586
587        // 3. Compute SHA-256 of the exported file.
588        // FIX(review): was fs::read loading entire DB into memory; use streaming hash.
589        let sha256 = {
590            let mut file = fs::File::open(destination_path)?;
591            let mut hasher = Sha256::new();
592            io::copy(&mut file, &mut hasher)?;
593            format!("{:x}", hasher.finalize())
594        };
595
596        // 4. Record when the export was created.
597        let exported_at = SystemTime::now()
598            .duration_since(SystemTime::UNIX_EPOCH)
599            .map_err(|e| EngineError::Bridge(format!("system clock error: {e}")))?
600            .as_secs();
601
602        let manifest = SafeExportManifest {
603            exported_at,
604            sha256,
605            schema_version,
606            protocol_version: EXPORT_PROTOCOL_VERSION,
607            page_count,
608        };
609
610        // 5. Write manifest alongside the exported file, using Path API for the name.
611        let manifest_path = {
612            let mut p = destination_path.to_path_buf();
613            let stem = p
614                .file_name()
615                .map(|n| format!("{}.export-manifest.json", n.to_string_lossy()))
616                .ok_or_else(|| {
617                    EngineError::Bridge("destination path has no filename".to_owned())
618                })?;
619            p.set_file_name(stem);
620            p
621        };
622        let manifest_json =
623            serde_json::to_string(&manifest).map_err(|e| EngineError::Bridge(e.to_string()))?;
624
625        // Atomic manifest write: write to a temp file then rename so readers never
626        // observe a partially-written manifest.
627        let manifest_tmp = manifest_path.with_extension("json.tmp");
628        if let Err(e) = fs::write(&manifest_tmp, &manifest_json)
629            .and_then(|()| fs::rename(&manifest_tmp, &manifest_path))
630        {
631            let _ = fs::remove_file(&manifest_tmp);
632            return Err(e.into());
633        }
634
635        Ok(manifest)
636    }
637}
638
639pub(super) fn rebuild_property_fts_in_tx(
640    conn: &rusqlite::Connection,
641) -> Result<usize, EngineError> {
642    // Delete from ALL per-kind FTS virtual tables (including orphaned ones without schemas).
643    // Filter by sql LIKE 'CREATE VIRTUAL TABLE%' to exclude FTS5 shadow tables.
644    let all_per_kind_tables: Vec<String> = {
645        let mut stmt = conn.prepare(
646            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'fts_props_%' \
647             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
648        )?;
649        stmt.query_map([], |r| r.get::<_, String>(0))?
650            .collect::<Result<Vec<_>, _>>()?
651    };
652    for table in &all_per_kind_tables {
653        conn.execute_batch(&format!("DELETE FROM {table}"))?;
654    }
655    conn.execute("DELETE FROM fts_node_property_positions", [])?;
656    let inserted = crate::projection::insert_property_fts_rows(
657        conn,
658        "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
659    )?;
660    Ok(inserted)
661}
662
663/// Rebuild property FTS for a single node. Returns 1 if a row was inserted, 0 otherwise.
664/// The caller must delete any existing per-kind FTS row for this node first.
665pub(super) fn rebuild_single_node_property_fts(
666    conn: &rusqlite::Connection,
667    logical_id: &str,
668    kind: &str,
669) -> Result<usize, EngineError> {
670    let schema: Option<(String, String)> = conn
671        .query_row(
672            "SELECT property_paths_json, separator FROM fts_property_schemas WHERE kind = ?1",
673            [kind],
674            |row| {
675                let paths_json: String = row.get(0)?;
676                let separator: String = row.get(1)?;
677                Ok((paths_json, separator))
678            },
679        )
680        .optional()?;
681    let Some((paths_json, separator)) = schema else {
682        return Ok(0);
683    };
684    let parsed = crate::writer::parse_property_schema_json(&paths_json, &separator);
685    let properties_str: Option<String> = conn
686        .query_row(
687            "SELECT properties FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
688            [logical_id],
689            |row| row.get(0),
690        )
691        .optional()?;
692    let Some(properties_str) = properties_str else {
693        return Ok(0);
694    };
695    let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
696    let (text, positions, _stats) = crate::writer::extract_property_fts(&props, &parsed);
697    let Some(text) = text else {
698        return Ok(0);
699    };
700    conn.execute(
701        "DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1",
702        rusqlite::params![logical_id],
703    )?;
704    let table = fathomdb_schema::fts_kind_table_name(kind);
705    let tok = fathomdb_schema::DEFAULT_FTS_TOKENIZER;
706    conn.execute_batch(&format!(
707        "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
708         USING fts5(node_logical_id UNINDEXED, text_content, tokenize = '{tok}')"
709    ))?;
710    conn.execute(
711        &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"),
712        rusqlite::params![logical_id, text],
713    )?;
714    for pos in &positions {
715        conn.execute(
716            "INSERT INTO fts_node_property_positions \
717             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
718             VALUES (?1, ?2, ?3, ?4, ?5)",
719            rusqlite::params![
720                logical_id,
721                kind,
722                i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
723                i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
724                pos.leaf_path,
725            ],
726        )?;
727    }
728    Ok(1)
729}
730
731fn count_source_ref(
732    conn: &rusqlite::Connection,
733    table: &str,
734    source_ref: &str,
735) -> Result<usize, EngineError> {
736    let sql = match table {
737        "nodes" => "SELECT count(*) FROM nodes WHERE source_ref = ?1",
738        "edges" => "SELECT count(*) FROM edges WHERE source_ref = ?1",
739        "actions" => "SELECT count(*) FROM actions WHERE source_ref = ?1",
740        "operational_mutations" => {
741            "SELECT count(*) FROM operational_mutations WHERE source_ref = ?1"
742        }
743        other => return Err(EngineError::Bridge(format!("unknown table: {other}"))),
744    };
745    let count: i64 = conn.query_row(sql, [source_ref], |row| row.get(0))?;
746    // FIX(review): was `count as usize` — unsound cast.
747    // Chose option (C) here: propagate error since this is a user-facing helper.
748    usize::try_from(count)
749        .map_err(|_| EngineError::Bridge(format!("count overflow for table {table}: {count}")))
750}
751
752fn collect_strings_tx(
753    tx: &rusqlite::Transaction<'_>,
754    sql: &str,
755    value: &str,
756) -> Result<Vec<String>, EngineError> {
757    let mut stmt = tx.prepare(sql)?;
758    let rows = stmt.query_map([value], |row| row.get::<_, String>(0))?;
759    rows.collect::<Result<Vec<_>, _>>()
760        .map_err(EngineError::from)
761}
762
763/// NOTE(review): sql parameter must be a hardcoded query string, never user input.
764/// Options: (A) doc comment, (B) whitelist refactor like `count_source_ref`, (C) leave as-is.
765/// Chose (A): function is private, only called with hardcoded SQL from `trace_source`.
766/// Whitelist refactor not practical — queries have different SELECT/ORDER BY per table.
767fn collect_strings(
768    conn: &rusqlite::Connection,
769    sql: &str,
770    param: &str,
771) -> Result<Vec<String>, EngineError> {
772    let mut stmt = conn.prepare(sql)?;
773    let values = stmt
774        .query_map([param], |row| row.get::<_, String>(0))?
775        .collect::<Result<Vec<_>, _>>()?;
776    Ok(values)
777}
778
779fn collect_edge_logical_ids_for_restore(
780    tx: &rusqlite::Transaction<'_>,
781    logical_id: &str,
782    retire_source_ref: Option<&str>,
783    retire_created_at: i64,
784    retire_event_rowid: i64,
785) -> Result<Vec<String>, EngineError> {
786    let mut stmt = tx.prepare(
787        "SELECT DISTINCT e.logical_id \
788         FROM edges e \
789         JOIN provenance_events p \
790           ON p.subject = e.logical_id \
791          AND p.event_type = 'edge_retire' \
792          AND ( \
793                p.created_at > ?3 \
794                OR (p.created_at = ?3 AND p.rowid >= ?4) \
795          ) \
796          AND ((?2 IS NULL AND p.source_ref IS NULL) OR p.source_ref = ?2) \
797         WHERE e.superseded_at IS NOT NULL \
798           AND (e.source_logical_id = ?1 OR e.target_logical_id = ?1) \
799           AND NOT EXISTS ( \
800                SELECT 1 FROM edges active \
801                WHERE active.logical_id = e.logical_id \
802                  AND active.superseded_at IS NULL \
803           ) \
804         ORDER BY e.logical_id",
805    )?;
806    let edge_ids = stmt
807        .query_map(
808            rusqlite::params![
809                logical_id,
810                retire_source_ref,
811                retire_created_at,
812                retire_event_rowid
813            ],
814            |row| row.get::<_, String>(0),
815        )?
816        .collect::<Result<Vec<_>, _>>()?;
817    Ok(edge_ids)
818}
819
820/// Restores edges for a node being restored, skipping any whose counterpart
821/// endpoint is not active (e.g. still retired or purged).
822fn restore_validated_edges(
823    tx: &rusqlite::Transaction<'_>,
824    logical_id: &str,
825    retire_source_ref: Option<&str>,
826    retire_created_at: i64,
827    retire_event_rowid: i64,
828) -> Result<(usize, Vec<SkippedEdge>), EngineError> {
829    let edge_logical_ids = collect_edge_logical_ids_for_restore(
830        tx,
831        logical_id,
832        retire_source_ref,
833        retire_created_at,
834        retire_event_rowid,
835    )?;
836    let mut restored = 0usize;
837    let mut skipped = Vec::new();
838    for edge_logical_id in &edge_logical_ids {
839        let edge_detail: Option<(String, String, String)> = tx
840            .query_row(
841                "SELECT row_id, source_logical_id, target_logical_id FROM edges \
842                 WHERE logical_id = ?1 AND superseded_at IS NOT NULL \
843                 ORDER BY superseded_at DESC, created_at DESC, rowid DESC LIMIT 1",
844                [edge_logical_id.as_str()],
845                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
846            )
847            .optional()?;
848        let Some((edge_row_id, source_lid, target_lid)) = edge_detail else {
849            continue;
850        };
851        let other_endpoint = if source_lid == logical_id {
852            &target_lid
853        } else {
854            &source_lid
855        };
856        let endpoint_active: bool = tx
857            .query_row(
858                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
859                [other_endpoint.as_str()],
860                |_| Ok(true),
861            )
862            .optional()?
863            .unwrap_or(false);
864        if !endpoint_active {
865            skipped.push(SkippedEdge {
866                edge_logical_id: edge_logical_id.clone(),
867                missing_endpoint: other_endpoint.clone(),
868            });
869            continue;
870        }
871        restored += tx.execute(
872            "UPDATE edges SET superseded_at = NULL WHERE row_id = ?1",
873            [edge_row_id.as_str()],
874        )?;
875    }
876    Ok((restored, skipped))
877}
878
879#[cfg(feature = "sqlite-vec")]
880fn count_vec_rows_for_logical_id(
881    tx: &rusqlite::Transaction<'_>,
882    logical_id: &str,
883) -> Result<usize, EngineError> {
884    // Look up the kind for this logical_id to derive the per-kind vec table name.
885    let kind: Option<String> = tx
886        .query_row(
887            "SELECT kind FROM nodes WHERE logical_id = ?1 LIMIT 1",
888            [logical_id],
889            |row| row.get(0),
890        )
891        .optional()?;
892    let Some(kind) = kind else {
893        return Ok(0);
894    };
895    let table_name = fathomdb_schema::vec_kind_table_name(&kind);
896    match tx.query_row(
897        &format!(
898            "SELECT count(*) FROM {table_name} v \
899             JOIN chunks c ON c.id = v.chunk_id \
900             WHERE c.node_logical_id = ?1"
901        ),
902        [logical_id],
903        |row| row.get::<_, i64>(0),
904    ) {
905        Ok(count) => Ok(i64_to_usize(count)),
906        Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
907            if msg.contains(&table_name) || msg.contains("no such module: vec0") =>
908        {
909            Ok(0)
910        }
911        Err(error) => Err(EngineError::Sqlite(error)),
912    }
913}
914
915#[cfg(not(feature = "sqlite-vec"))]
916#[allow(clippy::unnecessary_wraps)]
917fn count_vec_rows_for_logical_id(
918    _tx: &rusqlite::Transaction<'_>,
919    _logical_id: &str,
920) -> Result<usize, EngineError> {
921    Ok(0)
922}
923
924#[cfg(feature = "sqlite-vec")]
925fn delete_vec_rows_for_logical_id(
926    tx: &rusqlite::Transaction<'_>,
927    logical_id: &str,
928) -> Result<usize, EngineError> {
929    // Look up the kind for this logical_id to derive the per-kind vec table name.
930    let kind: Option<String> = tx
931        .query_row(
932            "SELECT kind FROM nodes WHERE logical_id = ?1 LIMIT 1",
933            [logical_id],
934            |row| row.get(0),
935        )
936        .optional()?;
937    let Some(kind) = kind else {
938        return Ok(0);
939    };
940    let table_name = fathomdb_schema::vec_kind_table_name(&kind);
941    match tx.execute(
942        &format!(
943            "DELETE FROM {table_name} WHERE chunk_id IN (SELECT id FROM chunks WHERE node_logical_id = ?1)"
944        ),
945        [logical_id],
946    ) {
947        Ok(count) => Ok(count),
948        Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
949            if msg.contains(&table_name) || msg.contains("no such module: vec0") =>
950        {
951            Ok(0)
952        }
953        Err(error) => Err(EngineError::Sqlite(error)),
954    }
955}
956
957#[cfg(not(feature = "sqlite-vec"))]
958#[allow(clippy::unnecessary_wraps)]
959fn delete_vec_rows_for_logical_id(
960    _tx: &rusqlite::Transaction<'_>,
961    _logical_id: &str,
962) -> Result<usize, EngineError> {
963    Ok(0)
964}