Skip to main content

fathomdb_engine/
projection.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use rusqlite::{OptionalExtension, TransactionBehavior};
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12    Fts,
13    Vec,
14    All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19    pub targets: Vec<ProjectionTarget>,
20    pub rebuilt_rows: usize,
21    pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26    database_path: PathBuf,
27    schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32        Self {
33            database_path: path.as_ref().to_path_buf(),
34            schema_manager,
35        }
36    }
37
38    fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39        let conn = sqlite::open_connection(&self.database_path)?;
40        self.schema_manager.bootstrap(&conn)?;
41        Ok(conn)
42    }
43
44    /// # Errors
45    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
46    pub fn rebuild_projections(
47        &self,
48        target: ProjectionTarget,
49    ) -> Result<ProjectionRepairReport, EngineError> {
50        trace_info!(target = ?target, "projection rebuild started");
51        #[cfg(feature = "tracing")]
52        let start = std::time::Instant::now();
53        let mut conn = self.connect()?;
54
55        let mut notes = Vec::new();
56        let rebuilt_rows = match target {
57            ProjectionTarget::Fts => {
58                let fts = rebuild_fts(&mut conn)?;
59                let prop_fts = rebuild_property_fts(&mut conn)?;
60                fts + prop_fts
61            }
62            ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
63            ProjectionTarget::All => {
64                let rebuilt_fts = rebuild_fts(&mut conn)?;
65                let rebuilt_prop_fts = rebuild_property_fts(&mut conn)?;
66                let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
67                rebuilt_fts + rebuilt_prop_fts + rebuilt_vec
68            }
69        };
70
71        trace_info!(
72            target = ?target,
73            rebuilt_rows,
74            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
75            "projection rebuild completed"
76        );
77        Ok(ProjectionRepairReport {
78            targets: expand_targets(target),
79            rebuilt_rows,
80            notes,
81        })
82    }
83
84    /// # Errors
85    /// Returns [`EngineError`] if the database connection fails or the INSERT query fails.
86    pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
87        // FIX(review): was bare execute without explicit transaction.
88        // Options: (A) IMMEDIATE tx matching rebuild_fts(), (B) DEFERRED tx, (C) leave as-is
89        // (autocommit wraps single statements atomically). Chose (A): explicit transaction
90        // communicates intent, matches sibling rebuild_fts(), and protects against future
91        // refactoring that might add additional statements.
92        let mut conn = self.connect()?;
93
94        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
95        let inserted_chunk_fts = tx.execute(
96            r"
97            INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
98            SELECT c.id, n.logical_id, n.kind, c.text_content
99            FROM chunks c
100            JOIN nodes n
101              ON n.logical_id = c.node_logical_id
102             AND n.superseded_at IS NULL
103            WHERE NOT EXISTS (
104                SELECT 1
105                FROM fts_nodes f
106                WHERE f.chunk_id = c.id
107            )
108            ",
109            [],
110        )?;
111        let inserted_prop_fts = rebuild_missing_property_fts_in_tx(&tx)?;
112        tx.commit()?;
113
114        Ok(ProjectionRepairReport {
115            targets: vec![ProjectionTarget::Fts],
116            rebuilt_rows: inserted_chunk_fts + inserted_prop_fts,
117            notes: vec![],
118        })
119    }
120}
121
122/// Atomically rebuild the FTS index: delete all existing rows and repopulate
123/// from the canonical `chunks`/`nodes` join.  The DELETE and INSERT are
124/// wrapped in a single `IMMEDIATE` transaction so a mid-rebuild failure
125/// cannot leave the index empty.
126fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
127    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
128    tx.execute("DELETE FROM fts_nodes", [])?;
129    let inserted = tx.execute(
130        r"
131        INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
132        SELECT c.id, n.logical_id, n.kind, c.text_content
133        FROM chunks c
134        JOIN nodes n
135          ON n.logical_id = c.node_logical_id
136         AND n.superseded_at IS NULL
137        ",
138        [],
139    )?;
140    tx.commit()?;
141    Ok(inserted)
142}
143
144/// Atomically rebuild the property FTS index from registered schemas and active nodes.
145fn rebuild_property_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
146    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
147
148    // Delete from ALL per-kind FTS virtual tables (including orphaned ones without schemas).
149    // Filter by sql LIKE 'CREATE VIRTUAL TABLE%' to exclude FTS5 shadow tables.
150    let all_per_kind_tables: Vec<String> = {
151        let mut stmt = tx.prepare(
152            "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'fts_props_%' \
153             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
154        )?;
155        stmt.query_map([], |r| r.get::<_, String>(0))?
156            .collect::<Result<Vec<_>, _>>()?
157    };
158    for table in &all_per_kind_tables {
159        tx.execute_batch(&format!("DELETE FROM {table}"))?;
160    }
161    tx.execute("DELETE FROM fts_node_property_positions", [])?;
162
163    let total = insert_property_fts_rows(
164        &tx,
165        "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
166    )?;
167
168    tx.commit()?;
169    Ok(total)
170}
171
172/// Insert missing property FTS rows within an existing transaction.
173///
174/// Two repair passes run inside the caller's transaction:
175///
176/// 1. Nodes of a registered kind that have no row in the per-kind FTS tables are
177///    re-extracted from canonical state and inserted (blob + positions).
178/// 2. Nodes of a recursive-mode kind that *do* have a row in the per-kind FTS tables
179///    but no `fts_node_property_positions` rows have their positions
180///    regenerated in place. This repairs orphaned position map rows caused
181///    by partial drift without requiring a full `rebuild_projections(Fts)`.
182///    (P4-P2-2)
183fn rebuild_missing_property_fts_in_tx(
184    conn: &rusqlite::Connection,
185) -> Result<usize, rusqlite::Error> {
186    // The per-kind table is parameterized: the SQL is built per-kind in
187    // insert_property_fts_rows_missing (below), which passes the table name inline.
188    let inserted = insert_property_fts_rows_missing(conn)?;
189    let repaired = repair_orphaned_position_map_in_tx(conn)?;
190    Ok(inserted + repaired)
191}
192
193/// Repair recursive-mode nodes whose per-kind FTS row exists but
194/// whose position-map rows have been dropped. For each such node the
195/// property FTS is re-extracted from canonical state and the position rows
196/// are re-inserted. The blob row is left untouched — callers that deleted
197/// positions without touching the blob keep the original blob rowid, which
198/// matters because `projection_row_id` in search hits is the blob rowid.
199fn repair_orphaned_position_map_in_tx(
200    conn: &rusqlite::Connection,
201) -> Result<usize, rusqlite::Error> {
202    let schemas = crate::writer::load_fts_property_schemas(conn)?;
203    if schemas.is_empty() {
204        return Ok(0);
205    }
206    let mut total = 0usize;
207    let mut ins_positions = conn.prepare(
208        "INSERT INTO fts_node_property_positions \
209         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
210         VALUES (?1, ?2, ?3, ?4, ?5)",
211    )?;
212    for (kind, schema) in &schemas {
213        let has_recursive = schema
214            .paths
215            .iter()
216            .any(|p| p.mode == crate::writer::PropertyPathMode::Recursive);
217        if !has_recursive {
218            continue;
219        }
220        let table = fathomdb_schema::fts_kind_table_name(kind);
221        // Nodes that have an FTS row in the per-kind table but no position-map rows.
222        let mut stmt = conn.prepare(&format!(
223            "SELECT n.logical_id, n.properties FROM nodes n \
224             WHERE n.kind = ?1 AND n.superseded_at IS NULL \
225               AND EXISTS (SELECT 1 FROM {table} fp \
226                           WHERE fp.node_logical_id = n.logical_id) \
227               AND NOT EXISTS (SELECT 1 FROM fts_node_property_positions p \
228                               WHERE p.node_logical_id = n.logical_id AND p.kind = ?1)"
229        ))?;
230        let rows: Vec<(String, String)> = stmt
231            .query_map([kind.as_str()], |row| {
232                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
233            })?
234            .collect::<Result<Vec<_>, _>>()?;
235        for (logical_id, properties_str) in &rows {
236            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
237            let (_text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
238            for pos in &positions {
239                ins_positions.execute(rusqlite::params![
240                    logical_id,
241                    kind,
242                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
243                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
244                    pos.leaf_path,
245                ])?;
246            }
247            if !positions.is_empty() {
248                total += 1;
249            }
250        }
251    }
252    Ok(total)
253}
254
255/// Rebuild property FTS rows for exactly one kind from its just-registered
256/// schema. Unlike [`insert_property_fts_rows`], this helper does NOT iterate
257/// over every registered schema — so callers that delete rows for a single
258/// kind won't duplicate rows for sibling kinds on the subsequent insert.
259///
260/// The caller is responsible for transaction management and for deleting
261/// stale rows for `kind` before calling this function.
262pub(crate) fn insert_property_fts_rows_for_kind(
263    conn: &rusqlite::Connection,
264    kind: &str,
265) -> Result<usize, rusqlite::Error> {
266    let schemas = crate::writer::load_fts_property_schemas(conn)?;
267    let Some(schema) = schemas
268        .iter()
269        .find(|(k, _)| k == kind)
270        .map(|(_, s)| s.clone())
271    else {
272        return Ok(0);
273    };
274
275    let table = fathomdb_schema::fts_kind_table_name(kind);
276    ensure_property_fts_table(conn, kind, &schema)?;
277    let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
278    let mut ins_positions = conn.prepare(
279        "INSERT INTO fts_node_property_positions \
280         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
281         VALUES (?1, ?2, ?3, ?4, ?5)",
282    )?;
283
284    let mut stmt = conn.prepare(
285        "SELECT logical_id, properties FROM nodes \
286         WHERE kind = ?1 AND superseded_at IS NULL",
287    )?;
288    let rows: Vec<(String, String)> = stmt
289        .query_map([kind], |row| {
290            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
291        })?
292        .collect::<Result<Vec<_>, _>>()?;
293
294    let mut total = 0usize;
295    for (logical_id, properties_str) in &rows {
296        let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
297        let (text, positions, _stats) = crate::writer::extract_property_fts(&props, &schema);
298        if let Some(text) = text {
299            if has_weights {
300                let cols = crate::writer::extract_property_fts_columns(&props, &schema);
301                let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
302                let placeholders: Vec<String> =
303                    (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
304                let sql = format!(
305                    "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
306                    c = col_names.join(", "),
307                    p = placeholders.join(", "),
308                );
309                conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
310                    std::iter::once(logical_id.as_str())
311                        .chain(cols.iter().map(|(_, v)| v.as_str())),
312                ))?;
313            } else {
314                conn.prepare(&format!(
315                    "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
316                ))?
317                .execute(rusqlite::params![logical_id, text])?;
318            }
319            for pos in &positions {
320                ins_positions.execute(rusqlite::params![
321                    logical_id,
322                    kind,
323                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
324                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
325                    pos.leaf_path,
326                ])?;
327            }
328            total += 1;
329        }
330    }
331    Ok(total)
332}
333
334/// Shared loop: load schemas, query nodes with `node_sql` (parameterized by kind),
335/// extract property FTS text, and insert into the per-kind FTS table.
336/// The caller is responsible for transaction management and for deleting stale rows
337/// before calling this function if a full rebuild is intended.
338pub(crate) fn insert_property_fts_rows(
339    conn: &rusqlite::Connection,
340    node_sql: &str,
341) -> Result<usize, rusqlite::Error> {
342    let schemas = crate::writer::load_fts_property_schemas(conn)?;
343    if schemas.is_empty() {
344        return Ok(0);
345    }
346
347    let mut total = 0usize;
348    let mut ins_positions = conn.prepare(
349        "INSERT INTO fts_node_property_positions \
350         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
351         VALUES (?1, ?2, ?3, ?4, ?5)",
352    )?;
353    for (kind, schema) in &schemas {
354        let table = fathomdb_schema::fts_kind_table_name(kind);
355        ensure_property_fts_table(conn, kind, schema)?;
356        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
357        let mut stmt = conn.prepare(node_sql)?;
358        let rows: Vec<(String, String)> = stmt
359            .query_map([kind.as_str()], |row| {
360                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
361            })?
362            .collect::<Result<Vec<_>, _>>()?;
363        for (logical_id, properties_str) in &rows {
364            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
365            let (text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
366            if let Some(text) = text {
367                if has_weights {
368                    let cols = crate::writer::extract_property_fts_columns(&props, schema);
369                    let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
370                    let placeholders: Vec<String> =
371                        (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
372                    let sql = format!(
373                        "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
374                        c = col_names.join(", "),
375                        p = placeholders.join(", "),
376                    );
377                    conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
378                        std::iter::once(logical_id.as_str())
379                            .chain(cols.iter().map(|(_, v)| v.as_str())),
380                    ))?;
381                } else {
382                    conn.prepare(&format!(
383                        "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
384                    ))?
385                    .execute(rusqlite::params![logical_id, text])?;
386                }
387                for pos in &positions {
388                    ins_positions.execute(rusqlite::params![
389                        logical_id,
390                        kind,
391                        i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
392                        i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
393                        pos.leaf_path,
394                    ])?;
395                }
396                total += 1;
397            }
398        }
399    }
400    Ok(total)
401}
402
403/// Insert missing property FTS rows: for each registered kind, find nodes that
404/// have no row in the per-kind FTS table and insert them.
405/// The caller is responsible for transaction management.
406fn insert_property_fts_rows_missing(conn: &rusqlite::Connection) -> Result<usize, rusqlite::Error> {
407    let schemas = crate::writer::load_fts_property_schemas(conn)?;
408    if schemas.is_empty() {
409        return Ok(0);
410    }
411
412    let mut total = 0usize;
413    let mut ins_positions = conn.prepare(
414        "INSERT INTO fts_node_property_positions \
415         (node_logical_id, kind, start_offset, end_offset, leaf_path) \
416         VALUES (?1, ?2, ?3, ?4, ?5)",
417    )?;
418    for (kind, schema) in &schemas {
419        let table = fathomdb_schema::fts_kind_table_name(kind);
420        ensure_property_fts_table(conn, kind, schema)?;
421        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
422        // Find nodes of this kind with no row in the per-kind table.
423        let mut stmt = conn.prepare(&format!(
424            "SELECT n.logical_id, n.properties FROM nodes n \
425             WHERE n.kind = ?1 AND n.superseded_at IS NULL \
426               AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
427        ))?;
428        let rows: Vec<(String, String)> = stmt
429            .query_map([kind.as_str()], |row| {
430                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
431            })?
432            .collect::<Result<Vec<_>, _>>()?;
433        for (logical_id, properties_str) in &rows {
434            let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
435            let (text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
436            if let Some(text) = text {
437                if has_weights {
438                    let cols = crate::writer::extract_property_fts_columns(&props, schema);
439                    let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
440                    let placeholders: Vec<String> =
441                        (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
442                    let sql = format!(
443                        "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
444                        c = col_names.join(", "),
445                        p = placeholders.join(", "),
446                    );
447                    conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
448                        std::iter::once(logical_id.as_str())
449                            .chain(cols.iter().map(|(_, v)| v.as_str())),
450                    ))?;
451                } else {
452                    conn.prepare(&format!(
453                        "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
454                    ))?
455                    .execute(rusqlite::params![logical_id, text])?;
456                }
457                for pos in &positions {
458                    ins_positions.execute(rusqlite::params![
459                        logical_id,
460                        kind,
461                        i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
462                        i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
463                        pos.leaf_path,
464                    ])?;
465                }
466                total += 1;
467            }
468        }
469    }
470    Ok(total)
471}
472
473fn ensure_property_fts_table(
474    conn: &rusqlite::Connection,
475    kind: &str,
476    schema: &crate::writer::PropertyFtsSchema,
477) -> Result<(), rusqlite::Error> {
478    let table = fathomdb_schema::fts_kind_table_name(kind);
479    let exists: bool = conn
480        .query_row(
481            "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 \
482             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
483            rusqlite::params![table],
484            |_| Ok(true),
485        )
486        .optional()?
487        .unwrap_or(false);
488    if exists {
489        return Ok(());
490    }
491
492    let tokenizer = fathomdb_schema::resolve_fts_tokenizer(conn, kind)
493        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
494    let tokenizer_sql = tokenizer.replace('\'', "''");
495    let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
496    let cols: Vec<String> = if has_weights {
497        std::iter::once("node_logical_id UNINDEXED".to_owned())
498            .chain(schema.paths.iter().map(|p| {
499                let is_recursive = matches!(p.mode, crate::writer::PropertyPathMode::Recursive);
500                fathomdb_schema::fts_column_name(&p.path, is_recursive)
501            }))
502            .collect()
503    } else {
504        vec![
505            "node_logical_id UNINDEXED".to_owned(),
506            "text_content".to_owned(),
507        ]
508    };
509    conn.execute_batch(&format!(
510        "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5({cols}, tokenize='{tokenizer_sql}')",
511        cols = cols.join(", "),
512    ))?;
513    Ok(())
514}
515
516/// Remove stale vec rows: entries whose chunk no longer exists or whose node has been
517/// superseded/retired.  Iterates all per-kind vec tables registered in
518/// `projection_profiles`.  Degrades gracefully when the feature is disabled or tables
519/// are absent.
520#[allow(clippy::unnecessary_wraps, unused_variables)]
521fn rebuild_vec(
522    conn: &mut rusqlite::Connection,
523    notes: &mut Vec<String>,
524) -> Result<usize, rusqlite::Error> {
525    #[cfg(feature = "sqlite-vec")]
526    {
527        let kinds: Vec<String> = {
528            let mut stmt =
529                match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
530                    Ok(s) => s,
531                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
532                        if msg.contains("no such table: projection_profiles") =>
533                    {
534                        notes.push("projection_profiles absent; vec rebuild skipped".to_owned());
535                        return Ok(0);
536                    }
537                    Err(e) => return Err(e),
538                };
539            stmt.query_map([], |row| row.get(0))?
540                .collect::<Result<Vec<_>, _>>()?
541        };
542
543        if kinds.is_empty() {
544            notes.push("no vec profiles registered; vec rebuild skipped".to_owned());
545            return Ok(0);
546        }
547
548        let mut total = 0;
549        for kind in &kinds {
550            let table = fathomdb_schema::vec_kind_table_name(kind);
551            let sql = format!(
552                "DELETE FROM {table} WHERE chunk_id IN (
553                    SELECT v.chunk_id FROM {table} v
554                    LEFT JOIN chunks c ON c.id = v.chunk_id
555                    LEFT JOIN nodes  n ON n.logical_id = c.node_logical_id
556                    WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
557                )"
558            );
559            let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
560            let deleted = match tx.execute(&sql, []) {
561                Ok(n) => n,
562                Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
563                    if msg.contains("no such table:") || msg.contains("no such module: vec0") =>
564                {
565                    notes.push(format!(
566                        "{table} absent; vec rebuild for kind '{kind}' skipped"
567                    ));
568                    tx.rollback()?;
569                    continue;
570                }
571                Err(e) => return Err(e),
572            };
573            tx.commit()?;
574            total += deleted;
575        }
576        Ok(total)
577    }
578    #[cfg(not(feature = "sqlite-vec"))]
579    {
580        notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
581        Ok(0)
582    }
583}
584
585fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
586    match target {
587        ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
588        ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
589        ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
590    }
591}
592
593#[cfg(all(test, feature = "sqlite-vec"))]
594#[allow(clippy::expect_used)]
595mod tests {
596    use std::sync::Arc;
597
598    use fathomdb_schema::SchemaManager;
599    use tempfile::NamedTempFile;
600
601    use crate::sqlite::open_connection_with_vec;
602
603    use super::{ProjectionService, ProjectionTarget};
604
605    #[test]
606    fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
607        let db = NamedTempFile::new().expect("temp db");
608        let schema = Arc::new(SchemaManager::new());
609
610        {
611            let conn = open_connection_with_vec(db.path()).expect("vec conn");
612            schema.bootstrap(&conn).expect("bootstrap");
613            schema
614                .ensure_vec_kind_profile(&conn, "Doc", 3)
615                .expect("vec kind profile");
616
617            // Insert a superseded node + chunk + vec row (stale state).
618            conn.execute_batch(
619                r"
620                INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
621                VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
622                INSERT INTO chunks (id, node_logical_id, text_content, created_at)
623                VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
624                ",
625            )
626            .expect("seed stale data");
627
628            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
629                .iter()
630                .flat_map(|f| f.to_le_bytes())
631                .collect();
632            let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
633            conn.execute(
634                &format!(
635                    "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-stale', ?1)"
636                ),
637                rusqlite::params![bytes],
638            )
639            .expect("insert stale vec row");
640        }
641
642        let service = ProjectionService::new(db.path(), Arc::clone(&schema));
643        let report = service
644            .rebuild_projections(ProjectionTarget::Vec)
645            .expect("rebuild vec");
646
647        assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
648        assert!(report.notes.is_empty(), "no notes expected on success");
649
650        let conn = rusqlite::Connection::open(db.path()).expect("conn");
651        let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
652        let count: i64 = conn
653            .query_row(
654                &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-stale'"),
655                [],
656                |row| row.get(0),
657            )
658            .expect("count");
659        assert_eq!(count, 0, "stale vec row must be gone after rebuild");
660    }
661}
662
663// --- B-3: projection per-column INSERT for weighted schemas ---
664
665#[cfg(test)]
666#[allow(clippy::expect_used)]
667mod weighted_schema_tests {
668    use fathomdb_schema::SchemaManager;
669    use rusqlite::Connection;
670
671    use super::insert_property_fts_rows_for_kind;
672
673    fn bootstrapped_conn() -> Connection {
674        let conn = Connection::open_in_memory().expect("in-memory sqlite");
675        let manager = SchemaManager::new();
676        manager.bootstrap(&conn).expect("bootstrap");
677        conn
678    }
679
680    #[test]
681    fn projection_inserts_per_column_for_weighted_schema() {
682        let conn = bootstrapped_conn();
683        let kind = "Article";
684        let table = fathomdb_schema::fts_kind_table_name(kind);
685        let title_col = fathomdb_schema::fts_column_name("$.title", false);
686        let body_col = fathomdb_schema::fts_column_name("$.body", false);
687
688        // Insert a node with two extractable properties.
689        conn.execute(
690            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
691             VALUES ('row-1', 'article-1', ?1, '{\"title\":\"Hello\",\"body\":\"World\"}', 100, 'seed')",
692            rusqlite::params![kind],
693        )
694        .expect("insert node");
695
696        // Register schema with weights.
697        let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
698        conn.execute(
699            "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
700             VALUES (?1, ?2, ' ')",
701            rusqlite::params![kind, paths_json],
702        )
703        .expect("insert schema");
704
705        // Create the weighted per-kind FTS table.
706        conn.execute_batch(&format!(
707            "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
708                node_logical_id UNINDEXED, {title_col}, {body_col}, \
709                tokenize = 'porter unicode61 remove_diacritics 2'\
710            )"
711        ))
712        .expect("create weighted per-kind table");
713
714        // Run the projection insert.
715        insert_property_fts_rows_for_kind(&conn, kind).expect("insert_property_fts_rows_for_kind");
716
717        // Verify one row was inserted.
718        let count: i64 = conn
719            .query_row(
720                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'article-1'"),
721                [],
722                |r| r.get(0),
723            )
724            .expect("count");
725        assert_eq!(count, 1, "per-kind table must have the inserted row");
726
727        // Verify per-column values.
728        let (title_val, body_val): (String, String) = conn
729            .query_row(
730                &format!(
731                    "SELECT {title_col}, {body_col} FROM {table} \
732                     WHERE node_logical_id = 'article-1'"
733                ),
734                [],
735                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
736            )
737            .expect("select per-column");
738        assert_eq!(title_val, "Hello", "title column must have correct value");
739        assert_eq!(body_val, "World", "body column must have correct value");
740    }
741}