Skip to main content

fathomdb_engine/
projection.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::{DEFAULT_FTS_TOKENIZER, SchemaManager};
5use rusqlite::TransactionBehavior;
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12    Fts,
13    Vec,
14    All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19    pub targets: Vec<ProjectionTarget>,
20    pub rebuilt_rows: usize,
21    pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26    database_path: PathBuf,
27    schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32        Self {
33            database_path: path.as_ref().to_path_buf(),
34            schema_manager,
35        }
36    }
37
38    fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39        let conn = sqlite::open_connection(&self.database_path)?;
40        self.schema_manager.bootstrap(&conn)?;
41        Ok(conn)
42    }
43
44    /// # Errors
45    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
46    pub fn rebuild_projections(
47        &self,
48        target: ProjectionTarget,
49    ) -> Result<ProjectionRepairReport, EngineError> {
50        trace_info!(target = ?target, "projection rebuild started");
51        #[cfg(feature = "tracing")]
52        let start = std::time::Instant::now();
53        let mut conn = self.connect()?;
54
55        let mut notes = Vec::new();
56        let rebuilt_rows = match target {
57            ProjectionTarget::Fts => {
58                let fts = rebuild_fts(&mut conn)?;
59                let prop_fts = rebuild_property_fts(&mut conn)?;
60                fts + prop_fts
61            }
62            ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
63            ProjectionTarget::All => {
64                let rebuilt_fts = rebuild_fts(&mut conn)?;
65                let rebuilt_prop_fts = rebuild_property_fts(&mut conn)?;
66                let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
67                rebuilt_fts + rebuilt_prop_fts + rebuilt_vec
68            }
69        };
70
71        trace_info!(
72            target = ?target,
73            rebuilt_rows,
74            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
75            "projection rebuild completed"
76        );
77        Ok(ProjectionRepairReport {
78            targets: expand_targets(target),
79            rebuilt_rows,
80            notes,
81        })
82    }
83
84    /// # Errors
85    /// Returns [`EngineError`] if the database connection fails or the INSERT query fails.
86    pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
87        // FIX(review): was bare execute without explicit transaction.
88        // Options: (A) IMMEDIATE tx matching rebuild_fts(), (B) DEFERRED tx, (C) leave as-is
89        // (autocommit wraps single statements atomically). Chose (A): explicit transaction
90        // communicates intent, matches sibling rebuild_fts(), and protects against future
91        // refactoring that might add additional statements.
92        let mut conn = self.connect()?;
93
94        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
95        let inserted_chunk_fts = tx.execute(
96            r"
97            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
98            SELECT c.id, n.logical_id, n.kind, c.text_content
99            FROM chunks c
100            JOIN nodes n
101              ON n.logical_id = c.node_logical_id
102             AND n.superseded_at IS NULL
103            WHERE NOT EXISTS (
104                SELECT 1
105                FROM fts_nodes f
106                WHERE f.chunk_id = c.id
107            )
108            ",
109            [],
110        )?;
111        let inserted_prop_fts = rebuild_missing_property_fts_in_tx(&tx)?;
112        tx.commit()?;
113
114        Ok(ProjectionRepairReport {
115            targets: vec![ProjectionTarget::Fts],
116            rebuilt_rows: inserted_chunk_fts + inserted_prop_fts,
117            notes: vec![],
118        })
119    }
120}
121
122/// Atomically rebuild the FTS index: delete all existing rows and repopulate
123/// from the canonical `chunks`/`nodes` join.  The DELETE and INSERT are
124/// wrapped in a single `IMMEDIATE` transaction so a mid-rebuild failure
125/// cannot leave the index empty.
126fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
127    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
128    tx.execute("DELETE FROM fts_nodes", [])?;
129    let inserted = tx.execute(
130        r"
131        INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
132        SELECT c.id, n.logical_id, n.kind, c.text_content
133        FROM chunks c
134        JOIN nodes n
135          ON n.logical_id = c.node_logical_id
136         AND n.superseded_at IS NULL
137        ",
138        [],
139    )?;
140    tx.commit()?;
141    Ok(inserted)
142}
143
144/// Atomically rebuild the property FTS index from registered schemas and active nodes.
145fn rebuild_property_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
146    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
147
148    // Delete from ALL per-kind FTS virtual tables (including orphaned ones without schemas).
149    // Filter by sql LIKE 'CREATE VIRTUAL TABLE%' to exclude FTS5 shadow tables.
150    let all_per_kind_tables: Vec<String> = {
151        let mut stmt = tx.prepare(
152            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'fts_props_%' \
153             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
154        )?;
155        stmt.query_map([], |r| r.get::<_, String>(0))?
156            .collect::<Result<Vec<_>, _>>()?
157    };
158    for table in &all_per_kind_tables {
159        tx.execute_batch(&format!("DELETE FROM {table}"))?;
160    }
161    tx.execute("DELETE FROM fts_node_property_positions", [])?;
162
163    let total = insert_property_fts_rows(
164        &tx,
165        "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
166    )?;
167
168    tx.commit()?;
169    Ok(total)
170}
171
172/// Insert missing property FTS rows within an existing transaction.
173///
174/// Two repair passes run inside the caller's transaction:
175///
176/// 1. Nodes of a registered kind that have no row in the per-kind FTS tables are
177///    re-extracted from canonical state and inserted (blob + positions).
178/// 2. Nodes of a recursive-mode kind that *do* have a row in the per-kind FTS tables
179///    but no `fts_node_property_positions` rows have their positions
180///    regenerated in place. This repairs orphaned position map rows caused
181///    by partial drift without requiring a full `rebuild_projections(Fts)`.
182///    (P4-P2-2)
183fn rebuild_missing_property_fts_in_tx(
184    conn: &rusqlite::Connection,
185) -> Result<usize, rusqlite::Error> {
186    // The per-kind table is parameterized: the SQL is built per-kind in
187    // insert_property_fts_rows_missing (below), which passes the table name inline.
188    let inserted = insert_property_fts_rows_missing(conn)?;
189    let repaired = repair_orphaned_position_map_in_tx(conn)?;
190    Ok(inserted + repaired)
191}
192
193/// Repair recursive-mode nodes whose per-kind FTS row exists but
194/// whose position-map rows have been dropped. For each such node the
195/// property FTS is re-extracted from canonical state and the position rows
196/// are re-inserted. The blob row is left untouched — callers that deleted
197/// positions without touching the blob keep the original blob rowid, which
198/// matters because `projection_row_id` in search hits is the blob rowid.
199fn repair_orphaned_position_map_in_tx(
200    conn: &rusqlite::Connection,
201) -> Result<usize, rusqlite::Error> {
202    let schemas = crate::writer::load_fts_property_schemas(conn)?;
203    if schemas.is_empty() {
204        return Ok(0);
205    }
206    let mut total = 0usize;
207    let mut ins_positions = conn.prepare(
208        "INSERT INTO fts_node_property_positions \
209         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
210         VALUES (?1, ?2, ?3, ?4, ?5)",
211    )?;
212    for (kind, schema) in &schemas {
213        let has_recursive = schema
214            .paths
215            .iter()
216            .any(|p| p.mode == crate::writer::PropertyPathMode::Recursive);
217        if !has_recursive {
218            continue;
219        }
220        let table = fathomdb_schema::fts_kind_table_name(kind);
221        // Nodes that have an FTS row in the per-kind table but no position-map rows.
222        let mut stmt = conn.prepare(&format!(
223            "SELECT n.logical_id, n.properties FROM nodes n \
224             WHERE n.kind = ?1 AND n.superseded_at IS NULL \
225               AND EXISTS (SELECT 1 FROM {table} fp \
226                           WHERE fp.node_logical_id = n.logical_id) \
227               AND NOT EXISTS (SELECT 1 FROM fts_node_property_positions p \
228                               WHERE p.node_logical_id = n.logical_id AND p.kind = ?1)"
229        ))?;
230        let rows: Vec<(String, String)> = stmt
231            .query_map([kind.as_str()], |row| {
232                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
233            })?
234            .collect::<Result<Vec<_>, _>>()?;
235        for (logical_id, properties_str) in &rows {
236            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
237            let (_text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
238            for pos in &positions {
239                ins_positions.execute(rusqlite::params![
240                    logical_id,
241                    kind,
242                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
243                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
244                    pos.leaf_path,
245                ])?;
246            }
247            if !positions.is_empty() {
248                total += 1;
249            }
250        }
251    }
252    Ok(total)
253}
254
255/// Rebuild property FTS rows for exactly one kind from its just-registered
256/// schema. Unlike [`insert_property_fts_rows`], this helper does NOT iterate
257/// over every registered schema — so callers that delete rows for a single
258/// kind won't duplicate rows for sibling kinds on the subsequent insert.
259///
260/// The caller is responsible for transaction management and for deleting
261/// stale rows for `kind` before calling this function.
262pub(crate) fn insert_property_fts_rows_for_kind(
263    conn: &rusqlite::Connection,
264    kind: &str,
265) -> Result<usize, rusqlite::Error> {
266    let schemas = crate::writer::load_fts_property_schemas(conn)?;
267    let Some(schema) = schemas
268        .iter()
269        .find(|(k, _)| k == kind)
270        .map(|(_, s)| s.clone())
271    else {
272        return Ok(0);
273    };
274
275    let table = fathomdb_schema::fts_kind_table_name(kind);
276    // Ensure the per-kind table exists.
277    conn.execute_batch(&format!(
278        "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
279            node_logical_id UNINDEXED, text_content, \
280            tokenize = '{DEFAULT_FTS_TOKENIZER}'\
281        )"
282    ))?;
283    let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
284    let mut ins_positions = conn.prepare(
285        "INSERT INTO fts_node_property_positions \
286         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
287         VALUES (?1, ?2, ?3, ?4, ?5)",
288    )?;
289
290    let mut stmt = conn.prepare(
291        "SELECT logical_id, properties FROM nodes \
292         WHERE kind = ?1 AND superseded_at IS NULL",
293    )?;
294    let rows: Vec<(String, String)> = stmt
295        .query_map([kind], |row| {
296            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
297        })?
298        .collect::<Result<Vec<_>, _>>()?;
299
300    let mut total = 0usize;
301    for (logical_id, properties_str) in &rows {
302        let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
303        let (text, positions, _stats) = crate::writer::extract_property_fts(&props, &schema);
304        if let Some(text) = text {
305            if has_weights {
306                let cols = crate::writer::extract_property_fts_columns(&props, &schema);
307                let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
308                let placeholders: Vec<String> =
309                    (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
310                let sql = format!(
311                    "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
312                    c = col_names.join(", "),
313                    p = placeholders.join(", "),
314                );
315                conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
316                    std::iter::once(logical_id.as_str())
317                        .chain(cols.iter().map(|(_, v)| v.as_str())),
318                ))?;
319            } else {
320                conn.prepare(&format!(
321                    "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
322                ))?
323                .execute(rusqlite::params![logical_id, text])?;
324            }
325            for pos in &positions {
326                ins_positions.execute(rusqlite::params![
327                    logical_id,
328                    kind,
329                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
330                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
331                    pos.leaf_path,
332                ])?;
333            }
334            total += 1;
335        }
336    }
337    Ok(total)
338}
339
340/// Shared loop: load schemas, query nodes with `node_sql` (parameterized by kind),
341/// extract property FTS text, and insert into the per-kind FTS table.
342/// The caller is responsible for transaction management and for deleting stale rows
343/// before calling this function if a full rebuild is intended.
344pub(crate) fn insert_property_fts_rows(
345    conn: &rusqlite::Connection,
346    node_sql: &str,
347) -> Result<usize, rusqlite::Error> {
348    let schemas = crate::writer::load_fts_property_schemas(conn)?;
349    if schemas.is_empty() {
350        return Ok(0);
351    }
352
353    let mut total = 0usize;
354    let mut ins_positions = conn.prepare(
355        "INSERT INTO fts_node_property_positions \
356         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
357         VALUES (?1, ?2, ?3, ?4, ?5)",
358    )?;
359    for (kind, schema) in &schemas {
360        let table = fathomdb_schema::fts_kind_table_name(kind);
361        // Ensure per-kind table exists.
362        conn.execute_batch(&format!(
363            "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
364                node_logical_id UNINDEXED, text_content, \
365                tokenize = '{DEFAULT_FTS_TOKENIZER}'\
366            )"
367        ))?;
368        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
369        let mut stmt = conn.prepare(node_sql)?;
370        let rows: Vec<(String, String)> = stmt
371            .query_map([kind.as_str()], |row| {
372                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
373            })?
374            .collect::<Result<Vec<_>, _>>()?;
375        for (logical_id, properties_str) in &rows {
376            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
377            let (text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
378            if let Some(text) = text {
379                if has_weights {
380                    let cols = crate::writer::extract_property_fts_columns(&props, schema);
381                    let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
382                    let placeholders: Vec<String> =
383                        (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
384                    let sql = format!(
385                        "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
386                        c = col_names.join(", "),
387                        p = placeholders.join(", "),
388                    );
389                    conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
390                        std::iter::once(logical_id.as_str())
391                            .chain(cols.iter().map(|(_, v)| v.as_str())),
392                    ))?;
393                } else {
394                    conn.prepare(&format!(
395                        "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
396                    ))?
397                    .execute(rusqlite::params![logical_id, text])?;
398                }
399                for pos in &positions {
400                    ins_positions.execute(rusqlite::params![
401                        logical_id,
402                        kind,
403                        i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
404                        i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
405                        pos.leaf_path,
406                    ])?;
407                }
408                total += 1;
409            }
410        }
411    }
412    Ok(total)
413}
414
415/// Insert missing property FTS rows: for each registered kind, find nodes that
416/// have no row in the per-kind FTS table and insert them.
417/// The caller is responsible for transaction management.
418fn insert_property_fts_rows_missing(conn: &rusqlite::Connection) -> Result<usize, rusqlite::Error> {
419    let schemas = crate::writer::load_fts_property_schemas(conn)?;
420    if schemas.is_empty() {
421        return Ok(0);
422    }
423
424    let mut total = 0usize;
425    let mut ins_positions = conn.prepare(
426        "INSERT INTO fts_node_property_positions \
427         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
428         VALUES (?1, ?2, ?3, ?4, ?5)",
429    )?;
430    for (kind, schema) in &schemas {
431        let table = fathomdb_schema::fts_kind_table_name(kind);
432        // Ensure per-kind table exists.
433        conn.execute_batch(&format!(
434            "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
435                node_logical_id UNINDEXED, text_content, \
436                tokenize = '{DEFAULT_FTS_TOKENIZER}'\
437            )"
438        ))?;
439        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
440        // Find nodes of this kind with no row in the per-kind table.
441        let mut stmt = conn.prepare(&format!(
442            "SELECT n.logical_id, n.properties FROM nodes n \
443             WHERE n.kind = ?1 AND n.superseded_at IS NULL \
444               AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
445        ))?;
446        let rows: Vec<(String, String)> = stmt
447            .query_map([kind.as_str()], |row| {
448                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
449            })?
450            .collect::<Result<Vec<_>, _>>()?;
451        for (logical_id, properties_str) in &rows {
452            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
453            let (text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
454            if let Some(text) = text {
455                if has_weights {
456                    let cols = crate::writer::extract_property_fts_columns(&props, schema);
457                    let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
458                    let placeholders: Vec<String> =
459                        (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
460                    let sql = format!(
461                        "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
462                        c = col_names.join(", "),
463                        p = placeholders.join(", "),
464                    );
465                    conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
466                        std::iter::once(logical_id.as_str())
467                            .chain(cols.iter().map(|(_, v)| v.as_str())),
468                    ))?;
469                } else {
470                    conn.prepare(&format!(
471                        "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
472                    ))?
473                    .execute(rusqlite::params![logical_id, text])?;
474                }
475                for pos in &positions {
476                    ins_positions.execute(rusqlite::params![
477                        logical_id,
478                        kind,
479                        i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
480                        i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
481                        pos.leaf_path,
482                    ])?;
483                }
484                total += 1;
485            }
486        }
487    }
488    Ok(total)
489}
490
491/// Remove stale vec rows: entries whose chunk no longer exists or whose node has been
492/// superseded/retired.  Iterates all per-kind vec tables registered in
493/// `projection_profiles`.  Degrades gracefully when the feature is disabled or tables
494/// are absent.
495#[allow(clippy::unnecessary_wraps, unused_variables)]
496fn rebuild_vec(
497    conn: &mut rusqlite::Connection,
498    notes: &mut Vec<String>,
499) -> Result<usize, rusqlite::Error> {
500    #[cfg(feature = "sqlite-vec")]
501    {
502        let kinds: Vec<String> = {
503            let mut stmt =
504                match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
505                    Ok(s) => s,
506                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
507                        if msg.contains("no such table: projection_profiles") =>
508                    {
509                        notes.push("projection_profiles absent; vec rebuild skipped".to_owned());
510                        return Ok(0);
511                    }
512                    Err(e) => return Err(e),
513                };
514            stmt.query_map([], |row| row.get(0))?
515                .collect::<Result<Vec<_>, _>>()?
516        };
517
518        if kinds.is_empty() {
519            notes.push("no vec profiles registered; vec rebuild skipped".to_owned());
520            return Ok(0);
521        }
522
523        let mut total = 0;
524        for kind in &kinds {
525            let table = fathomdb_schema::vec_kind_table_name(kind);
526            let sql = format!(
527                "DELETE FROM {table} WHERE chunk_id IN (
528                    SELECT v.chunk_id FROM {table} v
529                    LEFT JOIN chunks c ON c.id = v.chunk_id
530                    LEFT JOIN nodes  n ON n.logical_id = c.node_logical_id
531                    WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
532                )"
533            );
534            let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
535            let deleted = match tx.execute(&sql, []) {
536                Ok(n) => n,
537                Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
538                    if msg.contains("no such table:") || msg.contains("no such module: vec0") =>
539                {
540                    notes.push(format!(
541                        "{table} absent; vec rebuild for kind '{kind}' skipped"
542                    ));
543                    tx.rollback()?;
544                    continue;
545                }
546                Err(e) => return Err(e),
547            };
548            tx.commit()?;
549            total += deleted;
550        }
551        Ok(total)
552    }
553    #[cfg(not(feature = "sqlite-vec"))]
554    {
555        notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
556        Ok(0)
557    }
558}
559
560fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
561    match target {
562        ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
563        ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
564        ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
565    }
566}
567
568#[cfg(all(test, feature = "sqlite-vec"))]
569#[allow(clippy::expect_used)]
570mod tests {
571    use std::sync::Arc;
572
573    use fathomdb_schema::SchemaManager;
574    use tempfile::NamedTempFile;
575
576    use crate::sqlite::open_connection_with_vec;
577
578    use super::{ProjectionService, ProjectionTarget};
579
580    #[test]
581    fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
582        let db = NamedTempFile::new().expect("temp db");
583        let schema = Arc::new(SchemaManager::new());
584
585        {
586            let conn = open_connection_with_vec(db.path()).expect("vec conn");
587            schema.bootstrap(&conn).expect("bootstrap");
588            schema
589                .ensure_vec_kind_profile(&conn, "Doc", 3)
590                .expect("vec kind profile");
591
592            // Insert a superseded node + chunk + vec row (stale state).
593            conn.execute_batch(
594                r"
595                INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
596                VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
597                INSERT INTO chunks (id, node_logical_id, text_content, created_at)
598                VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
599                ",
600            )
601            .expect("seed stale data");
602
603            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
604                .iter()
605                .flat_map(|f| f.to_le_bytes())
606                .collect();
607            conn.execute(
608                "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('chunk-stale', ?1)",
609                rusqlite::params![bytes],
610            )
611            .expect("insert stale vec row");
612        }
613
614        let service = ProjectionService::new(db.path(), Arc::clone(&schema));
615        let report = service
616            .rebuild_projections(ProjectionTarget::Vec)
617            .expect("rebuild vec");
618
619        assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
620        assert!(report.notes.is_empty(), "no notes expected on success");
621
622        let conn = rusqlite::Connection::open(db.path()).expect("conn");
623        let count: i64 = conn
624            .query_row(
625                "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-stale'",
626                [],
627                |row| row.get(0),
628            )
629            .expect("count");
630        assert_eq!(count, 0, "stale vec row must be gone after rebuild");
631    }
632}
633
634// --- B-3: projection per-column INSERT for weighted schemas ---
635
636#[cfg(test)]
637#[allow(clippy::expect_used)]
638mod weighted_schema_tests {
639    use fathomdb_schema::SchemaManager;
640    use rusqlite::Connection;
641
642    use super::insert_property_fts_rows_for_kind;
643
644    fn bootstrapped_conn() -> Connection {
645        let conn = Connection::open_in_memory().expect("in-memory sqlite");
646        let manager = SchemaManager::new();
647        manager.bootstrap(&conn).expect("bootstrap");
648        conn
649    }
650
651    #[test]
652    fn projection_inserts_per_column_for_weighted_schema() {
653        let conn = bootstrapped_conn();
654        let kind = "Article";
655        let table = fathomdb_schema::fts_kind_table_name(kind);
656        let title_col = fathomdb_schema::fts_column_name("$.title", false);
657        let body_col = fathomdb_schema::fts_column_name("$.body", false);
658
659        // Insert a node with two extractable properties.
660        conn.execute(
661            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
662             VALUES ('row-1', 'article-1', ?1, '{\"title\":\"Hello\",\"body\":\"World\"}', 100, 'seed')",
663            rusqlite::params![kind],
664        )
665        .expect("insert node");
666
667        // Register schema with weights.
668        let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
669        conn.execute(
670            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
671             VALUES (?1, ?2, ' ')",
672            rusqlite::params![kind, paths_json],
673        )
674        .expect("insert schema");
675
676        // Create the weighted per-kind FTS table.
677        conn.execute_batch(&format!(
678            "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
679                node_logical_id UNINDEXED, {title_col}, {body_col}, \
680                tokenize = 'porter unicode61 remove_diacritics 2'\
681            )"
682        ))
683        .expect("create weighted per-kind table");
684
685        // Run the projection insert.
686        insert_property_fts_rows_for_kind(&conn, kind).expect("insert_property_fts_rows_for_kind");
687
688        // Verify one row was inserted.
689        let count: i64 = conn
690            .query_row(
691                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'article-1'"),
692                [],
693                |r| r.get(0),
694            )
695            .expect("count");
696        assert_eq!(count, 1, "per-kind table must have the inserted row");
697
698        // Verify per-column values.
699        let (title_val, body_val): (String, String) = conn
700            .query_row(
701                &format!(
702                    "SELECT {title_col}, {body_col} FROM {table} \
703                     WHERE node_logical_id = 'article-1'"
704                ),
705                [],
706                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
707            )
708            .expect("select per-column");
709        assert_eq!(title_val, "Hello", "title column must have correct value");
710        assert_eq!(body_val, "World", "body column must have correct value");
711    }
712}