Skip to main content

fathomdb_engine/
projection.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use rusqlite::TransactionBehavior;
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12    Fts,
13    Vec,
14    All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19    pub targets: Vec<ProjectionTarget>,
20    pub rebuilt_rows: usize,
21    pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26    database_path: PathBuf,
27    schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32        Self {
33            database_path: path.as_ref().to_path_buf(),
34            schema_manager,
35        }
36    }
37
38    fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39        let conn = sqlite::open_connection(&self.database_path)?;
40        self.schema_manager.bootstrap(&conn)?;
41        Ok(conn)
42    }
43
44    /// # Errors
45    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
46    pub fn rebuild_projections(
47        &self,
48        target: ProjectionTarget,
49    ) -> Result<ProjectionRepairReport, EngineError> {
50        trace_info!(target = ?target, "projection rebuild started");
51        #[cfg(feature = "tracing")]
52        let start = std::time::Instant::now();
53        let mut conn = self.connect()?;
54
55        let mut notes = Vec::new();
56        let rebuilt_rows = match target {
57            ProjectionTarget::Fts => {
58                let fts = rebuild_fts(&mut conn)?;
59                let prop_fts = rebuild_property_fts(&mut conn)?;
60                fts + prop_fts
61            }
62            ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
63            ProjectionTarget::All => {
64                let rebuilt_fts = rebuild_fts(&mut conn)?;
65                let rebuilt_prop_fts = rebuild_property_fts(&mut conn)?;
66                let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
67                rebuilt_fts + rebuilt_prop_fts + rebuilt_vec
68            }
69        };
70
71        trace_info!(
72            target = ?target,
73            rebuilt_rows,
74            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
75            "projection rebuild completed"
76        );
77        Ok(ProjectionRepairReport {
78            targets: expand_targets(target),
79            rebuilt_rows,
80            notes,
81        })
82    }
83
84    /// # Errors
85    /// Returns [`EngineError`] if the database connection fails or the INSERT query fails.
86    pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
87        // FIX(review): was bare execute without explicit transaction.
88        // Options: (A) IMMEDIATE tx matching rebuild_fts(), (B) DEFERRED tx, (C) leave as-is
89        // (autocommit wraps single statements atomically). Chose (A): explicit transaction
90        // communicates intent, matches sibling rebuild_fts(), and protects against future
91        // refactoring that might add additional statements.
92        let mut conn = self.connect()?;
93
94        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
95        let inserted_chunk_fts = tx.execute(
96            r"
97            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
98            SELECT c.id, n.logical_id, n.kind, c.text_content
99            FROM chunks c
100            JOIN nodes n
101              ON n.logical_id = c.node_logical_id
102             AND n.superseded_at IS NULL
103            WHERE NOT EXISTS (
104                SELECT 1
105                FROM fts_nodes f
106                WHERE f.chunk_id = c.id
107            )
108            ",
109            [],
110        )?;
111        let inserted_prop_fts = rebuild_missing_property_fts_in_tx(&tx)?;
112        tx.commit()?;
113
114        Ok(ProjectionRepairReport {
115            targets: vec![ProjectionTarget::Fts],
116            rebuilt_rows: inserted_chunk_fts + inserted_prop_fts,
117            notes: vec![],
118        })
119    }
120}
121
122/// Atomically rebuild the FTS index: delete all existing rows and repopulate
123/// from the canonical `chunks`/`nodes` join.  The DELETE and INSERT are
124/// wrapped in a single `IMMEDIATE` transaction so a mid-rebuild failure
125/// cannot leave the index empty.
126fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
127    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
128    tx.execute("DELETE FROM fts_nodes", [])?;
129    let inserted = tx.execute(
130        r"
131        INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
132        SELECT c.id, n.logical_id, n.kind, c.text_content
133        FROM chunks c
134        JOIN nodes n
135          ON n.logical_id = c.node_logical_id
136         AND n.superseded_at IS NULL
137        ",
138        [],
139    )?;
140    tx.commit()?;
141    Ok(inserted)
142}
143
144/// Atomically rebuild the property FTS index from registered schemas and active nodes.
145fn rebuild_property_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
146    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
147    tx.execute("DELETE FROM fts_node_properties", [])?;
148    tx.execute("DELETE FROM fts_node_property_positions", [])?;
149
150    let total = insert_property_fts_rows(
151        &tx,
152        "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
153    )?;
154
155    tx.commit()?;
156    Ok(total)
157}
158
159/// Insert missing property FTS rows within an existing transaction.
160///
161/// Two repair passes run inside the caller's transaction:
162///
163/// 1. Nodes of a registered kind that have no `fts_node_properties` row are
164///    re-extracted from canonical state and inserted (blob + positions).
165/// 2. Nodes of a recursive-mode kind that *do* have an `fts_node_properties`
166///    row but no `fts_node_property_positions` rows have their positions
167///    regenerated in place. This repairs orphaned position map rows caused
168///    by partial drift without requiring a full `rebuild_projections(Fts)`.
169///    (P4-P2-2)
170fn rebuild_missing_property_fts_in_tx(
171    conn: &rusqlite::Connection,
172) -> Result<usize, rusqlite::Error> {
173    let inserted = insert_property_fts_rows(
174        conn,
175        "SELECT n.logical_id, n.properties FROM nodes n \
176         WHERE n.kind = ?1 AND n.superseded_at IS NULL \
177           AND NOT EXISTS (SELECT 1 FROM fts_node_properties fp WHERE fp.node_logical_id = n.logical_id)",
178    )?;
179    let repaired = repair_orphaned_position_map_in_tx(conn)?;
180    Ok(inserted + repaired)
181}
182
183/// Repair recursive-mode nodes whose `fts_node_properties` row exists but
184/// whose position-map rows have been dropped. For each such node the
185/// property FTS is re-extracted from canonical state and the position rows
186/// are re-inserted. The blob row is left untouched — callers that deleted
187/// positions without touching the blob keep the original blob rowid, which
188/// matters because `projection_row_id` in search hits is the blob rowid.
189fn repair_orphaned_position_map_in_tx(
190    conn: &rusqlite::Connection,
191) -> Result<usize, rusqlite::Error> {
192    let schemas = crate::writer::load_fts_property_schemas(conn)?;
193    if schemas.is_empty() {
194        return Ok(0);
195    }
196    let mut total = 0usize;
197    let mut ins_positions = conn.prepare(
198        "INSERT INTO fts_node_property_positions \
199         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
200         VALUES (?1, ?2, ?3, ?4, ?5)",
201    )?;
202    for (kind, schema) in &schemas {
203        let has_recursive = schema
204            .paths
205            .iter()
206            .any(|p| p.mode == crate::writer::PropertyPathMode::Recursive);
207        if !has_recursive {
208            continue;
209        }
210        let mut stmt = conn.prepare(
211            "SELECT n.logical_id, n.properties FROM nodes n \
212             WHERE n.kind = ?1 AND n.superseded_at IS NULL \
213               AND EXISTS (SELECT 1 FROM fts_node_properties fp \
214                           WHERE fp.node_logical_id = n.logical_id AND fp.kind = ?1) \
215               AND NOT EXISTS (SELECT 1 FROM fts_node_property_positions p \
216                               WHERE p.node_logical_id = n.logical_id AND p.kind = ?1)",
217        )?;
218        let rows: Vec<(String, String)> = stmt
219            .query_map([kind.as_str()], |row| {
220                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
221            })?
222            .collect::<Result<Vec<_>, _>>()?;
223        for (logical_id, properties_str) in &rows {
224            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
225            let (_text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
226            for pos in &positions {
227                ins_positions.execute(rusqlite::params![
228                    logical_id,
229                    kind,
230                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
231                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
232                    pos.leaf_path,
233                ])?;
234            }
235            if !positions.is_empty() {
236                total += 1;
237            }
238        }
239    }
240    Ok(total)
241}
242
243/// Rebuild property FTS rows for exactly one kind from its just-registered
244/// schema. Unlike [`insert_property_fts_rows`], this helper does NOT iterate
245/// over every registered schema — so callers that delete rows for a single
246/// kind won't duplicate rows for sibling kinds on the subsequent insert.
247///
248/// The caller is responsible for transaction management and for deleting
249/// stale rows for `kind` before calling this function.
250pub(crate) fn insert_property_fts_rows_for_kind(
251    conn: &rusqlite::Connection,
252    kind: &str,
253) -> Result<usize, rusqlite::Error> {
254    let schemas = crate::writer::load_fts_property_schemas(conn)?;
255    let Some(schema) = schemas
256        .iter()
257        .find(|(k, _)| k == kind)
258        .map(|(_, s)| s.clone())
259    else {
260        return Ok(0);
261    };
262
263    let mut ins = conn.prepare(
264        "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) VALUES (?1, ?2, ?3)",
265    )?;
266    let mut ins_positions = conn.prepare(
267        "INSERT INTO fts_node_property_positions \
268         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
269         VALUES (?1, ?2, ?3, ?4, ?5)",
270    )?;
271
272    let mut stmt = conn.prepare(
273        "SELECT logical_id, properties FROM nodes \
274         WHERE kind = ?1 AND superseded_at IS NULL",
275    )?;
276    let rows: Vec<(String, String)> = stmt
277        .query_map([kind], |row| {
278            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
279        })?
280        .collect::<Result<Vec<_>, _>>()?;
281
282    let mut total = 0usize;
283    for (logical_id, properties_str) in &rows {
284        let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
285        let (text, positions, _stats) = crate::writer::extract_property_fts(&props, &schema);
286        if let Some(text) = text {
287            ins.execute(rusqlite::params![logical_id, kind, text])?;
288            for pos in &positions {
289                ins_positions.execute(rusqlite::params![
290                    logical_id,
291                    kind,
292                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
293                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
294                    pos.leaf_path,
295                ])?;
296            }
297            total += 1;
298        }
299    }
300    Ok(total)
301}
302
303/// Shared loop: load schemas, query nodes with `node_sql` (parameterized by kind),
304/// extract property FTS text, and insert into `fts_node_properties`.
305/// The caller is responsible for transaction management and for deleting stale rows
306/// before calling this function if a full rebuild is intended.
307pub(crate) fn insert_property_fts_rows(
308    conn: &rusqlite::Connection,
309    node_sql: &str,
310) -> Result<usize, rusqlite::Error> {
311    let schemas = crate::writer::load_fts_property_schemas(conn)?;
312    if schemas.is_empty() {
313        return Ok(0);
314    }
315
316    let mut total = 0usize;
317    let mut ins = conn.prepare(
318        "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) VALUES (?1, ?2, ?3)",
319    )?;
320    let mut ins_positions = conn.prepare(
321        "INSERT INTO fts_node_property_positions \
322         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
323         VALUES (?1, ?2, ?3, ?4, ?5)",
324    )?;
325    for (kind, schema) in &schemas {
326        let mut stmt = conn.prepare(node_sql)?;
327        let rows: Vec<(String, String)> = stmt
328            .query_map([kind.as_str()], |row| {
329                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
330            })?
331            .collect::<Result<Vec<_>, _>>()?;
332        for (logical_id, properties_str) in &rows {
333            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
334            let (text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
335            if let Some(text) = text {
336                ins.execute(rusqlite::params![logical_id, kind, text])?;
337                for pos in &positions {
338                    ins_positions.execute(rusqlite::params![
339                        logical_id,
340                        kind,
341                        i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
342                        i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
343                        pos.leaf_path,
344                    ])?;
345                }
346                total += 1;
347            }
348        }
349    }
350    Ok(total)
351}
352
353/// Remove stale vec rows: entries whose chunk no longer exists or whose node has been
354/// superseded/retired.  When the `sqlite-vec` feature is disabled or the
355/// `vec_nodes_active` table is absent, degrades gracefully to a no-op and appends a note.
356#[allow(clippy::unnecessary_wraps, unused_variables)]
357fn rebuild_vec(
358    conn: &mut rusqlite::Connection,
359    notes: &mut Vec<String>,
360) -> Result<usize, rusqlite::Error> {
361    #[cfg(feature = "sqlite-vec")]
362    {
363        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
364        let deleted = match tx.execute(
365            r"
366            DELETE FROM vec_nodes_active WHERE chunk_id IN (
367                SELECT v.chunk_id FROM vec_nodes_active v
368                LEFT JOIN chunks c ON c.id = v.chunk_id
369                LEFT JOIN nodes  n ON n.logical_id = c.node_logical_id
370                WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
371            )
372            ",
373            [],
374        ) {
375            Ok(n) => n,
376            Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
377                if msg.contains("vec_nodes_active") || msg.contains("no such module: vec0") =>
378            {
379                notes.push("vec_nodes_active table absent; vec rebuild skipped".to_owned());
380                tx.rollback()?;
381                return Ok(0);
382            }
383            Err(e) => return Err(e),
384        };
385        tx.commit()?;
386        Ok(deleted)
387    }
388    #[cfg(not(feature = "sqlite-vec"))]
389    {
390        notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
391        Ok(0)
392    }
393}
394
395fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
396    match target {
397        ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
398        ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
399        ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
400    }
401}
402
403#[cfg(all(test, feature = "sqlite-vec"))]
404#[allow(clippy::expect_used)]
405mod tests {
406    use std::sync::Arc;
407
408    use fathomdb_schema::SchemaManager;
409    use tempfile::NamedTempFile;
410
411    use crate::sqlite::open_connection_with_vec;
412
413    use super::{ProjectionService, ProjectionTarget};
414
415    #[test]
416    fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
417        let db = NamedTempFile::new().expect("temp db");
418        let schema = Arc::new(SchemaManager::new());
419
420        {
421            let conn = open_connection_with_vec(db.path()).expect("vec conn");
422            schema.bootstrap(&conn).expect("bootstrap");
423            schema
424                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
425                .expect("vec profile");
426
427            // Insert a superseded node + chunk + vec row (stale state).
428            conn.execute_batch(
429                r#"
430                INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
431                VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
432                INSERT INTO chunks (id, node_logical_id, text_content, created_at)
433                VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
434                "#,
435            )
436            .expect("seed stale data");
437
438            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
439                .iter()
440                .flat_map(|f| f.to_le_bytes())
441                .collect();
442            conn.execute(
443                "INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES ('chunk-stale', ?1)",
444                rusqlite::params![bytes],
445            )
446            .expect("insert stale vec row");
447        }
448
449        let service = ProjectionService::new(db.path(), Arc::clone(&schema));
450        let report = service
451            .rebuild_projections(ProjectionTarget::Vec)
452            .expect("rebuild vec");
453
454        assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
455        assert!(report.notes.is_empty(), "no notes expected on success");
456
457        let conn = rusqlite::Connection::open(db.path()).expect("conn");
458        let count: i64 = conn
459            .query_row(
460                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-stale'",
461                [],
462                |row| row.get(0),
463            )
464            .expect("count");
465        assert_eq!(count, 0, "stale vec row must be gone after rebuild");
466    }
467}