1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use serde::{Deserialize, Serialize};
6
7use crate::rebuild_actor::{RebuildClient, RebuildMode, RebuildRequest, RebuildSubmit};
8
9use crate::{
10 EngineError, ProjectionRepairReport, ProjectionService, ids::new_id,
11 projection::ProjectionTarget, sqlite,
12};
13
14mod fts;
15mod introspection;
16mod operational;
17mod provenance;
18mod vector;
19
20pub use introspection::{
21 Capabilities, CurrentConfig, EmbedderCapability, EmbeddingProfileSummary, FtsKindConfig,
22 KindDescription, VecKindConfig, WorkQueueSummary,
23};
24pub(crate) use vector::canonical_chunk_hash;
25pub use vector::{
26 ConfigureEmbeddingOutcome, ConfigureVecOutcome, VecIndexStatus, VectorSource,
27 load_vector_regeneration_config,
28};
29
30#[cfg(test)]
31use fts::{
32 create_or_replace_fts_kind_table, serialize_property_paths_json, validate_fts_property_paths,
33};
34
35#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
37pub struct IntegrityReport {
38 pub physical_ok: bool,
39 pub foreign_keys_ok: bool,
40 pub missing_fts_rows: usize,
41 pub missing_property_fts_rows: usize,
42 pub duplicate_active_logical_ids: usize,
43 pub operational_missing_collections: usize,
44 pub operational_missing_last_mutations: usize,
45 pub warnings: Vec<String>,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
50pub struct FtsPropertySchemaRecord {
51 pub kind: String,
53 pub property_paths: Vec<String>,
58 pub entries: Vec<FtsPropertyPathSpec>,
63 pub exclude_paths: Vec<String>,
66 pub separator: String,
68 pub format_version: i64,
70}
71
72#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
74#[serde(rename_all = "snake_case")]
75pub enum FtsPropertyPathMode {
76 #[default]
79 Scalar,
80 Recursive,
83}
84
85#[non_exhaustive]
87#[derive(Clone, Debug, PartialEq, Serialize)]
88pub struct FtsPropertyPathSpec {
89 pub path: String,
91 pub mode: FtsPropertyPathMode,
93 pub weight: Option<f32>,
96}
97
98impl Eq for FtsPropertyPathSpec {}
101
102impl FtsPropertyPathSpec {
103 #[must_use]
104 pub fn scalar(path: impl Into<String>) -> Self {
105 Self {
106 path: path.into(),
107 mode: FtsPropertyPathMode::Scalar,
108 weight: None,
109 }
110 }
111
112 #[must_use]
113 pub fn recursive(path: impl Into<String>) -> Self {
114 Self {
115 path: path.into(),
116 mode: FtsPropertyPathMode::Recursive,
117 weight: None,
118 }
119 }
120
121 #[must_use]
127 pub fn with_weight(mut self, weight: f32) -> Self {
128 self.weight = Some(weight);
129 self
130 }
131}
132
133#[derive(Clone, Copy, Debug)]
135pub struct SafeExportOptions {
136 pub force_checkpoint: bool,
140}
141
142impl Default for SafeExportOptions {
143 fn default() -> Self {
144 Self {
145 force_checkpoint: true,
146 }
147 }
148}
149
150const EXPORT_PROTOCOL_VERSION: u32 = 1;
152
153#[derive(Clone, Debug, Serialize)]
155pub struct SafeExportManifest {
156 pub exported_at: u64,
158 pub sha256: String,
160 pub schema_version: u32,
162 pub protocol_version: u32,
164 pub page_count: u64,
166}
167
168#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
170pub struct TraceReport {
171 pub source_ref: String,
172 pub node_rows: usize,
173 pub edge_rows: usize,
174 pub action_rows: usize,
175 pub operational_mutation_rows: usize,
176 pub node_logical_ids: Vec<String>,
177 pub action_ids: Vec<String>,
178 pub operational_mutation_ids: Vec<String>,
179}
180
181#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
183pub struct SkippedEdge {
184 pub edge_logical_id: String,
185 pub missing_endpoint: String,
186}
187
188#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
190pub struct LogicalRestoreReport {
191 pub logical_id: String,
192 pub was_noop: bool,
193 pub restored_node_rows: usize,
194 pub restored_edge_rows: usize,
195 pub restored_chunk_rows: usize,
196 pub restored_fts_rows: usize,
197 pub restored_property_fts_rows: usize,
198 pub restored_vec_rows: usize,
199 pub skipped_edges: Vec<SkippedEdge>,
200 pub notes: Vec<String>,
201}
202
203#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
205pub struct LogicalPurgeReport {
206 pub logical_id: String,
207 pub was_noop: bool,
208 pub deleted_node_rows: usize,
209 pub deleted_edge_rows: usize,
210 pub deleted_chunk_rows: usize,
211 pub deleted_fts_rows: usize,
212 pub deleted_vec_rows: usize,
213 pub notes: Vec<String>,
214}
215
216#[derive(Clone, Debug, Serialize, Deserialize)]
218pub struct ProvenancePurgeOptions {
219 pub dry_run: bool,
220 #[serde(default)]
221 pub preserve_event_types: Vec<String>,
222}
223
224#[derive(Clone, Debug, Serialize)]
226pub struct ProvenancePurgeReport {
227 pub events_deleted: u64,
228 pub events_preserved: u64,
229 pub oldest_remaining: Option<i64>,
230}
231
232#[derive(Debug)]
234pub struct AdminService {
235 pub(super) database_path: PathBuf,
236 pub(super) schema_manager: Arc<SchemaManager>,
237 pub(super) projections: ProjectionService,
238 pub(super) rebuild_client: Option<RebuildClient>,
242 pub(super) writer: Option<Arc<crate::WriterActor>>,
246}
247
248#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
250pub struct SemanticReport {
251 pub orphaned_chunks: usize,
253 pub null_source_ref_nodes: usize,
255 pub broken_step_fk: usize,
257 pub broken_action_fk: usize,
259 pub stale_fts_rows: usize,
261 pub fts_rows_for_superseded_nodes: usize,
263 pub stale_property_fts_rows: usize,
265 pub orphaned_property_fts_rows: usize,
267 pub mismatched_kind_property_fts_rows: usize,
269 pub duplicate_property_fts_rows: usize,
271 pub drifted_property_fts_rows: usize,
273 pub dangling_edges: usize,
275 pub orphaned_supersession_chains: usize,
277 pub stale_vec_rows: usize,
279 pub vec_rows_for_superseded_nodes: usize,
281 pub missing_operational_current_rows: usize,
283 pub stale_operational_current_rows: usize,
285 pub disabled_collection_mutations: usize,
287 pub orphaned_last_access_metadata_rows: usize,
289 pub warnings: Vec<String>,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
305#[serde(rename_all = "snake_case", deny_unknown_fields)]
306pub struct VectorRegenerationConfig {
307 pub kind: String,
308 pub profile: String,
309 pub chunking_policy: String,
310 pub preprocessing_policy: String,
311}
312
313#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
315pub struct VectorRegenerationReport {
316 pub profile: String,
317 pub table_name: String,
318 pub dimension: usize,
319 pub total_chunks: usize,
320 pub regenerated_rows: usize,
321 pub contract_persisted: bool,
322 pub notes: Vec<String>,
323}
324
325#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
329pub struct FtsProfile {
330 pub kind: String,
332 pub tokenizer: String,
334 pub active_at: Option<i64>,
336 pub created_at: i64,
338}
339
340#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
344pub struct VecProfile {
345 pub model_identity: String,
347 pub model_version: Option<String>,
349 pub dimensions: u32,
351 pub active_at: Option<i64>,
353 pub created_at: i64,
355}
356
357#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
361pub struct ProjectionImpact {
362 pub rows_to_rebuild: u64,
364 pub estimated_seconds: u64,
366 pub temp_db_size_bytes: u64,
368 pub current_tokenizer: Option<String>,
370 pub target_tokenizer: Option<String>,
372}
373
374pub const TOKENIZER_PRESETS: &[(&str, &str)] = &[
376 (
377 "recall-optimized-english",
378 "porter unicode61 remove_diacritics 2",
379 ),
380 ("precision-optimized", "unicode61 remove_diacritics 2"),
381 ("global-cjk", "icu"),
382 ("substring-trigram", "trigram"),
383 ("source-code", "unicode61 tokenchars '._-$@'"),
384];
385
386pub fn resolve_tokenizer_preset(input: &str) -> &str {
391 for (name, value) in TOKENIZER_PRESETS {
392 if *name == input {
393 return value;
394 }
395 }
396 input
397}
398
399pub(super) const CURRENT_VECTOR_CONTRACT_FORMAT_VERSION: i64 = 1;
400pub(super) const MAX_PROFILE_LEN: usize = 128;
401pub(super) const MAX_POLICY_LEN: usize = 128;
402pub(super) const MAX_CONTRACT_JSON_BYTES: usize = 32 * 1024;
403pub(super) const MAX_AUDIT_METADATA_BYTES: usize = 2048;
404const DEFAULT_OPERATIONAL_READ_LIMIT: usize = 100;
405const MAX_OPERATIONAL_READ_LIMIT: usize = 1000;
406
407#[derive(Clone, Debug)]
409pub struct AdminHandle {
410 inner: Arc<AdminService>,
411}
412
413impl AdminHandle {
414 #[must_use]
416 pub fn new(service: AdminService) -> Self {
417 Self {
418 inner: Arc::new(service),
419 }
420 }
421
422 #[must_use]
424 pub fn service(&self) -> Arc<AdminService> {
425 Arc::clone(&self.inner)
426 }
427}
428
429impl AdminService {
430 #[must_use]
432 pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
433 let database_path = path.as_ref().to_path_buf();
434 let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
435 Self {
436 database_path,
437 schema_manager,
438 projections,
439 rebuild_client: None,
440 writer: None,
441 }
442 }
443
444 #[must_use]
446 pub fn new_with_rebuild(
447 path: impl AsRef<Path>,
448 schema_manager: Arc<SchemaManager>,
449 rebuild_client: RebuildClient,
450 ) -> Self {
451 let database_path = path.as_ref().to_path_buf();
452 let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
453 Self {
454 database_path,
455 schema_manager,
456 projections,
457 rebuild_client: Some(rebuild_client),
458 writer: None,
459 }
460 }
461
462 #[must_use]
467 pub fn new_with_engine(
468 path: impl AsRef<Path>,
469 schema_manager: Arc<SchemaManager>,
470 rebuild_client: RebuildClient,
471 writer: Arc<crate::WriterActor>,
472 ) -> Self {
473 let database_path = path.as_ref().to_path_buf();
474 let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
475 Self {
476 database_path,
477 schema_manager,
478 projections,
479 rebuild_client: Some(rebuild_client),
480 writer: Some(writer),
481 }
482 }
483
484 pub(super) fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
485 #[cfg(feature = "sqlite-vec")]
486 let conn = sqlite::open_connection_with_vec(&self.database_path)?;
487 #[cfg(not(feature = "sqlite-vec"))]
488 let conn = sqlite::open_connection(&self.database_path)?;
489 self.schema_manager.bootstrap(&conn)?;
490 Ok(conn)
491 }
492
493 pub fn check_integrity(&self) -> Result<IntegrityReport, EngineError> {
496 let conn = self.connect()?;
497
498 let physical_result: String =
499 conn.query_row("PRAGMA integrity_check", [], |row| row.get(0))?;
500 let foreign_key_count: i64 =
501 conn.query_row("SELECT count(*) FROM pragma_foreign_key_check", [], |row| {
502 row.get(0)
503 })?;
504 let missing_fts_rows: i64 = conn.query_row(
505 r"
506 SELECT count(*)
507 FROM chunks c
508 JOIN nodes n
509 ON n.logical_id = c.node_logical_id
510 AND n.superseded_at IS NULL
511 WHERE NOT EXISTS (
512 SELECT 1
513 FROM fts_nodes f
514 WHERE f.chunk_id = c.id
515 )
516 ",
517 [],
518 |row| row.get(0),
519 )?;
520 let duplicate_active: i64 = conn.query_row(
521 r"
522 SELECT count(*)
523 FROM (
524 SELECT logical_id
525 FROM nodes
526 WHERE superseded_at IS NULL
527 GROUP BY logical_id
528 HAVING count(*) > 1
529 )
530 ",
531 [],
532 |row| row.get(0),
533 )?;
534 let operational_missing_collections: i64 = conn.query_row(
535 r"
536 SELECT (
537 SELECT count(*)
538 FROM operational_mutations m
539 LEFT JOIN operational_collections c ON c.name = m.collection_name
540 WHERE c.name IS NULL
541 ) + (
542 SELECT count(*)
543 FROM operational_current oc
544 LEFT JOIN operational_collections c ON c.name = oc.collection_name
545 WHERE c.name IS NULL
546 )
547 ",
548 [],
549 |row| row.get(0),
550 )?;
551 let operational_missing_last_mutations: i64 = conn.query_row(
552 r"
553 SELECT count(*)
554 FROM operational_current oc
555 LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
556 WHERE m.id IS NULL
557 ",
558 [],
559 |row| row.get(0),
560 )?;
561
562 let missing_property_fts_rows = count_missing_property_fts_rows(&conn)?;
566
567 let mut warnings = Vec::new();
568 if missing_fts_rows > 0 {
569 warnings.push("missing FTS projections detected".to_owned());
570 }
571 if missing_property_fts_rows > 0 {
572 warnings.push("missing property FTS projections detected".to_owned());
573 }
574 if duplicate_active > 0 {
575 warnings.push("duplicate active logical_ids detected".to_owned());
576 }
577 if operational_missing_collections > 0 {
578 warnings.push("operational rows reference missing collections".to_owned());
579 }
580 if operational_missing_last_mutations > 0 {
581 warnings.push("operational current rows reference missing last mutations".to_owned());
582 }
583 warnings.extend(projection_table_collision_warnings(&conn)?);
584
585 Ok(IntegrityReport {
590 physical_ok: physical_result == "ok",
591 foreign_keys_ok: foreign_key_count == 0,
592 missing_fts_rows: i64_to_usize(missing_fts_rows),
593 missing_property_fts_rows: i64_to_usize(missing_property_fts_rows),
594 duplicate_active_logical_ids: i64_to_usize(duplicate_active),
595 operational_missing_collections: i64_to_usize(operational_missing_collections),
596 operational_missing_last_mutations: i64_to_usize(operational_missing_last_mutations),
597 warnings,
598 })
599 }
600
601 #[allow(clippy::too_many_lines)]
604 pub fn check_semantics(&self) -> Result<SemanticReport, EngineError> {
605 let conn = self.connect()?;
606
607 let orphaned_chunks: i64 = conn.query_row(
608 r"
609 SELECT count(*)
610 FROM chunks c
611 WHERE NOT EXISTS (
612 SELECT 1 FROM nodes n
613 WHERE n.logical_id = c.node_logical_id
614 )
615 ",
616 [],
617 |row| row.get(0),
618 )?;
619
620 let null_source_ref_nodes: i64 = conn.query_row(
621 "SELECT count(*) FROM nodes WHERE source_ref IS NULL AND superseded_at IS NULL",
622 [],
623 |row| row.get(0),
624 )?;
625
626 let broken_step_fk: i64 = conn.query_row(
627 r"
628 SELECT count(*) FROM steps s
629 WHERE NOT EXISTS (SELECT 1 FROM runs r WHERE r.id = s.run_id)
630 ",
631 [],
632 |row| row.get(0),
633 )?;
634
635 let broken_action_fk: i64 = conn.query_row(
636 r"
637 SELECT count(*) FROM actions a
638 WHERE NOT EXISTS (SELECT 1 FROM steps s WHERE s.id = a.step_id)
639 ",
640 [],
641 |row| row.get(0),
642 )?;
643
644 let stale_fts_rows: i64 = conn.query_row(
645 r"
646 SELECT count(*) FROM fts_nodes f
647 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = f.chunk_id)
648 ",
649 [],
650 |row| row.get(0),
651 )?;
652
653 let fts_rows_for_superseded_nodes: i64 = conn.query_row(
654 r"
655 SELECT count(*) FROM fts_nodes f
656 WHERE NOT EXISTS (
657 SELECT 1 FROM nodes n
658 WHERE n.logical_id = f.node_logical_id AND n.superseded_at IS NULL
659 )
660 ",
661 [],
662 |row| row.get(0),
663 )?;
664
665 let (
666 stale_property_fts_rows,
667 orphaned_property_fts_rows,
668 mismatched_kind_property_fts_rows,
669 duplicate_property_fts_rows,
670 ) = count_per_kind_property_fts_issues(&conn)?;
671
672 let drifted_property_fts_rows = count_drifted_property_fts_rows(&conn)?;
673
674 let dangling_edges: i64 = conn.query_row(
675 r"
676 SELECT count(*) FROM edges e
677 WHERE e.superseded_at IS NULL AND (
678 NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.source_logical_id AND n.superseded_at IS NULL)
679 OR
680 NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.target_logical_id AND n.superseded_at IS NULL)
681 )
682 ",
683 [],
684 |row| row.get(0),
685 )?;
686
687 let orphaned_supersession_chains: i64 = conn.query_row(
688 r"
689 SELECT count(*) FROM (
690 SELECT logical_id FROM nodes
691 GROUP BY logical_id
692 HAVING count(*) > 0 AND sum(CASE WHEN superseded_at IS NULL THEN 1 ELSE 0 END) = 0
693 )
694 ",
695 [],
696 |row| row.get(0),
697 )?;
698
699 #[cfg(feature = "sqlite-vec")]
701 let (stale_vec_rows, vec_rows_for_superseded_nodes): (i64, i64) = {
702 let kinds: Vec<String> =
703 match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
704 Ok(mut stmt) => stmt
705 .query_map([], |row| row.get(0))
706 .map_err(EngineError::Sqlite)?
707 .collect::<Result<Vec<_>, _>>()
708 .map_err(EngineError::Sqlite)?,
709 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
710 if msg.contains("no such table: projection_profiles") =>
711 {
712 vec![]
713 }
714 Err(e) => return Err(EngineError::Sqlite(e)),
715 };
716 let mut stale = 0i64;
717 let mut superseded = 0i64;
718 for kind in &kinds {
719 let table = fathomdb_schema::vec_kind_table_name(kind);
720 let stale_sql = format!(
721 "SELECT count(*) FROM {table} v \
722 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = v.chunk_id)"
723 );
724 let superseded_sql = format!(
725 "SELECT count(*) FROM {table} v \
726 JOIN chunks c ON c.id = v.chunk_id \
727 WHERE NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = c.node_logical_id)"
728 );
729 stale += match conn.query_row(&stale_sql, [], |row| row.get(0)) {
730 Ok(n) => n,
731 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
732 if msg.contains("no such table:")
733 || msg.contains("no such module: vec0") =>
734 {
735 0
736 }
737 Err(e) => return Err(EngineError::Sqlite(e)),
738 };
739 superseded += match conn.query_row(&superseded_sql, [], |row| row.get(0)) {
740 Ok(n) => n,
741 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
742 if msg.contains("no such table:")
743 || msg.contains("no such module: vec0") =>
744 {
745 0
746 }
747 Err(e) => return Err(EngineError::Sqlite(e)),
748 };
749 }
750 (stale, superseded)
751 };
752 #[cfg(not(feature = "sqlite-vec"))]
753 let stale_vec_rows: i64 = 0;
754 #[cfg(not(feature = "sqlite-vec"))]
755 let vec_rows_for_superseded_nodes: i64 = 0;
756 let missing_operational_current_rows: i64 = conn.query_row(
757 r"
758 SELECT count(*)
759 FROM operational_mutations m
760 JOIN operational_collections c
761 ON c.name = m.collection_name
762 AND c.kind = 'latest_state'
763 WHERE m.op_kind = 'put'
764 AND NOT EXISTS (
765 SELECT 1
766 FROM operational_mutations newer
767 WHERE newer.collection_name = m.collection_name
768 AND newer.record_key = m.record_key
769 AND newer.mutation_order > m.mutation_order
770 )
771 AND NOT EXISTS (
772 SELECT 1
773 FROM operational_current oc
774 WHERE oc.collection_name = m.collection_name
775 AND oc.record_key = m.record_key
776 )
777 ",
778 [],
779 |row| row.get(0),
780 )?;
781 let stale_operational_current_rows: i64 = conn.query_row(
782 r"
783 SELECT count(*)
784 FROM operational_current oc
785 JOIN operational_collections c
786 ON c.name = oc.collection_name
787 AND c.kind = 'latest_state'
788 LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
789 WHERE m.id IS NULL
790 OR m.collection_name != oc.collection_name
791 OR m.record_key != oc.record_key
792 OR m.op_kind != 'put'
793 OR m.payload_json != oc.payload_json
794 OR EXISTS (
795 SELECT 1
796 FROM operational_mutations newer
797 WHERE newer.collection_name = oc.collection_name
798 AND newer.record_key = oc.record_key
799 AND newer.mutation_order > m.mutation_order
800 )
801 ",
802 [],
803 |row| row.get(0),
804 )?;
805 let disabled_collection_mutations: i64 = conn.query_row(
806 r"
807 SELECT count(*)
808 FROM operational_mutations m
809 JOIN operational_collections c ON c.name = m.collection_name
810 WHERE c.disabled_at IS NOT NULL AND m.created_at > c.disabled_at
811 ",
812 [],
813 |row| row.get(0),
814 )?;
815 let orphaned_last_access_metadata_rows: i64 = conn.query_row(
816 r"
817 SELECT count(*)
818 FROM node_access_metadata am
819 WHERE NOT EXISTS (
820 SELECT 1 FROM nodes n WHERE n.logical_id = am.logical_id
821 )
822 ",
823 [],
824 |row| row.get(0),
825 )?;
826
827 let mut warnings = Vec::new();
828 if orphaned_chunks > 0 {
829 warnings.push(format!(
830 "{orphaned_chunks} orphaned chunk(s) with no surviving node history"
831 ));
832 }
833 if null_source_ref_nodes > 0 {
834 warnings.push(format!(
835 "{null_source_ref_nodes} active node(s) with null source_ref"
836 ));
837 }
838 if broken_step_fk > 0 {
839 warnings.push(format!(
840 "{broken_step_fk} step(s) referencing non-existent run"
841 ));
842 }
843 if broken_action_fk > 0 {
844 warnings.push(format!(
845 "{broken_action_fk} action(s) referencing non-existent step"
846 ));
847 }
848 if stale_fts_rows > 0 {
849 warnings.push(format!(
850 "{stale_fts_rows} stale FTS row(s) referencing missing chunk"
851 ));
852 }
853 if fts_rows_for_superseded_nodes > 0 {
854 warnings.push(format!(
855 "{fts_rows_for_superseded_nodes} FTS row(s) for superseded node(s)"
856 ));
857 }
858 if stale_property_fts_rows > 0 {
859 warnings.push(format!(
860 "{stale_property_fts_rows} stale property FTS row(s) for superseded/missing node(s)"
861 ));
862 }
863 if orphaned_property_fts_rows > 0 {
864 warnings.push(format!(
865 "{orphaned_property_fts_rows} orphaned property FTS row(s) for unregistered kind(s)"
866 ));
867 }
868 if mismatched_kind_property_fts_rows > 0 {
869 warnings.push(format!(
870 "{mismatched_kind_property_fts_rows} property FTS row(s) whose kind does not match the active node"
871 ));
872 }
873 if duplicate_property_fts_rows > 0 {
874 warnings.push(format!(
875 "{duplicate_property_fts_rows} active logical ID(s) with duplicate property FTS rows"
876 ));
877 }
878 if drifted_property_fts_rows > 0 {
879 warnings.push(format!(
880 "{drifted_property_fts_rows} property FTS row(s) with stale text_content"
881 ));
882 }
883 if dangling_edges > 0 {
884 warnings.push(format!(
885 "{dangling_edges} active edge(s) with missing endpoint node"
886 ));
887 }
888 if orphaned_supersession_chains > 0 {
889 warnings.push(format!(
890 "{orphaned_supersession_chains} logical_id(s) with all versions superseded"
891 ));
892 }
893 if stale_vec_rows > 0 {
894 warnings.push(format!(
895 "{stale_vec_rows} stale vec row(s) referencing missing chunk"
896 ));
897 }
898 if vec_rows_for_superseded_nodes > 0 {
899 warnings.push(format!(
900 "{vec_rows_for_superseded_nodes} vec row(s) whose node history is missing"
901 ));
902 }
903 if missing_operational_current_rows > 0 {
904 warnings.push(format!(
905 "{missing_operational_current_rows} latest-state key(s) missing operational_current rows"
906 ));
907 }
908 if stale_operational_current_rows > 0 {
909 warnings.push(format!(
910 "{stale_operational_current_rows} stale operational_current row(s)"
911 ));
912 }
913 if disabled_collection_mutations > 0 {
914 warnings.push(format!(
915 "{disabled_collection_mutations} mutation(s) were written after collection disable"
916 ));
917 }
918 if orphaned_last_access_metadata_rows > 0 {
919 warnings.push(format!(
920 "{orphaned_last_access_metadata_rows} last_access metadata row(s) reference missing node history"
921 ));
922 }
923
924 Ok(SemanticReport {
925 orphaned_chunks: i64_to_usize(orphaned_chunks),
926 null_source_ref_nodes: i64_to_usize(null_source_ref_nodes),
927 broken_step_fk: i64_to_usize(broken_step_fk),
928 broken_action_fk: i64_to_usize(broken_action_fk),
929 stale_fts_rows: i64_to_usize(stale_fts_rows),
930 fts_rows_for_superseded_nodes: i64_to_usize(fts_rows_for_superseded_nodes),
931 stale_property_fts_rows: i64_to_usize(stale_property_fts_rows),
932 orphaned_property_fts_rows: i64_to_usize(orphaned_property_fts_rows),
933 mismatched_kind_property_fts_rows: i64_to_usize(mismatched_kind_property_fts_rows),
934 duplicate_property_fts_rows: i64_to_usize(duplicate_property_fts_rows),
935 drifted_property_fts_rows: i64_to_usize(drifted_property_fts_rows),
936 dangling_edges: i64_to_usize(dangling_edges),
937 orphaned_supersession_chains: i64_to_usize(orphaned_supersession_chains),
938 stale_vec_rows: i64_to_usize(stale_vec_rows),
939 vec_rows_for_superseded_nodes: i64_to_usize(vec_rows_for_superseded_nodes),
940 missing_operational_current_rows: i64_to_usize(missing_operational_current_rows),
941 stale_operational_current_rows: i64_to_usize(stale_operational_current_rows),
942 disabled_collection_mutations: i64_to_usize(disabled_collection_mutations),
943 orphaned_last_access_metadata_rows: i64_to_usize(orphaned_last_access_metadata_rows),
944 warnings,
945 })
946 }
947}
948
949fn projection_table_collision_warnings(
950 conn: &rusqlite::Connection,
951) -> Result<Vec<String>, EngineError> {
952 let fts_kinds = projection_kinds(conn, "SELECT kind FROM fts_property_schemas")?;
953 let mut warnings = legacy_projection_collision_warnings(
954 "FTS property",
955 &fts_kinds,
956 fathomdb_schema::legacy_fts_kind_table_name,
957 fathomdb_schema::fts_kind_table_name,
958 );
959
960 let vec_kinds = projection_kinds(
961 conn,
962 "SELECT kind FROM projection_profiles WHERE facet = 'vec'",
963 )?;
964 warnings.extend(legacy_projection_collision_warnings(
965 "vector",
966 &vec_kinds,
967 fathomdb_schema::legacy_vec_kind_table_name,
968 fathomdb_schema::vec_kind_table_name,
969 ));
970 Ok(warnings)
971}
972
973fn projection_kinds(conn: &rusqlite::Connection, sql: &str) -> Result<Vec<String>, EngineError> {
974 let mut stmt = conn.prepare(sql)?;
975 stmt.query_map([], |r| r.get::<_, String>(0))?
976 .collect::<Result<Vec<_>, _>>()
977 .map_err(EngineError::Sqlite)
978}
979
980fn legacy_projection_collision_warnings(
981 label: &str,
982 kinds: &[String],
983 legacy_name: fn(&str) -> String,
984 current_name: fn(&str) -> String,
985) -> Vec<String> {
986 let mut by_legacy: std::collections::BTreeMap<String, Vec<&str>> =
987 std::collections::BTreeMap::new();
988 for kind in kinds {
989 by_legacy
990 .entry(legacy_name(kind))
991 .or_default()
992 .push(kind.as_str());
993 }
994
995 let mut warnings = Vec::new();
996 for (legacy_table, mut colliding_kinds) in by_legacy {
997 if colliding_kinds.len() <= 1 {
998 continue;
999 }
1000 colliding_kinds.sort_unstable();
1001 let current_tables: Vec<String> = colliding_kinds
1002 .iter()
1003 .map(|kind| current_name(kind))
1004 .collect();
1005 warnings.push(format!(
1006 "legacy {label} projection table name collision for {legacy_table}: kinds [{}] now map to [{}]",
1007 colliding_kinds.join(", "),
1008 current_tables.join(", ")
1009 ));
1010 }
1011 warnings
1012}
1013
1014fn count_per_kind_property_fts_issues(
1022 conn: &rusqlite::Connection,
1023) -> Result<(i64, i64, i64, i64), EngineError> {
1024 let per_kind_tables: Vec<String> = {
1028 let mut stmt = conn.prepare(
1029 "SELECT name FROM sqlite_master \
1030 WHERE type='table' AND name LIKE 'fts_props_%' \
1031 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1032 )?;
1033 stmt.query_map([], |r| r.get::<_, String>(0))?
1034 .collect::<Result<Vec<_>, _>>()?
1035 };
1036
1037 let registered_kinds: std::collections::HashSet<String> = {
1038 let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
1039 stmt.query_map([], |r| r.get::<_, String>(0))?
1040 .collect::<Result<std::collections::HashSet<_>, _>>()?
1041 };
1042
1043 let mut stale = 0i64;
1044 let mut orphaned = 0i64;
1045 let mut duplicate = 0i64;
1046
1047 for table in &per_kind_tables {
1048 let kind_stale: i64 = conn.query_row(
1050 &format!(
1051 "SELECT count(*) FROM {table} fp \
1052 WHERE NOT EXISTS (\
1053 SELECT 1 FROM nodes n \
1054 WHERE n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL\
1055 )"
1056 ),
1057 [],
1058 |r| r.get(0),
1059 )?;
1060 stale += kind_stale;
1061
1062 let kind_dup: i64 = conn.query_row(
1064 &format!(
1065 "SELECT count(*) FROM (\
1066 SELECT node_logical_id FROM {table} \
1067 GROUP BY node_logical_id HAVING count(*) > 1\
1068 )"
1069 ),
1070 [],
1071 |r| r.get(0),
1072 )?;
1073 duplicate += kind_dup;
1074
1075 let table_has_schema = registered_kinds
1078 .iter()
1079 .any(|k| fathomdb_schema::fts_kind_table_name(k) == *table);
1080 if !table_has_schema {
1081 let table_rows: i64 =
1082 conn.query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))?;
1083 orphaned += table_rows;
1084 }
1085 }
1086
1087 Ok((stale, orphaned, 0, duplicate))
1089}
1090
1091fn count_missing_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1095 let schemas = crate::writer::load_fts_property_schemas(conn)?;
1096 if schemas.is_empty() {
1097 return Ok(0);
1098 }
1099
1100 let mut missing = 0i64;
1101 for (kind, schema) in &schemas {
1102 let table = fathomdb_schema::fts_kind_table_name(kind);
1103 let table_exists: bool = conn
1105 .query_row(
1106 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1107 [table.as_str()],
1108 |r| r.get::<_, i64>(0),
1109 )
1110 .unwrap_or(0)
1111 > 0;
1112
1113 if table_exists {
1114 let mut stmt = conn.prepare(&format!(
1115 "SELECT n.logical_id, n.properties FROM nodes n \
1116 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
1117 AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
1118 ))?;
1119 let rows = stmt.query_map([kind.as_str()], |row| {
1120 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1121 })?;
1122 for row in rows {
1123 let (_logical_id, properties_str) = row?;
1124 let props: serde_json::Value =
1125 serde_json::from_str(&properties_str).unwrap_or_default();
1126 if crate::writer::extract_property_fts(&props, schema)
1127 .0
1128 .is_some()
1129 {
1130 missing += 1;
1131 }
1132 }
1133 } else {
1134 let mut stmt = conn.prepare(
1136 "SELECT n.logical_id, n.properties FROM nodes n \
1137 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
1138 )?;
1139 let rows = stmt.query_map([kind.as_str()], |row| {
1140 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1141 })?;
1142 for row in rows {
1143 let (_logical_id, properties_str) = row?;
1144 let props: serde_json::Value =
1145 serde_json::from_str(&properties_str).unwrap_or_default();
1146 if crate::writer::extract_property_fts(&props, schema)
1147 .0
1148 .is_some()
1149 {
1150 missing += 1;
1151 }
1152 }
1153 }
1154 }
1155 Ok(missing)
1156}
1157
1158fn count_drifted_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1172 let schemas = crate::writer::load_fts_property_schemas(conn)?;
1173 if schemas.is_empty() {
1174 return Ok(0);
1175 }
1176
1177 let mut drifted = 0i64;
1178 for (kind, schema) in &schemas {
1179 let table = fathomdb_schema::fts_kind_table_name(kind);
1180 let table_exists: bool = conn
1182 .query_row(
1183 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1184 [table.as_str()],
1185 |r| r.get::<_, i64>(0),
1186 )
1187 .unwrap_or(0)
1188 > 0;
1189 if !table_exists {
1190 continue;
1191 }
1192
1193 drifted += if schema.is_weighted() {
1203 count_drifted_weighted(conn, kind, &table, schema)?
1204 } else {
1205 count_drifted_non_weighted(conn, kind, &table, schema)?
1206 };
1207 }
1208 Ok(drifted)
1209}
1210
1211fn count_drifted_non_weighted(
1214 conn: &rusqlite::Connection,
1215 kind: &str,
1216 table: &str,
1217 schema: &crate::writer::PropertyFtsSchema,
1218) -> Result<i64, EngineError> {
1219 let mut drifted = 0i64;
1220 let mut stmt = conn.prepare(&format!(
1221 "SELECT fp.node_logical_id, fp.text_content, n.properties \
1222 FROM {table} fp \
1223 JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1224 WHERE n.kind = ?1"
1225 ))?;
1226 let rows = stmt.query_map([kind], |row| {
1227 Ok((
1228 row.get::<_, String>(0)?,
1229 row.get::<_, String>(1)?,
1230 row.get::<_, String>(2)?,
1231 ))
1232 })?;
1233 for row in rows {
1234 let (_logical_id, stored_text, properties_str) = row?;
1235 let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1236 let (expected, _positions, _stats) = crate::writer::extract_property_fts(&props, schema);
1237 match expected {
1238 Some(text) if text == stored_text => {}
1239 _ => drifted += 1,
1240 }
1241 }
1242 Ok(drifted)
1243}
1244
1245fn count_drifted_weighted(
1249 conn: &rusqlite::Connection,
1250 kind: &str,
1251 table: &str,
1252 schema: &crate::writer::PropertyFtsSchema,
1253) -> Result<i64, EngineError> {
1254 let columns: Vec<String> = schema
1257 .paths
1258 .iter()
1259 .map(|entry| {
1260 let is_recursive = matches!(entry.mode, crate::writer::PropertyPathMode::Recursive);
1261 fathomdb_schema::fts_column_name(&entry.path, is_recursive)
1262 })
1263 .collect();
1264 if columns.is_empty() {
1265 return Ok(0);
1269 }
1270
1271 let select_cols: String = columns
1272 .iter()
1273 .map(|c| format!("fp.\"{c}\""))
1274 .collect::<Vec<_>>()
1275 .join(", ");
1276 let sql = format!(
1277 "SELECT fp.node_logical_id, {select_cols}, n.properties \
1278 FROM {table} fp \
1279 JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1280 WHERE n.kind = ?1"
1281 );
1282 let mut stmt = conn.prepare(&sql)?;
1283 let props_col_idx = columns.len() + 1;
1288 let rows = stmt.query_map([kind], |row| {
1289 let mut stored: Vec<String> = Vec::with_capacity(columns.len());
1290 for i in 0..columns.len() {
1291 stored.push(row.get::<_, String>(i + 1)?);
1292 }
1293 let properties: String = row.get(props_col_idx)?;
1294 Ok((stored, properties))
1295 })?;
1296
1297 let mut drifted = 0i64;
1298 for row in rows {
1299 let (stored, properties_str) = row?;
1300 let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1301 let expected = crate::writer::extract_property_fts_columns(&props, schema);
1302 let row_drifted = if expected.len() == stored.len() {
1306 expected
1307 .iter()
1308 .zip(stored.iter())
1309 .any(|((_name, exp_text), stored_text)| exp_text != stored_text)
1310 } else {
1311 true
1312 };
1313 if row_drifted {
1314 drifted += 1;
1315 }
1316 }
1317 Ok(drifted)
1318}
1319
1320#[allow(clippy::expect_used)]
1323pub(super) fn i64_to_usize(val: i64) -> usize {
1324 usize::try_from(val).expect("count(*) must be non-negative")
1325}
1326
1327pub(super) fn persist_simple_provenance_event(
1328 conn: &rusqlite::Connection,
1329 event_type: &str,
1330 subject: &str,
1331 metadata: Option<serde_json::Value>,
1332) -> Result<(), EngineError> {
1333 let metadata_json = metadata.map(|value| value.to_string()).unwrap_or_default();
1334 conn.execute(
1335 "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1336 rusqlite::params![new_id(), event_type, subject, metadata_json],
1337 )?;
1338 Ok(())
1339}
1340
1341pub(super) fn rebuild_operational_current_rows(
1342 tx: &rusqlite::Transaction<'_>,
1343 collections: &[String],
1344) -> Result<usize, EngineError> {
1345 let mut rebuilt_rows = 0usize;
1346 clear_operational_current_rows(tx, collections)?;
1347 let mut ins_current = tx.prepare_cached(
1348 "INSERT INTO operational_current \
1349 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1350 VALUES (?1, ?2, ?3, ?4, ?5)",
1351 )?;
1352
1353 for collection in collections {
1354 let mut stmt = tx.prepare(
1355 "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
1356 FROM operational_mutations \
1357 WHERE collection_name = ?1 \
1358 ORDER BY record_key, mutation_order",
1359 )?;
1360 let mut latest_by_key: std::collections::HashMap<String, Option<(String, i64, String)>> =
1361 std::collections::HashMap::new();
1362 let rows = stmt.query_map([collection], operational::map_operational_mutation_row)?;
1363 for row in rows {
1364 let mutation = row?;
1365 match mutation.op_kind.as_str() {
1366 "put" => {
1367 latest_by_key.insert(
1368 mutation.record_key,
1369 Some((mutation.payload_json, mutation.created_at, mutation.id)),
1370 );
1371 }
1372 "delete" => {
1373 latest_by_key.insert(mutation.record_key, None);
1374 }
1375 _ => {}
1376 }
1377 }
1378
1379 for (record_key, state) in latest_by_key {
1380 if let Some((payload_json, updated_at, last_mutation_id)) = state {
1381 ins_current.execute(rusqlite::params![
1382 collection,
1383 record_key,
1384 payload_json,
1385 updated_at,
1386 last_mutation_id,
1387 ])?;
1388 rebuilt_rows += 1;
1389 }
1390 }
1391 }
1392
1393 drop(ins_current);
1394 Ok(rebuilt_rows)
1395}
1396
1397pub(super) fn clear_operational_current_rows(
1398 tx: &rusqlite::Transaction<'_>,
1399 collections: &[String],
1400) -> Result<(), EngineError> {
1401 let mut delete_current =
1402 tx.prepare_cached("DELETE FROM operational_current WHERE collection_name = ?1")?;
1403 let mut delete_secondary_current = tx.prepare_cached(
1404 "DELETE FROM operational_secondary_index_entries \
1405 WHERE collection_name = ?1 AND subject_kind = 'current'",
1406 )?;
1407 for collection in collections {
1408 delete_secondary_current.execute([collection])?;
1409 delete_current.execute([collection])?;
1410 }
1411 drop(delete_secondary_current);
1412 drop(delete_current);
1413 Ok(())
1414}
1415
1416#[cfg(test)]
1417#[allow(clippy::expect_used, deprecated)]
1418mod tests {
1419 use std::fs;
1420 use std::sync::Arc;
1421
1422 use fathomdb_schema::SchemaManager;
1423 use tempfile::NamedTempFile;
1424
1425 use super::{
1426 AdminService, FtsPropertyPathMode, FtsPropertyPathSpec, SafeExportOptions,
1427 VectorRegenerationConfig,
1428 };
1429 use crate::embedder::{BatchEmbedder, EmbedderError, QueryEmbedder, QueryEmbedderIdentity};
1430 use crate::projection::ProjectionTarget;
1431 use crate::sqlite;
1432 use crate::{EngineError, OperationalCollectionKind, OperationalRegisterRequest};
1433
1434 #[cfg(feature = "sqlite-vec")]
1435 use crate::{ExecutionCoordinator, TelemetryCounters};
1436
1437 #[cfg(feature = "sqlite-vec")]
1438 use fathomdb_query::QueryBuilder;
1439
1440 #[cfg(feature = "sqlite-vec")]
1441 use super::load_vector_regeneration_config;
1442
1443 #[derive(Debug)]
1447 #[allow(dead_code)]
1448 struct TestEmbedder {
1449 identity: QueryEmbedderIdentity,
1450 vector: Vec<f32>,
1451 }
1452
1453 #[allow(dead_code)]
1454 impl TestEmbedder {
1455 fn new(model: &str, dimension: usize) -> Self {
1456 Self {
1457 identity: QueryEmbedderIdentity {
1458 model_identity: model.to_owned(),
1459 model_version: "1.0.0".to_owned(),
1460 dimension,
1461 normalization_policy: "l2".to_owned(),
1462 },
1463 vector: vec![1.0; dimension],
1464 }
1465 }
1466 }
1467
1468 impl QueryEmbedder for TestEmbedder {
1469 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1470 Ok(self.vector.clone())
1471 }
1472 fn identity(&self) -> QueryEmbedderIdentity {
1473 self.identity.clone()
1474 }
1475 fn max_tokens(&self) -> usize {
1476 512
1477 }
1478 }
1479
1480 impl BatchEmbedder for TestEmbedder {
1481 fn batch_embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, EmbedderError> {
1482 Ok(texts.iter().map(|_| self.vector.clone()).collect())
1483 }
1484 fn identity(&self) -> QueryEmbedderIdentity {
1485 self.identity.clone()
1486 }
1487 fn max_tokens(&self) -> usize {
1488 512
1489 }
1490 }
1491
1492 #[derive(Debug)]
1495 #[allow(dead_code)]
1496 struct FailingEmbedder {
1497 identity: QueryEmbedderIdentity,
1498 }
1499
1500 impl QueryEmbedder for FailingEmbedder {
1501 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1502 Err(EmbedderError::Failed("test failure".to_owned()))
1503 }
1504 fn identity(&self) -> QueryEmbedderIdentity {
1505 self.identity.clone()
1506 }
1507 fn max_tokens(&self) -> usize {
1508 512
1509 }
1510 }
1511
1512 #[allow(dead_code)]
1513 #[cfg(unix)]
1514 fn set_file_mode(path: &std::path::Path, mode: u32) {
1515 use std::os::unix::fs::PermissionsExt;
1516
1517 let mut permissions = fs::metadata(path).expect("script metadata").permissions();
1518 permissions.set_mode(mode);
1519 fs::set_permissions(path, permissions).expect("chmod");
1520 }
1521
1522 #[allow(dead_code)]
1523 #[cfg(not(unix))]
1524 fn set_file_mode(_path: &std::path::Path, _mode: u32) {}
1525 fn setup() -> (NamedTempFile, AdminService) {
1526 let db = NamedTempFile::new().expect("temp file");
1527 let schema = Arc::new(SchemaManager::new());
1528 {
1529 let conn = sqlite::open_connection(db.path()).expect("connection");
1530 schema.bootstrap(&conn).expect("bootstrap");
1531 }
1532 let service = AdminService::new(db.path(), Arc::clone(&schema));
1533 (db, service)
1534 }
1535
1536 #[test]
1537 fn check_integrity_includes_active_uniqueness_count() {
1538 let (_db, service) = setup();
1539 let report = service.check_integrity().expect("integrity check");
1540 assert_eq!(report.duplicate_active_logical_ids, 0);
1541 assert_eq!(report.operational_missing_collections, 0);
1542 assert_eq!(report.operational_missing_last_mutations, 0);
1543 }
1544
1545 #[test]
1546 fn check_integrity_warns_for_legacy_projection_name_collisions() {
1547 let (db, service) = setup();
1548 {
1549 let conn = sqlite::open_connection(db.path()).expect("conn");
1550 conn.execute(
1551 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
1552 VALUES ('Foo-Bar', '[\"$.name\"]', ' ')",
1553 [],
1554 )
1555 .expect("insert first schema");
1556 conn.execute(
1557 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
1558 VALUES ('Foo_Bar', '[\"$.name\"]', ' ')",
1559 [],
1560 )
1561 .expect("insert second schema");
1562 }
1563
1564 let report = service.check_integrity().expect("integrity check");
1565
1566 assert!(report.warnings.iter().any(|warning| {
1567 warning.contains("legacy FTS property projection table name collision")
1568 }));
1569 }
1570
1571 #[test]
1572 fn trace_source_returns_node_logical_ids() {
1573 let (db, service) = setup();
1574 {
1575 let conn = sqlite::open_connection(db.path()).expect("conn");
1576 conn.execute(
1577 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1578 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 'source-1')",
1579 [],
1580 )
1581 .expect("insert node");
1582 }
1583 let report = service.trace_source("source-1").expect("trace");
1584 assert_eq!(report.node_rows, 1);
1585 assert_eq!(report.node_logical_ids, vec!["lg1"]);
1586 }
1587
1588 #[test]
1589 fn trace_source_includes_operational_mutations() {
1590 let (db, service) = setup();
1591 {
1592 let conn = sqlite::open_connection(db.path()).expect("conn");
1593 conn.execute(
1594 "INSERT INTO operational_collections \
1595 (name, kind, schema_json, retention_json, format_version, created_at) \
1596 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1597 [],
1598 )
1599 .expect("insert collection");
1600 conn.execute(
1601 "INSERT INTO operational_mutations \
1602 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1603 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"ok\"}', 'source-1', 100, 1)",
1604 [],
1605 )
1606 .expect("insert mutation");
1607 }
1608
1609 let report = service.trace_source("source-1").expect("trace");
1610 assert_eq!(report.operational_mutation_rows, 1);
1611 assert_eq!(report.operational_mutation_ids, vec!["m1"]);
1612 }
1613
1614 #[test]
1615 fn excise_source_restores_prior_active_node() {
1616 let (db, service) = setup();
1617 {
1618 let conn = sqlite::open_connection(db.path()).expect("conn");
1619 conn.execute(
1620 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1621 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
1622 [],
1623 )
1624 .expect("insert v1 superseded");
1625 conn.execute(
1626 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1627 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
1628 [],
1629 )
1630 .expect("insert v2 active");
1631 }
1632 service.excise_source("source-2").expect("excise");
1633 {
1634 let conn = sqlite::open_connection(db.path()).expect("conn");
1635 let active_row_id: String = conn
1636 .query_row(
1637 "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
1638 [],
1639 |row| row.get(0),
1640 )
1641 .expect("active row exists after excise");
1642 assert_eq!(active_row_id, "r1");
1643 }
1644 }
1645
1646 #[test]
1647 fn excise_source_deletes_operational_mutations_and_repairs_latest_state_current() {
1648 let (db, service) = setup();
1649 {
1650 let conn = sqlite::open_connection(db.path()).expect("conn");
1651 conn.execute(
1652 "INSERT INTO operational_collections \
1653 (name, kind, schema_json, retention_json, format_version, created_at) \
1654 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1655 [],
1656 )
1657 .expect("insert collection");
1658 conn.execute(
1659 "INSERT INTO operational_mutations \
1660 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1661 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'source-1', 100, 1)",
1662 [],
1663 )
1664 .expect("insert prior mutation");
1665 conn.execute(
1666 "INSERT INTO operational_mutations \
1667 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1668 VALUES ('m2', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'source-2', 200, 2)",
1669 [],
1670 )
1671 .expect("insert excised mutation");
1672 conn.execute(
1673 "INSERT INTO operational_current \
1674 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1675 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 200, 'm2')",
1676 [],
1677 )
1678 .expect("insert current row");
1679 }
1680
1681 let traced = service
1682 .trace_source("source-2")
1683 .expect("trace before excise");
1684 assert_eq!(traced.operational_mutation_rows, 1);
1685 assert_eq!(traced.operational_mutation_ids, vec!["m2"]);
1686
1687 let excised = service.excise_source("source-2").expect("excise");
1688 assert_eq!(excised.operational_mutation_rows, 0);
1689 assert!(excised.operational_mutation_ids.is_empty());
1690
1691 {
1692 let conn = sqlite::open_connection(db.path()).expect("conn");
1693 let remaining: i64 = conn
1694 .query_row(
1695 "SELECT count(*) FROM operational_mutations WHERE source_ref = 'source-2'",
1696 [],
1697 |row| row.get(0),
1698 )
1699 .expect("remaining count");
1700 assert_eq!(remaining, 0);
1701
1702 let current: (String, String) = conn
1703 .query_row(
1704 "SELECT payload_json, last_mutation_id FROM operational_current \
1705 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1706 [],
1707 |row| Ok((row.get(0)?, row.get(1)?)),
1708 )
1709 .expect("rebuilt current row");
1710 assert_eq!(current.0, "{\"status\":\"old\"}");
1711 assert_eq!(current.1, "m1");
1712 }
1713 }
1714
1715 #[test]
1716 fn restore_logical_id_reestablishes_last_pre_retire_content_and_attached_edges() {
1717 let (db, service) = setup();
1718 {
1719 let conn = sqlite::open_connection(db.path()).expect("conn");
1720 conn.execute(
1721 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1722 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1723 [],
1724 )
1725 .expect("insert node");
1726 conn.execute(
1727 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1728 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1729 [],
1730 )
1731 .expect("insert target node");
1732 conn.execute(
1733 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1734 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1735 [],
1736 )
1737 .expect("insert chunk");
1738 conn.execute(
1739 "INSERT INTO edges \
1740 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1741 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1742 [],
1743 )
1744 .expect("insert edge");
1745 conn.execute(
1746 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1747 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1748 [],
1749 )
1750 .expect("insert node retire event");
1751 conn.execute(
1752 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1753 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
1754 [],
1755 )
1756 .expect("insert edge retire event");
1757 conn.execute(
1758 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1759 [],
1760 )
1761 .expect("retire node");
1762 conn.execute(
1763 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
1764 [],
1765 )
1766 .expect("retire edge");
1767 conn.execute("DELETE FROM fts_nodes", [])
1768 .expect("clear fts");
1769 }
1770
1771 let report = service.restore_logical_id("doc-1").expect("restore");
1772 assert_eq!(report.logical_id, "doc-1");
1773 assert!(!report.was_noop);
1774 assert_eq!(report.restored_node_rows, 1);
1775 assert_eq!(report.restored_edge_rows, 1);
1776 assert_eq!(report.restored_chunk_rows, 1);
1777 assert_eq!(report.restored_fts_rows, 1);
1778
1779 let conn = sqlite::open_connection(db.path()).expect("conn");
1780 let active_node_count: i64 = conn
1781 .query_row(
1782 "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1783 [],
1784 |row| row.get(0),
1785 )
1786 .expect("active node count");
1787 assert_eq!(active_node_count, 1);
1788 let active_edge_count: i64 = conn
1789 .query_row(
1790 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1791 [],
1792 |row| row.get(0),
1793 )
1794 .expect("active edge count");
1795 assert_eq!(active_edge_count, 1);
1796 let fts_count: i64 = conn
1797 .query_row(
1798 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
1799 [],
1800 |row| row.get(0),
1801 )
1802 .expect("fts count");
1803 assert_eq!(fts_count, 1);
1804 }
1805
1806 #[test]
1807 fn restore_logical_id_restores_edges_retired_after_the_node_retire_event() {
1808 let (db, service) = setup();
1809 {
1810 let conn = sqlite::open_connection(db.path()).expect("conn");
1811 conn.execute(
1812 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1813 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1814 [],
1815 )
1816 .expect("insert node");
1817 conn.execute(
1818 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1819 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1820 [],
1821 )
1822 .expect("insert target node");
1823 conn.execute(
1824 "INSERT INTO edges \
1825 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1826 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1827 [],
1828 )
1829 .expect("insert edge");
1830 conn.execute(
1831 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1832 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1833 [],
1834 )
1835 .expect("insert node retire event");
1836 conn.execute(
1837 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1838 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 201, '')",
1839 [],
1840 )
1841 .expect("insert edge retire event");
1842 conn.execute(
1843 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1844 [],
1845 )
1846 .expect("retire node");
1847 conn.execute(
1848 "UPDATE edges SET superseded_at = 201 WHERE logical_id = 'edge-1'",
1849 [],
1850 )
1851 .expect("retire edge");
1852 }
1853
1854 let report = service.restore_logical_id("doc-1").expect("restore");
1855 assert_eq!(report.restored_edge_rows, 1);
1856
1857 let conn = sqlite::open_connection(db.path()).expect("conn");
1858 let active_edge_count: i64 = conn
1859 .query_row(
1860 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1861 [],
1862 |row| row.get(0),
1863 )
1864 .expect("active edge count");
1865 assert_eq!(active_edge_count, 1);
1866 }
1867
1868 #[test]
1869 fn restore_logical_id_prefers_latest_retired_revision_when_timestamps_tie() {
1870 let (db, service) = setup();
1871 {
1872 let conn = sqlite::open_connection(db.path()).expect("conn");
1873 conn.execute(
1874 "INSERT INTO nodes \
1875 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1876 VALUES ('node-row-older', 'doc-1', 'Document', '{\"title\":\"older\"}', 100, 200, 'forget-1')",
1877 [],
1878 )
1879 .expect("insert older retired node");
1880 conn.execute(
1881 "INSERT INTO nodes \
1882 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1883 VALUES ('node-row-newer', 'doc-1', 'Document', '{\"title\":\"newer\"}', 100, 200, 'forget-1')",
1884 [],
1885 )
1886 .expect("insert newer retired node");
1887 conn.execute(
1888 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1889 VALUES ('evt-retire-older', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1890 [],
1891 )
1892 .expect("insert older retire event");
1893 conn.execute(
1894 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1895 VALUES ('evt-retire-newer', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1896 [],
1897 )
1898 .expect("insert newer retire event");
1899 }
1900
1901 let report = service.restore_logical_id("doc-1").expect("restore");
1902
1903 assert!(!report.was_noop);
1904 let conn = sqlite::open_connection(db.path()).expect("conn");
1905 let active_row: (String, String) = conn
1906 .query_row(
1907 "SELECT row_id, properties FROM nodes \
1908 WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1909 [],
1910 |row| Ok((row.get(0)?, row.get(1)?)),
1911 )
1912 .expect("restored active row");
1913 assert_eq!(active_row.0, "node-row-newer");
1914 assert_eq!(active_row.1, "{\"title\":\"newer\"}");
1915 }
1916
1917 #[test]
1918 fn purge_logical_id_removes_retired_content_and_records_tombstone() {
1919 let (db, service) = setup();
1920 {
1921 let conn = sqlite::open_connection(db.path()).expect("conn");
1922 conn.execute(
1923 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1924 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1925 [],
1926 )
1927 .expect("insert retired node");
1928 conn.execute(
1929 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1930 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1931 [],
1932 )
1933 .expect("insert chunk");
1934 conn.execute(
1935 "INSERT INTO edges \
1936 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, superseded_at, source_ref) \
1937 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 200, 'seed')",
1938 [],
1939 )
1940 .expect("insert retired edge");
1941 conn.execute(
1942 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1943 VALUES ('chunk-1', 'doc-1', 'Document', 'budget narrative')",
1944 [],
1945 )
1946 .expect("insert fts");
1947 }
1948
1949 let report = service.purge_logical_id("doc-1").expect("purge");
1950 assert_eq!(report.logical_id, "doc-1");
1951 assert!(!report.was_noop);
1952 assert_eq!(report.deleted_node_rows, 1);
1953 assert_eq!(report.deleted_edge_rows, 1);
1954 assert_eq!(report.deleted_chunk_rows, 1);
1955 assert_eq!(report.deleted_fts_rows, 1);
1956
1957 let conn = sqlite::open_connection(db.path()).expect("conn");
1958 let remaining_nodes: i64 = conn
1959 .query_row(
1960 "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1'",
1961 [],
1962 |row| row.get(0),
1963 )
1964 .expect("remaining nodes");
1965 assert_eq!(remaining_nodes, 0);
1966 let remaining_edges: i64 = conn
1967 .query_row(
1968 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1'",
1969 [],
1970 |row| row.get(0),
1971 )
1972 .expect("remaining edges");
1973 assert_eq!(remaining_edges, 0);
1974 let remaining_chunks: i64 = conn
1975 .query_row(
1976 "SELECT count(*) FROM chunks WHERE id = 'chunk-1'",
1977 [],
1978 |row| row.get(0),
1979 )
1980 .expect("remaining chunks");
1981 assert_eq!(remaining_chunks, 0);
1982 let purge_events: i64 = conn
1983 .query_row(
1984 "SELECT count(*) FROM provenance_events WHERE event_type = 'purge_logical_id' AND subject = 'doc-1'",
1985 [],
1986 |row| row.get(0),
1987 )
1988 .expect("purge events");
1989 assert_eq!(purge_events, 1);
1990 }
1991
1992 #[test]
1993 fn check_semantics_accepts_preserved_retired_chunks() {
1994 let (db, service) = setup();
1995 {
1996 let conn = sqlite::open_connection(db.path()).expect("conn");
1997 conn.execute(
1998 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1999 VALUES ('node-row-1', 'doc-1', 'Document', '{}', 100, 200, 'seed')",
2000 [],
2001 )
2002 .expect("insert retired node");
2003 conn.execute(
2004 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2005 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2006 [],
2007 )
2008 .expect("insert chunk");
2009 }
2010
2011 let report = service.check_semantics().expect("semantics");
2012 assert_eq!(report.orphaned_chunks, 0);
2013 }
2014
2015 #[test]
2016 fn check_semantics_detects_missing_retired_node_history_for_preserved_chunks() {
2017 let (db, service) = setup();
2018 {
2019 let conn = sqlite::open_connection(db.path()).expect("conn");
2020 conn.execute(
2021 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2022 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
2023 [],
2024 )
2025 .expect("insert orphaned chunk");
2026 }
2027
2028 let report = service.check_semantics().expect("semantics");
2029 assert_eq!(report.orphaned_chunks, 1);
2030 }
2031
2032 #[cfg(feature = "sqlite-vec")]
2033 #[test]
2034 fn check_semantics_detects_missing_retired_node_history_for_preserved_vec_rows() {
2035 let (db, service) = setup();
2036 {
2037 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2038 service
2039 .schema_manager
2040 .ensure_vec_kind_profile(&conn, "Doc", 4)
2041 .expect("ensure vec kind profile");
2042 let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
2043 conn.execute(
2044 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2045 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
2046 [],
2047 )
2048 .expect("insert orphaned chunk");
2049 conn.execute(
2050 &format!(
2051 "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
2052 ),
2053 [],
2054 )
2055 .expect("insert vec row");
2056 }
2057
2058 let report = service.check_semantics().expect("semantics");
2059 assert_eq!(report.orphaned_chunks, 1);
2060 assert_eq!(report.vec_rows_for_superseded_nodes, 1);
2061 }
2062
2063 #[cfg(feature = "sqlite-vec")]
2064 #[test]
2065 fn restore_logical_id_reestablishes_vector_search_without_reingest() {
2066 let (db, service) = setup();
2067 {
2068 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2069 service
2070 .schema_manager
2071 .ensure_vec_kind_profile(&conn, "Document", 4)
2072 .expect("ensure vec kind profile");
2073 conn.execute(
2074 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
2075 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
2076 [],
2077 )
2078 .expect("insert retired node");
2079 let vec_table = fathomdb_schema::vec_kind_table_name("Document");
2080 conn.execute(
2081 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2082 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2083 [],
2084 )
2085 .expect("insert chunk");
2086 conn.execute(
2087 &format!(
2088 "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
2089 ),
2090 [],
2091 )
2092 .expect("insert vec row");
2093 conn.execute(
2094 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
2095 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
2096 [],
2097 )
2098 .expect("insert retire event");
2099 }
2100
2101 let report = service.restore_logical_id("doc-1").expect("restore");
2102 assert_eq!(report.restored_vec_rows, 1);
2103
2104 let coordinator = ExecutionCoordinator::open(
2105 db.path(),
2106 Arc::new(SchemaManager::new()),
2107 Some(4),
2108 1,
2109 Arc::new(TelemetryCounters::default()),
2110 None,
2111 )
2112 .expect("coordinator");
2113 let compiled = QueryBuilder::nodes("Document")
2114 .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
2115 .compile()
2116 .expect("compile");
2117 let rows = coordinator
2118 .execute_compiled_read(&compiled)
2119 .expect("vector read");
2120 assert!(
2121 rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
2122 "restore should make the preserved vec row visible again without re-ingest"
2123 );
2124 }
2125
2126 #[cfg(feature = "sqlite-vec")]
2127 #[test]
2128 fn purge_logical_id_deletes_vec_rows_for_retired_content() {
2129 let (db, service) = setup();
2130 {
2131 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2132 service
2133 .schema_manager
2134 .ensure_vec_kind_profile(&conn, "Document", 4)
2135 .expect("ensure vec kind profile");
2136 conn.execute(
2137 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
2138 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
2139 [],
2140 )
2141 .expect("insert retired node");
2142 let vec_table = fathomdb_schema::vec_kind_table_name("Document");
2143 conn.execute(
2144 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2145 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2146 [],
2147 )
2148 .expect("insert chunk");
2149 conn.execute(
2150 &format!(
2151 "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
2152 ),
2153 [],
2154 )
2155 .expect("insert vec row");
2156 }
2157
2158 let report = service.purge_logical_id("doc-1").expect("purge");
2159 assert_eq!(report.deleted_vec_rows, 1);
2160
2161 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2162 let vec_table = fathomdb_schema::vec_kind_table_name("Document");
2163 let vec_count: i64 = conn
2164 .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
2165 row.get(0)
2166 })
2167 .expect("vec count");
2168 assert_eq!(vec_count, 0);
2169 }
2170
2171 #[cfg(feature = "sqlite-vec")]
2172 #[test]
2173 fn restore_logical_id_restores_visibility_of_regenerated_vectors() {
2174 let (db, service) = setup();
2175
2176 {
2177 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2178 conn.execute(
2179 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
2180 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
2181 [],
2182 )
2183 .expect("insert node");
2184 conn.execute(
2185 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2186 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2187 [],
2188 )
2189 .expect("insert chunk");
2190 }
2191
2192 let embedder = TestEmbedder::new("test-model", 4);
2193 service
2194 .regenerate_vector_embeddings(
2195 &embedder,
2196 &VectorRegenerationConfig {
2197 kind: "Document".to_owned(),
2198 profile: "default".to_owned(),
2199 chunking_policy: "per_chunk".to_owned(),
2200 preprocessing_policy: "trim".to_owned(),
2201 },
2202 )
2203 .expect("regenerate");
2204
2205 {
2206 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2207 conn.execute(
2208 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
2209 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
2210 [],
2211 )
2212 .expect("insert retire event");
2213 conn.execute(
2214 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
2215 [],
2216 )
2217 .expect("retire node");
2218 }
2219
2220 let report = service.restore_logical_id("doc-1").expect("restore");
2221 assert_eq!(report.restored_vec_rows, 1);
2222
2223 let coordinator = ExecutionCoordinator::open(
2224 db.path(),
2225 Arc::new(SchemaManager::new()),
2226 Some(4),
2227 1,
2228 Arc::new(TelemetryCounters::default()),
2229 None,
2230 )
2231 .expect("coordinator");
2232 let compiled = QueryBuilder::nodes("Document")
2233 .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
2234 .compile()
2235 .expect("compile");
2236 let rows = coordinator
2237 .execute_compiled_read(&compiled)
2238 .expect("vector read");
2239 assert!(
2240 rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
2241 "restored logical_id should become visible through regenerated vectors"
2242 );
2243 }
2244
2245 #[test]
2246 fn check_semantics_clean_db_returns_zeros() {
2247 let (_db, service) = setup();
2248 let report = service.check_semantics().expect("semantics check");
2249 assert_eq!(report.orphaned_chunks, 0);
2250 assert_eq!(report.null_source_ref_nodes, 0);
2251 assert_eq!(report.broken_step_fk, 0);
2252 assert_eq!(report.broken_action_fk, 0);
2253 assert_eq!(report.stale_fts_rows, 0);
2254 assert_eq!(report.fts_rows_for_superseded_nodes, 0);
2255 assert_eq!(report.dangling_edges, 0);
2256 assert_eq!(report.orphaned_supersession_chains, 0);
2257 assert_eq!(report.stale_vec_rows, 0);
2258 assert_eq!(report.vec_rows_for_superseded_nodes, 0);
2259 assert_eq!(report.missing_operational_current_rows, 0);
2260 assert_eq!(report.stale_operational_current_rows, 0);
2261 assert_eq!(report.disabled_collection_mutations, 0);
2262 assert_eq!(report.mismatched_kind_property_fts_rows, 0);
2263 assert_eq!(report.duplicate_property_fts_rows, 0);
2264 assert_eq!(report.drifted_property_fts_rows, 0);
2265 assert!(report.warnings.is_empty());
2266 }
2267
2268 #[test]
2269 fn register_operational_collection_persists_and_emits_provenance() {
2270 let (db, service) = setup();
2271 let record = service
2272 .register_operational_collection(&OperationalRegisterRequest {
2273 name: "connector_health".to_owned(),
2274 kind: OperationalCollectionKind::LatestState,
2275 schema_json: "{}".to_owned(),
2276 retention_json: "{}".to_owned(),
2277 filter_fields_json: "[]".to_owned(),
2278 validation_json: String::new(),
2279 secondary_indexes_json: "[]".to_owned(),
2280 format_version: 1,
2281 })
2282 .expect("register collection");
2283
2284 assert_eq!(record.name, "connector_health");
2285 assert_eq!(record.kind, OperationalCollectionKind::LatestState);
2286 assert_eq!(record.schema_json, "{}");
2287 assert_eq!(record.retention_json, "{}");
2288 assert_eq!(record.filter_fields_json, "[]");
2289 assert!(record.created_at > 0);
2290 assert_eq!(record.disabled_at, None);
2291
2292 let described = service
2293 .describe_operational_collection("connector_health")
2294 .expect("describe collection")
2295 .expect("collection exists");
2296 assert_eq!(described, record);
2297
2298 let conn = sqlite::open_connection(db.path()).expect("conn");
2299 let provenance_count: i64 = conn
2300 .query_row(
2301 "SELECT count(*) FROM provenance_events \
2302 WHERE event_type = 'operational_collection_registered' AND subject = 'connector_health'",
2303 [],
2304 |row| row.get(0),
2305 )
2306 .expect("provenance count");
2307 assert_eq!(provenance_count, 1);
2308 }
2309
2310 #[test]
2311 fn register_and_update_operational_collection_validation_round_trip() {
2312 let (db, service) = setup();
2313 let record = service
2314 .register_operational_collection(&OperationalRegisterRequest {
2315 name: "connector_health".to_owned(),
2316 kind: OperationalCollectionKind::LatestState,
2317 schema_json: "{}".to_owned(),
2318 retention_json: "{}".to_owned(),
2319 filter_fields_json: "[]".to_owned(),
2320 validation_json: String::new(),
2321 secondary_indexes_json: "[]".to_owned(),
2322 format_version: 1,
2323 })
2324 .expect("register collection");
2325 assert_eq!(record.validation_json, "");
2326
2327 let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
2328 let updated = service
2329 .update_operational_collection_validation("connector_health", validation_json)
2330 .expect("update validation");
2331 assert_eq!(updated.validation_json, validation_json);
2332
2333 let described = service
2334 .describe_operational_collection("connector_health")
2335 .expect("describe collection")
2336 .expect("collection exists");
2337 assert_eq!(described.validation_json, validation_json);
2338
2339 let conn = sqlite::open_connection(db.path()).expect("conn");
2340 let provenance_count: i64 = conn
2341 .query_row(
2342 "SELECT count(*) FROM provenance_events \
2343 WHERE event_type = 'operational_collection_validation_updated' \
2344 AND subject = 'connector_health'",
2345 [],
2346 |row| row.get(0),
2347 )
2348 .expect("provenance count");
2349 assert_eq!(provenance_count, 1);
2350 }
2351
2352 #[test]
2353 fn register_update_and_rebuild_operational_secondary_indexes_round_trip() {
2354 let (db, service) = setup();
2355 let record = service
2356 .register_operational_collection(&OperationalRegisterRequest {
2357 name: "audit_log".to_owned(),
2358 kind: OperationalCollectionKind::AppendOnlyLog,
2359 schema_json: "{}".to_owned(),
2360 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2361 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
2362 validation_json: String::new(),
2363 secondary_indexes_json: "[]".to_owned(),
2364 format_version: 1,
2365 })
2366 .expect("register collection");
2367 assert_eq!(record.secondary_indexes_json, "[]");
2368
2369 {
2370 let writer = crate::WriterActor::start(
2371 db.path(),
2372 Arc::new(SchemaManager::new()),
2373 crate::ProvenanceMode::Warn,
2374 Arc::new(crate::TelemetryCounters::default()),
2375 )
2376 .expect("writer");
2377 writer
2378 .submit(crate::WriteRequest {
2379 label: "secondary-index-seed".to_owned(),
2380 nodes: vec![],
2381 node_retires: vec![],
2382 edges: vec![],
2383 edge_retires: vec![],
2384 chunks: vec![],
2385 runs: vec![],
2386 steps: vec![],
2387 actions: vec![],
2388 optional_backfills: vec![],
2389 vec_inserts: vec![],
2390 operational_writes: vec![
2391 crate::OperationalWrite::Append {
2392 collection: "audit_log".to_owned(),
2393 record_key: "evt-1".to_owned(),
2394 payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
2395 source_ref: Some("src-1".to_owned()),
2396 },
2397 crate::OperationalWrite::Append {
2398 collection: "audit_log".to_owned(),
2399 record_key: "evt-2".to_owned(),
2400 payload_json: r#"{"actor":"bob","ts":200}"#.to_owned(),
2401 source_ref: Some("src-2".to_owned()),
2402 },
2403 ],
2404 })
2405 .expect("seed writes");
2406 }
2407
2408 let secondary_indexes_json = r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#;
2409 let updated = service
2410 .update_operational_collection_secondary_indexes("audit_log", secondary_indexes_json)
2411 .expect("update secondary indexes");
2412 assert_eq!(updated.secondary_indexes_json, secondary_indexes_json);
2413
2414 let conn = sqlite::open_connection(db.path()).expect("conn");
2415 let entry_count: i64 = conn
2416 .query_row(
2417 "SELECT count(*) FROM operational_secondary_index_entries \
2418 WHERE collection_name = 'audit_log' AND index_name = 'actor_ts'",
2419 [],
2420 |row| row.get(0),
2421 )
2422 .expect("secondary index count");
2423 assert_eq!(entry_count, 2);
2424 conn.execute(
2425 "DELETE FROM operational_secondary_index_entries WHERE collection_name = 'audit_log'",
2426 [],
2427 )
2428 .expect("clear index entries");
2429 drop(conn);
2430
2431 let rebuild = service
2432 .rebuild_operational_secondary_indexes("audit_log")
2433 .expect("rebuild secondary indexes");
2434 assert_eq!(rebuild.collection_name, "audit_log");
2435 assert_eq!(rebuild.mutation_entries_rebuilt, 2);
2436 assert_eq!(rebuild.current_entries_rebuilt, 0);
2437 }
2438
2439 #[test]
2440 fn register_operational_collection_rejects_invalid_validation_contract() {
2441 let (_db, service) = setup();
2442
2443 let error = service
2444 .register_operational_collection(&OperationalRegisterRequest {
2445 name: "connector_health".to_owned(),
2446 kind: OperationalCollectionKind::LatestState,
2447 schema_json: "{}".to_owned(),
2448 retention_json: "{}".to_owned(),
2449 filter_fields_json: "[]".to_owned(),
2450 validation_json: r#"{"format_version":1,"mode":"enforce","fields":[{"name":"status","type":"string","minimum":0}]}"#
2451 .to_owned(),
2452 secondary_indexes_json: "[]".to_owned(),
2453 format_version: 1,
2454 })
2455 .expect_err("invalid validation contract should reject");
2456
2457 assert!(matches!(error, EngineError::InvalidWrite(_)));
2458 assert!(error.to_string().contains("minimum/maximum"));
2459 }
2460
2461 #[test]
2462 fn validate_operational_collection_history_reports_invalid_rows_without_mutation() {
2463 let (db, service) = setup();
2464 service
2465 .register_operational_collection(&OperationalRegisterRequest {
2466 name: "audit_log".to_owned(),
2467 kind: OperationalCollectionKind::AppendOnlyLog,
2468 schema_json: "{}".to_owned(),
2469 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2470 filter_fields_json: "[]".to_owned(),
2471 validation_json: r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#
2472 .to_owned(),
2473 secondary_indexes_json: "[]".to_owned(),
2474 format_version: 1,
2475 })
2476 .expect("register collection");
2477 {
2478 let writer = crate::WriterActor::start(
2479 db.path(),
2480 Arc::new(SchemaManager::new()),
2481 crate::ProvenanceMode::Warn,
2482 Arc::new(crate::TelemetryCounters::default()),
2483 )
2484 .expect("writer");
2485 writer
2486 .submit(crate::WriteRequest {
2487 label: "history-validation".to_owned(),
2488 nodes: vec![],
2489 node_retires: vec![],
2490 edges: vec![],
2491 edge_retires: vec![],
2492 chunks: vec![],
2493 runs: vec![],
2494 steps: vec![],
2495 actions: vec![],
2496 optional_backfills: vec![],
2497 vec_inserts: vec![],
2498 operational_writes: vec![
2499 crate::OperationalWrite::Append {
2500 collection: "audit_log".to_owned(),
2501 record_key: "evt-1".to_owned(),
2502 payload_json: r#"{"status":"ok"}"#.to_owned(),
2503 source_ref: Some("src-1".to_owned()),
2504 },
2505 crate::OperationalWrite::Append {
2506 collection: "audit_log".to_owned(),
2507 record_key: "evt-2".to_owned(),
2508 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2509 source_ref: Some("src-2".to_owned()),
2510 },
2511 ],
2512 })
2513 .expect("write");
2514 }
2515
2516 let report = service
2517 .validate_operational_collection_history("audit_log")
2518 .expect("validate history");
2519 assert_eq!(report.collection_name, "audit_log");
2520 assert_eq!(report.checked_rows, 2);
2521 assert_eq!(report.invalid_row_count, 1);
2522 assert_eq!(report.issues.len(), 1);
2523 assert_eq!(report.issues[0].record_key, "evt-2");
2524 assert!(report.issues[0].message.contains("must be one of"));
2525
2526 let trace = service
2527 .trace_operational_collection("audit_log", None)
2528 .expect("trace");
2529 assert_eq!(trace.mutation_count, 2);
2530
2531 let conn = sqlite::open_connection(db.path()).expect("conn");
2532 let provenance_count: i64 = conn
2533 .query_row(
2534 "SELECT count(*) FROM provenance_events \
2535 WHERE event_type = 'operational_collection_history_validated' \
2536 AND subject = 'audit_log'",
2537 [],
2538 |row| row.get(0),
2539 )
2540 .expect("provenance count");
2541 assert_eq!(provenance_count, 0);
2542 }
2543
2544 #[test]
2545 fn trace_operational_collection_returns_mutations_and_current_rows() {
2546 let (db, service) = setup();
2547 service
2548 .register_operational_collection(&OperationalRegisterRequest {
2549 name: "connector_health".to_owned(),
2550 kind: OperationalCollectionKind::LatestState,
2551 schema_json: "{}".to_owned(),
2552 retention_json: "{}".to_owned(),
2553 filter_fields_json: "[]".to_owned(),
2554 validation_json: String::new(),
2555 secondary_indexes_json: "[]".to_owned(),
2556 format_version: 1,
2557 })
2558 .expect("register collection");
2559 {
2560 let writer = crate::WriterActor::start(
2561 db.path(),
2562 Arc::new(SchemaManager::new()),
2563 crate::ProvenanceMode::Warn,
2564 Arc::new(crate::TelemetryCounters::default()),
2565 )
2566 .expect("writer");
2567 writer
2568 .submit(crate::WriteRequest {
2569 label: "operational".to_owned(),
2570 nodes: vec![],
2571 node_retires: vec![],
2572 edges: vec![],
2573 edge_retires: vec![],
2574 chunks: vec![],
2575 runs: vec![],
2576 steps: vec![],
2577 actions: vec![],
2578 optional_backfills: vec![],
2579 vec_inserts: vec![],
2580 operational_writes: vec![crate::OperationalWrite::Put {
2581 collection: "connector_health".to_owned(),
2582 record_key: "gmail".to_owned(),
2583 payload_json: r#"{"status":"ok"}"#.to_owned(),
2584 source_ref: Some("src-1".to_owned()),
2585 }],
2586 })
2587 .expect("write");
2588 }
2589
2590 let report = service
2591 .trace_operational_collection("connector_health", Some("gmail"))
2592 .expect("trace");
2593 assert_eq!(report.collection_name, "connector_health");
2594 assert_eq!(report.record_key.as_deref(), Some("gmail"));
2595 assert_eq!(report.mutation_count, 1);
2596 assert_eq!(report.current_count, 1);
2597 assert_eq!(report.mutations[0].op_kind, "put");
2598 assert_eq!(report.current_rows[0].payload_json, r#"{"status":"ok"}"#);
2599 }
2600
2601 #[test]
2602 fn trace_operational_collection_rejects_unknown_collection() {
2603 let (_db, service) = setup();
2604
2605 let error = service
2606 .trace_operational_collection("missing_collection", None)
2607 .expect_err("unknown collection should fail");
2608
2609 assert!(matches!(error, EngineError::InvalidWrite(_)));
2610 assert!(error.to_string().contains("is not registered"));
2611 }
2612
2613 #[test]
2614 fn rebuild_operational_current_repairs_missing_latest_state_rows() {
2615 let (db, service) = setup();
2616 service
2617 .register_operational_collection(&OperationalRegisterRequest {
2618 name: "connector_health".to_owned(),
2619 kind: OperationalCollectionKind::LatestState,
2620 schema_json: "{}".to_owned(),
2621 retention_json: "{}".to_owned(),
2622 filter_fields_json: "[]".to_owned(),
2623 validation_json: String::new(),
2624 secondary_indexes_json: "[]".to_owned(),
2625 format_version: 1,
2626 })
2627 .expect("register collection");
2628 {
2629 let writer = crate::WriterActor::start(
2630 db.path(),
2631 Arc::new(SchemaManager::new()),
2632 crate::ProvenanceMode::Warn,
2633 Arc::new(crate::TelemetryCounters::default()),
2634 )
2635 .expect("writer");
2636 writer
2637 .submit(crate::WriteRequest {
2638 label: "operational".to_owned(),
2639 nodes: vec![],
2640 node_retires: vec![],
2641 edges: vec![],
2642 edge_retires: vec![],
2643 chunks: vec![],
2644 runs: vec![],
2645 steps: vec![],
2646 actions: vec![],
2647 optional_backfills: vec![],
2648 vec_inserts: vec![],
2649 operational_writes: vec![crate::OperationalWrite::Put {
2650 collection: "connector_health".to_owned(),
2651 record_key: "gmail".to_owned(),
2652 payload_json: r#"{"status":"ok"}"#.to_owned(),
2653 source_ref: Some("src-1".to_owned()),
2654 }],
2655 })
2656 .expect("write");
2657 }
2658 {
2659 let conn = sqlite::open_connection(db.path()).expect("conn");
2660 conn.execute(
2661 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2662 [],
2663 )
2664 .expect("delete current row");
2665 }
2666
2667 let before = service.check_semantics().expect("semantics before rebuild");
2668 assert_eq!(before.missing_operational_current_rows, 1);
2669
2670 let repair = service
2671 .rebuild_operational_current(Some("connector_health"))
2672 .expect("rebuild current");
2673 assert_eq!(repair.collections_rebuilt, 1);
2674 assert_eq!(repair.current_rows_rebuilt, 1);
2675
2676 let after = service.check_semantics().expect("semantics after rebuild");
2677 assert_eq!(after.missing_operational_current_rows, 0);
2678
2679 let conn = sqlite::open_connection(db.path()).expect("conn");
2680 let payload: String = conn
2681 .query_row(
2682 "SELECT payload_json FROM operational_current \
2683 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2684 [],
2685 |row| row.get(0),
2686 )
2687 .expect("restored payload");
2688 assert_eq!(payload, r#"{"status":"ok"}"#);
2689 }
2690
2691 #[test]
2692 fn rebuild_operational_current_restores_latest_state_secondary_index_entries() {
2693 let (db, service) = setup();
2694 service
2695 .register_operational_collection(&OperationalRegisterRequest {
2696 name: "connector_health".to_owned(),
2697 kind: OperationalCollectionKind::LatestState,
2698 schema_json: "{}".to_owned(),
2699 retention_json: "{}".to_owned(),
2700 filter_fields_json: "[]".to_owned(),
2701 validation_json: String::new(),
2702 secondary_indexes_json: r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"}]"#.to_owned(),
2703 format_version: 1,
2704 })
2705 .expect("register collection");
2706 {
2707 let writer = crate::WriterActor::start(
2708 db.path(),
2709 Arc::new(SchemaManager::new()),
2710 crate::ProvenanceMode::Warn,
2711 Arc::new(crate::TelemetryCounters::default()),
2712 )
2713 .expect("writer");
2714 writer
2715 .submit(crate::WriteRequest {
2716 label: "operational".to_owned(),
2717 nodes: vec![],
2718 node_retires: vec![],
2719 edges: vec![],
2720 edge_retires: vec![],
2721 chunks: vec![],
2722 runs: vec![],
2723 steps: vec![],
2724 actions: vec![],
2725 optional_backfills: vec![],
2726 vec_inserts: vec![],
2727 operational_writes: vec![crate::OperationalWrite::Put {
2728 collection: "connector_health".to_owned(),
2729 record_key: "gmail".to_owned(),
2730 payload_json: r#"{"status":"ok"}"#.to_owned(),
2731 source_ref: Some("src-1".to_owned()),
2732 }],
2733 })
2734 .expect("write");
2735 }
2736 {
2737 let conn = sqlite::open_connection(db.path()).expect("conn");
2738 let entry_count: i64 = conn
2739 .query_row(
2740 "SELECT count(*) FROM operational_secondary_index_entries \
2741 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2742 [],
2743 |row| row.get(0),
2744 )
2745 .expect("secondary index count before repair");
2746 assert_eq!(entry_count, 1);
2747 conn.execute(
2748 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2749 [],
2750 )
2751 .expect("delete current row");
2752 }
2753
2754 service
2755 .rebuild_operational_current(Some("connector_health"))
2756 .expect("rebuild current");
2757
2758 let conn = sqlite::open_connection(db.path()).expect("conn");
2759 let entry_count: i64 = conn
2760 .query_row(
2761 "SELECT count(*) FROM operational_secondary_index_entries \
2762 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2763 [],
2764 |row| row.get(0),
2765 )
2766 .expect("secondary index count after repair");
2767 assert_eq!(entry_count, 1);
2768 }
2769
2770 #[test]
2771 fn operational_current_semantics_and_rebuild_follow_mutation_order() {
2772 let (db, service) = setup();
2773 {
2774 let conn = sqlite::open_connection(db.path()).expect("conn");
2775 conn.execute(
2776 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2777 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
2778 [],
2779 )
2780 .expect("seed collection");
2781 conn.execute(
2782 "INSERT INTO operational_mutations \
2783 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2784 VALUES ('m3', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'src-1', 100, 1)",
2785 [],
2786 )
2787 .expect("seed first put");
2788 conn.execute(
2789 "INSERT INTO operational_mutations \
2790 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2791 VALUES ('m2', 'connector_health', 'gmail', 'delete', '', 'src-2', 100, 2)",
2792 [],
2793 )
2794 .expect("seed delete");
2795 conn.execute(
2796 "INSERT INTO operational_mutations \
2797 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2798 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'src-3', 100, 3)",
2799 [],
2800 )
2801 .expect("seed final put");
2802 conn.execute(
2803 "INSERT INTO operational_current \
2804 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2805 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 100, 'm1')",
2806 [],
2807 )
2808 .expect("seed current");
2809 }
2810
2811 let before = service.check_semantics().expect("semantics before rebuild");
2812 assert_eq!(before.missing_operational_current_rows, 0);
2813 assert_eq!(before.stale_operational_current_rows, 0);
2814
2815 {
2816 let conn = sqlite::open_connection(db.path()).expect("conn");
2817 conn.execute(
2818 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2819 [],
2820 )
2821 .expect("delete current row");
2822 }
2823
2824 let missing = service.check_semantics().expect("semantics after delete");
2825 assert_eq!(missing.missing_operational_current_rows, 1);
2826 assert_eq!(missing.stale_operational_current_rows, 0);
2827
2828 service
2829 .rebuild_operational_current(Some("connector_health"))
2830 .expect("rebuild current");
2831
2832 let after = service.check_semantics().expect("semantics after rebuild");
2833 assert_eq!(after.missing_operational_current_rows, 0);
2834 assert_eq!(after.stale_operational_current_rows, 0);
2835
2836 let conn = sqlite::open_connection(db.path()).expect("conn");
2837 let payload: String = conn
2838 .query_row(
2839 "SELECT payload_json FROM operational_current \
2840 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2841 [],
2842 |row| row.get(0),
2843 )
2844 .expect("restored payload");
2845 assert_eq!(payload, r#"{"status":"new"}"#);
2846 }
2847
2848 #[test]
2849 fn disable_operational_collection_sets_disabled_at_and_emits_provenance() {
2850 let (db, service) = setup();
2851 service
2852 .register_operational_collection(&OperationalRegisterRequest {
2853 name: "audit_log".to_owned(),
2854 kind: OperationalCollectionKind::AppendOnlyLog,
2855 schema_json: "{}".to_owned(),
2856 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2857 filter_fields_json: "[]".to_owned(),
2858 validation_json: String::new(),
2859 secondary_indexes_json: "[]".to_owned(),
2860 format_version: 1,
2861 })
2862 .expect("register collection");
2863
2864 let record = service
2865 .disable_operational_collection("audit_log")
2866 .expect("disable collection");
2867 assert_eq!(record.name, "audit_log");
2868 assert!(record.disabled_at.is_some());
2869
2870 let disabled_at = record.disabled_at.expect("disabled_at");
2871 let described = service
2872 .describe_operational_collection("audit_log")
2873 .expect("describe collection")
2874 .expect("collection exists");
2875 assert_eq!(described.disabled_at, Some(disabled_at));
2876
2877 let writer = crate::WriterActor::start(
2878 db.path(),
2879 Arc::new(SchemaManager::new()),
2880 crate::ProvenanceMode::Warn,
2881 Arc::new(crate::TelemetryCounters::default()),
2882 )
2883 .expect("writer");
2884 let error = writer
2885 .submit(crate::WriteRequest {
2886 label: "disabled-operational".to_owned(),
2887 nodes: vec![],
2888 node_retires: vec![],
2889 edges: vec![],
2890 edge_retires: vec![],
2891 chunks: vec![],
2892 runs: vec![],
2893 steps: vec![],
2894 actions: vec![],
2895 optional_backfills: vec![],
2896 vec_inserts: vec![],
2897 operational_writes: vec![crate::OperationalWrite::Append {
2898 collection: "audit_log".to_owned(),
2899 record_key: "evt-1".to_owned(),
2900 payload_json: r#"{"type":"sync"}"#.to_owned(),
2901 source_ref: Some("src-1".to_owned()),
2902 }],
2903 })
2904 .expect_err("disabled collection should reject writes");
2905 assert!(matches!(error, EngineError::InvalidWrite(_)));
2906 assert!(error.to_string().contains("is disabled"));
2907
2908 let conn = sqlite::open_connection(db.path()).expect("conn");
2909 let provenance_count: i64 = conn
2910 .query_row(
2911 "SELECT count(*) FROM provenance_events \
2912 WHERE event_type = 'operational_collection_disabled' AND subject = 'audit_log'",
2913 [],
2914 |row| row.get(0),
2915 )
2916 .expect("provenance count");
2917 assert_eq!(provenance_count, 1);
2918 }
2919
2920 #[test]
2921 fn purge_operational_collection_deletes_append_only_rows_before_cutoff() {
2922 let (db, service) = setup();
2923 {
2924 let conn = sqlite::open_connection(db.path()).expect("conn");
2925 conn.execute(
2926 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2927 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_all\"}', 1, 100)",
2928 [],
2929 )
2930 .expect("seed collection");
2931 conn.execute(
2932 "INSERT INTO operational_mutations \
2933 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2934 VALUES ('evt-1', 'audit_log', 'evt-1', 'append', '{\"seq\":1}', 'src-1', 100, 1)",
2935 [],
2936 )
2937 .expect("seed event 1");
2938 conn.execute(
2939 "INSERT INTO operational_mutations \
2940 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2941 VALUES ('evt-2', 'audit_log', 'evt-2', 'append', '{\"seq\":2}', 'src-2', 200, 2)",
2942 [],
2943 )
2944 .expect("seed event 2");
2945 conn.execute(
2946 "INSERT INTO operational_mutations \
2947 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2948 VALUES ('evt-3', 'audit_log', 'evt-3', 'append', '{\"seq\":3}', 'src-3', 300, 3)",
2949 [],
2950 )
2951 .expect("seed event 3");
2952 }
2953
2954 let report = service
2955 .purge_operational_collection("audit_log", 250)
2956 .expect("purge collection");
2957 assert_eq!(report.collection_name, "audit_log");
2958 assert_eq!(report.deleted_mutations, 2);
2959 assert_eq!(report.before_timestamp, 250);
2960
2961 let conn = sqlite::open_connection(db.path()).expect("conn");
2962 let remaining: Vec<String> = {
2963 let mut stmt = conn
2964 .prepare(
2965 "SELECT id FROM operational_mutations \
2966 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2967 )
2968 .expect("stmt");
2969 stmt.query_map([], |row| row.get(0))
2970 .expect("rows")
2971 .collect::<Result<_, _>>()
2972 .expect("collect")
2973 };
2974 assert_eq!(remaining, vec!["evt-3".to_owned()]);
2975 let provenance_count: i64 = conn
2976 .query_row(
2977 "SELECT count(*) FROM provenance_events \
2978 WHERE event_type = 'operational_collection_purged' AND subject = 'audit_log'",
2979 [],
2980 |row| row.get(0),
2981 )
2982 .expect("provenance count");
2983 assert_eq!(provenance_count, 1);
2984 }
2985
2986 #[test]
2987 fn compact_operational_collection_dry_run_reports_without_mutation() {
2988 let (db, service) = setup();
2989 {
2990 let conn = sqlite::open_connection(db.path()).expect("conn");
2991 conn.execute(
2992 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2993 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2994 [],
2995 )
2996 .expect("seed collection");
2997 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2998 conn.execute(
2999 "INSERT INTO operational_mutations \
3000 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3001 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3002 rusqlite::params![
3003 format!("evt-{index}"),
3004 format!("{{\"seq\":{index}}}"),
3005 created_at,
3006 index,
3007 ],
3008 )
3009 .expect("seed event");
3010 }
3011 }
3012
3013 let report = service
3014 .compact_operational_collection("audit_log", true)
3015 .expect("compact collection");
3016 assert_eq!(report.collection_name, "audit_log");
3017 assert_eq!(report.deleted_mutations, 1);
3018 assert!(report.dry_run);
3019 assert_eq!(report.before_timestamp, None);
3020
3021 let conn = sqlite::open_connection(db.path()).expect("conn");
3022 let remaining_count: i64 = conn
3023 .query_row(
3024 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
3025 [],
3026 |row| row.get(0),
3027 )
3028 .expect("remaining count");
3029 assert_eq!(remaining_count, 3);
3030 let provenance_count: i64 = conn
3031 .query_row(
3032 "SELECT count(*) FROM provenance_events \
3033 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
3034 [],
3035 |row| row.get(0),
3036 )
3037 .expect("provenance count");
3038 assert_eq!(provenance_count, 0);
3039 }
3040
3041 #[test]
3042 fn compact_operational_collection_keep_last_deletes_oldest_rows() {
3043 let (db, service) = setup();
3044 {
3045 let conn = sqlite::open_connection(db.path()).expect("conn");
3046 conn.execute(
3047 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3048 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3049 [],
3050 )
3051 .expect("seed collection");
3052 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
3053 conn.execute(
3054 "INSERT INTO operational_mutations \
3055 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3056 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3057 rusqlite::params![
3058 format!("evt-{index}"),
3059 format!("{{\"seq\":{index}}}"),
3060 created_at,
3061 index,
3062 ],
3063 )
3064 .expect("seed event");
3065 }
3066 }
3067
3068 let report = service
3069 .compact_operational_collection("audit_log", false)
3070 .expect("compact collection");
3071 assert_eq!(report.deleted_mutations, 1);
3072 assert!(!report.dry_run);
3073
3074 let conn = sqlite::open_connection(db.path()).expect("conn");
3075 let remaining: Vec<String> = {
3076 let mut stmt = conn
3077 .prepare(
3078 "SELECT id FROM operational_mutations \
3079 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
3080 )
3081 .expect("stmt");
3082 stmt.query_map([], |row| row.get(0))
3083 .expect("rows")
3084 .collect::<Result<_, _>>()
3085 .expect("collect")
3086 };
3087 assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
3088 let provenance_count: i64 = conn
3089 .query_row(
3090 "SELECT count(*) FROM provenance_events \
3091 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
3092 [],
3093 |row| row.get(0),
3094 )
3095 .expect("provenance count");
3096 assert_eq!(provenance_count, 1);
3097 }
3098
3099 #[test]
3100 fn plan_and_run_operational_retention_keep_last() {
3101 let (db, service) = setup();
3102 {
3103 let conn = sqlite::open_connection(db.path()).expect("conn");
3104 conn.execute(
3105 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3106 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3107 [],
3108 )
3109 .expect("seed collection");
3110 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
3111 conn.execute(
3112 "INSERT INTO operational_mutations \
3113 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3114 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3115 rusqlite::params![
3116 format!("evt-{index}"),
3117 format!("{{\"seq\":{index}}}"),
3118 created_at,
3119 index,
3120 ],
3121 )
3122 .expect("seed event");
3123 }
3124 }
3125
3126 let plan = service
3127 .plan_operational_retention(1_000, None, Some(10))
3128 .expect("plan retention");
3129 assert_eq!(plan.collections_examined, 1);
3130 assert_eq!(plan.items[0].collection_name, "audit_log");
3131 assert_eq!(
3132 plan.items[0].action_kind,
3133 crate::operational::OperationalRetentionActionKind::KeepLast
3134 );
3135 assert_eq!(plan.items[0].candidate_deletions, 1);
3136 assert_eq!(plan.items[0].max_rows, Some(2));
3137 assert_eq!(plan.items[0].last_run_at, None);
3138
3139 let dry_run = service
3140 .run_operational_retention(1_000, None, Some(10), true)
3141 .expect("dry-run retention");
3142 assert!(dry_run.dry_run);
3143 assert_eq!(dry_run.collections_acted_on, 1);
3144 assert_eq!(dry_run.items[0].deleted_mutations, 1);
3145 assert_eq!(dry_run.items[0].rows_remaining, 2);
3146
3147 let conn = sqlite::open_connection(db.path()).expect("conn");
3148 let remaining_count: i64 = conn
3149 .query_row(
3150 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
3151 [],
3152 |row| row.get(0),
3153 )
3154 .expect("remaining count after dry run");
3155 assert_eq!(remaining_count, 3);
3156 let retention_run_count: i64 = conn
3157 .query_row(
3158 "SELECT count(*) FROM operational_retention_runs WHERE collection_name = 'audit_log'",
3159 [],
3160 |row| row.get(0),
3161 )
3162 .expect("retention run count");
3163 assert_eq!(retention_run_count, 0);
3164 drop(conn);
3165
3166 let executed = service
3167 .run_operational_retention(1_000, None, Some(10), false)
3168 .expect("execute retention");
3169 assert_eq!(executed.collections_acted_on, 1);
3170 assert_eq!(executed.items[0].deleted_mutations, 1);
3171 assert_eq!(executed.items[0].rows_remaining, 2);
3172
3173 let conn = sqlite::open_connection(db.path()).expect("conn");
3174 let remaining: Vec<String> = {
3175 let mut stmt = conn
3176 .prepare(
3177 "SELECT id FROM operational_mutations \
3178 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
3179 )
3180 .expect("stmt");
3181 stmt.query_map([], |row| row.get(0))
3182 .expect("rows")
3183 .collect::<Result<_, _>>()
3184 .expect("collect")
3185 };
3186 assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
3187 let last_run_at: i64 = conn
3188 .query_row(
3189 "SELECT executed_at FROM operational_retention_runs \
3190 WHERE collection_name = 'audit_log' ORDER BY executed_at DESC LIMIT 1",
3191 [],
3192 |row| row.get(0),
3193 )
3194 .expect("last run at");
3195 assert_eq!(last_run_at, 1_000);
3196 }
3197
3198 #[test]
3199 fn dry_run_operational_retention_does_not_mark_noop_collection_as_acted_on() {
3200 let (db, service) = setup();
3201 let conn = sqlite::open_connection(db.path()).expect("conn");
3202 conn.execute(
3203 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3204 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3205 [],
3206 )
3207 .expect("seed collection");
3208 for (index, created_at) in [(1_i64, 100_i64), (2, 200)] {
3209 conn.execute(
3210 "INSERT INTO operational_mutations \
3211 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3212 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3213 rusqlite::params![
3214 format!("evt-{index}"),
3215 format!("{{\"seq\":{index}}}"),
3216 created_at,
3217 index,
3218 ],
3219 )
3220 .expect("seed event");
3221 }
3222 drop(conn);
3223
3224 let dry_run = service
3225 .run_operational_retention(1_000, None, Some(10), true)
3226 .expect("dry-run retention");
3227 assert!(dry_run.dry_run);
3228 assert_eq!(dry_run.collections_acted_on, 0);
3229 assert_eq!(dry_run.items[0].deleted_mutations, 0);
3230 assert_eq!(dry_run.items[0].rows_remaining, 2);
3231 }
3232
3233 #[test]
3234 fn compact_operational_collection_rejects_latest_state() {
3235 let (_db, service) = setup();
3236 service
3237 .register_operational_collection(&OperationalRegisterRequest {
3238 name: "connector_health".to_owned(),
3239 kind: OperationalCollectionKind::LatestState,
3240 schema_json: "{}".to_owned(),
3241 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3242 filter_fields_json: "[]".to_owned(),
3243 validation_json: String::new(),
3244 secondary_indexes_json: "[]".to_owned(),
3245 format_version: 1,
3246 })
3247 .expect("register collection");
3248
3249 let error = service
3250 .compact_operational_collection("connector_health", false)
3251 .expect_err("latest_state compaction should be rejected");
3252 assert!(matches!(error, EngineError::InvalidWrite(_)));
3253 assert!(error.to_string().contains("append_only_log"));
3254 }
3255
3256 #[test]
3257 fn register_operational_collection_persists_filter_fields_json() {
3258 let (_db, service) = setup();
3259
3260 let record = service
3261 .register_operational_collection(&OperationalRegisterRequest {
3262 name: "audit_log".to_owned(),
3263 kind: OperationalCollectionKind::AppendOnlyLog,
3264 schema_json: "{}".to_owned(),
3265 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3266 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3267 validation_json: String::new(),
3268 secondary_indexes_json: "[]".to_owned(),
3269 format_version: 1,
3270 })
3271 .expect("register collection");
3272
3273 assert_eq!(
3274 record.filter_fields_json,
3275 r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#
3276 );
3277 }
3278
3279 #[test]
3280 fn read_operational_collection_filters_append_only_rows_by_declared_fields() {
3281 let (db, service) = setup();
3282 service
3283 .register_operational_collection(&OperationalRegisterRequest {
3284 name: "audit_log".to_owned(),
3285 kind: OperationalCollectionKind::AppendOnlyLog,
3286 schema_json: "{}".to_owned(),
3287 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3288 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"seq","type":"integer","modes":["exact","range"]},{"name":"ts","type":"timestamp","modes":["exact","range"]}]"#.to_owned(),
3289 validation_json: String::new(),
3290 secondary_indexes_json: "[]".to_owned(),
3291 format_version: 1,
3292 })
3293 .expect("register collection");
3294 {
3295 let writer = crate::WriterActor::start(
3296 db.path(),
3297 Arc::new(SchemaManager::new()),
3298 crate::ProvenanceMode::Warn,
3299 Arc::new(crate::TelemetryCounters::default()),
3300 )
3301 .expect("writer");
3302 writer
3303 .submit(crate::WriteRequest {
3304 label: "operational".to_owned(),
3305 nodes: vec![],
3306 node_retires: vec![],
3307 edges: vec![],
3308 edge_retires: vec![],
3309 chunks: vec![],
3310 runs: vec![],
3311 steps: vec![],
3312 actions: vec![],
3313 optional_backfills: vec![],
3314 vec_inserts: vec![],
3315 operational_writes: vec![
3316 crate::OperationalWrite::Append {
3317 collection: "audit_log".to_owned(),
3318 record_key: "evt-1".to_owned(),
3319 payload_json: r#"{"actor":"alice","seq":1,"ts":100}"#.to_owned(),
3320 source_ref: Some("src-1".to_owned()),
3321 },
3322 crate::OperationalWrite::Append {
3323 collection: "audit_log".to_owned(),
3324 record_key: "evt-2".to_owned(),
3325 payload_json: r#"{"actor":"alice-admin","seq":2,"ts":200}"#.to_owned(),
3326 source_ref: Some("src-2".to_owned()),
3327 },
3328 crate::OperationalWrite::Append {
3329 collection: "audit_log".to_owned(),
3330 record_key: "evt-3".to_owned(),
3331 payload_json: r#"{"actor":"bob","seq":3,"ts":300}"#.to_owned(),
3332 source_ref: Some("src-3".to_owned()),
3333 },
3334 ],
3335 })
3336 .expect("write");
3337 }
3338
3339 let report = service
3340 .read_operational_collection(&crate::operational::OperationalReadRequest {
3341 collection_name: "audit_log".to_owned(),
3342 filters: vec![
3343 crate::operational::OperationalFilterClause::Prefix {
3344 field: "actor".to_owned(),
3345 value: "alice".to_owned(),
3346 },
3347 crate::operational::OperationalFilterClause::Range {
3348 field: "ts".to_owned(),
3349 lower: Some(150),
3350 upper: Some(250),
3351 },
3352 ],
3353 limit: Some(10),
3354 })
3355 .expect("filtered read");
3356
3357 assert_eq!(report.collection_name, "audit_log");
3358 assert_eq!(report.row_count, 1);
3359 assert!(!report.was_limited);
3360 assert_eq!(report.rows.len(), 1);
3361 assert_eq!(report.rows[0].record_key, "evt-2");
3362 assert_eq!(
3363 report.rows[0].payload_json,
3364 r#"{"actor":"alice-admin","seq":2,"ts":200}"#
3365 );
3366 }
3367
3368 #[test]
3369 fn read_operational_collection_uses_secondary_index_when_filter_values_are_missing() {
3370 let (db, service) = setup();
3371 service
3372 .register_operational_collection(&OperationalRegisterRequest {
3373 name: "audit_log".to_owned(),
3374 kind: OperationalCollectionKind::AppendOnlyLog,
3375 schema_json: "{}".to_owned(),
3376 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3377 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3378 validation_json: String::new(),
3379 secondary_indexes_json: r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#.to_owned(),
3380 format_version: 1,
3381 })
3382 .expect("register collection");
3383 {
3384 let writer = crate::WriterActor::start(
3385 db.path(),
3386 Arc::new(SchemaManager::new()),
3387 crate::ProvenanceMode::Warn,
3388 Arc::new(crate::TelemetryCounters::default()),
3389 )
3390 .expect("writer");
3391 writer
3392 .submit(crate::WriteRequest {
3393 label: "operational".to_owned(),
3394 nodes: vec![],
3395 node_retires: vec![],
3396 edges: vec![],
3397 edge_retires: vec![],
3398 chunks: vec![],
3399 runs: vec![],
3400 steps: vec![],
3401 actions: vec![],
3402 optional_backfills: vec![],
3403 vec_inserts: vec![],
3404 operational_writes: vec![
3405 crate::OperationalWrite::Append {
3406 collection: "audit_log".to_owned(),
3407 record_key: "evt-1".to_owned(),
3408 payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
3409 source_ref: Some("src-1".to_owned()),
3410 },
3411 crate::OperationalWrite::Append {
3412 collection: "audit_log".to_owned(),
3413 record_key: "evt-2".to_owned(),
3414 payload_json: r#"{"actor":"alice-admin","ts":200}"#.to_owned(),
3415 source_ref: Some("src-2".to_owned()),
3416 },
3417 ],
3418 })
3419 .expect("write");
3420 }
3421 let conn = sqlite::open_connection(db.path()).expect("conn");
3422 conn.execute(
3423 "DELETE FROM operational_filter_values WHERE collection_name = 'audit_log'",
3424 [],
3425 )
3426 .expect("clear filter values");
3427 drop(conn);
3428
3429 let report = service
3430 .read_operational_collection(&crate::operational::OperationalReadRequest {
3431 collection_name: "audit_log".to_owned(),
3432 filters: vec![
3433 crate::operational::OperationalFilterClause::Prefix {
3434 field: "actor".to_owned(),
3435 value: "alice".to_owned(),
3436 },
3437 crate::operational::OperationalFilterClause::Range {
3438 field: "ts".to_owned(),
3439 lower: Some(150),
3440 upper: Some(250),
3441 },
3442 ],
3443 limit: Some(10),
3444 })
3445 .expect("secondary-index read");
3446
3447 assert_eq!(report.row_count, 1);
3448 assert_eq!(report.rows[0].record_key, "evt-2");
3449 }
3450
3451 #[test]
3452 fn read_operational_collection_rejects_undeclared_fields_and_latest_state_collections() {
3453 let (_db, service) = setup();
3454 service
3455 .register_operational_collection(&OperationalRegisterRequest {
3456 name: "connector_health".to_owned(),
3457 kind: OperationalCollectionKind::LatestState,
3458 schema_json: "{}".to_owned(),
3459 retention_json: "{}".to_owned(),
3460 filter_fields_json: r#"[{"name":"status","type":"string","modes":["exact"]}]"#
3461 .to_owned(),
3462 validation_json: String::new(),
3463 secondary_indexes_json: "[]".to_owned(),
3464 format_version: 1,
3465 })
3466 .expect("register collection");
3467
3468 let latest_state_error = service
3469 .read_operational_collection(&crate::operational::OperationalReadRequest {
3470 collection_name: "connector_health".to_owned(),
3471 filters: vec![crate::operational::OperationalFilterClause::Exact {
3472 field: "status".to_owned(),
3473 value: crate::operational::OperationalFilterValue::String("ok".to_owned()),
3474 }],
3475 limit: Some(10),
3476 })
3477 .expect_err("latest_state filtered reads should be rejected");
3478 assert!(latest_state_error.to_string().contains("append_only_log"));
3479
3480 service
3481 .register_operational_collection(&OperationalRegisterRequest {
3482 name: "audit_log".to_owned(),
3483 kind: OperationalCollectionKind::AppendOnlyLog,
3484 schema_json: "{}".to_owned(),
3485 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3486 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact"]}]"#
3487 .to_owned(),
3488 validation_json: String::new(),
3489 secondary_indexes_json: "[]".to_owned(),
3490 format_version: 1,
3491 })
3492 .expect("register append-only collection");
3493
3494 let undeclared_error = service
3495 .read_operational_collection(&crate::operational::OperationalReadRequest {
3496 collection_name: "audit_log".to_owned(),
3497 filters: vec![crate::operational::OperationalFilterClause::Exact {
3498 field: "missing".to_owned(),
3499 value: crate::operational::OperationalFilterValue::String("x".to_owned()),
3500 }],
3501 limit: Some(10),
3502 })
3503 .expect_err("undeclared field should be rejected");
3504 assert!(undeclared_error.to_string().contains("undeclared"));
3505 }
3506
3507 #[test]
3508 fn read_operational_collection_applies_limit_and_reports_truncation() {
3509 let (db, service) = setup();
3510 service
3511 .register_operational_collection(&OperationalRegisterRequest {
3512 name: "audit_log".to_owned(),
3513 kind: OperationalCollectionKind::AppendOnlyLog,
3514 schema_json: "{}".to_owned(),
3515 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3516 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["prefix"]}]"#
3517 .to_owned(),
3518 validation_json: String::new(),
3519 secondary_indexes_json: "[]".to_owned(),
3520 format_version: 1,
3521 })
3522 .expect("register collection");
3523 {
3524 let writer = crate::WriterActor::start(
3525 db.path(),
3526 Arc::new(SchemaManager::new()),
3527 crate::ProvenanceMode::Warn,
3528 Arc::new(crate::TelemetryCounters::default()),
3529 )
3530 .expect("writer");
3531 writer
3532 .submit(crate::WriteRequest {
3533 label: "operational".to_owned(),
3534 nodes: vec![],
3535 node_retires: vec![],
3536 edges: vec![],
3537 edge_retires: vec![],
3538 chunks: vec![],
3539 runs: vec![],
3540 steps: vec![],
3541 actions: vec![],
3542 optional_backfills: vec![],
3543 vec_inserts: vec![],
3544 operational_writes: vec![
3545 crate::OperationalWrite::Append {
3546 collection: "audit_log".to_owned(),
3547 record_key: "evt-1".to_owned(),
3548 payload_json: r#"{"actor":"alice-1"}"#.to_owned(),
3549 source_ref: Some("src-1".to_owned()),
3550 },
3551 crate::OperationalWrite::Append {
3552 collection: "audit_log".to_owned(),
3553 record_key: "evt-2".to_owned(),
3554 payload_json: r#"{"actor":"alice-2"}"#.to_owned(),
3555 source_ref: Some("src-2".to_owned()),
3556 },
3557 ],
3558 })
3559 .expect("write");
3560 }
3561
3562 let report = service
3563 .read_operational_collection(&crate::operational::OperationalReadRequest {
3564 collection_name: "audit_log".to_owned(),
3565 filters: vec![crate::operational::OperationalFilterClause::Prefix {
3566 field: "actor".to_owned(),
3567 value: "alice".to_owned(),
3568 }],
3569 limit: Some(1),
3570 })
3571 .expect("limited read");
3572
3573 assert_eq!(report.row_count, 1);
3574 assert_eq!(report.applied_limit, 1);
3575 assert!(report.was_limited);
3576 assert_eq!(report.rows[0].record_key, "evt-2");
3577 }
3578
3579 #[test]
3580 fn preexisting_operational_collection_can_gain_filter_contract_after_upgrade() {
3581 let db = NamedTempFile::new().expect("temp db");
3582 let conn = sqlite::open_connection(db.path()).expect("conn");
3583 conn.execute_batch(
3584 r#"
3585 CREATE TABLE operational_collections (
3586 name TEXT PRIMARY KEY,
3587 kind TEXT NOT NULL,
3588 schema_json TEXT NOT NULL,
3589 retention_json TEXT NOT NULL,
3590 format_version INTEGER NOT NULL DEFAULT 1,
3591 created_at INTEGER NOT NULL DEFAULT 100,
3592 disabled_at INTEGER
3593 );
3594 CREATE TABLE operational_mutations (
3595 id TEXT PRIMARY KEY,
3596 collection_name TEXT NOT NULL,
3597 record_key TEXT NOT NULL,
3598 op_kind TEXT NOT NULL,
3599 payload_json TEXT NOT NULL,
3600 source_ref TEXT,
3601 created_at INTEGER NOT NULL DEFAULT 100,
3602 mutation_order INTEGER NOT NULL DEFAULT 1
3603 );
3604 INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at)
3605 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}', 1, 100);
3606 INSERT INTO operational_mutations
3607 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order)
3608 VALUES
3609 ('evt-1', 'audit_log', 'evt-1', 'append', '{"actor":"alice","ts":0}', 'src-1', 100, 1);
3610 "#,
3611 )
3612 .expect("seed pre-v10 schema");
3613 drop(conn);
3614
3615 let service = AdminService::new(db.path(), Arc::new(SchemaManager::new()));
3616 let pre_update = service
3617 .read_operational_collection(&crate::operational::OperationalReadRequest {
3618 collection_name: "audit_log".to_owned(),
3619 filters: vec![crate::operational::OperationalFilterClause::Exact {
3620 field: "actor".to_owned(),
3621 value: crate::operational::OperationalFilterValue::String("alice".to_owned()),
3622 }],
3623 limit: Some(10),
3624 })
3625 .expect_err("read should reject undeclared fields before migration update");
3626 assert!(pre_update.to_string().contains("undeclared"));
3627
3628 let updated = service
3629 .update_operational_collection_filters(
3630 "audit_log",
3631 r#"[{"name":"actor","type":"string","modes":["exact"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#,
3632 )
3633 .expect("update filter contract");
3634 assert!(updated.filter_fields_json.contains("\"actor\""));
3635
3636 let report = service
3637 .read_operational_collection(&crate::operational::OperationalReadRequest {
3638 collection_name: "audit_log".to_owned(),
3639 filters: vec![crate::operational::OperationalFilterClause::Range {
3640 field: "ts".to_owned(),
3641 lower: Some(0),
3642 upper: Some(0),
3643 }],
3644 limit: Some(10),
3645 })
3646 .expect("read after explicit filter update");
3647 assert_eq!(report.row_count, 1);
3648 assert_eq!(report.rows[0].record_key, "evt-1");
3649 }
3650
3651 #[cfg(feature = "sqlite-vec")]
3652 #[test]
3653 fn check_semantics_detects_stale_vec_rows() {
3654 use crate::sqlite::open_connection_with_vec;
3655
3656 let db = NamedTempFile::new().expect("temp file");
3657 let schema = Arc::new(SchemaManager::new());
3658 {
3659 let conn = open_connection_with_vec(db.path()).expect("vec conn");
3660 schema.bootstrap(&conn).expect("bootstrap");
3661 schema
3662 .ensure_vec_kind_profile(&conn, "Doc", 3)
3663 .expect("vec kind profile");
3664 let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
3666 .iter()
3667 .flat_map(|f| f.to_le_bytes())
3668 .collect();
3669 let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
3670 conn.execute(
3671 &format!(
3672 "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('ghost-chunk', ?1)"
3673 ),
3674 rusqlite::params![bytes],
3675 )
3676 .expect("insert stale vec row");
3677 }
3678 let service = AdminService::new(db.path(), Arc::clone(&schema));
3679 let report = service.check_semantics().expect("semantics check");
3680 assert_eq!(report.stale_vec_rows, 1);
3681 assert!(
3682 report.warnings.iter().any(|w| w.contains("stale vec")),
3683 "warning must mention stale vec"
3684 );
3685 }
3686
3687 #[cfg(feature = "sqlite-vec")]
3688 #[test]
3689 fn restore_vector_profiles_recreates_vec_table_from_metadata() {
3690 let db = NamedTempFile::new().expect("temp file");
3691 let schema = Arc::new(SchemaManager::new());
3692 {
3693 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3694 schema.bootstrap(&conn).expect("bootstrap");
3695 conn.execute(
3696 "INSERT INTO vector_profiles (profile, table_name, dimension, enabled) \
3697 VALUES ('default', 'vec_nodes_active', 3, 1)",
3698 [],
3699 )
3700 .expect("insert vector profile");
3701 }
3702
3703 let service = AdminService::new(db.path(), Arc::clone(&schema));
3704 let report = service
3705 .restore_vector_profiles()
3706 .expect("restore vector profiles");
3707 assert_eq!(
3708 report.targets,
3709 vec![crate::projection::ProjectionTarget::Vec]
3710 );
3711 assert_eq!(report.rebuilt_rows, 1);
3712
3713 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3714 let count: i64 = conn
3715 .query_row(
3716 "SELECT count(*) FROM sqlite_schema WHERE name = 'vec_nodes_active'",
3717 [],
3718 |row| row.get(0),
3719 )
3720 .expect("vec schema count");
3721 assert_eq!(count, 1, "vec table should exist after restore");
3722 }
3723
3724 #[cfg(feature = "sqlite-vec")]
3725 #[test]
3726 fn load_vector_regeneration_config_supports_json_and_toml() {
3727 let dir = tempfile::tempdir().expect("temp dir");
3728 let json_path = dir.path().join("regen.json");
3729 let toml_path = dir.path().join("regen.toml");
3730
3731 let config = VectorRegenerationConfig {
3732 kind: "Document".to_owned(),
3733 profile: "default".to_owned(),
3734 chunking_policy: "per_chunk".to_owned(),
3735 preprocessing_policy: "trim".to_owned(),
3736 };
3737
3738 fs::write(&json_path, serde_json::to_string(&config).expect("json")).expect("write json");
3739 fs::write(&toml_path, toml::to_string(&config).expect("toml")).expect("write toml");
3740
3741 let parsed_json = load_vector_regeneration_config(&json_path).expect("json parse");
3742 let parsed_toml = load_vector_regeneration_config(&toml_path).expect("toml parse");
3743
3744 assert_eq!(parsed_json, config);
3745 assert_eq!(parsed_toml, config);
3746 }
3747
3748 #[test]
3753 fn regenerate_vector_embeddings_config_rejects_old_identity_fields() {
3754 let legacy_json = r#"{
3757 "kind": "Document",
3758 "profile": "default",
3759 "table_name": "vec_nodes_active",
3760 "model_identity": "old-model",
3761 "model_version": "1.0",
3762 "dimension": 4,
3763 "normalization_policy": "l2",
3764 "chunking_policy": "per_chunk",
3765 "preprocessing_policy": "trim",
3766 "generator_command": ["/bin/echo"]
3767 }"#;
3768 let result: Result<VectorRegenerationConfig, _> = serde_json::from_str(legacy_json);
3769 assert!(
3770 result.is_err(),
3771 "legacy identity fields must be rejected at deserialization"
3772 );
3773 }
3774
3775 #[cfg(all(not(feature = "sqlite-vec"), unix))]
3776 #[test]
3777 fn regenerate_vector_embeddings_unsupported_vec_capability_writes_request_and_failed_audit() {
3778 let db = NamedTempFile::new().expect("temp file");
3779 let schema = Arc::new(SchemaManager::new());
3780
3781 {
3782 let conn = sqlite::open_connection(db.path()).expect("connection");
3783 schema.bootstrap(&conn).expect("bootstrap");
3784 conn.execute(
3785 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3786 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3787 [],
3788 )
3789 .expect("insert node");
3790 conn.execute(
3791 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3792 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3793 [],
3794 )
3795 .expect("insert chunk");
3796 }
3797
3798 let service = AdminService::new(db.path(), Arc::clone(&schema));
3799 let embedder = TestEmbedder::new("test-model", 4);
3800 let error = service
3801 .regenerate_vector_embeddings(
3802 &embedder,
3803 &VectorRegenerationConfig {
3804 kind: "Document".to_owned(),
3805 profile: "default".to_owned(),
3806 chunking_policy: "per_chunk".to_owned(),
3807 preprocessing_policy: "trim".to_owned(),
3808 },
3809 )
3810 .expect_err("sqlite-vec capability should be required");
3811
3812 assert!(error.to_string().contains("unsupported vec capability"));
3813
3814 let conn = sqlite::open_connection(db.path()).expect("connection");
3815 let request_count: i64 = conn
3816 .query_row(
3817 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3818 [],
3819 |row| row.get(0),
3820 )
3821 .expect("request count");
3822 assert_eq!(request_count, 1);
3823 let failed_count: i64 = conn
3824 .query_row(
3825 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3826 [],
3827 |row| row.get(0),
3828 )
3829 .expect("failed count");
3830 assert_eq!(failed_count, 1);
3831 let metadata_json: String = conn
3832 .query_row(
3833 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3834 [],
3835 |row| row.get(0),
3836 )
3837 .expect("failed metadata");
3838 assert!(metadata_json.contains("\"failure_class\":\"unsupported vec capability\""));
3839 }
3840
3841 #[cfg(feature = "sqlite-vec")]
3842 #[test]
3843 #[allow(clippy::too_many_lines)]
3844 fn regenerate_vector_embeddings_rebuilds_embeddings_via_embedder() {
3845 let db = NamedTempFile::new().expect("temp file");
3846 let schema = Arc::new(SchemaManager::new());
3847
3848 {
3849 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3850 schema.bootstrap(&conn).expect("bootstrap");
3851 conn.execute(
3852 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3853 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3854 [],
3855 )
3856 .expect("insert node");
3857 conn.execute(
3858 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3859 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3860 [],
3861 )
3862 .expect("insert chunk 1");
3863 conn.execute(
3864 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3865 VALUES ('chunk-2', 'doc-1', 'travel plan', 101)",
3866 [],
3867 )
3868 .expect("insert chunk 2");
3869 }
3870
3871 let service = AdminService::new(db.path(), Arc::clone(&schema));
3872 let embedder = TestEmbedder::new("test-model", 4);
3873 let report = service
3874 .regenerate_vector_embeddings(
3875 &embedder,
3876 &VectorRegenerationConfig {
3877 kind: "Document".to_owned(),
3878 profile: "default".to_owned(),
3879 chunking_policy: "per_chunk".to_owned(),
3880 preprocessing_policy: "trim".to_owned(),
3881 },
3882 )
3883 .expect("regenerate vectors");
3884
3885 let expected_vec_table = fathomdb_schema::vec_kind_table_name("Document");
3886 assert_eq!(report.profile, "default");
3887 assert_eq!(report.table_name, expected_vec_table);
3888 assert_eq!(report.dimension, 4);
3889 assert_eq!(report.total_chunks, 2);
3890 assert_eq!(report.regenerated_rows, 2);
3891 assert!(report.contract_persisted);
3892
3893 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3894 let vec_count: i64 = conn
3895 .query_row(
3896 &format!("SELECT count(*) FROM {expected_vec_table}"),
3897 [],
3898 |row| row.get(0),
3899 )
3900 .expect("vec count");
3901 assert_eq!(vec_count, 2);
3902
3903 let (model_identity, model_version, dimension, normalization_policy): (
3907 String,
3908 String,
3909 i64,
3910 String,
3911 ) = conn
3912 .query_row(
3913 "SELECT model_identity, model_version, dimension, normalization_policy \
3914 FROM vector_embedding_contracts WHERE profile = 'default'",
3915 [],
3916 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3917 )
3918 .expect("contract row");
3919 assert_eq!(model_identity, "test-model");
3920 assert_eq!(model_version, "1.0.0");
3921 assert_eq!(dimension, 4);
3922 assert_eq!(normalization_policy, "l2");
3923
3924 let contract_format_version: i64 = conn
3925 .query_row(
3926 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
3927 [],
3928 |row| row.get(0),
3929 )
3930 .expect("contract_format_version");
3931 assert_eq!(contract_format_version, 1);
3932 let request_count: i64 = conn
3933 .query_row(
3934 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3935 [],
3936 |row| row.get(0),
3937 )
3938 .expect("request audit count");
3939 assert_eq!(request_count, 1);
3940 let apply_count: i64 = conn
3941 .query_row(
3942 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3943 [],
3944 |row| row.get(0),
3945 )
3946 .expect("apply audit count");
3947 assert_eq!(apply_count, 1);
3948 let apply_metadata: String = conn
3949 .query_row(
3950 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3951 [],
3952 |row| row.get(0),
3953 )
3954 .expect("apply metadata");
3955 assert!(apply_metadata.contains("\"profile\":\"default\""));
3956 assert!(apply_metadata.contains("\"snapshot_hash\":"));
3957 assert!(apply_metadata.contains("\"model_identity\":\"test-model\""));
3958 }
3959
3960 #[cfg(feature = "sqlite-vec")]
3961 #[test]
3962 #[allow(clippy::too_many_lines)]
3963 fn regenerate_vector_embeddings_embedder_failure_leaves_contract_and_vec_rows_unchanged() {
3964 let db = NamedTempFile::new().expect("temp file");
3965 let schema = Arc::new(SchemaManager::new());
3966
3967 {
3968 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3969 schema.bootstrap(&conn).expect("bootstrap");
3970 conn.execute(
3971 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3972 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3973 [],
3974 )
3975 .expect("insert node");
3976 conn.execute(
3977 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3978 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3979 [],
3980 )
3981 .expect("insert chunk");
3982 schema
3983 .ensure_vec_kind_profile(&conn, "Document", 4)
3984 .expect("ensure vec kind profile");
3985 conn.execute(
3986 r"
3987 INSERT INTO vector_embedding_contracts (
3988 profile,
3989 table_name,
3990 model_identity,
3991 model_version,
3992 dimension,
3993 normalization_policy,
3994 chunking_policy,
3995 preprocessing_policy,
3996 generator_command_json,
3997 applied_at,
3998 snapshot_hash
3999 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
4000 ",
4001 rusqlite::params![
4002 "default",
4003 fathomdb_schema::vec_kind_table_name("Document"),
4004 "old-model",
4005 "0.9.0",
4006 4,
4007 "l2",
4008 "per_chunk",
4009 "trim",
4010 "[]",
4011 111,
4012 "old-snapshot"
4013 ],
4014 )
4015 .expect("seed contract");
4016 let vec_table = fathomdb_schema::vec_kind_table_name("Document");
4017 conn.execute(
4018 &format!(
4019 "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
4020 ),
4021 [],
4022 )
4023 .expect("seed vec row");
4024 }
4025
4026 let service = AdminService::new(db.path(), Arc::clone(&schema));
4027 let failing = FailingEmbedder {
4028 identity: QueryEmbedderIdentity {
4029 model_identity: "new-model".to_owned(),
4030 model_version: "1.0.0".to_owned(),
4031 dimension: 4,
4032 normalization_policy: "l2".to_owned(),
4033 },
4034 };
4035 let error = service
4036 .regenerate_vector_embeddings(
4037 &failing,
4038 &VectorRegenerationConfig {
4039 kind: "Document".to_owned(),
4040 profile: "default".to_owned(),
4041 chunking_policy: "per_chunk".to_owned(),
4042 preprocessing_policy: "trim".to_owned(),
4043 },
4044 )
4045 .expect_err("embedder should fail");
4046
4047 assert!(error.to_string().contains("embedder failure"));
4048
4049 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4050 let model_identity: String = conn
4051 .query_row(
4052 "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
4053 [],
4054 |row| row.get(0),
4055 )
4056 .expect("model identity");
4057 assert_eq!(model_identity, "old-model");
4058 let snapshot_hash: String = conn
4059 .query_row(
4060 "SELECT snapshot_hash FROM vector_embedding_contracts WHERE profile = 'default'",
4061 [],
4062 |row| row.get(0),
4063 )
4064 .expect("snapshot hash");
4065 assert_eq!(snapshot_hash, "old-snapshot");
4066 let vec_table = fathomdb_schema::vec_kind_table_name("Document");
4067 let vec_count: i64 = conn
4068 .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
4069 row.get(0)
4070 })
4071 .expect("vec count");
4072 assert_eq!(vec_count, 1);
4073 let failure_count: i64 = conn
4074 .query_row(
4075 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
4076 [],
4077 |row| row.get(0),
4078 )
4079 .expect("failure count");
4080 assert_eq!(failure_count, 1);
4081 let failure_metadata: String = conn
4082 .query_row(
4083 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
4084 [],
4085 |row| row.get(0),
4086 )
4087 .expect("failure metadata");
4088 assert!(failure_metadata.contains("\"failure_class\":\"embedder failure\""));
4089 }
4090
4091 #[cfg(feature = "sqlite-vec")]
4102 #[test]
4103 fn regenerate_vector_embeddings_rejects_whitespace_only_profile_before_mutation() {
4104 let db = NamedTempFile::new().expect("temp file");
4105 let schema = Arc::new(SchemaManager::new());
4106 {
4107 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4108 schema.bootstrap(&conn).expect("bootstrap");
4109 conn.execute(
4110 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4111 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
4112 [],
4113 )
4114 .expect("insert node");
4115 conn.execute(
4116 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4117 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
4118 [],
4119 )
4120 .expect("insert chunk");
4121 }
4122
4123 let service = AdminService::new(db.path(), Arc::clone(&schema));
4124 let embedder = TestEmbedder::new("test-model", 4);
4125 let error = service
4126 .regenerate_vector_embeddings(
4127 &embedder,
4128 &VectorRegenerationConfig {
4129 kind: "Document".to_owned(),
4130 profile: " ".to_owned(),
4131 chunking_policy: "per_chunk".to_owned(),
4132 preprocessing_policy: "trim".to_owned(),
4133 },
4134 )
4135 .expect_err("whitespace profile should be rejected");
4136
4137 assert!(error.to_string().contains("invalid contract"));
4138 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4139 let contract_count: i64 = conn
4140 .query_row(
4141 "SELECT count(*) FROM vector_embedding_contracts",
4142 [],
4143 |row| row.get(0),
4144 )
4145 .expect("contract count");
4146 assert_eq!(contract_count, 0);
4147 let provenance_count: i64 = conn
4148 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
4149 row.get(0)
4150 })
4151 .expect("provenance count");
4152 assert_eq!(provenance_count, 0);
4153 }
4154
4155 #[cfg(feature = "sqlite-vec")]
4156 #[test]
4157 fn regenerate_vector_embeddings_rejects_future_contract_format_version() {
4158 let db = NamedTempFile::new().expect("temp file");
4159 let schema = Arc::new(SchemaManager::new());
4160 {
4161 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4162 schema.bootstrap(&conn).expect("bootstrap");
4163 conn.execute(
4164 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4165 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
4166 [],
4167 )
4168 .expect("insert node");
4169 conn.execute(
4170 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4171 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
4172 [],
4173 )
4174 .expect("insert chunk");
4175 conn.execute(
4176 r"
4177 INSERT INTO vector_embedding_contracts (
4178 profile,
4179 table_name,
4180 model_identity,
4181 model_version,
4182 dimension,
4183 normalization_policy,
4184 chunking_policy,
4185 preprocessing_policy,
4186 generator_command_json,
4187 applied_at,
4188 snapshot_hash,
4189 contract_format_version,
4190 updated_at
4191 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
4192 ",
4193 rusqlite::params![
4194 "default",
4195 "vec_nodes_active",
4196 "old-model",
4197 "0.9.0",
4198 4,
4199 "l2",
4200 "per_chunk",
4201 "trim",
4202 "[]",
4203 111,
4204 "old-snapshot",
4205 99,
4206 111,
4207 ],
4208 )
4209 .expect("seed future contract");
4210 }
4211
4212 let service = AdminService::new(db.path(), Arc::clone(&schema));
4213 let embedder = TestEmbedder::new("test-model", 4);
4214 let error = service
4215 .regenerate_vector_embeddings(
4216 &embedder,
4217 &VectorRegenerationConfig {
4218 kind: "Document".to_owned(),
4219 profile: "default".to_owned(),
4220 chunking_policy: "per_chunk".to_owned(),
4221 preprocessing_policy: "trim".to_owned(),
4222 },
4223 )
4224 .expect_err("future contract version should be rejected");
4225
4226 assert!(error.to_string().contains("unsupported"));
4227 assert!(error.to_string().contains("format version"));
4228 }
4229
4230 #[test]
4231 fn check_semantics_detects_orphaned_chunk() {
4232 let (db, service) = setup();
4233 {
4234 let conn = sqlite::open_connection(db.path()).expect("conn");
4236 conn.execute(
4237 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4238 VALUES ('c1', 'ghost-node', 'text', 100)",
4239 [],
4240 )
4241 .expect("insert orphaned chunk");
4242 }
4243 let report = service.check_semantics().expect("semantics check");
4244 assert_eq!(report.orphaned_chunks, 1);
4245 }
4246
4247 #[test]
4248 fn check_semantics_detects_null_source_ref() {
4249 let (db, service) = setup();
4250 {
4251 let conn = sqlite::open_connection(db.path()).expect("conn");
4252 conn.execute(
4253 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4254 VALUES ('r1', 'lg1', 'Meeting', '{}', 100)",
4255 [],
4256 )
4257 .expect("insert node with null source_ref");
4258 }
4259 let report = service.check_semantics().expect("semantics check");
4260 assert_eq!(report.null_source_ref_nodes, 1);
4261 }
4262
4263 #[test]
4264 fn check_semantics_detects_broken_step_fk() {
4265 let (db, service) = setup();
4266 {
4267 let conn = sqlite::open_connection(db.path()).expect("conn");
4270 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4271 .expect("disable FK");
4272 conn.execute(
4273 "INSERT INTO steps (id, run_id, kind, status, properties, created_at) \
4274 VALUES ('s1', 'ghost-run', 'llm', 'completed', '{}', 100)",
4275 [],
4276 )
4277 .expect("insert step with ghost run_id");
4278 }
4279 let report = service.check_semantics().expect("semantics check");
4280 assert_eq!(report.broken_step_fk, 1);
4281 }
4282
4283 #[test]
4284 fn check_semantics_detects_broken_action_fk() {
4285 let (db, service) = setup();
4286 {
4287 let conn = sqlite::open_connection(db.path()).expect("conn");
4288 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4289 .expect("disable FK");
4290 conn.execute(
4291 "INSERT INTO actions (id, step_id, kind, status, properties, created_at) \
4292 VALUES ('a1', 'ghost-step', 'emit', 'completed', '{}', 100)",
4293 [],
4294 )
4295 .expect("insert action with ghost step_id");
4296 }
4297 let report = service.check_semantics().expect("semantics check");
4298 assert_eq!(report.broken_action_fk, 1);
4299 }
4300
4301 #[test]
4302 fn check_semantics_detects_stale_fts_rows() {
4303 let (db, service) = setup();
4304 {
4305 let conn = sqlite::open_connection(db.path()).expect("conn");
4306 conn.execute(
4309 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4310 VALUES ('ghost-chunk', 'any-node', 'Meeting', 'stale content')",
4311 [],
4312 )
4313 .expect("insert stale FTS row");
4314 }
4315 let report = service.check_semantics().expect("semantics check");
4316 assert_eq!(report.stale_fts_rows, 1);
4317 }
4318
4319 #[test]
4320 fn check_semantics_detects_fts_rows_for_superseded_nodes() {
4321 let (db, service) = setup();
4322 {
4323 let conn = sqlite::open_connection(db.path()).expect("conn");
4324 conn.execute(
4326 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4327 VALUES ('r1', 'lg-sup', 'Meeting', '{}', 100, 200, 'src-1')",
4328 [],
4329 )
4330 .expect("insert superseded node");
4331 conn.execute(
4333 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4334 VALUES ('ck-x', 'lg-sup', 'Meeting', 'superseded content')",
4335 [],
4336 )
4337 .expect("insert FTS row for superseded node");
4338 }
4339 let report = service.check_semantics().expect("semantics check");
4340 assert_eq!(report.fts_rows_for_superseded_nodes, 1);
4341 }
4342
4343 #[test]
4344 fn check_semantics_detects_dangling_edges() {
4345 let (db, service) = setup();
4346 {
4347 let conn = sqlite::open_connection(db.path()).expect("conn");
4348 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4349 .expect("disable FK");
4350 conn.execute(
4352 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4353 VALUES ('r1', 'lg-src', 'Meeting', '{}', 100, 'src-1')",
4354 [],
4355 )
4356 .expect("insert source node");
4357 conn.execute(
4358 "INSERT INTO edges \
4359 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4360 VALUES ('e1', 'edge-1', 'lg-src', 'ghost-target', 'LINKS', '{}', 100, 'src-1')",
4361 [],
4362 )
4363 .expect("insert dangling edge");
4364 }
4365 let report = service.check_semantics().expect("semantics check");
4366 assert_eq!(report.dangling_edges, 1);
4367 }
4368
4369 #[test]
4370 fn check_semantics_detects_orphaned_supersession_chains() {
4371 let (db, service) = setup();
4372 {
4373 let conn = sqlite::open_connection(db.path()).expect("conn");
4374 conn.execute(
4376 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4377 VALUES ('r1', 'lg-orphaned', 'Meeting', '{}', 100, 200, 'src-1')",
4378 [],
4379 )
4380 .expect("insert fully superseded node");
4381 }
4382 let report = service.check_semantics().expect("semantics check");
4383 assert_eq!(report.orphaned_supersession_chains, 1);
4384 }
4385
4386 #[test]
4387 fn check_semantics_detects_mismatched_kind_property_fts_rows() {
4388 let (db, service) = setup();
4394 {
4395 let conn = sqlite::open_connection(db.path()).expect("conn");
4396 conn.execute(
4397 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4398 VALUES ('Goal', '[\"$.name\"]', ' ')",
4399 [],
4400 )
4401 .expect("register schema");
4402 conn.execute(
4403 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4404 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4405 [],
4406 )
4407 .expect("insert node");
4408 let table = fathomdb_schema::fts_kind_table_name("Goal");
4410 conn.execute_batch(&format!(
4411 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4412 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4413 ))
4414 .expect("create per-kind table");
4415 conn.execute(
4416 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4417 [],
4418 )
4419 .expect("insert per-kind FTS row");
4420 }
4421 let report = service.check_semantics().expect("semantics check");
4422 assert_eq!(report.mismatched_kind_property_fts_rows, 0);
4424 }
4425
4426 #[test]
4427 fn check_semantics_detects_duplicate_property_fts_rows() {
4428 let (db, service) = setup();
4429 {
4430 let conn = sqlite::open_connection(db.path()).expect("conn");
4431 conn.execute(
4432 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4433 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4434 [],
4435 )
4436 .expect("insert node");
4437 let table = fathomdb_schema::fts_kind_table_name("Goal");
4439 conn.execute_batch(&format!(
4440 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4441 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4442 ))
4443 .expect("create per-kind table");
4444 conn.execute(
4445 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4446 [],
4447 )
4448 .expect("insert first property FTS row");
4449 conn.execute(
4450 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2 duplicate')"),
4451 [],
4452 )
4453 .expect("insert duplicate property FTS row");
4454 }
4455 let report = service.check_semantics().expect("semantics check");
4456 assert_eq!(report.duplicate_property_fts_rows, 1);
4457 }
4458
4459 #[test]
4460 fn check_semantics_detects_drifted_property_fts_text() {
4461 let (db, service) = setup();
4462 {
4463 let conn = sqlite::open_connection(db.path()).expect("conn");
4464 conn.execute(
4465 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4466 VALUES ('Goal', '[\"$.name\"]', ' ')",
4467 [],
4468 )
4469 .expect("register schema");
4470 conn.execute(
4471 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4472 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Current name\"}', 100, 'src-1')",
4473 [],
4474 )
4475 .expect("insert node");
4476 let table = fathomdb_schema::fts_kind_table_name("Goal");
4478 conn.execute_batch(&format!(
4479 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4480 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4481 ))
4482 .expect("create per-kind table");
4483 conn.execute(
4484 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Old stale name')"),
4485 [],
4486 )
4487 .expect("insert stale property FTS row");
4488 }
4489 let report = service.check_semantics().expect("semantics check");
4490 assert_eq!(report.drifted_property_fts_rows, 1);
4491 }
4492
4493 #[test]
4494 fn check_semantics_detects_property_fts_row_that_should_not_exist() {
4495 let (db, service) = setup();
4496 {
4497 let conn = sqlite::open_connection(db.path()).expect("conn");
4498 conn.execute(
4499 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4500 VALUES ('Goal', '[\"$.searchable\"]', ' ')",
4501 [],
4502 )
4503 .expect("register schema");
4504 conn.execute(
4506 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4507 VALUES ('r1', 'goal-1', 'Goal', '{\"other\":\"field\"}', 100, 'src-1')",
4508 [],
4509 )
4510 .expect("insert node");
4511 let table = fathomdb_schema::fts_kind_table_name("Goal");
4513 conn.execute_batch(&format!(
4514 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4515 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4516 ))
4517 .expect("create per-kind table");
4518 conn.execute(
4519 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'phantom text')"),
4520 [],
4521 )
4522 .expect("insert phantom property FTS row");
4523 }
4524 let report = service.check_semantics().expect("semantics check");
4525 assert_eq!(
4526 report.drifted_property_fts_rows, 1,
4527 "row that should not exist must be counted as drifted"
4528 );
4529 }
4530
4531 #[test]
4537 fn check_semantics_clean_on_weighted_fts_schema_does_not_panic() {
4538 let (db, service) = setup();
4539 let entries = vec![
4545 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4546 FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4547 ];
4548 service
4549 .register_fts_property_schema_with_entries(
4550 "Article",
4551 &entries,
4552 Some(" "),
4553 &[],
4554 crate::rebuild_actor::RebuildMode::Eager,
4555 )
4556 .expect("register weighted schema");
4557
4558 {
4561 let conn = sqlite::open_connection(db.path()).expect("conn");
4562 let properties = r#"{"title":"Hello","body":{"text":"world"}}"#;
4563 conn.execute(
4564 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4565 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4566 [properties],
4567 )
4568 .expect("insert node");
4569
4570 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4571 let (_kind, schema) = schemas
4572 .iter()
4573 .find(|(k, _)| k == "Article")
4574 .expect("weighted schema present");
4575 let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4576 let cols = crate::writer::extract_property_fts_columns(&props, schema);
4577
4578 let table = fathomdb_schema::fts_kind_table_name("Article");
4579 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4580 let placeholders: Vec<String> =
4581 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4582 let sql = format!(
4583 "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4584 cols = col_names.join(", "),
4585 placeholders = placeholders.join(", "),
4586 );
4587 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4588 let params: Vec<&dyn rusqlite::ToSql> =
4589 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4590 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4591 .collect();
4592 conn.execute(&sql, params.as_slice())
4593 .expect("insert weighted FTS row");
4594 }
4595
4596 let report = service
4597 .check_semantics()
4598 .expect("semantics check must not crash on weighted schema");
4599 assert_eq!(report.drifted_property_fts_rows, 0);
4600 }
4601
4602 #[test]
4606 fn check_semantics_detects_drifted_property_fts_text_weighted() {
4607 let (db, service) = setup();
4608 let entries = vec![
4611 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4612 FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4613 ];
4614 service
4615 .register_fts_property_schema_with_entries(
4616 "Article",
4617 &entries,
4618 Some(" "),
4619 &[],
4620 crate::rebuild_actor::RebuildMode::Eager,
4621 )
4622 .expect("register weighted schema");
4623
4624 let title_col = fathomdb_schema::fts_column_name("$.title", false);
4625
4626 {
4627 let conn = sqlite::open_connection(db.path()).expect("conn");
4628 let properties = r#"{"title":"Current","body":{"text":"body"}}"#;
4629 conn.execute(
4630 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4631 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4632 [properties],
4633 )
4634 .expect("insert node");
4635
4636 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4637 let (_kind, schema) = schemas
4638 .iter()
4639 .find(|(k, _)| k == "Article")
4640 .expect("weighted schema present");
4641 let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4642 let cols = crate::writer::extract_property_fts_columns(&props, schema);
4643
4644 let table = fathomdb_schema::fts_kind_table_name("Article");
4645 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4646 let placeholders: Vec<String> =
4647 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4648 let sql = format!(
4649 "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4650 cols = col_names.join(", "),
4651 placeholders = placeholders.join(", "),
4652 );
4653 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4654 let params: Vec<&dyn rusqlite::ToSql> =
4655 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4656 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4657 .collect();
4658 conn.execute(&sql, params.as_slice())
4659 .expect("insert weighted FTS row");
4660
4661 conn.execute(
4663 &format!("UPDATE {table} SET {title_col} = 'tampered' WHERE node_logical_id = 'article-1'"),
4664 [],
4665 )
4666 .expect("tamper weighted FTS row");
4667 }
4668
4669 let report = service.check_semantics().expect("semantics check");
4670 assert_eq!(report.drifted_property_fts_rows, 1);
4671 }
4672
4673 #[test]
4678 fn check_semantics_mixed_weighted_and_non_weighted_schemas() {
4679 let (db, service) = setup();
4680
4681 let weighted_entries = vec![
4683 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4684 FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4685 ];
4686 service
4687 .register_fts_property_schema_with_entries(
4688 "Article",
4689 &weighted_entries,
4690 Some(" "),
4691 &[],
4692 crate::rebuild_actor::RebuildMode::Eager,
4693 )
4694 .expect("register weighted schema");
4695
4696 {
4699 let conn = sqlite::open_connection(db.path()).expect("conn");
4700 conn.execute(
4701 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4702 VALUES ('Goal', '[\"$.name\"]', ' ')",
4703 [],
4704 )
4705 .expect("register non-weighted schema");
4706 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
4707 conn.execute_batch(&format!(
4708 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
4709 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4710 ))
4711 .expect("create non-weighted per-kind table");
4712
4713 let article_props = r#"{"title":"Hello","body":{"text":"world"}}"#;
4715 conn.execute(
4716 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4717 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4718 [article_props],
4719 )
4720 .expect("insert article");
4721
4722 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4723 let (_k, article_schema) = schemas
4724 .iter()
4725 .find(|(k, _)| k == "Article")
4726 .expect("Article schema present");
4727 let props: serde_json::Value =
4728 serde_json::from_str(article_props).expect("parse article props");
4729 let cols = crate::writer::extract_property_fts_columns(&props, article_schema);
4730 let article_table = fathomdb_schema::fts_kind_table_name("Article");
4731 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4732 let placeholders: Vec<String> =
4733 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4734 let sql = format!(
4735 "INSERT INTO {article_table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4736 cols = col_names.join(", "),
4737 placeholders = placeholders.join(", "),
4738 );
4739 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4740 let params: Vec<&dyn rusqlite::ToSql> =
4741 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4742 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4743 .collect();
4744 conn.execute(&sql, params.as_slice())
4745 .expect("insert weighted FTS row");
4746
4747 conn.execute(
4751 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4752 VALUES ('r2', 'goal-1', 'Goal', '{\"name\":\"Goal One\"}', 100, 'src-2')",
4753 [],
4754 )
4755 .expect("insert goal node");
4756 conn.execute(
4757 &format!("INSERT INTO {goal_table} (node_logical_id, text_content) VALUES ('goal-1', 'Goal One')"),
4758 [],
4759 )
4760 .expect("insert non-weighted FTS row");
4761 }
4762
4763 let report = service
4764 .check_semantics()
4765 .expect("semantics check must handle both shapes");
4766 assert_eq!(
4767 report.drifted_property_fts_rows, 0,
4768 "clean mixed weighted + non-weighted DB must report 0 drift"
4769 );
4770 }
4771
4772 #[test]
4788 fn check_semantics_weighted_schema_with_text_content_path() {
4789 let (db, service) = setup();
4790 let entries = vec![
4791 FtsPropertyPathSpec::scalar("$.text_content").with_weight(2.0),
4792 FtsPropertyPathSpec::scalar("$.title").with_weight(1.0),
4793 ];
4794 service
4795 .register_fts_property_schema_with_entries(
4796 "Article",
4797 &entries,
4798 Some(" "),
4799 &[],
4800 crate::rebuild_actor::RebuildMode::Eager,
4801 )
4802 .expect("register weighted schema with $.text_content path");
4803
4804 {
4805 let conn = sqlite::open_connection(db.path()).expect("conn");
4806 let properties = r#"{"text_content":"canonical body","title":"Hello"}"#;
4813 conn.execute(
4814 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4815 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4816 [properties],
4817 )
4818 .expect("insert node");
4819
4820 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4821 let (_kind, schema) = schemas
4822 .iter()
4823 .find(|(k, _)| k == "Article")
4824 .expect("weighted schema present");
4825 let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4826 let cols = crate::writer::extract_property_fts_columns(&props, schema);
4827
4828 let table = fathomdb_schema::fts_kind_table_name("Article");
4829 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4830 let placeholders: Vec<String> =
4831 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4832 let sql = format!(
4833 "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4834 cols = col_names.join(", "),
4835 placeholders = placeholders.join(", "),
4836 );
4837 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4838 let params: Vec<&dyn rusqlite::ToSql> =
4839 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4840 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4841 .collect();
4842 conn.execute(&sql, params.as_slice())
4843 .expect("insert weighted FTS row");
4844 }
4845
4846 let report = service.check_semantics().expect("semantics check");
4847 assert_eq!(
4848 report.drifted_property_fts_rows, 0,
4849 "weighted schema whose path collapses to `text_content` must be \
4850 dispatched as weighted (per-column comparator); a clean DB \
4851 must report 0 drift"
4852 );
4853 }
4854
4855 #[test]
4856 fn safe_export_writes_manifest_with_sha256() {
4857 let (_db, service) = setup();
4858 let export_dir = tempfile::TempDir::new().expect("temp dir");
4859 let export_path = export_dir.path().join("backup.db");
4860
4861 let manifest = service
4862 .safe_export(
4863 &export_path,
4864 SafeExportOptions {
4865 force_checkpoint: false,
4866 },
4867 )
4868 .expect("export");
4869
4870 assert!(export_path.exists(), "exported db should exist");
4871 let manifest_path = export_dir.path().join("backup.db.export-manifest.json");
4872 assert!(
4873 manifest_path.exists(),
4874 "manifest file should exist at {}",
4875 manifest_path.display()
4876 );
4877 assert_eq!(manifest.sha256.len(), 64, "sha256 should be 64 hex chars");
4878 assert!(
4879 manifest.exported_at > 0,
4880 "exported_at should be a unix timestamp"
4881 );
4882 assert_eq!(
4883 manifest.schema_version,
4884 SchemaManager::new().current_version().0,
4885 "schema_version should match the live schema version"
4886 );
4887 assert_eq!(manifest.protocol_version, 1, "protocol_version should be 1");
4888 assert!(manifest.page_count > 0, "page_count should be positive");
4889 }
4890
4891 #[test]
4892 fn safe_export_preserves_operational_validation_contracts() {
4893 let (_db, service) = setup();
4894 let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
4895 service
4896 .register_operational_collection(&OperationalRegisterRequest {
4897 name: "connector_health".to_owned(),
4898 kind: OperationalCollectionKind::LatestState,
4899 schema_json: "{}".to_owned(),
4900 retention_json: "{}".to_owned(),
4901 filter_fields_json: "[]".to_owned(),
4902 validation_json: validation_json.to_owned(),
4903 secondary_indexes_json: "[]".to_owned(),
4904 format_version: 1,
4905 })
4906 .expect("register collection");
4907
4908 let export_dir = tempfile::TempDir::new().expect("temp dir");
4909 let export_path = export_dir.path().join("backup.db");
4910 service
4911 .safe_export(
4912 &export_path,
4913 SafeExportOptions {
4914 force_checkpoint: false,
4915 },
4916 )
4917 .expect("export");
4918
4919 let exported = sqlite::open_connection(&export_path).expect("exported conn");
4920 let exported_validation_json: String = exported
4921 .query_row(
4922 "SELECT validation_json FROM operational_collections WHERE name = 'connector_health'",
4923 [],
4924 |row| row.get(0),
4925 )
4926 .expect("validation_json");
4927 assert_eq!(exported_validation_json, validation_json);
4928 }
4929
4930 #[test]
4931 fn safe_export_force_checkpoint_false_skips_wal_pragma() {
4932 let (_db, service) = setup();
4933 let export_dir = tempfile::TempDir::new().expect("temp dir");
4934 let export_path = export_dir.path().join("no-wal.db");
4935
4936 let manifest = service
4938 .safe_export(
4939 &export_path,
4940 SafeExportOptions {
4941 force_checkpoint: false,
4942 },
4943 )
4944 .expect("export with no checkpoint");
4945
4946 assert!(
4947 manifest.page_count > 0,
4948 "page_count must be populated regardless of checkpoint mode"
4949 );
4950 assert_eq!(
4951 manifest.schema_version,
4952 SchemaManager::new().current_version().0
4953 );
4954 assert_eq!(manifest.protocol_version, 1);
4955 }
4956
4957 #[test]
4958 fn safe_export_force_checkpoint_false_still_captures_wal_backed_changes() {
4959 let (db, service) = setup();
4960 let conn = sqlite::open_connection(db.path()).expect("conn");
4961 let journal_mode: String = conn
4962 .query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
4963 .expect("enable wal");
4964 assert_eq!(journal_mode.to_lowercase(), "wal");
4965 let auto_checkpoint_pages: i64 = conn
4966 .query_row("PRAGMA wal_autocheckpoint=0", [], |row| row.get(0))
4967 .expect("disable auto checkpoint");
4968 assert_eq!(auto_checkpoint_pages, 0);
4969 conn.execute(
4970 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4971 VALUES ('r-wal', 'lg-wal', 'Meeting', '{}', 100, 'src-wal')",
4972 [],
4973 )
4974 .expect("insert wal-backed node");
4975
4976 let export_dir = tempfile::TempDir::new().expect("temp dir");
4977 let export_path = export_dir.path().join("wal-backed.db");
4978 service
4979 .safe_export(
4980 &export_path,
4981 SafeExportOptions {
4982 force_checkpoint: false,
4983 },
4984 )
4985 .expect("export wal-backed db");
4986
4987 let exported = sqlite::open_connection(&export_path).expect("open exported db");
4988 let exported_count: i64 = exported
4989 .query_row(
4990 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-wal'",
4991 [],
4992 |row| row.get(0),
4993 )
4994 .expect("count exported nodes");
4995 assert_eq!(
4996 exported_count, 1,
4997 "safe_export must include committed rows that are still resident in the WAL"
4998 );
4999 }
5000
5001 #[test]
5002 fn excise_source_removes_searchable_content_after_excision() {
5003 let (db, service) = setup();
5004 {
5005 let conn = sqlite::open_connection(db.path()).expect("conn");
5006 conn.execute(
5007 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
5008 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
5009 [],
5010 )
5011 .expect("insert v1");
5012 conn.execute(
5013 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5014 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
5015 [],
5016 )
5017 .expect("insert v2");
5018 conn.execute(
5019 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5020 VALUES ('ck1', 'lg1', 'hello world', 100)",
5021 [],
5022 )
5023 .expect("insert chunk");
5024 }
5025 service.excise_source("source-2").expect("excise");
5026 {
5027 let conn = sqlite::open_connection(db.path()).expect("conn");
5028 let fts_count: i64 = conn
5029 .query_row(
5030 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'ck1'",
5031 [],
5032 |row| row.get(0),
5033 )
5034 .expect("fts count");
5035 assert_eq!(
5036 fts_count, 0,
5037 "excised content should not remain searchable after excise"
5038 );
5039 }
5040 }
5041
5042 #[cfg(feature = "sqlite-vec")]
5043 #[test]
5044 fn excise_source_cleans_chunks_and_vec_rows_for_excised_version() {
5045 let (db, service) = setup();
5046 {
5047 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
5048 service
5049 .schema_manager
5050 .ensure_vec_kind_profile(&conn, "Meeting", 4)
5051 .expect("ensure vec kind profile");
5052 conn.execute(
5053 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
5054 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
5055 [],
5056 )
5057 .expect("insert v1");
5058 conn.execute(
5059 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5060 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
5061 [],
5062 )
5063 .expect("insert v2");
5064 conn.execute(
5065 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5066 VALUES ('ck1', 'lg1', 'new content', 200)",
5067 [],
5068 )
5069 .expect("insert chunk");
5070 let vec_table = fathomdb_schema::vec_kind_table_name("Meeting");
5071 conn.execute(
5072 &format!(
5073 "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('ck1', zeroblob(16))"
5074 ),
5075 [],
5076 )
5077 .expect("insert vec row");
5078 }
5079
5080 service.excise_source("source-2").expect("excise");
5081
5082 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
5083 let active_row: String = conn
5084 .query_row(
5085 "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
5086 [],
5087 |row| row.get(0),
5088 )
5089 .expect("restored active row");
5090 assert_eq!(active_row, "r1");
5091 let chunk_count: i64 = conn
5092 .query_row(
5093 "SELECT count(*) FROM chunks WHERE node_logical_id = 'lg1'",
5094 [],
5095 |row| row.get(0),
5096 )
5097 .expect("chunk count");
5098 assert_eq!(
5099 chunk_count, 0,
5100 "excised source content must not survive as chunks"
5101 );
5102 let vec_table = fathomdb_schema::vec_kind_table_name("Meeting");
5103 let vec_count: i64 = conn
5104 .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
5105 row.get(0)
5106 })
5107 .expect("vec count");
5108 assert_eq!(vec_count, 0, "excised source vec rows must be removed");
5109 let fts_count: i64 = conn
5110 .query_row(
5111 "SELECT count(*) FROM fts_nodes WHERE node_logical_id = 'lg1'",
5112 [],
5113 |row| row.get(0),
5114 )
5115 .expect("fts count");
5116 assert_eq!(
5117 fts_count, 0,
5118 "excised source content must not remain searchable"
5119 );
5120 }
5121
5122 #[test]
5123 fn export_page_count_matches_exported_file() {
5124 let (_db, service) = setup();
5125 let export_dir = tempfile::TempDir::new().expect("temp dir");
5126 let export_path = export_dir.path().join("page-count.db");
5127
5128 let manifest = service
5129 .safe_export(
5130 &export_path,
5131 SafeExportOptions {
5132 force_checkpoint: false,
5133 },
5134 )
5135 .expect("export");
5136
5137 let exported = sqlite::open_connection(&export_path).expect("open exported db");
5138 let actual_page_count: u64 = exported
5139 .query_row("PRAGMA page_count", [], |row| row.get(0))
5140 .expect("page_count from exported file");
5141
5142 assert_eq!(
5143 manifest.page_count, actual_page_count,
5144 "manifest page_count must match the exported file's PRAGMA page_count"
5145 );
5146 }
5147
5148 #[test]
5149 fn no_temp_file_after_successful_export() {
5150 let (_db, service) = setup();
5151 let export_dir = tempfile::TempDir::new().expect("temp dir");
5152 let export_path = export_dir.path().join("no-tmp.db");
5153
5154 service
5155 .safe_export(
5156 &export_path,
5157 SafeExportOptions {
5158 force_checkpoint: false,
5159 },
5160 )
5161 .expect("export");
5162
5163 let tmp_files: Vec<_> = fs::read_dir(export_dir.path())
5164 .expect("read export dir")
5165 .filter_map(Result::ok)
5166 .filter(|e| e.path().extension().is_some_and(|ext| ext == "tmp"))
5167 .collect();
5168
5169 assert!(
5170 tmp_files.is_empty(),
5171 "no .tmp files should remain after a successful export, found: {tmp_files:?}"
5172 );
5173 }
5174
5175 #[test]
5176 fn export_manifest_is_valid_json() {
5177 let (_db, service) = setup();
5178 let export_dir = tempfile::TempDir::new().expect("temp dir");
5179 let export_path = export_dir.path().join("valid-json.db");
5180
5181 service
5182 .safe_export(
5183 &export_path,
5184 SafeExportOptions {
5185 force_checkpoint: false,
5186 },
5187 )
5188 .expect("export");
5189
5190 let manifest_path = export_dir.path().join("valid-json.db.export-manifest.json");
5191 let manifest_contents = fs::read_to_string(&manifest_path).expect("read manifest");
5192 let parsed: serde_json::Value =
5193 serde_json::from_str(&manifest_contents).expect("manifest must be valid JSON");
5194
5195 assert!(
5196 parsed.get("exported_at").is_some(),
5197 "manifest must contain exported_at"
5198 );
5199 assert!(
5200 parsed.get("sha256").is_some(),
5201 "manifest must contain sha256"
5202 );
5203 assert!(
5204 parsed.get("schema_version").is_some(),
5205 "manifest must contain schema_version"
5206 );
5207 assert!(
5208 parsed.get("protocol_version").is_some(),
5209 "manifest must contain protocol_version"
5210 );
5211 assert!(
5212 parsed.get("page_count").is_some(),
5213 "manifest must contain page_count"
5214 );
5215 }
5216
5217 #[test]
5218 fn provenance_purge_dry_run_reports_counts() {
5219 let (db, service) = setup();
5220 {
5221 let conn = sqlite::open_connection(db.path()).expect("conn");
5222 conn.execute(
5223 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5224 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5225 [],
5226 )
5227 .expect("insert p1");
5228 conn.execute(
5229 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5230 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5231 [],
5232 )
5233 .expect("insert p2");
5234 conn.execute(
5235 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5236 VALUES ('p3', 'excise', 'lg3', 'src-1', 300)",
5237 [],
5238 )
5239 .expect("insert p3");
5240 }
5241
5242 let options = super::ProvenancePurgeOptions {
5243 dry_run: true,
5244 preserve_event_types: Vec::new(),
5245 };
5246 let report = service
5247 .purge_provenance_events(250, &options)
5248 .expect("dry run purge");
5249
5250 assert_eq!(report.events_deleted, 2);
5251 assert_eq!(report.events_preserved, 1);
5252 assert!(report.oldest_remaining.is_some());
5253
5254 let conn = sqlite::open_connection(db.path()).expect("conn");
5255 let total: i64 = conn
5256 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5257 row.get(0)
5258 })
5259 .expect("count");
5260 assert_eq!(total, 3, "dry_run must not delete any events");
5261 }
5262
5263 #[test]
5264 fn provenance_purge_deletes_old_events() {
5265 let (db, service) = setup();
5266 {
5267 let conn = sqlite::open_connection(db.path()).expect("conn");
5268 conn.execute(
5269 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5270 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5271 [],
5272 )
5273 .expect("insert p1");
5274 conn.execute(
5275 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5276 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5277 [],
5278 )
5279 .expect("insert p2");
5280 }
5281
5282 let options = super::ProvenancePurgeOptions {
5283 dry_run: false,
5284 preserve_event_types: Vec::new(),
5285 };
5286 let report = service
5287 .purge_provenance_events(150, &options)
5288 .expect("purge");
5289
5290 assert_eq!(report.events_deleted, 1);
5291 assert_eq!(report.events_preserved, 1);
5292 assert_eq!(report.oldest_remaining, Some(200));
5293
5294 let conn = sqlite::open_connection(db.path()).expect("conn");
5295 let remaining: i64 = conn
5296 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5297 row.get(0)
5298 })
5299 .expect("count");
5300 assert_eq!(remaining, 1);
5301 }
5302
5303 #[test]
5304 fn provenance_purge_preserves_specified_types() {
5305 let (db, service) = setup();
5306 {
5307 let conn = sqlite::open_connection(db.path()).expect("conn");
5308 conn.execute(
5309 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5310 VALUES ('p1', 'excise', 'lg1', 'src-1', 100)",
5311 [],
5312 )
5313 .expect("insert p1");
5314 conn.execute(
5315 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5316 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 100)",
5317 [],
5318 )
5319 .expect("insert p2");
5320 conn.execute(
5321 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5322 VALUES ('p3', 'node_insert', 'lg3', 'src-1', 100)",
5323 [],
5324 )
5325 .expect("insert p3");
5326 }
5327
5328 let options = super::ProvenancePurgeOptions {
5329 dry_run: false,
5330 preserve_event_types: Vec::new(),
5331 };
5332 let report = service
5333 .purge_provenance_events(500, &options)
5334 .expect("purge");
5335
5336 assert_eq!(report.events_deleted, 2);
5337 assert_eq!(report.events_preserved, 1);
5338
5339 let conn = sqlite::open_connection(db.path()).expect("conn");
5340 let remaining_type: String = conn
5341 .query_row("SELECT event_type FROM provenance_events", [], |row| {
5342 row.get(0)
5343 })
5344 .expect("remaining event type");
5345 assert_eq!(remaining_type, "excise");
5346 }
5347
5348 #[test]
5349 fn provenance_purge_noop_with_zero_timestamp() {
5350 let (db, service) = setup();
5351 {
5352 let conn = sqlite::open_connection(db.path()).expect("conn");
5353 conn.execute(
5354 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5355 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5356 [],
5357 )
5358 .expect("insert p1");
5359 }
5360
5361 let options = super::ProvenancePurgeOptions {
5362 dry_run: false,
5363 preserve_event_types: Vec::new(),
5364 };
5365 let report = service.purge_provenance_events(0, &options).expect("purge");
5366
5367 assert_eq!(report.events_deleted, 0);
5368 assert_eq!(report.events_preserved, 1);
5369 assert_eq!(report.oldest_remaining, Some(100));
5370 }
5371
5372 #[test]
5373 fn restore_skips_edge_when_counterpart_purged() {
5374 let (db, service) = setup();
5375 {
5376 let conn = sqlite::open_connection(db.path()).expect("conn");
5377 conn.execute(
5379 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5380 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5381 [],
5382 )
5383 .expect("insert node A");
5384 conn.execute(
5385 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5386 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5387 [],
5388 )
5389 .expect("insert node B");
5390 conn.execute(
5392 "INSERT INTO edges \
5393 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5394 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5395 [],
5396 )
5397 .expect("insert edge");
5398 conn.execute(
5400 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5401 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5402 [],
5403 )
5404 .expect("insert retire event A");
5405 conn.execute(
5406 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5407 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5408 [],
5409 )
5410 .expect("insert edge retire event");
5411 conn.execute(
5412 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5413 [],
5414 )
5415 .expect("retire node A");
5416 conn.execute(
5417 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5418 [],
5419 )
5420 .expect("retire node B");
5421 conn.execute(
5422 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5423 [],
5424 )
5425 .expect("retire edge");
5426 conn.execute("DELETE FROM nodes WHERE logical_id = 'doc-2'", [])
5429 .expect("purge node B rows");
5430 }
5431
5432 let report = service.restore_logical_id("doc-1").expect("restore A");
5434 assert!(!report.was_noop);
5435 assert_eq!(report.restored_node_rows, 1);
5436 assert_eq!(report.restored_edge_rows, 0, "edge should not be restored");
5437 assert_eq!(report.skipped_edges.len(), 1);
5438 assert_eq!(report.skipped_edges[0].edge_logical_id, "edge-1");
5439 assert_eq!(report.skipped_edges[0].missing_endpoint, "doc-2");
5440
5441 let conn = sqlite::open_connection(db.path()).expect("conn");
5443 let active_edge_count: i64 = conn
5444 .query_row(
5445 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5446 [],
5447 |row| row.get(0),
5448 )
5449 .expect("active edge count");
5450 assert_eq!(active_edge_count, 0, "edge must remain retired");
5451 }
5452
5453 #[test]
5454 fn restore_restores_edges_to_active_nodes() {
5455 let (db, service) = setup();
5456 {
5457 let conn = sqlite::open_connection(db.path()).expect("conn");
5458 conn.execute(
5460 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5461 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5462 [],
5463 )
5464 .expect("insert node A");
5465 conn.execute(
5466 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5467 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5468 [],
5469 )
5470 .expect("insert node B");
5471 conn.execute(
5473 "INSERT INTO edges \
5474 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5475 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5476 [],
5477 )
5478 .expect("insert edge");
5479 conn.execute(
5481 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5482 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5483 [],
5484 )
5485 .expect("insert retire event A");
5486 conn.execute(
5487 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5488 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5489 [],
5490 )
5491 .expect("insert edge retire event");
5492 conn.execute(
5493 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5494 [],
5495 )
5496 .expect("retire node A");
5497 conn.execute(
5498 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5499 [],
5500 )
5501 .expect("retire edge");
5502 }
5503
5504 let report = service.restore_logical_id("doc-1").expect("restore A");
5506 assert!(!report.was_noop);
5507 assert_eq!(report.restored_node_rows, 1);
5508 assert!(report.restored_edge_rows > 0, "edge should be restored");
5509 assert!(
5510 report.skipped_edges.is_empty(),
5511 "no edges should be skipped"
5512 );
5513
5514 let conn = sqlite::open_connection(db.path()).expect("conn");
5515 let active_edge_count: i64 = conn
5516 .query_row(
5517 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5518 [],
5519 |row| row.get(0),
5520 )
5521 .expect("active edge count");
5522 assert_eq!(active_edge_count, 1, "edge must be active");
5523 }
5524
5525 #[test]
5526 fn restore_restores_edges_when_both_restored() {
5527 let (db, service) = setup();
5528 {
5529 let conn = sqlite::open_connection(db.path()).expect("conn");
5530 conn.execute(
5532 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5533 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5534 [],
5535 )
5536 .expect("insert node A");
5537 conn.execute(
5538 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5539 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5540 [],
5541 )
5542 .expect("insert node B");
5543 conn.execute(
5545 "INSERT INTO edges \
5546 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5547 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5548 [],
5549 )
5550 .expect("insert edge");
5551 conn.execute(
5553 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5554 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5555 [],
5556 )
5557 .expect("insert retire event A");
5558 conn.execute(
5559 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5560 VALUES ('evt-retire-b', 'node_retire', 'doc-2', 'forget-1', 200, '')",
5561 [],
5562 )
5563 .expect("insert retire event B");
5564 conn.execute(
5565 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5566 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5567 [],
5568 )
5569 .expect("insert edge retire event");
5570 conn.execute(
5571 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5572 [],
5573 )
5574 .expect("retire node A");
5575 conn.execute(
5576 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5577 [],
5578 )
5579 .expect("retire node B");
5580 conn.execute(
5581 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5582 [],
5583 )
5584 .expect("retire edge");
5585 }
5586
5587 let report_b = service.restore_logical_id("doc-2").expect("restore B");
5589 assert!(!report_b.was_noop);
5590
5591 let report_a = service.restore_logical_id("doc-1").expect("restore A");
5593 assert!(!report_a.was_noop);
5594 assert_eq!(report_a.restored_node_rows, 1);
5595 assert!(
5596 report_a.restored_edge_rows > 0,
5597 "edge should be restored when both endpoints active"
5598 );
5599 assert!(
5600 report_a.skipped_edges.is_empty(),
5601 "no edges should be skipped"
5602 );
5603
5604 let conn = sqlite::open_connection(db.path()).expect("conn");
5605 let active_edge_count: i64 = conn
5606 .query_row(
5607 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5608 [],
5609 |row| row.get(0),
5610 )
5611 .expect("active edge count");
5612 assert_eq!(
5613 active_edge_count, 1,
5614 "edge must be active after both endpoints restored"
5615 );
5616 }
5617
5618 #[test]
5621 fn fts_property_schema_crud_round_trip() {
5622 let (_db, service) = setup();
5623
5624 let record = service
5626 .register_fts_property_schema(
5627 "Meeting",
5628 &["$.title".to_owned(), "$.summary".to_owned()],
5629 None,
5630 )
5631 .expect("register");
5632 assert_eq!(record.kind, "Meeting");
5633 assert_eq!(record.property_paths, vec!["$.title", "$.summary"]);
5634 assert_eq!(record.separator, " ");
5635 assert_eq!(record.format_version, 1);
5636
5637 let described = service
5639 .describe_fts_property_schema("Meeting")
5640 .expect("describe")
5641 .expect("should exist");
5642 assert_eq!(described, record);
5643
5644 let missing = service
5646 .describe_fts_property_schema("NoSuchKind")
5647 .expect("describe missing");
5648 assert!(missing.is_none());
5649
5650 let list = service.list_fts_property_schemas().expect("list");
5652 assert_eq!(list.len(), 1);
5653 assert_eq!(list[0].kind, "Meeting");
5654
5655 let updated = service
5657 .register_fts_property_schema(
5658 "Meeting",
5659 &["$.title".to_owned(), "$.notes".to_owned()],
5660 Some("\n"),
5661 )
5662 .expect("update");
5663 assert_eq!(updated.property_paths, vec!["$.title", "$.notes"]);
5664 assert_eq!(updated.separator, "\n");
5665
5666 service
5668 .remove_fts_property_schema("Meeting")
5669 .expect("remove");
5670 let after_remove = service
5671 .describe_fts_property_schema("Meeting")
5672 .expect("describe after remove");
5673 assert!(after_remove.is_none());
5674
5675 let err = service.remove_fts_property_schema("Meeting");
5677 assert!(err.is_err());
5678 }
5679
5680 #[test]
5681 fn describe_fts_property_schema_round_trips_recursive_entries() {
5682 let (_db, service) = setup();
5683
5684 let entries = vec![
5685 FtsPropertyPathSpec::scalar("$.title"),
5686 FtsPropertyPathSpec::recursive("$.payload"),
5687 ];
5688 let exclude = vec!["$.payload.private".to_owned()];
5689 let registered = service
5690 .register_fts_property_schema_with_entries(
5691 "KnowledgeItem",
5692 &entries,
5693 Some(" "),
5694 &exclude,
5695 crate::rebuild_actor::RebuildMode::Eager,
5696 )
5697 .expect("register recursive");
5698
5699 assert_eq!(registered.entries, entries);
5702 assert_eq!(registered.exclude_paths, exclude);
5703 assert_eq!(registered.property_paths, vec!["$.title", "$.payload"]);
5704
5705 let described = service
5706 .describe_fts_property_schema("KnowledgeItem")
5707 .expect("describe")
5708 .expect("should exist");
5709 assert_eq!(described.kind, "KnowledgeItem");
5710 assert_eq!(described.entries, entries);
5711 assert_eq!(described.exclude_paths, exclude);
5712 assert_eq!(described.property_paths, vec!["$.title", "$.payload"]);
5713 assert_eq!(described.separator, " ");
5714 assert_eq!(described.format_version, 1);
5715 }
5716
5717 #[test]
5718 fn list_fts_property_schemas_round_trips_recursive_entries() {
5719 let (_db, service) = setup();
5720
5721 let entries = vec![
5722 FtsPropertyPathSpec::scalar("$.title"),
5723 FtsPropertyPathSpec::recursive("$.payload"),
5724 ];
5725 let exclude = vec!["$.payload.secret".to_owned()];
5726 service
5727 .register_fts_property_schema_with_entries(
5728 "KnowledgeItem",
5729 &entries,
5730 Some(" "),
5731 &exclude,
5732 crate::rebuild_actor::RebuildMode::Eager,
5733 )
5734 .expect("register recursive");
5735
5736 let listed = service.list_fts_property_schemas().expect("list");
5737 assert_eq!(listed.len(), 1);
5738 let record = &listed[0];
5739 assert_eq!(record.kind, "KnowledgeItem");
5740 assert_eq!(record.entries, entries);
5741 assert_eq!(record.exclude_paths, exclude);
5742 assert_eq!(record.property_paths, vec!["$.title", "$.payload"]);
5743 }
5744
5745 #[test]
5746 fn describe_fts_property_schema_round_trips_scalar_only_entries() {
5747 let (_db, service) = setup();
5748
5749 service
5750 .register_fts_property_schema(
5751 "Meeting",
5752 &["$.title".to_owned(), "$.summary".to_owned()],
5753 None,
5754 )
5755 .expect("register scalar");
5756
5757 let described = service
5758 .describe_fts_property_schema("Meeting")
5759 .expect("describe")
5760 .expect("should exist");
5761 assert_eq!(described.property_paths, vec!["$.title", "$.summary"]);
5762 assert_eq!(described.entries.len(), 2);
5763 for entry in &described.entries {
5764 assert_eq!(
5765 entry.mode,
5766 FtsPropertyPathMode::Scalar,
5767 "scalar-only schema should deserialize every entry as Scalar"
5768 );
5769 }
5770 assert!(described.exclude_paths.is_empty());
5771 }
5772
5773 #[test]
5774 fn restore_reestablishes_property_fts_visibility() {
5775 let (db, service) = setup();
5776 let doc_table = fathomdb_schema::fts_kind_table_name("Document");
5777 {
5778 let conn = sqlite::open_connection(db.path()).expect("conn");
5779 conn.execute(
5781 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5782 VALUES ('Document', '[\"$.title\", \"$.body\"]', ' ')",
5783 [],
5784 )
5785 .expect("register schema");
5786 conn.execute_batch(&format!(
5788 "CREATE VIRTUAL TABLE IF NOT EXISTS {doc_table} USING fts5(\
5789 node_logical_id UNINDEXED, text_content, \
5790 tokenize = 'porter unicode61 remove_diacritics 2'\
5791 )"
5792 ))
5793 .expect("create per-kind table");
5794 conn.execute(
5796 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5797 VALUES ('row-1', 'doc-1', 'Document', '{\"title\":\"Budget\",\"body\":\"Q3 forecast\"}', 100, 'seed')",
5798 [],
5799 )
5800 .expect("insert node");
5801 conn.execute(
5803 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5804 VALUES ('chunk-1', 'doc-1', 'budget text', 100)",
5805 [],
5806 )
5807 .expect("insert chunk");
5808 conn.execute(
5810 &format!(
5811 "INSERT INTO {doc_table} (node_logical_id, text_content) \
5812 VALUES ('doc-1', 'Budget Q3 forecast')"
5813 ),
5814 [],
5815 )
5816 .expect("insert property fts");
5817 conn.execute(
5819 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5820 VALUES ('evt-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5821 [],
5822 )
5823 .expect("retire event");
5824 conn.execute(
5825 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5826 [],
5827 )
5828 .expect("supersede");
5829 conn.execute("DELETE FROM fts_nodes", [])
5830 .expect("clear chunk fts");
5831 conn.execute(&format!("DELETE FROM {doc_table}"), [])
5832 .expect("clear property fts");
5833 }
5834
5835 let report = service.restore_logical_id("doc-1").expect("restore");
5836 assert_eq!(report.restored_property_fts_rows, 1);
5837
5838 let conn = sqlite::open_connection(db.path()).expect("conn");
5840 let prop_fts_count: i64 = conn
5841 .query_row(
5842 &format!("SELECT count(*) FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5843 [],
5844 |row| row.get(0),
5845 )
5846 .expect("prop fts count");
5847 assert_eq!(prop_fts_count, 1, "property FTS must be restored");
5848
5849 let text: String = conn
5850 .query_row(
5851 &format!("SELECT text_content FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5852 [],
5853 |row| row.get(0),
5854 )
5855 .expect("prop fts text");
5856 assert_eq!(text, "Budget Q3 forecast");
5857 }
5858
5859 #[test]
5860 fn safe_export_preserves_fts_property_schemas() {
5861 let (_db, service) = setup();
5862 service
5863 .register_fts_property_schema(
5864 "Goal",
5865 &["$.name".to_owned(), "$.rationale".to_owned()],
5866 None,
5867 )
5868 .expect("register schema");
5869
5870 let export_dir = tempfile::TempDir::new().expect("temp dir");
5871 let export_path = export_dir.path().join("backup.db");
5872 service
5873 .safe_export(
5874 &export_path,
5875 SafeExportOptions {
5876 force_checkpoint: false,
5877 },
5878 )
5879 .expect("export");
5880
5881 let exported_conn = rusqlite::Connection::open(&export_path).expect("open exported db");
5883 let kind: String = exported_conn
5884 .query_row(
5885 "SELECT kind FROM fts_property_schemas WHERE kind = 'Goal'",
5886 [],
5887 |row| row.get(0),
5888 )
5889 .expect("schema must exist in export");
5890 assert_eq!(kind, "Goal");
5891 let paths_json: String = exported_conn
5892 .query_row(
5893 "SELECT property_paths_json FROM fts_property_schemas WHERE kind = 'Goal'",
5894 [],
5895 |row| row.get(0),
5896 )
5897 .expect("paths must exist");
5898 let paths: Vec<String> = serde_json::from_str(&paths_json).expect("valid json");
5899 assert_eq!(paths, vec!["$.name", "$.rationale"]);
5900 }
5901
5902 #[test]
5903 #[allow(clippy::too_many_lines)]
5904 fn export_recovery_rebuilds_property_fts_from_canonical_state() {
5905 let (db, service) = setup();
5906 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5907 service
5909 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5910 .expect("register");
5911 {
5912 let conn = sqlite::open_connection(db.path()).expect("conn");
5913 conn.execute(
5914 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5915 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5916 [],
5917 )
5918 .expect("insert node 1");
5919 conn.execute(
5920 &format!(
5921 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5922 VALUES ('goal-1', 'Ship v2')"
5923 ),
5924 [],
5925 )
5926 .expect("insert property FTS row 1");
5927 conn.execute(
5928 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5929 VALUES ('row-2', 'goal-2', 'Goal', '{\"name\":\"Launch redesign\"}', 100, 'seed')",
5930 [],
5931 )
5932 .expect("insert node 2");
5933 conn.execute(
5934 &format!(
5935 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5936 VALUES ('goal-2', 'Launch redesign')"
5937 ),
5938 [],
5939 )
5940 .expect("insert property FTS row 2");
5941 }
5942
5943 let export_dir = tempfile::TempDir::new().expect("temp dir");
5945 let export_path = export_dir.path().join("backup.db");
5946 service
5947 .safe_export(
5948 &export_path,
5949 SafeExportOptions {
5950 force_checkpoint: false,
5951 },
5952 )
5953 .expect("export");
5954
5955 {
5959 let conn = rusqlite::Connection::open(&export_path).expect("open export");
5960 SchemaManager::new()
5962 .bootstrap(&conn)
5963 .expect("bootstrap export");
5964 conn.execute(
5965 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5966 [],
5967 )
5968 .expect("delete old row");
5969 conn.execute(
5970 &format!(
5971 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5972 VALUES ('goal-1', 'completely wrong stale text')"
5973 ),
5974 [],
5975 )
5976 .expect("insert corrupted row");
5977 conn.execute(
5978 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5979 [],
5980 )
5981 .expect("delete goal-2 row");
5982 }
5983
5984 let schema = Arc::new(SchemaManager::new());
5986 let exported_service = AdminService::new(&export_path, Arc::clone(&schema));
5987 exported_service
5988 .rebuild_projections(ProjectionTarget::Fts)
5989 .expect("rebuild");
5990
5991 let conn = rusqlite::Connection::open(&export_path).expect("open export for verify");
5993 let goal1_text: String = conn
5994 .query_row(
5995 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5996 [],
5997 |r| r.get(0),
5998 )
5999 .expect("goal-1 text after rebuild");
6000 assert_eq!(
6001 goal1_text, "Ship v2",
6002 "goal-1 text must be corrected by rebuild"
6003 );
6004
6005 let goal2_count: i64 = conn
6006 .query_row(
6007 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
6008 [],
6009 |r| r.get(0),
6010 )
6011 .expect("goal-2 count");
6012 assert_eq!(goal2_count, 1, "goal-2 row must be restored by rebuild");
6013
6014 let stale_count: i64 = conn
6015 .query_row(
6016 &format!("SELECT count(*) FROM {goal_table} WHERE text_content = 'completely wrong stale text'"),
6017 [],
6018 |r| r.get(0),
6019 )
6020 .expect("stale count");
6021 assert_eq!(stale_count, 0, "corrupted text must be gone after rebuild");
6022
6023 let integrity = exported_service.check_integrity().expect("integrity");
6025 assert_eq!(integrity.missing_property_fts_rows, 0);
6026 let semantics = exported_service.check_semantics().expect("semantics");
6027 assert_eq!(semantics.drifted_property_fts_rows, 0);
6028 assert_eq!(semantics.orphaned_property_fts_rows, 0);
6029 assert_eq!(semantics.duplicate_property_fts_rows, 0);
6030 }
6031
6032 #[test]
6033 fn check_integrity_no_false_positives_for_empty_extraction() {
6034 let (db, service) = setup();
6035 {
6036 let conn = sqlite::open_connection(db.path()).expect("conn");
6037 conn.execute(
6039 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6040 VALUES ('Ticket', '[\"$.searchable\"]', ' ')",
6041 [],
6042 )
6043 .expect("register schema");
6044 conn.execute(
6047 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6048 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"status\":\"open\"}', 100, 'seed')",
6049 [],
6050 )
6051 .expect("insert node");
6052 }
6053
6054 let report = service.check_integrity().expect("integrity");
6055 assert_eq!(
6056 report.missing_property_fts_rows, 0,
6057 "node with no extractable values must not be counted as missing"
6058 );
6059 }
6060
6061 #[test]
6062 fn check_integrity_detects_genuinely_missing_property_fts_rows() {
6063 let (db, service) = setup();
6064 {
6065 let conn = sqlite::open_connection(db.path()).expect("conn");
6066 conn.execute(
6067 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6068 VALUES ('Ticket', '[\"$.title\"]', ' ')",
6069 [],
6070 )
6071 .expect("register schema");
6072 conn.execute(
6074 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6075 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"title\":\"fix login bug\"}', 100, 'seed')",
6076 [],
6077 )
6078 .expect("insert node");
6079 }
6080
6081 let report = service.check_integrity().expect("integrity");
6082 assert_eq!(
6083 report.missing_property_fts_rows, 1,
6084 "node with extractable values but no property FTS row must be detected"
6085 );
6086 }
6087
6088 #[test]
6089 fn rebuild_projections_fts_restores_missing_property_fts_rows() {
6090 let (db, service) = setup();
6091 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6092 {
6093 let conn = sqlite::open_connection(db.path()).expect("conn");
6094 conn.execute(
6095 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6096 VALUES ('Goal', '[\"$.name\"]', ' ')",
6097 [],
6098 )
6099 .expect("register schema");
6100 conn.execute(
6101 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6102 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6103 [],
6104 )
6105 .expect("insert node");
6106 }
6108
6109 let report = service
6110 .rebuild_projections(ProjectionTarget::Fts)
6111 .expect("rebuild");
6112 assert!(
6113 report.rebuilt_rows >= 1,
6114 "rebuild must insert at least one property FTS row"
6115 );
6116
6117 let conn = sqlite::open_connection(db.path()).expect("conn");
6118 let text: String = conn
6119 .query_row(
6120 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6121 [],
6122 |row| row.get(0),
6123 )
6124 .expect("property FTS row must exist after rebuild");
6125 assert_eq!(text, "Ship v2");
6126 }
6127
6128 #[test]
6129 fn rebuild_missing_projections_fills_gap_for_deleted_property_fts_row() {
6130 let (db, service) = setup();
6131 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6132 {
6133 let conn = sqlite::open_connection(db.path()).expect("conn");
6134 conn.execute(
6135 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6136 VALUES ('Goal', '[\"$.name\"]', ' ')",
6137 [],
6138 )
6139 .expect("register schema");
6140 conn.execute(
6141 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6142 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6143 [],
6144 )
6145 .expect("insert node");
6146 conn.execute_batch(&format!(
6148 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
6149 node_logical_id UNINDEXED, text_content, \
6150 tokenize = 'porter unicode61 remove_diacritics 2'\
6151 )"
6152 ))
6153 .expect("create per-kind table");
6154 conn.execute(
6155 &format!(
6156 "INSERT INTO {goal_table} (node_logical_id, text_content) \
6157 VALUES ('goal-1', 'Ship v2')"
6158 ),
6159 [],
6160 )
6161 .expect("insert property fts");
6162 conn.execute(
6163 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6164 [],
6165 )
6166 .expect("delete property fts");
6167 }
6168
6169 let report = service
6170 .rebuild_missing_projections()
6171 .expect("rebuild missing");
6172 assert!(
6173 report.rebuilt_rows >= 1,
6174 "missing rebuild must insert the gap-fill row"
6175 );
6176
6177 let conn = sqlite::open_connection(db.path()).expect("conn");
6178 let count: i64 = conn
6179 .query_row(
6180 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6181 [],
6182 |row| row.get(0),
6183 )
6184 .expect("count");
6185 assert_eq!(
6186 count, 1,
6187 "gap-fill must restore exactly one property FTS row"
6188 );
6189 }
6190
6191 #[test]
6192 fn remove_schema_then_rebuild_cleans_stale_property_fts_rows() {
6193 let (db, service) = setup();
6199 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6200 {
6201 let conn = sqlite::open_connection(db.path()).expect("conn");
6202 conn.execute(
6203 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6204 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6205 [],
6206 )
6207 .expect("insert node");
6208 conn.execute_batch(&format!(
6211 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
6212 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
6213 ))
6214 .expect("create per-kind table");
6215 conn.execute(
6216 &format!(
6217 "INSERT INTO {goal_table} (node_logical_id, text_content) \
6218 VALUES ('goal-1', 'Ship v2')"
6219 ),
6220 [],
6221 )
6222 .expect("insert property fts");
6223 }
6224
6225 let semantics = service.check_semantics().expect("semantics");
6227 assert_eq!(
6228 semantics.orphaned_property_fts_rows, 1,
6229 "orphaned property FTS rows must be detected with no registered schema"
6230 );
6231
6232 service
6234 .rebuild_projections(ProjectionTarget::Fts)
6235 .expect("rebuild");
6236
6237 let conn = sqlite::open_connection(db.path()).expect("conn");
6238 let count: i64 = conn
6239 .query_row(
6240 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6241 [],
6242 |row| row.get(0),
6243 )
6244 .expect("count");
6245 assert_eq!(
6246 count, 0,
6247 "rebuild must delete rows from per-kind tables with no registered schema"
6248 );
6249 }
6250
6251 mod validate_fts_property_paths_tests {
6252 use super::super::validate_fts_property_paths;
6253
6254 #[test]
6255 fn valid_simple_path() {
6256 assert!(validate_fts_property_paths(&["$.name".to_owned()]).is_ok());
6257 }
6258
6259 #[test]
6260 fn valid_nested_path() {
6261 assert!(validate_fts_property_paths(&["$.address.city".to_owned()]).is_ok());
6262 }
6263
6264 #[test]
6265 fn valid_underscore_segment() {
6266 assert!(validate_fts_property_paths(&["$.a_b".to_owned()]).is_ok());
6267 }
6268
6269 #[test]
6270 fn rejects_bare_prefix() {
6271 let result = validate_fts_property_paths(&["$.".to_owned()]);
6272 assert!(result.is_err(), "path '$.' must be rejected");
6273 }
6274
6275 #[test]
6276 fn rejects_double_dot() {
6277 let result = validate_fts_property_paths(&["$..x".to_owned()]);
6278 assert!(result.is_err(), "path '$..x' must be rejected");
6279 }
6280
6281 #[test]
6282 fn rejects_trailing_dot() {
6283 let result = validate_fts_property_paths(&["$.foo.".to_owned()]);
6284 assert!(result.is_err(), "path '$.foo.' must be rejected");
6285 }
6286
6287 #[test]
6288 fn rejects_space_in_segment() {
6289 let result = validate_fts_property_paths(&["$.foo bar".to_owned()]);
6290 assert!(result.is_err(), "path '$.foo bar' must be rejected");
6291 }
6292
6293 #[test]
6294 fn rejects_bracket_syntax() {
6295 let result = validate_fts_property_paths(&["$.foo[0]".to_owned()]);
6296 assert!(result.is_err(), "path '$.foo[0]' must be rejected");
6297 }
6298
6299 #[test]
6300 fn rejects_duplicates() {
6301 let result = validate_fts_property_paths(&["$.name".to_owned(), "$.name".to_owned()]);
6302 assert!(result.is_err(), "duplicate paths must be rejected");
6303 }
6304
6305 #[test]
6306 fn rejects_empty_list() {
6307 let result = validate_fts_property_paths(&[]);
6308 assert!(result.is_err(), "empty path list must be rejected");
6309 }
6310 }
6311
6312 #[test]
6315 fn register_fts_schema_writes_to_per_kind_table() {
6316 let (db, service) = setup();
6319 {
6320 let conn = sqlite::open_connection(db.path()).expect("conn");
6321 conn.execute(
6323 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6324 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6325 [],
6326 )
6327 .expect("insert node");
6328 }
6329
6330 service
6332 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6333 .expect("register schema");
6334
6335 let conn = sqlite::open_connection(db.path()).expect("conn");
6336 let table = fathomdb_schema::fts_kind_table_name("Goal");
6337 let per_kind_count: i64 = conn
6339 .query_row(
6340 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6341 [],
6342 |row| row.get(0),
6343 )
6344 .expect("per-kind count");
6345 assert_eq!(
6346 per_kind_count, 1,
6347 "per-kind table must have the row after registration"
6348 );
6349 }
6350
6351 #[test]
6352 fn remove_fts_schema_deletes_from_per_kind_table() {
6353 let (db, service) = setup();
6355 {
6356 let conn = sqlite::open_connection(db.path()).expect("conn");
6357 conn.execute(
6358 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6359 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6360 [],
6361 )
6362 .expect("insert node");
6363 }
6364
6365 service
6366 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6367 .expect("register schema");
6368 service
6369 .remove_fts_property_schema("Goal")
6370 .expect("remove schema");
6371
6372 let conn = sqlite::open_connection(db.path()).expect("conn");
6373 let table = fathomdb_schema::fts_kind_table_name("Goal");
6374 let per_kind_count: i64 = conn
6375 .query_row(
6376 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6377 [],
6378 |row| row.get(0),
6379 )
6380 .expect("per-kind count");
6381 assert_eq!(
6382 per_kind_count, 0,
6383 "per-kind table must be empty after schema removal"
6384 );
6385 }
6386
6387 #[test]
6390 fn fts_path_spec_with_weight_builder() {
6391 let spec = FtsPropertyPathSpec::scalar("$.title").with_weight(5.0);
6392 assert_eq!(spec.weight, Some(5.0));
6393 assert_eq!(spec.path, "$.title");
6394 assert_eq!(spec.mode, FtsPropertyPathMode::Scalar);
6395 }
6396
6397 #[test]
6398 fn fts_path_spec_serialize_with_weight() {
6399 use super::serialize_property_paths_json;
6400 let entries = vec![
6401 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6402 FtsPropertyPathSpec::scalar("$.body"),
6403 ];
6404 let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6405 let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6407 let paths = v
6408 .get("paths")
6409 .expect("paths key")
6410 .as_array()
6411 .expect("array");
6412 assert_eq!(paths.len(), 2);
6413 assert_eq!(
6415 paths[0].get("path").and_then(serde_json::Value::as_str),
6416 Some("$.title")
6417 );
6418 assert_eq!(
6419 paths[0].get("weight").and_then(serde_json::Value::as_f64),
6420 Some(2.0)
6421 );
6422 assert!(
6424 paths[1].get("weight").is_none(),
6425 "unweighted spec must omit weight field"
6426 );
6427 }
6428
6429 #[test]
6430 fn fts_path_spec_serialize_no_weights() {
6431 use super::serialize_property_paths_json;
6432 let entries = vec![
6433 FtsPropertyPathSpec::scalar("$.title"),
6434 FtsPropertyPathSpec::scalar("$.payload"),
6435 ];
6436 let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6437 let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6439 assert!(
6440 v.is_array(),
6441 "all-scalar no-weight schema must serialize as bare string array"
6442 );
6443 let arr = v.as_array().expect("array");
6444 assert_eq!(arr.len(), 2);
6445 assert_eq!(arr[0].as_str(), Some("$.title"));
6446 assert_eq!(arr[1].as_str(), Some("$.payload"));
6447 }
6448
6449 #[test]
6450 fn fts_weight_validation_out_of_range() {
6451 let (_db, service) = setup();
6452 let entries_zero = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(0.0)];
6454 let result = service.register_fts_property_schema_with_entries(
6455 "Article",
6456 &entries_zero,
6457 None,
6458 &[],
6459 crate::rebuild_actor::RebuildMode::Eager,
6460 );
6461 assert!(result.is_err(), "weight 0.0 must be rejected");
6462 let err_msg = result.expect_err("weight 0.0 must be rejected").to_string();
6463 assert!(
6464 err_msg.contains("weight"),
6465 "error must mention weight: {err_msg}"
6466 );
6467
6468 let entries_big = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(1001.0)];
6470 let result = service.register_fts_property_schema_with_entries(
6471 "Article",
6472 &entries_big,
6473 None,
6474 &[],
6475 crate::rebuild_actor::RebuildMode::Eager,
6476 );
6477 assert!(result.is_err(), "weight 1001.0 must be rejected");
6478 }
6479
6480 #[test]
6481 fn fts_weight_validation_valid() {
6482 let (_db, service) = setup();
6483 let entries = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(10.0)];
6484 let result = service.register_fts_property_schema_with_entries(
6485 "Article",
6486 &entries,
6487 None,
6488 &[],
6489 crate::rebuild_actor::RebuildMode::Eager,
6490 );
6491 assert!(
6492 result.is_ok(),
6493 "weight 10.0 must be accepted: {:?}",
6494 result.err()
6495 );
6496 }
6497
6498 #[test]
6501 fn create_or_replace_creates_multi_column_table() {
6502 use super::create_or_replace_fts_kind_table;
6503 let (db, _service) = setup();
6504 let conn = sqlite::open_connection(db.path()).expect("conn");
6505 let specs = vec![
6506 FtsPropertyPathSpec::scalar("$.title"),
6507 FtsPropertyPathSpec::recursive("$.payload"),
6508 ];
6509 create_or_replace_fts_kind_table(
6510 &conn,
6511 "Article",
6512 &specs,
6513 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6514 )
6515 .expect("create table");
6516
6517 let table = fathomdb_schema::fts_kind_table_name("Article");
6519 let count: i64 = conn
6521 .query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))
6522 .expect("count");
6523 assert_eq!(count, 0, "new table must be empty");
6524
6525 let title_col = fathomdb_schema::fts_column_name("$.title", false);
6527 let payload_col = fathomdb_schema::fts_column_name("$.payload", true);
6528 conn.execute(
6529 &format!(
6530 "INSERT INTO {table} (node_logical_id, {title_col}, {payload_col}) VALUES ('id1', 'hello', 'world')"
6531 ),
6532 [],
6533 )
6534 .expect("insert with per-spec columns must succeed");
6535 }
6536
6537 #[test]
6538 fn create_or_replace_drops_and_recreates() {
6539 use super::create_or_replace_fts_kind_table;
6540 let (db, _service) = setup();
6541 let conn = sqlite::open_connection(db.path()).expect("conn");
6542
6543 let specs_v1 = vec![FtsPropertyPathSpec::scalar("$.title")];
6545 create_or_replace_fts_kind_table(
6546 &conn,
6547 "Post",
6548 &specs_v1,
6549 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6550 )
6551 .expect("create v1");
6552
6553 let specs_v2 = vec![
6555 FtsPropertyPathSpec::scalar("$.title"),
6556 FtsPropertyPathSpec::scalar("$.summary"),
6557 ];
6558 create_or_replace_fts_kind_table(
6559 &conn,
6560 "Post",
6561 &specs_v2,
6562 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6563 )
6564 .expect("create v2");
6565
6566 let table = fathomdb_schema::fts_kind_table_name("Post");
6568 let summary_col = fathomdb_schema::fts_column_name("$.summary", false);
6569 conn.execute(
6570 &format!("INSERT INTO {table} (node_logical_id, {summary_col}) VALUES ('id1', 'text')"),
6571 [],
6572 )
6573 .expect("second layout must allow summary column");
6574 }
6575
6576 #[test]
6577 fn create_or_replace_invalid_tokenizer() {
6578 use super::create_or_replace_fts_kind_table;
6579 let (db, _service) = setup();
6580 let conn = sqlite::open_connection(db.path()).expect("conn");
6581 let specs = vec![FtsPropertyPathSpec::scalar("$.title")];
6582 let result = create_or_replace_fts_kind_table(&conn, "Post", &specs, "'; DROP TABLE --");
6583 assert!(result.is_err(), "invalid tokenizer must be rejected");
6584 let err_msg = result
6585 .expect_err("invalid tokenizer must be rejected")
6586 .to_string();
6587 assert!(
6588 err_msg.contains("tokenizer"),
6589 "error must mention tokenizer: {err_msg}"
6590 );
6591 }
6592
6593 #[test]
6594 fn register_with_weights_creates_per_column_table() {
6595 let (db, service) = setup();
6596 let entries = vec![
6597 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6598 FtsPropertyPathSpec::scalar("$.body"),
6599 ];
6600 service
6601 .register_fts_property_schema_with_entries(
6602 "Article",
6603 &entries,
6604 None,
6605 &[],
6606 crate::rebuild_actor::RebuildMode::Eager,
6607 )
6608 .expect("register");
6609
6610 let conn = sqlite::open_connection(db.path()).expect("conn");
6612 let table = fathomdb_schema::fts_kind_table_name("Article");
6613 let title_col = fathomdb_schema::fts_column_name("$.title", false);
6614 let body_col = fathomdb_schema::fts_column_name("$.body", false);
6615 conn.execute(
6617 &format!(
6618 "INSERT INTO {table} (node_logical_id, {title_col}, {body_col}) VALUES ('art-1', 'hello', 'world')"
6619 ),
6620 [],
6621 )
6622 .expect("per-spec columns must exist after registration with weights");
6623 }
6624
6625 #[test]
6626 fn weighted_to_unweighted_downgrade_recreates_table() {
6627 let (db, service) = setup();
6628
6629 let weighted_entries = vec![
6631 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6632 FtsPropertyPathSpec::scalar("$.body"),
6633 ];
6634 service
6635 .register_fts_property_schema_with_entries(
6636 "Article",
6637 &weighted_entries,
6638 None,
6639 &[],
6640 crate::rebuild_actor::RebuildMode::Eager,
6641 )
6642 .expect("register weighted");
6643
6644 let unweighted_entries = vec![
6646 FtsPropertyPathSpec::scalar("$.title"),
6647 FtsPropertyPathSpec::scalar("$.body"),
6648 ];
6649 service
6650 .register_fts_property_schema_with_entries(
6651 "Article",
6652 &unweighted_entries,
6653 None,
6654 &[],
6655 crate::rebuild_actor::RebuildMode::Eager,
6656 )
6657 .expect("re-register unweighted");
6658
6659 let conn = sqlite::open_connection(db.path()).expect("conn");
6662 let table = fathomdb_schema::fts_kind_table_name("Article");
6663 let result = conn.execute(
6664 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('art-1', 'hello world')"),
6665 [],
6666 );
6667 assert!(
6668 result.is_ok(),
6669 "text_content column must exist after weighted-to-unweighted downgrade"
6670 );
6671 }
6672
6673 #[test]
6676 fn set_get_fts_profile_roundtrip() {
6677 let (_db, service) = setup();
6678 let profile = service
6679 .set_fts_profile("book", "unicode61")
6680 .expect("set_fts_profile");
6681 assert_eq!(profile.kind, "book");
6682 assert_eq!(profile.tokenizer, "unicode61");
6683
6684 let got = service
6685 .get_fts_profile("book")
6686 .expect("get_fts_profile")
6687 .expect("should be Some");
6688 assert_eq!(got.kind, "book");
6689 assert_eq!(got.tokenizer, "unicode61");
6690 }
6691
6692 #[test]
6693 fn fts_profile_upsert() {
6694 let (_db, service) = setup();
6695 service
6696 .set_fts_profile("article", "unicode61")
6697 .expect("first set");
6698 service
6699 .set_fts_profile("article", "porter unicode61 remove_diacritics 2")
6700 .expect("second set");
6701 let got = service
6702 .get_fts_profile("article")
6703 .expect("get")
6704 .expect("Some");
6705 assert_eq!(got.tokenizer, "porter unicode61 remove_diacritics 2");
6706 }
6707
6708 #[test]
6709 fn invalid_tokenizer_rejected() {
6710 let (_db, service) = setup();
6711 let result = service.set_fts_profile("book", "'; DROP TABLE nodes --");
6712 assert!(result.is_err(), "invalid tokenizer must be rejected");
6713 let msg = result.expect_err("must be Err").to_string();
6714 assert!(
6715 msg.contains("tokenizer") || msg.contains("invalid"),
6716 "error must mention tokenizer or invalid: {msg}"
6717 );
6718 }
6719
6720 #[test]
6721 fn preset_recall_optimized_english() {
6722 assert_eq!(
6723 super::resolve_tokenizer_preset("recall-optimized-english"),
6724 "porter unicode61 remove_diacritics 2"
6725 );
6726 }
6727
6728 #[test]
6729 fn preset_precision_optimized() {
6730 assert_eq!(
6731 super::resolve_tokenizer_preset("precision-optimized"),
6732 "unicode61 remove_diacritics 2"
6733 );
6734 }
6735
6736 #[test]
6737 fn preset_global_cjk() {
6738 assert_eq!(super::resolve_tokenizer_preset("global-cjk"), "icu");
6739 }
6740
6741 #[test]
6742 fn preset_substring_trigram() {
6743 assert_eq!(
6744 super::resolve_tokenizer_preset("substring-trigram"),
6745 "trigram"
6746 );
6747 }
6748
6749 #[test]
6750 fn preset_source_code() {
6751 assert_eq!(
6752 super::resolve_tokenizer_preset("source-code"),
6753 "unicode61 tokenchars '._-$@'"
6754 );
6755 }
6756
6757 #[test]
6758 fn preview_fts_row_count() {
6759 let (db, service) = setup();
6760 {
6761 let conn = sqlite::open_connection(db.path()).expect("conn");
6762 for i in 0..5u32 {
6763 conn.execute(
6764 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6765 VALUES (?1, ?2, 'book', '{}', 100, 'src')",
6766 rusqlite::params![format!("r{i}"), format!("lg{i}")],
6767 )
6768 .expect("insert node");
6769 }
6770 conn.execute(
6772 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, superseded_at) \
6773 VALUES ('r99', 'lg99', 'book', '{}', 100, 'src', 200)",
6774 [],
6775 )
6776 .expect("insert superseded");
6777 }
6778 let impact = service
6779 .preview_projection_impact("book", "fts")
6780 .expect("preview");
6781 assert_eq!(impact.rows_to_rebuild, 5);
6782 }
6783
6784 #[test]
6785 fn preview_populates_current_tokenizer() {
6786 let (_db, service) = setup();
6787 service
6788 .set_fts_profile("doc", "trigram")
6789 .expect("set profile");
6790 let impact = service
6791 .preview_projection_impact("doc", "fts")
6792 .expect("preview");
6793 assert_eq!(impact.current_tokenizer, Some("trigram".to_owned()));
6794 assert_eq!(impact.target_tokenizer, None);
6795 }
6796
6797 #[test]
6800 fn create_or_replace_source_code_tokenizer_is_accepted() {
6801 use super::create_or_replace_fts_kind_table;
6805 let (db, _service) = setup();
6806 let conn = sqlite::open_connection(db.path()).expect("conn");
6807 let specs = vec![FtsPropertyPathSpec::scalar("$.symbol")];
6808 let source_code_tokenizer = "unicode61 tokenchars '._-$@'";
6809 let result =
6810 create_or_replace_fts_kind_table(&conn, "Symbol", &specs, source_code_tokenizer);
6811 assert!(
6812 result.is_ok(),
6813 "source-code tokenizer string must be accepted by create_or_replace_fts_kind_table: {:?}",
6814 result.err()
6815 );
6816 }
6817
6818 #[test]
6819 fn source_code_profile_round_trip_through_register_fts_schema() {
6820 let db = tempfile::NamedTempFile::new().expect("temp file");
6825 let schema = Arc::new(fathomdb_schema::SchemaManager::new());
6826
6827 {
6829 let _coord = crate::ExecutionCoordinator::open(
6830 db.path(),
6831 Arc::clone(&schema),
6832 None,
6833 1,
6834 Arc::new(crate::TelemetryCounters::default()),
6835 None,
6836 )
6837 .expect("coordinator opens for bootstrap");
6838 }
6839
6840 let service = AdminService::new(db.path(), Arc::clone(&schema));
6841
6842 service
6844 .set_fts_profile("Symbol", "source-code")
6845 .expect("set_fts_profile with source-code preset must succeed");
6846
6847 let result = service.register_fts_property_schema("Symbol", &["$.name".to_owned()], None);
6850 assert!(
6851 result.is_ok(),
6852 "register_fts_property_schema must succeed when source-code profile is active: {:?}",
6853 result.err()
6854 );
6855 }
6856
6857 #[cfg(feature = "sqlite-vec")]
6866 #[test]
6867 fn embedder_max_tokens_8192_handles_chunk_exceeding_512_words() {
6868 let long_text = (0..600u32)
6869 .map(|i| format!("word{i}"))
6870 .collect::<Vec<_>>()
6871 .join(" ");
6872
6873 let db = NamedTempFile::new().expect("temp file");
6874 let schema = Arc::new(SchemaManager::new());
6875
6876 {
6877 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6878 schema.bootstrap(&conn).expect("bootstrap");
6879 conn.execute(
6880 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6881 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'src-1')",
6882 [],
6883 )
6884 .expect("insert node");
6885 conn.execute(
6886 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6887 VALUES (?1, 'doc-1', ?2, 100)",
6888 rusqlite::params!["chunk-long", long_text],
6889 )
6890 .expect("insert long chunk");
6891 }
6892
6893 let embedder = LargeContextTestEmbedder::new("long-context-model", 4, 8192);
6894 let service = AdminService::new(db.path(), Arc::clone(&schema));
6895 let report = service
6896 .regenerate_vector_embeddings(
6897 &embedder,
6898 &VectorRegenerationConfig {
6899 kind: "Document".to_owned(),
6900 profile: "default".to_owned(),
6901 chunking_policy: "per_chunk".to_owned(),
6902 preprocessing_policy: "trim".to_owned(),
6903 },
6904 )
6905 .expect("regenerate with long-context embedder");
6906
6907 assert_eq!(
6908 report.total_chunks, 1,
6909 "600-word text pre-written as one chunk must result in exactly one embedded row"
6910 );
6911 assert_eq!(report.regenerated_rows, 1);
6912 assert_eq!(
6913 embedder.max_tokens(),
6914 8192,
6915 "embedder must advertise 8192 token capacity"
6916 );
6917 }
6918
6919 #[cfg(feature = "sqlite-vec")]
6921 #[derive(Debug)]
6922 struct LargeContextTestEmbedder {
6923 identity: QueryEmbedderIdentity,
6924 vector: Vec<f32>,
6925 max_tokens: usize,
6926 }
6927
6928 #[cfg(feature = "sqlite-vec")]
6929 impl LargeContextTestEmbedder {
6930 fn new(model: &str, dimension: usize, max_tokens: usize) -> Self {
6931 Self {
6932 identity: QueryEmbedderIdentity {
6933 model_identity: model.to_owned(),
6934 model_version: "1.0.0".to_owned(),
6935 dimension,
6936 normalization_policy: "l2".to_owned(),
6937 },
6938 vector: vec![1.0; dimension],
6939 max_tokens,
6940 }
6941 }
6942 }
6943
6944 #[cfg(feature = "sqlite-vec")]
6945 impl QueryEmbedder for LargeContextTestEmbedder {
6946 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
6947 Ok(self.vector.clone())
6948 }
6949 fn identity(&self) -> QueryEmbedderIdentity {
6950 self.identity.clone()
6951 }
6952 fn max_tokens(&self) -> usize {
6953 self.max_tokens
6954 }
6955 }
6956
6957 #[cfg(feature = "sqlite-vec")]
6961 #[test]
6962 #[allow(clippy::too_many_lines)]
6963 fn regenerate_vector_embeddings_in_process_writes_contract_and_vec_rows() {
6964 let db = NamedTempFile::new().expect("temp file");
6965 let schema = Arc::new(SchemaManager::new());
6966
6967 {
6968 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6969 schema.bootstrap(&conn).expect("bootstrap");
6970 for (row_id, logical_id, created_at, src) in [
6971 ("r1", "node-1", 100, "src1"),
6972 ("r2", "node-2", 101, "src2"),
6973 ("r3", "node-3", 102, "src3"),
6974 ] {
6975 conn.execute(
6976 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6977 VALUES (?1, ?2, 'Doc', '{}', ?3, ?4)",
6978 rusqlite::params![row_id, logical_id, created_at, src],
6979 )
6980 .expect("insert node");
6981 }
6982 for (chunk_id, node_id, text, created_at) in [
6983 ("c1", "node-1", "first document text", 100),
6984 ("c2", "node-2", "second document text", 101),
6985 ("c3", "node-3", "third document text", 102),
6986 ] {
6987 conn.execute(
6988 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6989 VALUES (?1, ?2, ?3, ?4)",
6990 rusqlite::params![chunk_id, node_id, text, created_at],
6991 )
6992 .expect("insert chunk");
6993 }
6994 }
6995
6996 let service = AdminService::new(db.path(), Arc::clone(&schema));
6997 let embedder = TestEmbedder::new("batch-test-model", 4);
6998 let config = VectorRegenerationConfig {
6999 kind: "Doc".to_owned(),
7000 profile: "default".to_owned(),
7001 chunking_policy: "per_chunk".to_owned(),
7002 preprocessing_policy: "trim".to_owned(),
7003 };
7004 let report = service
7005 .regenerate_vector_embeddings_in_process(&embedder, &config)
7006 .expect("in-process regen must succeed");
7007
7008 assert_eq!(report.total_chunks, 3);
7009 assert_eq!(report.regenerated_rows, 3);
7010 assert!(report.contract_persisted);
7011
7012 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7013 let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
7014 let vec_count: i64 = conn
7015 .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
7016 row.get(0)
7017 })
7018 .expect("vec_doc count");
7019 assert_eq!(vec_count, 3, "one vec row per chunk");
7020
7021 let model_identity: String = conn
7022 .query_row(
7023 "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
7024 [],
7025 |row| row.get(0),
7026 )
7027 .expect("contract row");
7028 assert_eq!(model_identity, "batch-test-model");
7029 }
7030
7031 #[cfg(feature = "sqlite-vec")]
7034 #[test]
7035 #[allow(clippy::too_many_lines)]
7036 fn regenerate_vector_embeddings_targets_per_kind_table() {
7037 let db = NamedTempFile::new().expect("temp file");
7038 let schema = Arc::new(SchemaManager::new());
7039
7040 {
7041 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7042 schema.bootstrap(&conn).expect("bootstrap");
7043 conn.execute(
7044 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
7045 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
7046 [],
7047 )
7048 .expect("insert node");
7049 conn.execute(
7050 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
7051 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
7052 [],
7053 )
7054 .expect("insert chunk");
7055 }
7056
7057 let service = AdminService::new(db.path(), Arc::clone(&schema));
7058 let embedder = TestEmbedder::new("test-model", 4);
7059 let report = service
7060 .regenerate_vector_embeddings(
7061 &embedder,
7062 &VectorRegenerationConfig {
7063 kind: "Document".to_owned(),
7064 profile: "default".to_owned(),
7065 chunking_policy: "per_chunk".to_owned(),
7066 preprocessing_policy: "trim".to_owned(),
7067 },
7068 )
7069 .expect("regenerate vectors");
7070
7071 let expected_vec_table = fathomdb_schema::vec_kind_table_name("Document");
7072 assert_eq!(report.table_name, expected_vec_table);
7073 assert_eq!(report.regenerated_rows, 1);
7074
7075 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7076 let vec_count: i64 = conn
7077 .query_row(
7078 &format!("SELECT count(*) FROM {expected_vec_table}"),
7079 [],
7080 |row| row.get(0),
7081 )
7082 .expect("vec_document count");
7083 assert_eq!(vec_count, 1, "rows must be in vec_document");
7084
7085 let old_count: i64 = conn
7086 .query_row(
7087 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
7088 [],
7089 |r| r.get(0),
7090 )
7091 .expect("sqlite_master check");
7092 assert_eq!(
7093 old_count, 0,
7094 "vec_nodes_active must NOT be created for per-kind regen"
7095 );
7096 }
7097
7098 #[test]
7101 fn get_vec_profile_returns_none_when_no_profile_exists() {
7102 let (db, service) = setup();
7103 let _ = db;
7104 let result = service.get_vec_profile("MyKind").expect("should not error");
7105 assert!(
7106 result.is_none(),
7107 "must return None when no profile registered"
7108 );
7109 }
7110
7111 #[cfg(feature = "sqlite-vec")]
7112 #[test]
7113 fn get_vec_profile_returns_profile_for_registered_kind() {
7114 let db = NamedTempFile::new().expect("temp file");
7115 let schema = Arc::new(SchemaManager::new());
7116 {
7117 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7118 schema.bootstrap(&conn).expect("bootstrap");
7119 schema
7120 .ensure_vec_kind_profile(&conn, "MyKind", 128)
7121 .expect("ensure_vec_kind_profile");
7122 }
7123
7124 let service = AdminService::new(db.path(), Arc::clone(&schema));
7125 let profile = service.get_vec_profile("MyKind").expect("should not error");
7126 assert!(profile.is_some(), "must return profile after registration");
7127 assert_eq!(profile.expect("profile present").dimensions, 128);
7128 }
7129
7130 #[test]
7131 fn get_vec_profile_does_not_return_global_sentinel_row() {
7132 let (db, service) = setup();
7133 {
7134 let conn = sqlite::open_connection(db.path()).expect("conn");
7135 conn.execute(
7136 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
7137 VALUES ('*', 'vec', '{\"model_identity\":\"old-model\",\"dimensions\":384}', 0, 0)",
7138 [],
7139 )
7140 .expect("insert global sentinel");
7141 }
7142 let result = service
7143 .get_vec_profile("SomeKind")
7144 .expect("should not error");
7145 assert!(
7146 result.is_none(),
7147 "per-kind query must not return global ('*', 'vec') row"
7148 );
7149 }
7150}