Skip to main content

fathomdb_engine/
projection.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use rusqlite::TransactionBehavior;
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12    Fts,
13    Vec,
14    All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19    pub targets: Vec<ProjectionTarget>,
20    pub rebuilt_rows: usize,
21    pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26    database_path: PathBuf,
27    schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32        Self {
33            database_path: path.as_ref().to_path_buf(),
34            schema_manager,
35        }
36    }
37
38    fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39        let conn = sqlite::open_connection(&self.database_path)?;
40        self.schema_manager.bootstrap(&conn)?;
41        Ok(conn)
42    }
43
44    /// # Errors
45    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
46    pub fn rebuild_projections(
47        &self,
48        target: ProjectionTarget,
49    ) -> Result<ProjectionRepairReport, EngineError> {
50        trace_info!(target = ?target, "projection rebuild started");
51        #[cfg(feature = "tracing")]
52        let start = std::time::Instant::now();
53        let mut conn = self.connect()?;
54
55        let mut notes = Vec::new();
56        let rebuilt_rows = match target {
57            ProjectionTarget::Fts => {
58                let fts = rebuild_fts(&mut conn)?;
59                let prop_fts = rebuild_property_fts(&mut conn)?;
60                fts + prop_fts
61            }
62            ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
63            ProjectionTarget::All => {
64                let rebuilt_fts = rebuild_fts(&mut conn)?;
65                let rebuilt_prop_fts = rebuild_property_fts(&mut conn)?;
66                let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
67                rebuilt_fts + rebuilt_prop_fts + rebuilt_vec
68            }
69        };
70
71        trace_info!(
72            target = ?target,
73            rebuilt_rows,
74            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
75            "projection rebuild completed"
76        );
77        Ok(ProjectionRepairReport {
78            targets: expand_targets(target),
79            rebuilt_rows,
80            notes,
81        })
82    }
83
84    /// # Errors
85    /// Returns [`EngineError`] if the database connection fails or the INSERT query fails.
86    pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
87        // FIX(review): was bare execute without explicit transaction.
88        // Options: (A) IMMEDIATE tx matching rebuild_fts(), (B) DEFERRED tx, (C) leave as-is
89        // (autocommit wraps single statements atomically). Chose (A): explicit transaction
90        // communicates intent, matches sibling rebuild_fts(), and protects against future
91        // refactoring that might add additional statements.
92        let mut conn = self.connect()?;
93
94        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
95        let inserted_chunk_fts = tx.execute(
96            r"
97            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
98            SELECT c.id, n.logical_id, n.kind, c.text_content
99            FROM chunks c
100            JOIN nodes n
101              ON n.logical_id = c.node_logical_id
102             AND n.superseded_at IS NULL
103            WHERE NOT EXISTS (
104                SELECT 1
105                FROM fts_nodes f
106                WHERE f.chunk_id = c.id
107            )
108            ",
109            [],
110        )?;
111        let inserted_prop_fts = rebuild_missing_property_fts_in_tx(&tx)?;
112        tx.commit()?;
113
114        Ok(ProjectionRepairReport {
115            targets: vec![ProjectionTarget::Fts],
116            rebuilt_rows: inserted_chunk_fts + inserted_prop_fts,
117            notes: vec![],
118        })
119    }
120}
121
122/// Atomically rebuild the FTS index: delete all existing rows and repopulate
123/// from the canonical `chunks`/`nodes` join.  The DELETE and INSERT are
124/// wrapped in a single `IMMEDIATE` transaction so a mid-rebuild failure
125/// cannot leave the index empty.
126fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
127    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
128    tx.execute("DELETE FROM fts_nodes", [])?;
129    let inserted = tx.execute(
130        r"
131        INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
132        SELECT c.id, n.logical_id, n.kind, c.text_content
133        FROM chunks c
134        JOIN nodes n
135          ON n.logical_id = c.node_logical_id
136         AND n.superseded_at IS NULL
137        ",
138        [],
139    )?;
140    tx.commit()?;
141    Ok(inserted)
142}
143
144/// Atomically rebuild the property FTS index from registered schemas and active nodes.
145fn rebuild_property_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
146    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
147    tx.execute("DELETE FROM fts_node_properties", [])?;
148
149    let total = insert_property_fts_rows(
150        &tx,
151        "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
152    )?;
153
154    tx.commit()?;
155    Ok(total)
156}
157
158/// Insert missing property FTS rows within an existing transaction.
159fn rebuild_missing_property_fts_in_tx(
160    conn: &rusqlite::Connection,
161) -> Result<usize, rusqlite::Error> {
162    insert_property_fts_rows(
163        conn,
164        "SELECT n.logical_id, n.properties FROM nodes n \
165         WHERE n.kind = ?1 AND n.superseded_at IS NULL \
166           AND NOT EXISTS (SELECT 1 FROM fts_node_properties fp WHERE fp.node_logical_id = n.logical_id)",
167    )
168}
169
170/// Shared loop: load schemas, query nodes with `node_sql` (parameterized by kind),
171/// extract property FTS text, and insert into `fts_node_properties`.
172/// The caller is responsible for transaction management and for deleting stale rows
173/// before calling this function if a full rebuild is intended.
174pub(crate) fn insert_property_fts_rows(
175    conn: &rusqlite::Connection,
176    node_sql: &str,
177) -> Result<usize, rusqlite::Error> {
178    let schemas = crate::writer::load_fts_property_schemas(conn)?;
179    if schemas.is_empty() {
180        return Ok(0);
181    }
182
183    let mut total = 0usize;
184    let mut ins = conn.prepare(
185        "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) VALUES (?1, ?2, ?3)",
186    )?;
187    for (kind, paths, separator) in &schemas {
188        let mut stmt = conn.prepare(node_sql)?;
189        let rows: Vec<(String, String)> = stmt
190            .query_map([kind.as_str()], |row| {
191                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
192            })?
193            .collect::<Result<Vec<_>, _>>()?;
194        for (logical_id, properties_str) in &rows {
195            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
196            if let Some(text) = crate::writer::compute_property_fts_text(&props, paths, separator) {
197                ins.execute(rusqlite::params![logical_id, kind, text])?;
198                total += 1;
199            }
200        }
201    }
202    Ok(total)
203}
204
205/// Remove stale vec rows: entries whose chunk no longer exists or whose node has been
206/// superseded/retired.  When the `sqlite-vec` feature is disabled or the
207/// `vec_nodes_active` table is absent, degrades gracefully to a no-op and appends a note.
208#[allow(clippy::unnecessary_wraps, unused_variables)]
209fn rebuild_vec(
210    conn: &mut rusqlite::Connection,
211    notes: &mut Vec<String>,
212) -> Result<usize, rusqlite::Error> {
213    #[cfg(feature = "sqlite-vec")]
214    {
215        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
216        let deleted = match tx.execute(
217            r"
218            DELETE FROM vec_nodes_active WHERE chunk_id IN (
219                SELECT v.chunk_id FROM vec_nodes_active v
220                LEFT JOIN chunks c ON c.id = v.chunk_id
221                LEFT JOIN nodes  n ON n.logical_id = c.node_logical_id
222                WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
223            )
224            ",
225            [],
226        ) {
227            Ok(n) => n,
228            Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
229                if msg.contains("vec_nodes_active") || msg.contains("no such module: vec0") =>
230            {
231                notes.push("vec_nodes_active table absent; vec rebuild skipped".to_owned());
232                tx.rollback()?;
233                return Ok(0);
234            }
235            Err(e) => return Err(e),
236        };
237        tx.commit()?;
238        Ok(deleted)
239    }
240    #[cfg(not(feature = "sqlite-vec"))]
241    {
242        notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
243        Ok(0)
244    }
245}
246
247fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
248    match target {
249        ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
250        ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
251        ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
252    }
253}
254
255#[cfg(all(test, feature = "sqlite-vec"))]
256#[allow(clippy::expect_used)]
257mod tests {
258    use std::sync::Arc;
259
260    use fathomdb_schema::SchemaManager;
261    use tempfile::NamedTempFile;
262
263    use crate::sqlite::open_connection_with_vec;
264
265    use super::{ProjectionService, ProjectionTarget};
266
267    #[test]
268    fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
269        let db = NamedTempFile::new().expect("temp db");
270        let schema = Arc::new(SchemaManager::new());
271
272        {
273            let conn = open_connection_with_vec(db.path()).expect("vec conn");
274            schema.bootstrap(&conn).expect("bootstrap");
275            schema
276                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
277                .expect("vec profile");
278
279            // Insert a superseded node + chunk + vec row (stale state).
280            conn.execute_batch(
281                r#"
282                INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
283                VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
284                INSERT INTO chunks (id, node_logical_id, text_content, created_at)
285                VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
286                "#,
287            )
288            .expect("seed stale data");
289
290            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
291                .iter()
292                .flat_map(|f| f.to_le_bytes())
293                .collect();
294            conn.execute(
295                "INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES ('chunk-stale', ?1)",
296                rusqlite::params![bytes],
297            )
298            .expect("insert stale vec row");
299        }
300
301        let service = ProjectionService::new(db.path(), Arc::clone(&schema));
302        let report = service
303            .rebuild_projections(ProjectionTarget::Vec)
304            .expect("rebuild vec");
305
306        assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
307        assert!(report.notes.is_empty(), "no notes expected on success");
308
309        let conn = rusqlite::Connection::open(db.path()).expect("conn");
310        let count: i64 = conn
311            .query_row(
312                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-stale'",
313                [],
314                |row| row.get(0),
315            )
316            .expect("count");
317        assert_eq!(count, 0, "stale vec row must be gone after rebuild");
318    }
319}