Skip to main content

fathomdb_engine/
projection.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use rusqlite::TransactionBehavior;
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12    Fts,
13    Vec,
14    All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19    pub targets: Vec<ProjectionTarget>,
20    pub rebuilt_rows: usize,
21    pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26    database_path: PathBuf,
27    schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32        Self {
33            database_path: path.as_ref().to_path_buf(),
34            schema_manager,
35        }
36    }
37
38    fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39        let conn = sqlite::open_connection(&self.database_path)?;
40        self.schema_manager.bootstrap(&conn)?;
41        Ok(conn)
42    }
43
44    /// # Errors
45    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
46    pub fn rebuild_projections(
47        &self,
48        target: ProjectionTarget,
49    ) -> Result<ProjectionRepairReport, EngineError> {
50        trace_info!(target = ?target, "projection rebuild started");
51        #[cfg(feature = "tracing")]
52        let start = std::time::Instant::now();
53        let mut conn = self.connect()?;
54
55        let mut notes = Vec::new();
56        let rebuilt_rows = match target {
57            ProjectionTarget::Fts => rebuild_fts(&mut conn)?,
58            ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
59            ProjectionTarget::All => {
60                let rebuilt_fts = rebuild_fts(&mut conn)?;
61                let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
62                rebuilt_fts + rebuilt_vec
63            }
64        };
65
66        trace_info!(
67            target = ?target,
68            rebuilt_rows,
69            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
70            "projection rebuild completed"
71        );
72        Ok(ProjectionRepairReport {
73            targets: expand_targets(target),
74            rebuilt_rows,
75            notes,
76        })
77    }
78
79    /// # Errors
80    /// Returns [`EngineError`] if the database connection fails or the INSERT query fails.
81    pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
82        // FIX(review): was bare execute without explicit transaction.
83        // Options: (A) IMMEDIATE tx matching rebuild_fts(), (B) DEFERRED tx, (C) leave as-is
84        // (autocommit wraps single statements atomically). Chose (A): explicit transaction
85        // communicates intent, matches sibling rebuild_fts(), and protects against future
86        // refactoring that might add additional statements.
87        let mut conn = self.connect()?;
88
89        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
90        let inserted = tx.execute(
91            r"
92            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
93            SELECT c.id, n.logical_id, n.kind, c.text_content
94            FROM chunks c
95            JOIN nodes n
96              ON n.logical_id = c.node_logical_id
97             AND n.superseded_at IS NULL
98            WHERE NOT EXISTS (
99                SELECT 1
100                FROM fts_nodes f
101                WHERE f.chunk_id = c.id
102            )
103            ",
104            [],
105        )?;
106        tx.commit()?;
107
108        Ok(ProjectionRepairReport {
109            targets: vec![ProjectionTarget::Fts],
110            rebuilt_rows: inserted,
111            notes: vec![],
112        })
113    }
114}
115
116/// Atomically rebuild the FTS index: delete all existing rows and repopulate
117/// from the canonical `chunks`/`nodes` join.  The DELETE and INSERT are
118/// wrapped in a single `IMMEDIATE` transaction so a mid-rebuild failure
119/// cannot leave the index empty.
120fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
121    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
122    tx.execute("DELETE FROM fts_nodes", [])?;
123    let inserted = tx.execute(
124        r"
125        INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
126        SELECT c.id, n.logical_id, n.kind, c.text_content
127        FROM chunks c
128        JOIN nodes n
129          ON n.logical_id = c.node_logical_id
130         AND n.superseded_at IS NULL
131        ",
132        [],
133    )?;
134    tx.commit()?;
135    Ok(inserted)
136}
137
138/// Remove stale vec rows: entries whose chunk no longer exists or whose node has been
139/// superseded/retired.  When the `sqlite-vec` feature is disabled or the
140/// `vec_nodes_active` table is absent, degrades gracefully to a no-op and appends a note.
141#[allow(clippy::unnecessary_wraps, unused_variables)]
142fn rebuild_vec(
143    conn: &mut rusqlite::Connection,
144    notes: &mut Vec<String>,
145) -> Result<usize, rusqlite::Error> {
146    #[cfg(feature = "sqlite-vec")]
147    {
148        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
149        let deleted = match tx.execute(
150            r"
151            DELETE FROM vec_nodes_active WHERE chunk_id IN (
152                SELECT v.chunk_id FROM vec_nodes_active v
153                LEFT JOIN chunks c ON c.id = v.chunk_id
154                LEFT JOIN nodes  n ON n.logical_id = c.node_logical_id
155                WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
156            )
157            ",
158            [],
159        ) {
160            Ok(n) => n,
161            Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
162                if msg.contains("vec_nodes_active") || msg.contains("no such module: vec0") =>
163            {
164                notes.push("vec_nodes_active table absent; vec rebuild skipped".to_owned());
165                tx.rollback()?;
166                return Ok(0);
167            }
168            Err(e) => return Err(e),
169        };
170        tx.commit()?;
171        Ok(deleted)
172    }
173    #[cfg(not(feature = "sqlite-vec"))]
174    {
175        notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
176        Ok(0)
177    }
178}
179
180fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
181    match target {
182        ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
183        ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
184        ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
185    }
186}
187
188#[cfg(all(test, feature = "sqlite-vec"))]
189#[allow(clippy::expect_used)]
190mod tests {
191    use std::sync::Arc;
192
193    use fathomdb_schema::SchemaManager;
194    use tempfile::NamedTempFile;
195
196    use crate::sqlite::open_connection_with_vec;
197
198    use super::{ProjectionService, ProjectionTarget};
199
200    #[test]
201    fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
202        let db = NamedTempFile::new().expect("temp db");
203        let schema = Arc::new(SchemaManager::new());
204
205        {
206            let conn = open_connection_with_vec(db.path()).expect("vec conn");
207            schema.bootstrap(&conn).expect("bootstrap");
208            schema
209                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
210                .expect("vec profile");
211
212            // Insert a superseded node + chunk + vec row (stale state).
213            conn.execute_batch(
214                r#"
215                INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
216                VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
217                INSERT INTO chunks (id, node_logical_id, text_content, created_at)
218                VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
219                "#,
220            )
221            .expect("seed stale data");
222
223            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
224                .iter()
225                .flat_map(|f| f.to_le_bytes())
226                .collect();
227            conn.execute(
228                "INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES ('chunk-stale', ?1)",
229                rusqlite::params![bytes],
230            )
231            .expect("insert stale vec row");
232        }
233
234        let service = ProjectionService::new(db.path(), Arc::clone(&schema));
235        let report = service
236            .rebuild_projections(ProjectionTarget::Vec)
237            .expect("rebuild vec");
238
239        assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
240        assert!(report.notes.is_empty(), "no notes expected on success");
241
242        let conn = rusqlite::Connection::open(db.path()).expect("conn");
243        let count: i64 = conn
244            .query_row(
245                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-stale'",
246                [],
247                |row| row.get(0),
248            )
249            .expect("count");
250        assert_eq!(count, 0, "stale vec row must be gone after rebuild");
251    }
252}