1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use rusqlite::TransactionBehavior;
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12 Fts,
13 Vec,
14 All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19 pub targets: Vec<ProjectionTarget>,
20 pub rebuilt_rows: usize,
21 pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26 database_path: PathBuf,
27 schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31 pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32 Self {
33 database_path: path.as_ref().to_path_buf(),
34 schema_manager,
35 }
36 }
37
38 fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39 let conn = sqlite::open_connection(&self.database_path)?;
40 self.schema_manager.bootstrap(&conn)?;
41 Ok(conn)
42 }
43
44 pub fn rebuild_projections(
47 &self,
48 target: ProjectionTarget,
49 ) -> Result<ProjectionRepairReport, EngineError> {
50 trace_info!(target = ?target, "projection rebuild started");
51 #[cfg(feature = "tracing")]
52 let start = std::time::Instant::now();
53 let mut conn = self.connect()?;
54
55 let mut notes = Vec::new();
56 let rebuilt_rows = match target {
57 ProjectionTarget::Fts => rebuild_fts(&mut conn)?,
58 ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
59 ProjectionTarget::All => {
60 let rebuilt_fts = rebuild_fts(&mut conn)?;
61 let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
62 rebuilt_fts + rebuilt_vec
63 }
64 };
65
66 trace_info!(
67 target = ?target,
68 rebuilt_rows,
69 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
70 "projection rebuild completed"
71 );
72 Ok(ProjectionRepairReport {
73 targets: expand_targets(target),
74 rebuilt_rows,
75 notes,
76 })
77 }
78
79 pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
82 let mut conn = self.connect()?;
88
89 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
90 let inserted = tx.execute(
91 r"
92 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
93 SELECT c.id, n.logical_id, n.kind, c.text_content
94 FROM chunks c
95 JOIN nodes n
96 ON n.logical_id = c.node_logical_id
97 AND n.superseded_at IS NULL
98 WHERE NOT EXISTS (
99 SELECT 1
100 FROM fts_nodes f
101 WHERE f.chunk_id = c.id
102 )
103 ",
104 [],
105 )?;
106 tx.commit()?;
107
108 Ok(ProjectionRepairReport {
109 targets: vec![ProjectionTarget::Fts],
110 rebuilt_rows: inserted,
111 notes: vec![],
112 })
113 }
114}
115
116fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
121 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
122 tx.execute("DELETE FROM fts_nodes", [])?;
123 let inserted = tx.execute(
124 r"
125 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
126 SELECT c.id, n.logical_id, n.kind, c.text_content
127 FROM chunks c
128 JOIN nodes n
129 ON n.logical_id = c.node_logical_id
130 AND n.superseded_at IS NULL
131 ",
132 [],
133 )?;
134 tx.commit()?;
135 Ok(inserted)
136}
137
138#[allow(clippy::unnecessary_wraps, unused_variables)]
142fn rebuild_vec(
143 conn: &mut rusqlite::Connection,
144 notes: &mut Vec<String>,
145) -> Result<usize, rusqlite::Error> {
146 #[cfg(feature = "sqlite-vec")]
147 {
148 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
149 let deleted = match tx.execute(
150 r"
151 DELETE FROM vec_nodes_active WHERE chunk_id IN (
152 SELECT v.chunk_id FROM vec_nodes_active v
153 LEFT JOIN chunks c ON c.id = v.chunk_id
154 LEFT JOIN nodes n ON n.logical_id = c.node_logical_id
155 WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
156 )
157 ",
158 [],
159 ) {
160 Ok(n) => n,
161 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
162 if msg.contains("vec_nodes_active") || msg.contains("no such module: vec0") =>
163 {
164 notes.push("vec_nodes_active table absent; vec rebuild skipped".to_owned());
165 tx.rollback()?;
166 return Ok(0);
167 }
168 Err(e) => return Err(e),
169 };
170 tx.commit()?;
171 Ok(deleted)
172 }
173 #[cfg(not(feature = "sqlite-vec"))]
174 {
175 notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
176 Ok(0)
177 }
178}
179
180fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
181 match target {
182 ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
183 ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
184 ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
185 }
186}
187
188#[cfg(all(test, feature = "sqlite-vec"))]
189#[allow(clippy::expect_used)]
190mod tests {
191 use std::sync::Arc;
192
193 use fathomdb_schema::SchemaManager;
194 use tempfile::NamedTempFile;
195
196 use crate::sqlite::open_connection_with_vec;
197
198 use super::{ProjectionService, ProjectionTarget};
199
200 #[test]
201 fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
202 let db = NamedTempFile::new().expect("temp db");
203 let schema = Arc::new(SchemaManager::new());
204
205 {
206 let conn = open_connection_with_vec(db.path()).expect("vec conn");
207 schema.bootstrap(&conn).expect("bootstrap");
208 schema
209 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
210 .expect("vec profile");
211
212 conn.execute_batch(
214 r#"
215 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
216 VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
217 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
218 VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
219 "#,
220 )
221 .expect("seed stale data");
222
223 let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
224 .iter()
225 .flat_map(|f| f.to_le_bytes())
226 .collect();
227 conn.execute(
228 "INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES ('chunk-stale', ?1)",
229 rusqlite::params![bytes],
230 )
231 .expect("insert stale vec row");
232 }
233
234 let service = ProjectionService::new(db.path(), Arc::clone(&schema));
235 let report = service
236 .rebuild_projections(ProjectionTarget::Vec)
237 .expect("rebuild vec");
238
239 assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
240 assert!(report.notes.is_empty(), "no notes expected on success");
241
242 let conn = rusqlite::Connection::open(db.path()).expect("conn");
243 let count: i64 = conn
244 .query_row(
245 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-stale'",
246 [],
247 |row| row.get(0),
248 )
249 .expect("count");
250 assert_eq!(count, 0, "stale vec row must be gone after rebuild");
251 }
252}