1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use rusqlite::TransactionBehavior;
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12 Fts,
13 Vec,
14 All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19 pub targets: Vec<ProjectionTarget>,
20 pub rebuilt_rows: usize,
21 pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26 database_path: PathBuf,
27 schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31 pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32 Self {
33 database_path: path.as_ref().to_path_buf(),
34 schema_manager,
35 }
36 }
37
38 fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39 let conn = sqlite::open_connection(&self.database_path)?;
40 self.schema_manager.bootstrap(&conn)?;
41 Ok(conn)
42 }
43
44 pub fn rebuild_projections(
47 &self,
48 target: ProjectionTarget,
49 ) -> Result<ProjectionRepairReport, EngineError> {
50 trace_info!(target = ?target, "projection rebuild started");
51 #[cfg(feature = "tracing")]
52 let start = std::time::Instant::now();
53 let mut conn = self.connect()?;
54
55 let mut notes = Vec::new();
56 let rebuilt_rows = match target {
57 ProjectionTarget::Fts => {
58 let fts = rebuild_fts(&mut conn)?;
59 let prop_fts = rebuild_property_fts(&mut conn)?;
60 fts + prop_fts
61 }
62 ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
63 ProjectionTarget::All => {
64 let rebuilt_fts = rebuild_fts(&mut conn)?;
65 let rebuilt_prop_fts = rebuild_property_fts(&mut conn)?;
66 let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
67 rebuilt_fts + rebuilt_prop_fts + rebuilt_vec
68 }
69 };
70
71 trace_info!(
72 target = ?target,
73 rebuilt_rows,
74 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
75 "projection rebuild completed"
76 );
77 Ok(ProjectionRepairReport {
78 targets: expand_targets(target),
79 rebuilt_rows,
80 notes,
81 })
82 }
83
84 pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
87 let mut conn = self.connect()?;
93
94 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
95 let inserted_chunk_fts = tx.execute(
96 r"
97 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
98 SELECT c.id, n.logical_id, n.kind, c.text_content
99 FROM chunks c
100 JOIN nodes n
101 ON n.logical_id = c.node_logical_id
102 AND n.superseded_at IS NULL
103 WHERE NOT EXISTS (
104 SELECT 1
105 FROM fts_nodes f
106 WHERE f.chunk_id = c.id
107 )
108 ",
109 [],
110 )?;
111 let inserted_prop_fts = rebuild_missing_property_fts_in_tx(&tx)?;
112 tx.commit()?;
113
114 Ok(ProjectionRepairReport {
115 targets: vec![ProjectionTarget::Fts],
116 rebuilt_rows: inserted_chunk_fts + inserted_prop_fts,
117 notes: vec![],
118 })
119 }
120}
121
122fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
127 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
128 tx.execute("DELETE FROM fts_nodes", [])?;
129 let inserted = tx.execute(
130 r"
131 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
132 SELECT c.id, n.logical_id, n.kind, c.text_content
133 FROM chunks c
134 JOIN nodes n
135 ON n.logical_id = c.node_logical_id
136 AND n.superseded_at IS NULL
137 ",
138 [],
139 )?;
140 tx.commit()?;
141 Ok(inserted)
142}
143
144fn rebuild_property_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
146 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
147 tx.execute("DELETE FROM fts_node_properties", [])?;
148
149 let total = insert_property_fts_rows(
150 &tx,
151 "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
152 )?;
153
154 tx.commit()?;
155 Ok(total)
156}
157
158fn rebuild_missing_property_fts_in_tx(
160 conn: &rusqlite::Connection,
161) -> Result<usize, rusqlite::Error> {
162 insert_property_fts_rows(
163 conn,
164 "SELECT n.logical_id, n.properties FROM nodes n \
165 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
166 AND NOT EXISTS (SELECT 1 FROM fts_node_properties fp WHERE fp.node_logical_id = n.logical_id)",
167 )
168}
169
170pub(crate) fn insert_property_fts_rows(
175 conn: &rusqlite::Connection,
176 node_sql: &str,
177) -> Result<usize, rusqlite::Error> {
178 let schemas = crate::writer::load_fts_property_schemas(conn)?;
179 if schemas.is_empty() {
180 return Ok(0);
181 }
182
183 let mut total = 0usize;
184 let mut ins = conn.prepare(
185 "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) VALUES (?1, ?2, ?3)",
186 )?;
187 for (kind, paths, separator) in &schemas {
188 let mut stmt = conn.prepare(node_sql)?;
189 let rows: Vec<(String, String)> = stmt
190 .query_map([kind.as_str()], |row| {
191 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
192 })?
193 .collect::<Result<Vec<_>, _>>()?;
194 for (logical_id, properties_str) in &rows {
195 let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
196 if let Some(text) = crate::writer::compute_property_fts_text(&props, paths, separator) {
197 ins.execute(rusqlite::params![logical_id, kind, text])?;
198 total += 1;
199 }
200 }
201 }
202 Ok(total)
203}
204
205#[allow(clippy::unnecessary_wraps, unused_variables)]
209fn rebuild_vec(
210 conn: &mut rusqlite::Connection,
211 notes: &mut Vec<String>,
212) -> Result<usize, rusqlite::Error> {
213 #[cfg(feature = "sqlite-vec")]
214 {
215 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
216 let deleted = match tx.execute(
217 r"
218 DELETE FROM vec_nodes_active WHERE chunk_id IN (
219 SELECT v.chunk_id FROM vec_nodes_active v
220 LEFT JOIN chunks c ON c.id = v.chunk_id
221 LEFT JOIN nodes n ON n.logical_id = c.node_logical_id
222 WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
223 )
224 ",
225 [],
226 ) {
227 Ok(n) => n,
228 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
229 if msg.contains("vec_nodes_active") || msg.contains("no such module: vec0") =>
230 {
231 notes.push("vec_nodes_active table absent; vec rebuild skipped".to_owned());
232 tx.rollback()?;
233 return Ok(0);
234 }
235 Err(e) => return Err(e),
236 };
237 tx.commit()?;
238 Ok(deleted)
239 }
240 #[cfg(not(feature = "sqlite-vec"))]
241 {
242 notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
243 Ok(0)
244 }
245}
246
247fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
248 match target {
249 ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
250 ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
251 ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
252 }
253}
254
255#[cfg(all(test, feature = "sqlite-vec"))]
256#[allow(clippy::expect_used)]
257mod tests {
258 use std::sync::Arc;
259
260 use fathomdb_schema::SchemaManager;
261 use tempfile::NamedTempFile;
262
263 use crate::sqlite::open_connection_with_vec;
264
265 use super::{ProjectionService, ProjectionTarget};
266
267 #[test]
268 fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
269 let db = NamedTempFile::new().expect("temp db");
270 let schema = Arc::new(SchemaManager::new());
271
272 {
273 let conn = open_connection_with_vec(db.path()).expect("vec conn");
274 schema.bootstrap(&conn).expect("bootstrap");
275 schema
276 .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
277 .expect("vec profile");
278
279 conn.execute_batch(
281 r#"
282 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
283 VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
284 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
285 VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
286 "#,
287 )
288 .expect("seed stale data");
289
290 let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
291 .iter()
292 .flat_map(|f| f.to_le_bytes())
293 .collect();
294 conn.execute(
295 "INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES ('chunk-stale', ?1)",
296 rusqlite::params![bytes],
297 )
298 .expect("insert stale vec row");
299 }
300
301 let service = ProjectionService::new(db.path(), Arc::clone(&schema));
302 let report = service
303 .rebuild_projections(ProjectionTarget::Vec)
304 .expect("rebuild vec");
305
306 assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
307 assert!(report.notes.is_empty(), "no notes expected on success");
308
309 let conn = rusqlite::Connection::open(db.path()).expect("conn");
310 let count: i64 = conn
311 .query_row(
312 "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-stale'",
313 [],
314 |row| row.get(0),
315 )
316 .expect("count");
317 assert_eq!(count, 0, "stale vec row must be gone after rebuild");
318 }
319}