1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use rusqlite::{OptionalExtension, TransactionBehavior};
6use serde::Serialize;
7
8use crate::{EngineError, sqlite};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
11pub enum ProjectionTarget {
12 Fts,
13 Vec,
14 All,
15}
16
17#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
18pub struct ProjectionRepairReport {
19 pub targets: Vec<ProjectionTarget>,
20 pub rebuilt_rows: usize,
21 pub notes: Vec<String>,
22}
23
24#[derive(Debug)]
25pub struct ProjectionService {
26 database_path: PathBuf,
27 schema_manager: Arc<SchemaManager>,
28}
29
30impl ProjectionService {
31 pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
32 Self {
33 database_path: path.as_ref().to_path_buf(),
34 schema_manager,
35 }
36 }
37
38 fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
39 let conn = sqlite::open_connection(&self.database_path)?;
40 self.schema_manager.bootstrap(&conn)?;
41 Ok(conn)
42 }
43
44 pub fn rebuild_projections(
47 &self,
48 target: ProjectionTarget,
49 ) -> Result<ProjectionRepairReport, EngineError> {
50 trace_info!(target = ?target, "projection rebuild started");
51 #[cfg(feature = "tracing")]
52 let start = std::time::Instant::now();
53 let mut conn = self.connect()?;
54
55 let mut notes = Vec::new();
56 let rebuilt_rows = match target {
57 ProjectionTarget::Fts => {
58 let fts = rebuild_fts(&mut conn)?;
59 let prop_fts = rebuild_property_fts(&mut conn)?;
60 fts + prop_fts
61 }
62 ProjectionTarget::Vec => rebuild_vec(&mut conn, &mut notes)?,
63 ProjectionTarget::All => {
64 let rebuilt_fts = rebuild_fts(&mut conn)?;
65 let rebuilt_prop_fts = rebuild_property_fts(&mut conn)?;
66 let rebuilt_vec = rebuild_vec(&mut conn, &mut notes)?;
67 rebuilt_fts + rebuilt_prop_fts + rebuilt_vec
68 }
69 };
70
71 trace_info!(
72 target = ?target,
73 rebuilt_rows,
74 duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
75 "projection rebuild completed"
76 );
77 Ok(ProjectionRepairReport {
78 targets: expand_targets(target),
79 rebuilt_rows,
80 notes,
81 })
82 }
83
84 pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
87 let mut conn = self.connect()?;
93
94 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
95 let inserted_chunk_fts = tx.execute(
96 r"
97 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
98 SELECT c.id, n.logical_id, n.kind, c.text_content
99 FROM chunks c
100 JOIN nodes n
101 ON n.logical_id = c.node_logical_id
102 AND n.superseded_at IS NULL
103 WHERE NOT EXISTS (
104 SELECT 1
105 FROM fts_nodes f
106 WHERE f.chunk_id = c.id
107 )
108 ",
109 [],
110 )?;
111 let inserted_prop_fts = rebuild_missing_property_fts_in_tx(&tx)?;
112 tx.commit()?;
113
114 Ok(ProjectionRepairReport {
115 targets: vec![ProjectionTarget::Fts],
116 rebuilt_rows: inserted_chunk_fts + inserted_prop_fts,
117 notes: vec![],
118 })
119 }
120}
121
122fn rebuild_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
127 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
128 tx.execute("DELETE FROM fts_nodes", [])?;
129 let inserted = tx.execute(
130 r"
131 INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content)
132 SELECT c.id, n.logical_id, n.kind, c.text_content
133 FROM chunks c
134 JOIN nodes n
135 ON n.logical_id = c.node_logical_id
136 AND n.superseded_at IS NULL
137 ",
138 [],
139 )?;
140 tx.commit()?;
141 Ok(inserted)
142}
143
144fn rebuild_property_fts(conn: &mut rusqlite::Connection) -> Result<usize, rusqlite::Error> {
146 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
147
148 let all_per_kind_tables: Vec<String> = {
151 let mut stmt = tx.prepare(
152 "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'fts_props_%' \
153 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
154 )?;
155 stmt.query_map([], |r| r.get::<_, String>(0))?
156 .collect::<Result<Vec<_>, _>>()?
157 };
158 for table in &all_per_kind_tables {
159 tx.execute_batch(&format!("DELETE FROM {table}"))?;
160 }
161 tx.execute("DELETE FROM fts_node_property_positions", [])?;
162
163 let total = insert_property_fts_rows(
164 &tx,
165 "SELECT logical_id, properties FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
166 )?;
167
168 tx.commit()?;
169 Ok(total)
170}
171
172fn rebuild_missing_property_fts_in_tx(
184 conn: &rusqlite::Connection,
185) -> Result<usize, rusqlite::Error> {
186 let inserted = insert_property_fts_rows_missing(conn)?;
189 let repaired = repair_orphaned_position_map_in_tx(conn)?;
190 Ok(inserted + repaired)
191}
192
193fn repair_orphaned_position_map_in_tx(
200 conn: &rusqlite::Connection,
201) -> Result<usize, rusqlite::Error> {
202 let schemas = crate::writer::load_fts_property_schemas(conn)?;
203 if schemas.is_empty() {
204 return Ok(0);
205 }
206 let mut total = 0usize;
207 let mut ins_positions = conn.prepare(
208 "INSERT INTO fts_node_property_positions \
209 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
210 VALUES (?1, ?2, ?3, ?4, ?5)",
211 )?;
212 for (kind, schema) in &schemas {
213 let has_recursive = schema
214 .paths
215 .iter()
216 .any(|p| p.mode == crate::writer::PropertyPathMode::Recursive);
217 if !has_recursive {
218 continue;
219 }
220 let table = fathomdb_schema::fts_kind_table_name(kind);
221 let mut stmt = conn.prepare(&format!(
223 "SELECT n.logical_id, n.properties FROM nodes n \
224 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
225 AND EXISTS (SELECT 1 FROM {table} fp \
226 WHERE fp.node_logical_id = n.logical_id) \
227 AND NOT EXISTS (SELECT 1 FROM fts_node_property_positions p \
228 WHERE p.node_logical_id = n.logical_id AND p.kind = ?1)"
229 ))?;
230 let rows: Vec<(String, String)> = stmt
231 .query_map([kind.as_str()], |row| {
232 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
233 })?
234 .collect::<Result<Vec<_>, _>>()?;
235 for (logical_id, properties_str) in &rows {
236 let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
237 let (_text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
238 for pos in &positions {
239 ins_positions.execute(rusqlite::params![
240 logical_id,
241 kind,
242 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
243 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
244 pos.leaf_path,
245 ])?;
246 }
247 if !positions.is_empty() {
248 total += 1;
249 }
250 }
251 }
252 Ok(total)
253}
254
255pub(crate) fn insert_property_fts_rows_for_kind(
263 conn: &rusqlite::Connection,
264 kind: &str,
265) -> Result<usize, rusqlite::Error> {
266 let schemas = crate::writer::load_fts_property_schemas(conn)?;
267 let Some(schema) = schemas
268 .iter()
269 .find(|(k, _)| k == kind)
270 .map(|(_, s)| s.clone())
271 else {
272 return Ok(0);
273 };
274
275 let table = fathomdb_schema::fts_kind_table_name(kind);
276 ensure_property_fts_table(conn, kind, &schema)?;
277 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
278 let mut ins_positions = conn.prepare(
279 "INSERT INTO fts_node_property_positions \
280 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
281 VALUES (?1, ?2, ?3, ?4, ?5)",
282 )?;
283
284 let mut stmt = conn.prepare(
285 "SELECT logical_id, properties FROM nodes \
286 WHERE kind = ?1 AND superseded_at IS NULL",
287 )?;
288 let rows: Vec<(String, String)> = stmt
289 .query_map([kind], |row| {
290 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
291 })?
292 .collect::<Result<Vec<_>, _>>()?;
293
294 let mut total = 0usize;
295 for (logical_id, properties_str) in &rows {
296 let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
297 let (text, positions, _stats) = crate::writer::extract_property_fts(&props, &schema);
298 if let Some(text) = text {
299 if has_weights {
300 let cols = crate::writer::extract_property_fts_columns(&props, &schema);
301 let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
302 let placeholders: Vec<String> =
303 (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
304 let sql = format!(
305 "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
306 c = col_names.join(", "),
307 p = placeholders.join(", "),
308 );
309 conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
310 std::iter::once(logical_id.as_str())
311 .chain(cols.iter().map(|(_, v)| v.as_str())),
312 ))?;
313 } else {
314 conn.prepare(&format!(
315 "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
316 ))?
317 .execute(rusqlite::params![logical_id, text])?;
318 }
319 for pos in &positions {
320 ins_positions.execute(rusqlite::params![
321 logical_id,
322 kind,
323 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
324 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
325 pos.leaf_path,
326 ])?;
327 }
328 total += 1;
329 }
330 }
331 Ok(total)
332}
333
334pub(crate) fn insert_property_fts_rows(
339 conn: &rusqlite::Connection,
340 node_sql: &str,
341) -> Result<usize, rusqlite::Error> {
342 let schemas = crate::writer::load_fts_property_schemas(conn)?;
343 if schemas.is_empty() {
344 return Ok(0);
345 }
346
347 let mut total = 0usize;
348 let mut ins_positions = conn.prepare(
349 "INSERT INTO fts_node_property_positions \
350 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
351 VALUES (?1, ?2, ?3, ?4, ?5)",
352 )?;
353 for (kind, schema) in &schemas {
354 let table = fathomdb_schema::fts_kind_table_name(kind);
355 ensure_property_fts_table(conn, kind, schema)?;
356 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
357 let mut stmt = conn.prepare(node_sql)?;
358 let rows: Vec<(String, String)> = stmt
359 .query_map([kind.as_str()], |row| {
360 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
361 })?
362 .collect::<Result<Vec<_>, _>>()?;
363 for (logical_id, properties_str) in &rows {
364 let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
365 let (text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
366 if let Some(text) = text {
367 if has_weights {
368 let cols = crate::writer::extract_property_fts_columns(&props, schema);
369 let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
370 let placeholders: Vec<String> =
371 (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
372 let sql = format!(
373 "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
374 c = col_names.join(", "),
375 p = placeholders.join(", "),
376 );
377 conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
378 std::iter::once(logical_id.as_str())
379 .chain(cols.iter().map(|(_, v)| v.as_str())),
380 ))?;
381 } else {
382 conn.prepare(&format!(
383 "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
384 ))?
385 .execute(rusqlite::params![logical_id, text])?;
386 }
387 for pos in &positions {
388 ins_positions.execute(rusqlite::params![
389 logical_id,
390 kind,
391 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
392 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
393 pos.leaf_path,
394 ])?;
395 }
396 total += 1;
397 }
398 }
399 }
400 Ok(total)
401}
402
403fn insert_property_fts_rows_missing(conn: &rusqlite::Connection) -> Result<usize, rusqlite::Error> {
407 let schemas = crate::writer::load_fts_property_schemas(conn)?;
408 if schemas.is_empty() {
409 return Ok(0);
410 }
411
412 let mut total = 0usize;
413 let mut ins_positions = conn.prepare(
414 "INSERT INTO fts_node_property_positions \
415 (node_logical_id, kind, start_offset, end_offset, leaf_path) \
416 VALUES (?1, ?2, ?3, ?4, ?5)",
417 )?;
418 for (kind, schema) in &schemas {
419 let table = fathomdb_schema::fts_kind_table_name(kind);
420 ensure_property_fts_table(conn, kind, schema)?;
421 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
422 let mut stmt = conn.prepare(&format!(
424 "SELECT n.logical_id, n.properties FROM nodes n \
425 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
426 AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
427 ))?;
428 let rows: Vec<(String, String)> = stmt
429 .query_map([kind.as_str()], |row| {
430 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
431 })?
432 .collect::<Result<Vec<_>, _>>()?;
433 for (logical_id, properties_str) in &rows {
434 let props: serde_json::Value = serde_json::from_str(properties_str).unwrap_or_default();
435 let (text, positions, _stats) = crate::writer::extract_property_fts(&props, schema);
436 if let Some(text) = text {
437 if has_weights {
438 let cols = crate::writer::extract_property_fts_columns(&props, schema);
439 let col_names: Vec<&str> = cols.iter().map(|(n, _)| n.as_str()).collect();
440 let placeholders: Vec<String> =
441 (2..=cols.len() + 1).map(|i| format!("?{i}")).collect();
442 let sql = format!(
443 "INSERT INTO {table}(node_logical_id, {c}) VALUES (?1, {p})",
444 c = col_names.join(", "),
445 p = placeholders.join(", "),
446 );
447 conn.prepare(&sql)?.execute(rusqlite::params_from_iter(
448 std::iter::once(logical_id.as_str())
449 .chain(cols.iter().map(|(_, v)| v.as_str())),
450 ))?;
451 } else {
452 conn.prepare(&format!(
453 "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
454 ))?
455 .execute(rusqlite::params![logical_id, text])?;
456 }
457 for pos in &positions {
458 ins_positions.execute(rusqlite::params![
459 logical_id,
460 kind,
461 i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
462 i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
463 pos.leaf_path,
464 ])?;
465 }
466 total += 1;
467 }
468 }
469 }
470 Ok(total)
471}
472
473fn ensure_property_fts_table(
474 conn: &rusqlite::Connection,
475 kind: &str,
476 schema: &crate::writer::PropertyFtsSchema,
477) -> Result<(), rusqlite::Error> {
478 let table = fathomdb_schema::fts_kind_table_name(kind);
479 let exists: bool = conn
480 .query_row(
481 "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 \
482 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
483 rusqlite::params![table],
484 |_| Ok(true),
485 )
486 .optional()?
487 .unwrap_or(false);
488 if exists {
489 return Ok(());
490 }
491
492 let tokenizer = fathomdb_schema::resolve_fts_tokenizer(conn, kind)
493 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
494 let tokenizer_sql = tokenizer.replace('\'', "''");
495 let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
496 let cols: Vec<String> = if has_weights {
497 std::iter::once("node_logical_id UNINDEXED".to_owned())
498 .chain(schema.paths.iter().map(|p| {
499 let is_recursive = matches!(p.mode, crate::writer::PropertyPathMode::Recursive);
500 fathomdb_schema::fts_column_name(&p.path, is_recursive)
501 }))
502 .collect()
503 } else {
504 vec![
505 "node_logical_id UNINDEXED".to_owned(),
506 "text_content".to_owned(),
507 ]
508 };
509 conn.execute_batch(&format!(
510 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5({cols}, tokenize='{tokenizer_sql}')",
511 cols = cols.join(", "),
512 ))?;
513 Ok(())
514}
515
516#[allow(clippy::unnecessary_wraps, unused_variables)]
521fn rebuild_vec(
522 conn: &mut rusqlite::Connection,
523 notes: &mut Vec<String>,
524) -> Result<usize, rusqlite::Error> {
525 #[cfg(feature = "sqlite-vec")]
526 {
527 let kinds: Vec<String> = {
528 let mut stmt =
529 match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
530 Ok(s) => s,
531 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
532 if msg.contains("no such table: projection_profiles") =>
533 {
534 notes.push("projection_profiles absent; vec rebuild skipped".to_owned());
535 return Ok(0);
536 }
537 Err(e) => return Err(e),
538 };
539 stmt.query_map([], |row| row.get(0))?
540 .collect::<Result<Vec<_>, _>>()?
541 };
542
543 if kinds.is_empty() {
544 notes.push("no vec profiles registered; vec rebuild skipped".to_owned());
545 return Ok(0);
546 }
547
548 let mut total = 0;
549 for kind in &kinds {
550 let table = fathomdb_schema::vec_kind_table_name(kind);
551 let sql = format!(
552 "DELETE FROM {table} WHERE chunk_id IN (
553 SELECT v.chunk_id FROM {table} v
554 LEFT JOIN chunks c ON c.id = v.chunk_id
555 LEFT JOIN nodes n ON n.logical_id = c.node_logical_id
556 WHERE c.id IS NULL OR n.superseded_at IS NOT NULL
557 )"
558 );
559 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
560 let deleted = match tx.execute(&sql, []) {
561 Ok(n) => n,
562 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
563 if msg.contains("no such table:") || msg.contains("no such module: vec0") =>
564 {
565 notes.push(format!(
566 "{table} absent; vec rebuild for kind '{kind}' skipped"
567 ));
568 tx.rollback()?;
569 continue;
570 }
571 Err(e) => return Err(e),
572 };
573 tx.commit()?;
574 total += deleted;
575 }
576 Ok(total)
577 }
578 #[cfg(not(feature = "sqlite-vec"))]
579 {
580 notes.push("vector projection rebuild skipped: sqlite-vec feature not enabled".to_owned());
581 Ok(0)
582 }
583}
584
585fn expand_targets(target: ProjectionTarget) -> Vec<ProjectionTarget> {
586 match target {
587 ProjectionTarget::Fts => vec![ProjectionTarget::Fts],
588 ProjectionTarget::Vec => vec![ProjectionTarget::Vec],
589 ProjectionTarget::All => vec![ProjectionTarget::Fts, ProjectionTarget::Vec],
590 }
591}
592
593#[cfg(all(test, feature = "sqlite-vec"))]
594#[allow(clippy::expect_used)]
595mod tests {
596 use std::sync::Arc;
597
598 use fathomdb_schema::SchemaManager;
599 use tempfile::NamedTempFile;
600
601 use crate::sqlite::open_connection_with_vec;
602
603 use super::{ProjectionService, ProjectionTarget};
604
605 #[test]
606 fn rebuild_vec_removes_stale_vec_rows_for_superseded_nodes() {
607 let db = NamedTempFile::new().expect("temp db");
608 let schema = Arc::new(SchemaManager::new());
609
610 {
611 let conn = open_connection_with_vec(db.path()).expect("vec conn");
612 schema.bootstrap(&conn).expect("bootstrap");
613 schema
614 .ensure_vec_kind_profile(&conn, "Doc", 3)
615 .expect("vec kind profile");
616
617 conn.execute_batch(
619 r"
620 INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at)
621 VALUES ('row-old', 'lg-stale', 'Doc', '{}', 100, 200);
622 INSERT INTO chunks (id, node_logical_id, text_content, created_at)
623 VALUES ('chunk-stale', 'lg-stale', 'old text', 100);
624 ",
625 )
626 .expect("seed stale data");
627
628 let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
629 .iter()
630 .flat_map(|f| f.to_le_bytes())
631 .collect();
632 let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
633 conn.execute(
634 &format!(
635 "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-stale', ?1)"
636 ),
637 rusqlite::params![bytes],
638 )
639 .expect("insert stale vec row");
640 }
641
642 let service = ProjectionService::new(db.path(), Arc::clone(&schema));
643 let report = service
644 .rebuild_projections(ProjectionTarget::Vec)
645 .expect("rebuild vec");
646
647 assert_eq!(report.rebuilt_rows, 1, "one stale vec row must be removed");
648 assert!(report.notes.is_empty(), "no notes expected on success");
649
650 let conn = rusqlite::Connection::open(db.path()).expect("conn");
651 let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
652 let count: i64 = conn
653 .query_row(
654 &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-stale'"),
655 [],
656 |row| row.get(0),
657 )
658 .expect("count");
659 assert_eq!(count, 0, "stale vec row must be gone after rebuild");
660 }
661}
662
663#[cfg(test)]
666#[allow(clippy::expect_used)]
667mod weighted_schema_tests {
668 use fathomdb_schema::SchemaManager;
669 use rusqlite::Connection;
670
671 use super::insert_property_fts_rows_for_kind;
672
673 fn bootstrapped_conn() -> Connection {
674 let conn = Connection::open_in_memory().expect("in-memory sqlite");
675 let manager = SchemaManager::new();
676 manager.bootstrap(&conn).expect("bootstrap");
677 conn
678 }
679
680 #[test]
681 fn projection_inserts_per_column_for_weighted_schema() {
682 let conn = bootstrapped_conn();
683 let kind = "Article";
684 let table = fathomdb_schema::fts_kind_table_name(kind);
685 let title_col = fathomdb_schema::fts_column_name("$.title", false);
686 let body_col = fathomdb_schema::fts_column_name("$.body", false);
687
688 conn.execute(
690 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
691 VALUES ('row-1', 'article-1', ?1, '{\"title\":\"Hello\",\"body\":\"World\"}', 100, 'seed')",
692 rusqlite::params![kind],
693 )
694 .expect("insert node");
695
696 let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#;
698 conn.execute(
699 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
700 VALUES (?1, ?2, ' ')",
701 rusqlite::params![kind, paths_json],
702 )
703 .expect("insert schema");
704
705 conn.execute_batch(&format!(
707 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
708 node_logical_id UNINDEXED, {title_col}, {body_col}, \
709 tokenize = 'porter unicode61 remove_diacritics 2'\
710 )"
711 ))
712 .expect("create weighted per-kind table");
713
714 insert_property_fts_rows_for_kind(&conn, kind).expect("insert_property_fts_rows_for_kind");
716
717 let count: i64 = conn
719 .query_row(
720 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'article-1'"),
721 [],
722 |r| r.get(0),
723 )
724 .expect("count");
725 assert_eq!(count, 1, "per-kind table must have the inserted row");
726
727 let (title_val, body_val): (String, String) = conn
729 .query_row(
730 &format!(
731 "SELECT {title_col}, {body_col} FROM {table} \
732 WHERE node_logical_id = 'article-1'"
733 ),
734 [],
735 |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
736 )
737 .expect("select per-column");
738 assert_eq!(title_val, "Hello", "title column must have correct value");
739 assert_eq!(body_val, "World", "body column must have correct value");
740 }
741}