1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::mpsc::SyncSender;
4
5use fathomdb_schema::SchemaManager;
6use serde::{Deserialize, Serialize};
7
8use crate::rebuild_actor::{RebuildMode, RebuildRequest};
9
10use crate::{
11 EngineError, ProjectionRepairReport, ProjectionService, ids::new_id,
12 projection::ProjectionTarget, sqlite,
13};
14
15mod fts;
16mod operational;
17mod provenance;
18mod vector;
19
20pub use vector::load_vector_regeneration_config;
21
22#[cfg(test)]
23use fts::{
24 create_or_replace_fts_kind_table, serialize_property_paths_json, validate_fts_property_paths,
25};
26
27#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
29pub struct IntegrityReport {
30 pub physical_ok: bool,
31 pub foreign_keys_ok: bool,
32 pub missing_fts_rows: usize,
33 pub missing_property_fts_rows: usize,
34 pub duplicate_active_logical_ids: usize,
35 pub operational_missing_collections: usize,
36 pub operational_missing_last_mutations: usize,
37 pub warnings: Vec<String>,
38}
39
40#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
42pub struct FtsPropertySchemaRecord {
43 pub kind: String,
45 pub property_paths: Vec<String>,
50 pub entries: Vec<FtsPropertyPathSpec>,
55 pub exclude_paths: Vec<String>,
58 pub separator: String,
60 pub format_version: i64,
62}
63
64#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
66#[serde(rename_all = "snake_case")]
67pub enum FtsPropertyPathMode {
68 #[default]
71 Scalar,
72 Recursive,
75}
76
77#[non_exhaustive]
79#[derive(Clone, Debug, PartialEq, Serialize)]
80pub struct FtsPropertyPathSpec {
81 pub path: String,
83 pub mode: FtsPropertyPathMode,
85 pub weight: Option<f32>,
88}
89
90impl Eq for FtsPropertyPathSpec {}
93
94impl FtsPropertyPathSpec {
95 #[must_use]
96 pub fn scalar(path: impl Into<String>) -> Self {
97 Self {
98 path: path.into(),
99 mode: FtsPropertyPathMode::Scalar,
100 weight: None,
101 }
102 }
103
104 #[must_use]
105 pub fn recursive(path: impl Into<String>) -> Self {
106 Self {
107 path: path.into(),
108 mode: FtsPropertyPathMode::Recursive,
109 weight: None,
110 }
111 }
112
113 #[must_use]
119 pub fn with_weight(mut self, weight: f32) -> Self {
120 self.weight = Some(weight);
121 self
122 }
123}
124
125#[derive(Clone, Copy, Debug)]
127pub struct SafeExportOptions {
128 pub force_checkpoint: bool,
132}
133
134impl Default for SafeExportOptions {
135 fn default() -> Self {
136 Self {
137 force_checkpoint: true,
138 }
139 }
140}
141
142const EXPORT_PROTOCOL_VERSION: u32 = 1;
144
145#[derive(Clone, Debug, Serialize)]
147pub struct SafeExportManifest {
148 pub exported_at: u64,
150 pub sha256: String,
152 pub schema_version: u32,
154 pub protocol_version: u32,
156 pub page_count: u64,
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
162pub struct TraceReport {
163 pub source_ref: String,
164 pub node_rows: usize,
165 pub edge_rows: usize,
166 pub action_rows: usize,
167 pub operational_mutation_rows: usize,
168 pub node_logical_ids: Vec<String>,
169 pub action_ids: Vec<String>,
170 pub operational_mutation_ids: Vec<String>,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
175pub struct SkippedEdge {
176 pub edge_logical_id: String,
177 pub missing_endpoint: String,
178}
179
180#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
182pub struct LogicalRestoreReport {
183 pub logical_id: String,
184 pub was_noop: bool,
185 pub restored_node_rows: usize,
186 pub restored_edge_rows: usize,
187 pub restored_chunk_rows: usize,
188 pub restored_fts_rows: usize,
189 pub restored_property_fts_rows: usize,
190 pub restored_vec_rows: usize,
191 pub skipped_edges: Vec<SkippedEdge>,
192 pub notes: Vec<String>,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
197pub struct LogicalPurgeReport {
198 pub logical_id: String,
199 pub was_noop: bool,
200 pub deleted_node_rows: usize,
201 pub deleted_edge_rows: usize,
202 pub deleted_chunk_rows: usize,
203 pub deleted_fts_rows: usize,
204 pub deleted_vec_rows: usize,
205 pub notes: Vec<String>,
206}
207
208#[derive(Clone, Debug, Serialize, Deserialize)]
210pub struct ProvenancePurgeOptions {
211 pub dry_run: bool,
212 #[serde(default)]
213 pub preserve_event_types: Vec<String>,
214}
215
216#[derive(Clone, Debug, Serialize)]
218pub struct ProvenancePurgeReport {
219 pub events_deleted: u64,
220 pub events_preserved: u64,
221 pub oldest_remaining: Option<i64>,
222}
223
224#[derive(Debug)]
226pub struct AdminService {
227 pub(super) database_path: PathBuf,
228 pub(super) schema_manager: Arc<SchemaManager>,
229 pub(super) projections: ProjectionService,
230 pub(super) rebuild_sender: Option<SyncSender<RebuildRequest>>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
238pub struct SemanticReport {
239 pub orphaned_chunks: usize,
241 pub null_source_ref_nodes: usize,
243 pub broken_step_fk: usize,
245 pub broken_action_fk: usize,
247 pub stale_fts_rows: usize,
249 pub fts_rows_for_superseded_nodes: usize,
251 pub stale_property_fts_rows: usize,
253 pub orphaned_property_fts_rows: usize,
255 pub mismatched_kind_property_fts_rows: usize,
257 pub duplicate_property_fts_rows: usize,
259 pub drifted_property_fts_rows: usize,
261 pub dangling_edges: usize,
263 pub orphaned_supersession_chains: usize,
265 pub stale_vec_rows: usize,
267 pub vec_rows_for_superseded_nodes: usize,
269 pub missing_operational_current_rows: usize,
271 pub stale_operational_current_rows: usize,
273 pub disabled_collection_mutations: usize,
275 pub orphaned_last_access_metadata_rows: usize,
277 pub warnings: Vec<String>,
278}
279
280#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
293#[serde(rename_all = "snake_case", deny_unknown_fields)]
294pub struct VectorRegenerationConfig {
295 pub kind: String,
296 pub profile: String,
297 pub chunking_policy: String,
298 pub preprocessing_policy: String,
299}
300
301#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
303pub struct VectorRegenerationReport {
304 pub profile: String,
305 pub table_name: String,
306 pub dimension: usize,
307 pub total_chunks: usize,
308 pub regenerated_rows: usize,
309 pub contract_persisted: bool,
310 pub notes: Vec<String>,
311}
312
313#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
317pub struct FtsProfile {
318 pub kind: String,
320 pub tokenizer: String,
322 pub active_at: Option<i64>,
324 pub created_at: i64,
326}
327
328#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
332pub struct VecProfile {
333 pub model_identity: String,
335 pub model_version: Option<String>,
337 pub dimensions: u32,
339 pub active_at: Option<i64>,
341 pub created_at: i64,
343}
344
345#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
349pub struct ProjectionImpact {
350 pub rows_to_rebuild: u64,
352 pub estimated_seconds: u64,
354 pub temp_db_size_bytes: u64,
356 pub current_tokenizer: Option<String>,
358 pub target_tokenizer: Option<String>,
360}
361
362pub const TOKENIZER_PRESETS: &[(&str, &str)] = &[
364 (
365 "recall-optimized-english",
366 "porter unicode61 remove_diacritics 2",
367 ),
368 ("precision-optimized", "unicode61 remove_diacritics 2"),
369 ("global-cjk", "icu"),
370 ("substring-trigram", "trigram"),
371 ("source-code", "unicode61 tokenchars '._-$@'"),
372];
373
374pub fn resolve_tokenizer_preset(input: &str) -> &str {
379 for (name, value) in TOKENIZER_PRESETS {
380 if *name == input {
381 return value;
382 }
383 }
384 input
385}
386
387pub(super) const CURRENT_VECTOR_CONTRACT_FORMAT_VERSION: i64 = 1;
388pub(super) const MAX_PROFILE_LEN: usize = 128;
389pub(super) const MAX_POLICY_LEN: usize = 128;
390pub(super) const MAX_CONTRACT_JSON_BYTES: usize = 32 * 1024;
391pub(super) const MAX_AUDIT_METADATA_BYTES: usize = 2048;
392const DEFAULT_OPERATIONAL_READ_LIMIT: usize = 100;
393const MAX_OPERATIONAL_READ_LIMIT: usize = 1000;
394
395#[derive(Clone, Debug)]
397pub struct AdminHandle {
398 inner: Arc<AdminService>,
399}
400
401impl AdminHandle {
402 #[must_use]
404 pub fn new(service: AdminService) -> Self {
405 Self {
406 inner: Arc::new(service),
407 }
408 }
409
410 #[must_use]
412 pub fn service(&self) -> Arc<AdminService> {
413 Arc::clone(&self.inner)
414 }
415}
416
417impl AdminService {
418 #[must_use]
420 pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
421 let database_path = path.as_ref().to_path_buf();
422 let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
423 Self {
424 database_path,
425 schema_manager,
426 projections,
427 rebuild_sender: None,
428 }
429 }
430
431 #[must_use]
433 pub fn new_with_rebuild(
434 path: impl AsRef<Path>,
435 schema_manager: Arc<SchemaManager>,
436 rebuild_sender: SyncSender<RebuildRequest>,
437 ) -> Self {
438 let database_path = path.as_ref().to_path_buf();
439 let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
440 Self {
441 database_path,
442 schema_manager,
443 projections,
444 rebuild_sender: Some(rebuild_sender),
445 }
446 }
447
448 pub(super) fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
449 #[cfg(feature = "sqlite-vec")]
450 let conn = sqlite::open_connection_with_vec(&self.database_path)?;
451 #[cfg(not(feature = "sqlite-vec"))]
452 let conn = sqlite::open_connection(&self.database_path)?;
453 self.schema_manager.bootstrap(&conn)?;
454 Ok(conn)
455 }
456
457 pub fn check_integrity(&self) -> Result<IntegrityReport, EngineError> {
460 let conn = self.connect()?;
461
462 let physical_result: String =
463 conn.query_row("PRAGMA integrity_check", [], |row| row.get(0))?;
464 let foreign_key_count: i64 =
465 conn.query_row("SELECT count(*) FROM pragma_foreign_key_check", [], |row| {
466 row.get(0)
467 })?;
468 let missing_fts_rows: i64 = conn.query_row(
469 r"
470 SELECT count(*)
471 FROM chunks c
472 JOIN nodes n
473 ON n.logical_id = c.node_logical_id
474 AND n.superseded_at IS NULL
475 WHERE NOT EXISTS (
476 SELECT 1
477 FROM fts_nodes f
478 WHERE f.chunk_id = c.id
479 )
480 ",
481 [],
482 |row| row.get(0),
483 )?;
484 let duplicate_active: i64 = conn.query_row(
485 r"
486 SELECT count(*)
487 FROM (
488 SELECT logical_id
489 FROM nodes
490 WHERE superseded_at IS NULL
491 GROUP BY logical_id
492 HAVING count(*) > 1
493 )
494 ",
495 [],
496 |row| row.get(0),
497 )?;
498 let operational_missing_collections: i64 = conn.query_row(
499 r"
500 SELECT (
501 SELECT count(*)
502 FROM operational_mutations m
503 LEFT JOIN operational_collections c ON c.name = m.collection_name
504 WHERE c.name IS NULL
505 ) + (
506 SELECT count(*)
507 FROM operational_current oc
508 LEFT JOIN operational_collections c ON c.name = oc.collection_name
509 WHERE c.name IS NULL
510 )
511 ",
512 [],
513 |row| row.get(0),
514 )?;
515 let operational_missing_last_mutations: i64 = conn.query_row(
516 r"
517 SELECT count(*)
518 FROM operational_current oc
519 LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
520 WHERE m.id IS NULL
521 ",
522 [],
523 |row| row.get(0),
524 )?;
525
526 let missing_property_fts_rows = count_missing_property_fts_rows(&conn)?;
530
531 let mut warnings = Vec::new();
532 if missing_fts_rows > 0 {
533 warnings.push("missing FTS projections detected".to_owned());
534 }
535 if missing_property_fts_rows > 0 {
536 warnings.push("missing property FTS projections detected".to_owned());
537 }
538 if duplicate_active > 0 {
539 warnings.push("duplicate active logical_ids detected".to_owned());
540 }
541 if operational_missing_collections > 0 {
542 warnings.push("operational rows reference missing collections".to_owned());
543 }
544 if operational_missing_last_mutations > 0 {
545 warnings.push("operational current rows reference missing last mutations".to_owned());
546 }
547
548 Ok(IntegrityReport {
553 physical_ok: physical_result == "ok",
554 foreign_keys_ok: foreign_key_count == 0,
555 missing_fts_rows: i64_to_usize(missing_fts_rows),
556 missing_property_fts_rows: i64_to_usize(missing_property_fts_rows),
557 duplicate_active_logical_ids: i64_to_usize(duplicate_active),
558 operational_missing_collections: i64_to_usize(operational_missing_collections),
559 operational_missing_last_mutations: i64_to_usize(operational_missing_last_mutations),
560 warnings,
561 })
562 }
563
564 #[allow(clippy::too_many_lines)]
567 pub fn check_semantics(&self) -> Result<SemanticReport, EngineError> {
568 let conn = self.connect()?;
569
570 let orphaned_chunks: i64 = conn.query_row(
571 r"
572 SELECT count(*)
573 FROM chunks c
574 WHERE NOT EXISTS (
575 SELECT 1 FROM nodes n
576 WHERE n.logical_id = c.node_logical_id
577 )
578 ",
579 [],
580 |row| row.get(0),
581 )?;
582
583 let null_source_ref_nodes: i64 = conn.query_row(
584 "SELECT count(*) FROM nodes WHERE source_ref IS NULL AND superseded_at IS NULL",
585 [],
586 |row| row.get(0),
587 )?;
588
589 let broken_step_fk: i64 = conn.query_row(
590 r"
591 SELECT count(*) FROM steps s
592 WHERE NOT EXISTS (SELECT 1 FROM runs r WHERE r.id = s.run_id)
593 ",
594 [],
595 |row| row.get(0),
596 )?;
597
598 let broken_action_fk: i64 = conn.query_row(
599 r"
600 SELECT count(*) FROM actions a
601 WHERE NOT EXISTS (SELECT 1 FROM steps s WHERE s.id = a.step_id)
602 ",
603 [],
604 |row| row.get(0),
605 )?;
606
607 let stale_fts_rows: i64 = conn.query_row(
608 r"
609 SELECT count(*) FROM fts_nodes f
610 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = f.chunk_id)
611 ",
612 [],
613 |row| row.get(0),
614 )?;
615
616 let fts_rows_for_superseded_nodes: i64 = conn.query_row(
617 r"
618 SELECT count(*) FROM fts_nodes f
619 WHERE NOT EXISTS (
620 SELECT 1 FROM nodes n
621 WHERE n.logical_id = f.node_logical_id AND n.superseded_at IS NULL
622 )
623 ",
624 [],
625 |row| row.get(0),
626 )?;
627
628 let (
629 stale_property_fts_rows,
630 orphaned_property_fts_rows,
631 mismatched_kind_property_fts_rows,
632 duplicate_property_fts_rows,
633 ) = count_per_kind_property_fts_issues(&conn)?;
634
635 let drifted_property_fts_rows = count_drifted_property_fts_rows(&conn)?;
636
637 let dangling_edges: i64 = conn.query_row(
638 r"
639 SELECT count(*) FROM edges e
640 WHERE e.superseded_at IS NULL AND (
641 NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.source_logical_id AND n.superseded_at IS NULL)
642 OR
643 NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.target_logical_id AND n.superseded_at IS NULL)
644 )
645 ",
646 [],
647 |row| row.get(0),
648 )?;
649
650 let orphaned_supersession_chains: i64 = conn.query_row(
651 r"
652 SELECT count(*) FROM (
653 SELECT logical_id FROM nodes
654 GROUP BY logical_id
655 HAVING count(*) > 0 AND sum(CASE WHEN superseded_at IS NULL THEN 1 ELSE 0 END) = 0
656 )
657 ",
658 [],
659 |row| row.get(0),
660 )?;
661
662 #[cfg(feature = "sqlite-vec")]
664 let (stale_vec_rows, vec_rows_for_superseded_nodes): (i64, i64) = {
665 let kinds: Vec<String> =
666 match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
667 Ok(mut stmt) => stmt
668 .query_map([], |row| row.get(0))
669 .map_err(EngineError::Sqlite)?
670 .collect::<Result<Vec<_>, _>>()
671 .map_err(EngineError::Sqlite)?,
672 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
673 if msg.contains("no such table: projection_profiles") =>
674 {
675 vec![]
676 }
677 Err(e) => return Err(EngineError::Sqlite(e)),
678 };
679 let mut stale = 0i64;
680 let mut superseded = 0i64;
681 for kind in &kinds {
682 let table = fathomdb_schema::vec_kind_table_name(kind);
683 let stale_sql = format!(
684 "SELECT count(*) FROM {table} v \
685 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = v.chunk_id)"
686 );
687 let superseded_sql = format!(
688 "SELECT count(*) FROM {table} v \
689 JOIN chunks c ON c.id = v.chunk_id \
690 WHERE NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = c.node_logical_id)"
691 );
692 stale += match conn.query_row(&stale_sql, [], |row| row.get(0)) {
693 Ok(n) => n,
694 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
695 if msg.contains("no such table:")
696 || msg.contains("no such module: vec0") =>
697 {
698 0
699 }
700 Err(e) => return Err(EngineError::Sqlite(e)),
701 };
702 superseded += match conn.query_row(&superseded_sql, [], |row| row.get(0)) {
703 Ok(n) => n,
704 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
705 if msg.contains("no such table:")
706 || msg.contains("no such module: vec0") =>
707 {
708 0
709 }
710 Err(e) => return Err(EngineError::Sqlite(e)),
711 };
712 }
713 (stale, superseded)
714 };
715 #[cfg(not(feature = "sqlite-vec"))]
716 let stale_vec_rows: i64 = 0;
717 #[cfg(not(feature = "sqlite-vec"))]
718 let vec_rows_for_superseded_nodes: i64 = 0;
719 let missing_operational_current_rows: i64 = conn.query_row(
720 r"
721 SELECT count(*)
722 FROM operational_mutations m
723 JOIN operational_collections c
724 ON c.name = m.collection_name
725 AND c.kind = 'latest_state'
726 WHERE m.op_kind = 'put'
727 AND NOT EXISTS (
728 SELECT 1
729 FROM operational_mutations newer
730 WHERE newer.collection_name = m.collection_name
731 AND newer.record_key = m.record_key
732 AND newer.mutation_order > m.mutation_order
733 )
734 AND NOT EXISTS (
735 SELECT 1
736 FROM operational_current oc
737 WHERE oc.collection_name = m.collection_name
738 AND oc.record_key = m.record_key
739 )
740 ",
741 [],
742 |row| row.get(0),
743 )?;
744 let stale_operational_current_rows: i64 = conn.query_row(
745 r"
746 SELECT count(*)
747 FROM operational_current oc
748 JOIN operational_collections c
749 ON c.name = oc.collection_name
750 AND c.kind = 'latest_state'
751 LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
752 WHERE m.id IS NULL
753 OR m.collection_name != oc.collection_name
754 OR m.record_key != oc.record_key
755 OR m.op_kind != 'put'
756 OR m.payload_json != oc.payload_json
757 OR EXISTS (
758 SELECT 1
759 FROM operational_mutations newer
760 WHERE newer.collection_name = oc.collection_name
761 AND newer.record_key = oc.record_key
762 AND newer.mutation_order > m.mutation_order
763 )
764 ",
765 [],
766 |row| row.get(0),
767 )?;
768 let disabled_collection_mutations: i64 = conn.query_row(
769 r"
770 SELECT count(*)
771 FROM operational_mutations m
772 JOIN operational_collections c ON c.name = m.collection_name
773 WHERE c.disabled_at IS NOT NULL AND m.created_at > c.disabled_at
774 ",
775 [],
776 |row| row.get(0),
777 )?;
778 let orphaned_last_access_metadata_rows: i64 = conn.query_row(
779 r"
780 SELECT count(*)
781 FROM node_access_metadata am
782 WHERE NOT EXISTS (
783 SELECT 1 FROM nodes n WHERE n.logical_id = am.logical_id
784 )
785 ",
786 [],
787 |row| row.get(0),
788 )?;
789
790 let mut warnings = Vec::new();
791 if orphaned_chunks > 0 {
792 warnings.push(format!(
793 "{orphaned_chunks} orphaned chunk(s) with no surviving node history"
794 ));
795 }
796 if null_source_ref_nodes > 0 {
797 warnings.push(format!(
798 "{null_source_ref_nodes} active node(s) with null source_ref"
799 ));
800 }
801 if broken_step_fk > 0 {
802 warnings.push(format!(
803 "{broken_step_fk} step(s) referencing non-existent run"
804 ));
805 }
806 if broken_action_fk > 0 {
807 warnings.push(format!(
808 "{broken_action_fk} action(s) referencing non-existent step"
809 ));
810 }
811 if stale_fts_rows > 0 {
812 warnings.push(format!(
813 "{stale_fts_rows} stale FTS row(s) referencing missing chunk"
814 ));
815 }
816 if fts_rows_for_superseded_nodes > 0 {
817 warnings.push(format!(
818 "{fts_rows_for_superseded_nodes} FTS row(s) for superseded node(s)"
819 ));
820 }
821 if stale_property_fts_rows > 0 {
822 warnings.push(format!(
823 "{stale_property_fts_rows} stale property FTS row(s) for superseded/missing node(s)"
824 ));
825 }
826 if orphaned_property_fts_rows > 0 {
827 warnings.push(format!(
828 "{orphaned_property_fts_rows} orphaned property FTS row(s) for unregistered kind(s)"
829 ));
830 }
831 if mismatched_kind_property_fts_rows > 0 {
832 warnings.push(format!(
833 "{mismatched_kind_property_fts_rows} property FTS row(s) whose kind does not match the active node"
834 ));
835 }
836 if duplicate_property_fts_rows > 0 {
837 warnings.push(format!(
838 "{duplicate_property_fts_rows} active logical ID(s) with duplicate property FTS rows"
839 ));
840 }
841 if drifted_property_fts_rows > 0 {
842 warnings.push(format!(
843 "{drifted_property_fts_rows} property FTS row(s) with stale text_content"
844 ));
845 }
846 if dangling_edges > 0 {
847 warnings.push(format!(
848 "{dangling_edges} active edge(s) with missing endpoint node"
849 ));
850 }
851 if orphaned_supersession_chains > 0 {
852 warnings.push(format!(
853 "{orphaned_supersession_chains} logical_id(s) with all versions superseded"
854 ));
855 }
856 if stale_vec_rows > 0 {
857 warnings.push(format!(
858 "{stale_vec_rows} stale vec row(s) referencing missing chunk"
859 ));
860 }
861 if vec_rows_for_superseded_nodes > 0 {
862 warnings.push(format!(
863 "{vec_rows_for_superseded_nodes} vec row(s) whose node history is missing"
864 ));
865 }
866 if missing_operational_current_rows > 0 {
867 warnings.push(format!(
868 "{missing_operational_current_rows} latest-state key(s) missing operational_current rows"
869 ));
870 }
871 if stale_operational_current_rows > 0 {
872 warnings.push(format!(
873 "{stale_operational_current_rows} stale operational_current row(s)"
874 ));
875 }
876 if disabled_collection_mutations > 0 {
877 warnings.push(format!(
878 "{disabled_collection_mutations} mutation(s) were written after collection disable"
879 ));
880 }
881 if orphaned_last_access_metadata_rows > 0 {
882 warnings.push(format!(
883 "{orphaned_last_access_metadata_rows} last_access metadata row(s) reference missing node history"
884 ));
885 }
886
887 Ok(SemanticReport {
888 orphaned_chunks: i64_to_usize(orphaned_chunks),
889 null_source_ref_nodes: i64_to_usize(null_source_ref_nodes),
890 broken_step_fk: i64_to_usize(broken_step_fk),
891 broken_action_fk: i64_to_usize(broken_action_fk),
892 stale_fts_rows: i64_to_usize(stale_fts_rows),
893 fts_rows_for_superseded_nodes: i64_to_usize(fts_rows_for_superseded_nodes),
894 stale_property_fts_rows: i64_to_usize(stale_property_fts_rows),
895 orphaned_property_fts_rows: i64_to_usize(orphaned_property_fts_rows),
896 mismatched_kind_property_fts_rows: i64_to_usize(mismatched_kind_property_fts_rows),
897 duplicate_property_fts_rows: i64_to_usize(duplicate_property_fts_rows),
898 drifted_property_fts_rows: i64_to_usize(drifted_property_fts_rows),
899 dangling_edges: i64_to_usize(dangling_edges),
900 orphaned_supersession_chains: i64_to_usize(orphaned_supersession_chains),
901 stale_vec_rows: i64_to_usize(stale_vec_rows),
902 vec_rows_for_superseded_nodes: i64_to_usize(vec_rows_for_superseded_nodes),
903 missing_operational_current_rows: i64_to_usize(missing_operational_current_rows),
904 stale_operational_current_rows: i64_to_usize(stale_operational_current_rows),
905 disabled_collection_mutations: i64_to_usize(disabled_collection_mutations),
906 orphaned_last_access_metadata_rows: i64_to_usize(orphaned_last_access_metadata_rows),
907 warnings,
908 })
909 }
910}
911
912fn count_per_kind_property_fts_issues(
920 conn: &rusqlite::Connection,
921) -> Result<(i64, i64, i64, i64), EngineError> {
922 let per_kind_tables: Vec<String> = {
926 let mut stmt = conn.prepare(
927 "SELECT name FROM sqlite_master \
928 WHERE type='table' AND name LIKE 'fts_props_%' \
929 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
930 )?;
931 stmt.query_map([], |r| r.get::<_, String>(0))?
932 .collect::<Result<Vec<_>, _>>()?
933 };
934
935 let registered_kinds: std::collections::HashSet<String> = {
936 let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
937 stmt.query_map([], |r| r.get::<_, String>(0))?
938 .collect::<Result<std::collections::HashSet<_>, _>>()?
939 };
940
941 let mut stale = 0i64;
942 let mut orphaned = 0i64;
943 let mut duplicate = 0i64;
944
945 for table in &per_kind_tables {
946 let kind_stale: i64 = conn.query_row(
948 &format!(
949 "SELECT count(*) FROM {table} fp \
950 WHERE NOT EXISTS (\
951 SELECT 1 FROM nodes n \
952 WHERE n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL\
953 )"
954 ),
955 [],
956 |r| r.get(0),
957 )?;
958 stale += kind_stale;
959
960 let kind_dup: i64 = conn.query_row(
962 &format!(
963 "SELECT count(*) FROM (\
964 SELECT node_logical_id FROM {table} \
965 GROUP BY node_logical_id HAVING count(*) > 1\
966 )"
967 ),
968 [],
969 |r| r.get(0),
970 )?;
971 duplicate += kind_dup;
972
973 let table_has_schema = registered_kinds
976 .iter()
977 .any(|k| fathomdb_schema::fts_kind_table_name(k) == *table);
978 if !table_has_schema {
979 let table_rows: i64 =
980 conn.query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))?;
981 orphaned += table_rows;
982 }
983 }
984
985 Ok((stale, orphaned, 0, duplicate))
987}
988
989fn count_missing_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
993 let schemas = crate::writer::load_fts_property_schemas(conn)?;
994 if schemas.is_empty() {
995 return Ok(0);
996 }
997
998 let mut missing = 0i64;
999 for (kind, schema) in &schemas {
1000 let table = fathomdb_schema::fts_kind_table_name(kind);
1001 let table_exists: bool = conn
1003 .query_row(
1004 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1005 [table.as_str()],
1006 |r| r.get::<_, i64>(0),
1007 )
1008 .unwrap_or(0)
1009 > 0;
1010
1011 if table_exists {
1012 let mut stmt = conn.prepare(&format!(
1013 "SELECT n.logical_id, n.properties FROM nodes n \
1014 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
1015 AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
1016 ))?;
1017 let rows = stmt.query_map([kind.as_str()], |row| {
1018 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1019 })?;
1020 for row in rows {
1021 let (_logical_id, properties_str) = row?;
1022 let props: serde_json::Value =
1023 serde_json::from_str(&properties_str).unwrap_or_default();
1024 if crate::writer::extract_property_fts(&props, schema)
1025 .0
1026 .is_some()
1027 {
1028 missing += 1;
1029 }
1030 }
1031 } else {
1032 let mut stmt = conn.prepare(
1034 "SELECT n.logical_id, n.properties FROM nodes n \
1035 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
1036 )?;
1037 let rows = stmt.query_map([kind.as_str()], |row| {
1038 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1039 })?;
1040 for row in rows {
1041 let (_logical_id, properties_str) = row?;
1042 let props: serde_json::Value =
1043 serde_json::from_str(&properties_str).unwrap_or_default();
1044 if crate::writer::extract_property_fts(&props, schema)
1045 .0
1046 .is_some()
1047 {
1048 missing += 1;
1049 }
1050 }
1051 }
1052 }
1053 Ok(missing)
1054}
1055
1056fn count_drifted_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1070 let schemas = crate::writer::load_fts_property_schemas(conn)?;
1071 if schemas.is_empty() {
1072 return Ok(0);
1073 }
1074
1075 let mut drifted = 0i64;
1076 for (kind, schema) in &schemas {
1077 let table = fathomdb_schema::fts_kind_table_name(kind);
1078 let table_exists: bool = conn
1080 .query_row(
1081 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1082 [table.as_str()],
1083 |r| r.get::<_, i64>(0),
1084 )
1085 .unwrap_or(0)
1086 > 0;
1087 if !table_exists {
1088 continue;
1089 }
1090
1091 drifted += if schema.is_weighted() {
1101 count_drifted_weighted(conn, kind, &table, schema)?
1102 } else {
1103 count_drifted_non_weighted(conn, kind, &table, schema)?
1104 };
1105 }
1106 Ok(drifted)
1107}
1108
1109fn count_drifted_non_weighted(
1112 conn: &rusqlite::Connection,
1113 kind: &str,
1114 table: &str,
1115 schema: &crate::writer::PropertyFtsSchema,
1116) -> Result<i64, EngineError> {
1117 let mut drifted = 0i64;
1118 let mut stmt = conn.prepare(&format!(
1119 "SELECT fp.node_logical_id, fp.text_content, n.properties \
1120 FROM {table} fp \
1121 JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1122 WHERE n.kind = ?1"
1123 ))?;
1124 let rows = stmt.query_map([kind], |row| {
1125 Ok((
1126 row.get::<_, String>(0)?,
1127 row.get::<_, String>(1)?,
1128 row.get::<_, String>(2)?,
1129 ))
1130 })?;
1131 for row in rows {
1132 let (_logical_id, stored_text, properties_str) = row?;
1133 let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1134 let (expected, _positions, _stats) = crate::writer::extract_property_fts(&props, schema);
1135 match expected {
1136 Some(text) if text == stored_text => {}
1137 _ => drifted += 1,
1138 }
1139 }
1140 Ok(drifted)
1141}
1142
1143fn count_drifted_weighted(
1147 conn: &rusqlite::Connection,
1148 kind: &str,
1149 table: &str,
1150 schema: &crate::writer::PropertyFtsSchema,
1151) -> Result<i64, EngineError> {
1152 let columns: Vec<String> = schema
1155 .paths
1156 .iter()
1157 .map(|entry| {
1158 let is_recursive = matches!(entry.mode, crate::writer::PropertyPathMode::Recursive);
1159 fathomdb_schema::fts_column_name(&entry.path, is_recursive)
1160 })
1161 .collect();
1162 if columns.is_empty() {
1163 return Ok(0);
1167 }
1168
1169 let select_cols: String = columns
1170 .iter()
1171 .map(|c| format!("fp.\"{c}\""))
1172 .collect::<Vec<_>>()
1173 .join(", ");
1174 let sql = format!(
1175 "SELECT fp.node_logical_id, {select_cols}, n.properties \
1176 FROM {table} fp \
1177 JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1178 WHERE n.kind = ?1"
1179 );
1180 let mut stmt = conn.prepare(&sql)?;
1181 let props_col_idx = columns.len() + 1;
1186 let rows = stmt.query_map([kind], |row| {
1187 let mut stored: Vec<String> = Vec::with_capacity(columns.len());
1188 for i in 0..columns.len() {
1189 stored.push(row.get::<_, String>(i + 1)?);
1190 }
1191 let properties: String = row.get(props_col_idx)?;
1192 Ok((stored, properties))
1193 })?;
1194
1195 let mut drifted = 0i64;
1196 for row in rows {
1197 let (stored, properties_str) = row?;
1198 let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1199 let expected = crate::writer::extract_property_fts_columns(&props, schema);
1200 let row_drifted = if expected.len() == stored.len() {
1204 expected
1205 .iter()
1206 .zip(stored.iter())
1207 .any(|((_name, exp_text), stored_text)| exp_text != stored_text)
1208 } else {
1209 true
1210 };
1211 if row_drifted {
1212 drifted += 1;
1213 }
1214 }
1215 Ok(drifted)
1216}
1217
1218#[allow(clippy::expect_used)]
1221pub(super) fn i64_to_usize(val: i64) -> usize {
1222 usize::try_from(val).expect("count(*) must be non-negative")
1223}
1224
1225pub(super) fn persist_simple_provenance_event(
1226 conn: &rusqlite::Connection,
1227 event_type: &str,
1228 subject: &str,
1229 metadata: Option<serde_json::Value>,
1230) -> Result<(), EngineError> {
1231 let metadata_json = metadata.map(|value| value.to_string()).unwrap_or_default();
1232 conn.execute(
1233 "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1234 rusqlite::params![new_id(), event_type, subject, metadata_json],
1235 )?;
1236 Ok(())
1237}
1238
1239pub(super) fn rebuild_operational_current_rows(
1240 tx: &rusqlite::Transaction<'_>,
1241 collections: &[String],
1242) -> Result<usize, EngineError> {
1243 let mut rebuilt_rows = 0usize;
1244 clear_operational_current_rows(tx, collections)?;
1245 let mut ins_current = tx.prepare_cached(
1246 "INSERT INTO operational_current \
1247 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1248 VALUES (?1, ?2, ?3, ?4, ?5)",
1249 )?;
1250
1251 for collection in collections {
1252 let mut stmt = tx.prepare(
1253 "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
1254 FROM operational_mutations \
1255 WHERE collection_name = ?1 \
1256 ORDER BY record_key, mutation_order",
1257 )?;
1258 let mut latest_by_key: std::collections::HashMap<String, Option<(String, i64, String)>> =
1259 std::collections::HashMap::new();
1260 let rows = stmt.query_map([collection], operational::map_operational_mutation_row)?;
1261 for row in rows {
1262 let mutation = row?;
1263 match mutation.op_kind.as_str() {
1264 "put" => {
1265 latest_by_key.insert(
1266 mutation.record_key,
1267 Some((mutation.payload_json, mutation.created_at, mutation.id)),
1268 );
1269 }
1270 "delete" => {
1271 latest_by_key.insert(mutation.record_key, None);
1272 }
1273 _ => {}
1274 }
1275 }
1276
1277 for (record_key, state) in latest_by_key {
1278 if let Some((payload_json, updated_at, last_mutation_id)) = state {
1279 ins_current.execute(rusqlite::params![
1280 collection,
1281 record_key,
1282 payload_json,
1283 updated_at,
1284 last_mutation_id,
1285 ])?;
1286 rebuilt_rows += 1;
1287 }
1288 }
1289 }
1290
1291 drop(ins_current);
1292 Ok(rebuilt_rows)
1293}
1294
1295pub(super) fn clear_operational_current_rows(
1296 tx: &rusqlite::Transaction<'_>,
1297 collections: &[String],
1298) -> Result<(), EngineError> {
1299 let mut delete_current =
1300 tx.prepare_cached("DELETE FROM operational_current WHERE collection_name = ?1")?;
1301 let mut delete_secondary_current = tx.prepare_cached(
1302 "DELETE FROM operational_secondary_index_entries \
1303 WHERE collection_name = ?1 AND subject_kind = 'current'",
1304 )?;
1305 for collection in collections {
1306 delete_secondary_current.execute([collection])?;
1307 delete_current.execute([collection])?;
1308 }
1309 drop(delete_secondary_current);
1310 drop(delete_current);
1311 Ok(())
1312}
1313
1314#[cfg(test)]
1315#[allow(clippy::expect_used)]
1316mod tests {
1317 use std::fs;
1318 use std::sync::Arc;
1319
1320 use fathomdb_schema::SchemaManager;
1321 use tempfile::NamedTempFile;
1322
1323 use super::{
1324 AdminService, FtsPropertyPathMode, FtsPropertyPathSpec, SafeExportOptions,
1325 VectorRegenerationConfig,
1326 };
1327 use crate::embedder::{BatchEmbedder, EmbedderError, QueryEmbedder, QueryEmbedderIdentity};
1328 use crate::projection::ProjectionTarget;
1329 use crate::sqlite;
1330 use crate::{EngineError, OperationalCollectionKind, OperationalRegisterRequest};
1331
1332 #[cfg(feature = "sqlite-vec")]
1333 use crate::{ExecutionCoordinator, TelemetryCounters};
1334
1335 #[cfg(feature = "sqlite-vec")]
1336 use fathomdb_query::QueryBuilder;
1337
1338 #[cfg(feature = "sqlite-vec")]
1339 use super::load_vector_regeneration_config;
1340
1341 #[derive(Debug)]
1345 #[allow(dead_code)]
1346 struct TestEmbedder {
1347 identity: QueryEmbedderIdentity,
1348 vector: Vec<f32>,
1349 }
1350
1351 #[allow(dead_code)]
1352 impl TestEmbedder {
1353 fn new(model: &str, dimension: usize) -> Self {
1354 Self {
1355 identity: QueryEmbedderIdentity {
1356 model_identity: model.to_owned(),
1357 model_version: "1.0.0".to_owned(),
1358 dimension,
1359 normalization_policy: "l2".to_owned(),
1360 },
1361 vector: vec![1.0; dimension],
1362 }
1363 }
1364 }
1365
1366 impl QueryEmbedder for TestEmbedder {
1367 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1368 Ok(self.vector.clone())
1369 }
1370 fn identity(&self) -> QueryEmbedderIdentity {
1371 self.identity.clone()
1372 }
1373 fn max_tokens(&self) -> usize {
1374 512
1375 }
1376 }
1377
1378 impl BatchEmbedder for TestEmbedder {
1379 fn batch_embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, EmbedderError> {
1380 Ok(texts.iter().map(|_| self.vector.clone()).collect())
1381 }
1382 fn identity(&self) -> QueryEmbedderIdentity {
1383 self.identity.clone()
1384 }
1385 fn max_tokens(&self) -> usize {
1386 512
1387 }
1388 }
1389
1390 #[derive(Debug)]
1393 #[allow(dead_code)]
1394 struct FailingEmbedder {
1395 identity: QueryEmbedderIdentity,
1396 }
1397
1398 impl QueryEmbedder for FailingEmbedder {
1399 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1400 Err(EmbedderError::Failed("test failure".to_owned()))
1401 }
1402 fn identity(&self) -> QueryEmbedderIdentity {
1403 self.identity.clone()
1404 }
1405 fn max_tokens(&self) -> usize {
1406 512
1407 }
1408 }
1409
1410 #[allow(dead_code)]
1411 #[cfg(unix)]
1412 fn set_file_mode(path: &std::path::Path, mode: u32) {
1413 use std::os::unix::fs::PermissionsExt;
1414
1415 let mut permissions = fs::metadata(path).expect("script metadata").permissions();
1416 permissions.set_mode(mode);
1417 fs::set_permissions(path, permissions).expect("chmod");
1418 }
1419
1420 #[allow(dead_code)]
1421 #[cfg(not(unix))]
1422 fn set_file_mode(_path: &std::path::Path, _mode: u32) {}
1423 fn setup() -> (NamedTempFile, AdminService) {
1424 let db = NamedTempFile::new().expect("temp file");
1425 let schema = Arc::new(SchemaManager::new());
1426 {
1427 let conn = sqlite::open_connection(db.path()).expect("connection");
1428 schema.bootstrap(&conn).expect("bootstrap");
1429 }
1430 let service = AdminService::new(db.path(), Arc::clone(&schema));
1431 (db, service)
1432 }
1433
1434 #[test]
1435 fn check_integrity_includes_active_uniqueness_count() {
1436 let (_db, service) = setup();
1437 let report = service.check_integrity().expect("integrity check");
1438 assert_eq!(report.duplicate_active_logical_ids, 0);
1439 assert_eq!(report.operational_missing_collections, 0);
1440 assert_eq!(report.operational_missing_last_mutations, 0);
1441 }
1442
1443 #[test]
1444 fn trace_source_returns_node_logical_ids() {
1445 let (db, service) = setup();
1446 {
1447 let conn = sqlite::open_connection(db.path()).expect("conn");
1448 conn.execute(
1449 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1450 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 'source-1')",
1451 [],
1452 )
1453 .expect("insert node");
1454 }
1455 let report = service.trace_source("source-1").expect("trace");
1456 assert_eq!(report.node_rows, 1);
1457 assert_eq!(report.node_logical_ids, vec!["lg1"]);
1458 }
1459
1460 #[test]
1461 fn trace_source_includes_operational_mutations() {
1462 let (db, service) = setup();
1463 {
1464 let conn = sqlite::open_connection(db.path()).expect("conn");
1465 conn.execute(
1466 "INSERT INTO operational_collections \
1467 (name, kind, schema_json, retention_json, format_version, created_at) \
1468 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1469 [],
1470 )
1471 .expect("insert collection");
1472 conn.execute(
1473 "INSERT INTO operational_mutations \
1474 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1475 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"ok\"}', 'source-1', 100, 1)",
1476 [],
1477 )
1478 .expect("insert mutation");
1479 }
1480
1481 let report = service.trace_source("source-1").expect("trace");
1482 assert_eq!(report.operational_mutation_rows, 1);
1483 assert_eq!(report.operational_mutation_ids, vec!["m1"]);
1484 }
1485
1486 #[test]
1487 fn excise_source_restores_prior_active_node() {
1488 let (db, service) = setup();
1489 {
1490 let conn = sqlite::open_connection(db.path()).expect("conn");
1491 conn.execute(
1492 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1493 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
1494 [],
1495 )
1496 .expect("insert v1 superseded");
1497 conn.execute(
1498 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1499 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
1500 [],
1501 )
1502 .expect("insert v2 active");
1503 }
1504 service.excise_source("source-2").expect("excise");
1505 {
1506 let conn = sqlite::open_connection(db.path()).expect("conn");
1507 let active_row_id: String = conn
1508 .query_row(
1509 "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
1510 [],
1511 |row| row.get(0),
1512 )
1513 .expect("active row exists after excise");
1514 assert_eq!(active_row_id, "r1");
1515 }
1516 }
1517
1518 #[test]
1519 fn excise_source_deletes_operational_mutations_and_repairs_latest_state_current() {
1520 let (db, service) = setup();
1521 {
1522 let conn = sqlite::open_connection(db.path()).expect("conn");
1523 conn.execute(
1524 "INSERT INTO operational_collections \
1525 (name, kind, schema_json, retention_json, format_version, created_at) \
1526 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1527 [],
1528 )
1529 .expect("insert collection");
1530 conn.execute(
1531 "INSERT INTO operational_mutations \
1532 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1533 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'source-1', 100, 1)",
1534 [],
1535 )
1536 .expect("insert prior mutation");
1537 conn.execute(
1538 "INSERT INTO operational_mutations \
1539 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1540 VALUES ('m2', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'source-2', 200, 2)",
1541 [],
1542 )
1543 .expect("insert excised mutation");
1544 conn.execute(
1545 "INSERT INTO operational_current \
1546 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1547 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 200, 'm2')",
1548 [],
1549 )
1550 .expect("insert current row");
1551 }
1552
1553 let traced = service
1554 .trace_source("source-2")
1555 .expect("trace before excise");
1556 assert_eq!(traced.operational_mutation_rows, 1);
1557 assert_eq!(traced.operational_mutation_ids, vec!["m2"]);
1558
1559 let excised = service.excise_source("source-2").expect("excise");
1560 assert_eq!(excised.operational_mutation_rows, 0);
1561 assert!(excised.operational_mutation_ids.is_empty());
1562
1563 {
1564 let conn = sqlite::open_connection(db.path()).expect("conn");
1565 let remaining: i64 = conn
1566 .query_row(
1567 "SELECT count(*) FROM operational_mutations WHERE source_ref = 'source-2'",
1568 [],
1569 |row| row.get(0),
1570 )
1571 .expect("remaining count");
1572 assert_eq!(remaining, 0);
1573
1574 let current: (String, String) = conn
1575 .query_row(
1576 "SELECT payload_json, last_mutation_id FROM operational_current \
1577 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1578 [],
1579 |row| Ok((row.get(0)?, row.get(1)?)),
1580 )
1581 .expect("rebuilt current row");
1582 assert_eq!(current.0, "{\"status\":\"old\"}");
1583 assert_eq!(current.1, "m1");
1584 }
1585 }
1586
1587 #[test]
1588 fn restore_logical_id_reestablishes_last_pre_retire_content_and_attached_edges() {
1589 let (db, service) = setup();
1590 {
1591 let conn = sqlite::open_connection(db.path()).expect("conn");
1592 conn.execute(
1593 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1594 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1595 [],
1596 )
1597 .expect("insert node");
1598 conn.execute(
1599 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1600 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1601 [],
1602 )
1603 .expect("insert target node");
1604 conn.execute(
1605 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1606 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1607 [],
1608 )
1609 .expect("insert chunk");
1610 conn.execute(
1611 "INSERT INTO edges \
1612 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1613 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1614 [],
1615 )
1616 .expect("insert edge");
1617 conn.execute(
1618 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1619 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1620 [],
1621 )
1622 .expect("insert node retire event");
1623 conn.execute(
1624 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1625 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
1626 [],
1627 )
1628 .expect("insert edge retire event");
1629 conn.execute(
1630 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1631 [],
1632 )
1633 .expect("retire node");
1634 conn.execute(
1635 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
1636 [],
1637 )
1638 .expect("retire edge");
1639 conn.execute("DELETE FROM fts_nodes", [])
1640 .expect("clear fts");
1641 }
1642
1643 let report = service.restore_logical_id("doc-1").expect("restore");
1644 assert_eq!(report.logical_id, "doc-1");
1645 assert!(!report.was_noop);
1646 assert_eq!(report.restored_node_rows, 1);
1647 assert_eq!(report.restored_edge_rows, 1);
1648 assert_eq!(report.restored_chunk_rows, 1);
1649 assert_eq!(report.restored_fts_rows, 1);
1650
1651 let conn = sqlite::open_connection(db.path()).expect("conn");
1652 let active_node_count: i64 = conn
1653 .query_row(
1654 "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1655 [],
1656 |row| row.get(0),
1657 )
1658 .expect("active node count");
1659 assert_eq!(active_node_count, 1);
1660 let active_edge_count: i64 = conn
1661 .query_row(
1662 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1663 [],
1664 |row| row.get(0),
1665 )
1666 .expect("active edge count");
1667 assert_eq!(active_edge_count, 1);
1668 let fts_count: i64 = conn
1669 .query_row(
1670 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
1671 [],
1672 |row| row.get(0),
1673 )
1674 .expect("fts count");
1675 assert_eq!(fts_count, 1);
1676 }
1677
1678 #[test]
1679 fn restore_logical_id_restores_edges_retired_after_the_node_retire_event() {
1680 let (db, service) = setup();
1681 {
1682 let conn = sqlite::open_connection(db.path()).expect("conn");
1683 conn.execute(
1684 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1685 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1686 [],
1687 )
1688 .expect("insert node");
1689 conn.execute(
1690 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1691 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1692 [],
1693 )
1694 .expect("insert target node");
1695 conn.execute(
1696 "INSERT INTO edges \
1697 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1698 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1699 [],
1700 )
1701 .expect("insert edge");
1702 conn.execute(
1703 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1704 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1705 [],
1706 )
1707 .expect("insert node retire event");
1708 conn.execute(
1709 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1710 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 201, '')",
1711 [],
1712 )
1713 .expect("insert edge retire event");
1714 conn.execute(
1715 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1716 [],
1717 )
1718 .expect("retire node");
1719 conn.execute(
1720 "UPDATE edges SET superseded_at = 201 WHERE logical_id = 'edge-1'",
1721 [],
1722 )
1723 .expect("retire edge");
1724 }
1725
1726 let report = service.restore_logical_id("doc-1").expect("restore");
1727 assert_eq!(report.restored_edge_rows, 1);
1728
1729 let conn = sqlite::open_connection(db.path()).expect("conn");
1730 let active_edge_count: i64 = conn
1731 .query_row(
1732 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1733 [],
1734 |row| row.get(0),
1735 )
1736 .expect("active edge count");
1737 assert_eq!(active_edge_count, 1);
1738 }
1739
1740 #[test]
1741 fn restore_logical_id_prefers_latest_retired_revision_when_timestamps_tie() {
1742 let (db, service) = setup();
1743 {
1744 let conn = sqlite::open_connection(db.path()).expect("conn");
1745 conn.execute(
1746 "INSERT INTO nodes \
1747 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1748 VALUES ('node-row-older', 'doc-1', 'Document', '{\"title\":\"older\"}', 100, 200, 'forget-1')",
1749 [],
1750 )
1751 .expect("insert older retired node");
1752 conn.execute(
1753 "INSERT INTO nodes \
1754 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1755 VALUES ('node-row-newer', 'doc-1', 'Document', '{\"title\":\"newer\"}', 100, 200, 'forget-1')",
1756 [],
1757 )
1758 .expect("insert newer retired node");
1759 conn.execute(
1760 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1761 VALUES ('evt-retire-older', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1762 [],
1763 )
1764 .expect("insert older retire event");
1765 conn.execute(
1766 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1767 VALUES ('evt-retire-newer', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1768 [],
1769 )
1770 .expect("insert newer retire event");
1771 }
1772
1773 let report = service.restore_logical_id("doc-1").expect("restore");
1774
1775 assert!(!report.was_noop);
1776 let conn = sqlite::open_connection(db.path()).expect("conn");
1777 let active_row: (String, String) = conn
1778 .query_row(
1779 "SELECT row_id, properties FROM nodes \
1780 WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1781 [],
1782 |row| Ok((row.get(0)?, row.get(1)?)),
1783 )
1784 .expect("restored active row");
1785 assert_eq!(active_row.0, "node-row-newer");
1786 assert_eq!(active_row.1, "{\"title\":\"newer\"}");
1787 }
1788
1789 #[test]
1790 fn purge_logical_id_removes_retired_content_and_records_tombstone() {
1791 let (db, service) = setup();
1792 {
1793 let conn = sqlite::open_connection(db.path()).expect("conn");
1794 conn.execute(
1795 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1796 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1797 [],
1798 )
1799 .expect("insert retired node");
1800 conn.execute(
1801 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1802 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1803 [],
1804 )
1805 .expect("insert chunk");
1806 conn.execute(
1807 "INSERT INTO edges \
1808 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, superseded_at, source_ref) \
1809 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 200, 'seed')",
1810 [],
1811 )
1812 .expect("insert retired edge");
1813 conn.execute(
1814 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1815 VALUES ('chunk-1', 'doc-1', 'Document', 'budget narrative')",
1816 [],
1817 )
1818 .expect("insert fts");
1819 }
1820
1821 let report = service.purge_logical_id("doc-1").expect("purge");
1822 assert_eq!(report.logical_id, "doc-1");
1823 assert!(!report.was_noop);
1824 assert_eq!(report.deleted_node_rows, 1);
1825 assert_eq!(report.deleted_edge_rows, 1);
1826 assert_eq!(report.deleted_chunk_rows, 1);
1827 assert_eq!(report.deleted_fts_rows, 1);
1828
1829 let conn = sqlite::open_connection(db.path()).expect("conn");
1830 let remaining_nodes: i64 = conn
1831 .query_row(
1832 "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1'",
1833 [],
1834 |row| row.get(0),
1835 )
1836 .expect("remaining nodes");
1837 assert_eq!(remaining_nodes, 0);
1838 let remaining_edges: i64 = conn
1839 .query_row(
1840 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1'",
1841 [],
1842 |row| row.get(0),
1843 )
1844 .expect("remaining edges");
1845 assert_eq!(remaining_edges, 0);
1846 let remaining_chunks: i64 = conn
1847 .query_row(
1848 "SELECT count(*) FROM chunks WHERE id = 'chunk-1'",
1849 [],
1850 |row| row.get(0),
1851 )
1852 .expect("remaining chunks");
1853 assert_eq!(remaining_chunks, 0);
1854 let purge_events: i64 = conn
1855 .query_row(
1856 "SELECT count(*) FROM provenance_events WHERE event_type = 'purge_logical_id' AND subject = 'doc-1'",
1857 [],
1858 |row| row.get(0),
1859 )
1860 .expect("purge events");
1861 assert_eq!(purge_events, 1);
1862 }
1863
1864 #[test]
1865 fn check_semantics_accepts_preserved_retired_chunks() {
1866 let (db, service) = setup();
1867 {
1868 let conn = sqlite::open_connection(db.path()).expect("conn");
1869 conn.execute(
1870 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1871 VALUES ('node-row-1', 'doc-1', 'Document', '{}', 100, 200, 'seed')",
1872 [],
1873 )
1874 .expect("insert retired node");
1875 conn.execute(
1876 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1877 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1878 [],
1879 )
1880 .expect("insert chunk");
1881 }
1882
1883 let report = service.check_semantics().expect("semantics");
1884 assert_eq!(report.orphaned_chunks, 0);
1885 }
1886
1887 #[test]
1888 fn check_semantics_detects_missing_retired_node_history_for_preserved_chunks() {
1889 let (db, service) = setup();
1890 {
1891 let conn = sqlite::open_connection(db.path()).expect("conn");
1892 conn.execute(
1893 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1894 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1895 [],
1896 )
1897 .expect("insert orphaned chunk");
1898 }
1899
1900 let report = service.check_semantics().expect("semantics");
1901 assert_eq!(report.orphaned_chunks, 1);
1902 }
1903
1904 #[cfg(feature = "sqlite-vec")]
1905 #[test]
1906 fn check_semantics_detects_missing_retired_node_history_for_preserved_vec_rows() {
1907 let (db, service) = setup();
1908 {
1909 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1910 service
1911 .schema_manager
1912 .ensure_vec_kind_profile(&conn, "Doc", 4)
1913 .expect("ensure vec kind profile");
1914 conn.execute(
1915 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1916 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1917 [],
1918 )
1919 .expect("insert orphaned chunk");
1920 conn.execute(
1921 "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1922 [],
1923 )
1924 .expect("insert vec row");
1925 }
1926
1927 let report = service.check_semantics().expect("semantics");
1928 assert_eq!(report.orphaned_chunks, 1);
1929 assert_eq!(report.vec_rows_for_superseded_nodes, 1);
1930 }
1931
1932 #[cfg(feature = "sqlite-vec")]
1933 #[test]
1934 fn restore_logical_id_reestablishes_vector_search_without_reingest() {
1935 let (db, service) = setup();
1936 {
1937 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1938 service
1939 .schema_manager
1940 .ensure_vec_kind_profile(&conn, "Document", 4)
1941 .expect("ensure vec kind profile");
1942 conn.execute(
1943 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1944 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1945 [],
1946 )
1947 .expect("insert retired node");
1948 conn.execute(
1949 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1950 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1951 [],
1952 )
1953 .expect("insert chunk");
1954 conn.execute(
1955 "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1956 [],
1957 )
1958 .expect("insert vec row");
1959 conn.execute(
1960 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1961 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1962 [],
1963 )
1964 .expect("insert retire event");
1965 }
1966
1967 let report = service.restore_logical_id("doc-1").expect("restore");
1968 assert_eq!(report.restored_vec_rows, 1);
1969
1970 let coordinator = ExecutionCoordinator::open(
1971 db.path(),
1972 Arc::new(SchemaManager::new()),
1973 Some(4),
1974 1,
1975 Arc::new(TelemetryCounters::default()),
1976 None,
1977 )
1978 .expect("coordinator");
1979 let compiled = QueryBuilder::nodes("Document")
1980 .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
1981 .compile()
1982 .expect("compile");
1983 let rows = coordinator
1984 .execute_compiled_read(&compiled)
1985 .expect("vector read");
1986 assert!(
1987 rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
1988 "restore should make the preserved vec row visible again without re-ingest"
1989 );
1990 }
1991
1992 #[cfg(feature = "sqlite-vec")]
1993 #[test]
1994 fn purge_logical_id_deletes_vec_rows_for_retired_content() {
1995 let (db, service) = setup();
1996 {
1997 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1998 service
1999 .schema_manager
2000 .ensure_vec_kind_profile(&conn, "Document", 4)
2001 .expect("ensure vec kind profile");
2002 conn.execute(
2003 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
2004 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
2005 [],
2006 )
2007 .expect("insert retired node");
2008 conn.execute(
2009 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2010 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2011 [],
2012 )
2013 .expect("insert chunk");
2014 conn.execute(
2015 "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
2016 [],
2017 )
2018 .expect("insert vec row");
2019 }
2020
2021 let report = service.purge_logical_id("doc-1").expect("purge");
2022 assert_eq!(report.deleted_vec_rows, 1);
2023
2024 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2025 let vec_count: i64 = conn
2026 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
2027 .expect("vec count");
2028 assert_eq!(vec_count, 0);
2029 }
2030
2031 #[cfg(feature = "sqlite-vec")]
2032 #[test]
2033 fn restore_logical_id_restores_visibility_of_regenerated_vectors() {
2034 let (db, service) = setup();
2035
2036 {
2037 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2038 conn.execute(
2039 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
2040 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
2041 [],
2042 )
2043 .expect("insert node");
2044 conn.execute(
2045 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2046 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2047 [],
2048 )
2049 .expect("insert chunk");
2050 }
2051
2052 let embedder = TestEmbedder::new("test-model", 4);
2053 service
2054 .regenerate_vector_embeddings(
2055 &embedder,
2056 &VectorRegenerationConfig {
2057 kind: "Document".to_owned(),
2058 profile: "default".to_owned(),
2059 chunking_policy: "per_chunk".to_owned(),
2060 preprocessing_policy: "trim".to_owned(),
2061 },
2062 )
2063 .expect("regenerate");
2064
2065 {
2066 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2067 conn.execute(
2068 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
2069 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
2070 [],
2071 )
2072 .expect("insert retire event");
2073 conn.execute(
2074 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
2075 [],
2076 )
2077 .expect("retire node");
2078 }
2079
2080 let report = service.restore_logical_id("doc-1").expect("restore");
2081 assert_eq!(report.restored_vec_rows, 1);
2082
2083 let coordinator = ExecutionCoordinator::open(
2084 db.path(),
2085 Arc::new(SchemaManager::new()),
2086 Some(4),
2087 1,
2088 Arc::new(TelemetryCounters::default()),
2089 None,
2090 )
2091 .expect("coordinator");
2092 let compiled = QueryBuilder::nodes("Document")
2093 .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
2094 .compile()
2095 .expect("compile");
2096 let rows = coordinator
2097 .execute_compiled_read(&compiled)
2098 .expect("vector read");
2099 assert!(
2100 rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
2101 "restored logical_id should become visible through regenerated vectors"
2102 );
2103 }
2104
2105 #[test]
2106 fn check_semantics_clean_db_returns_zeros() {
2107 let (_db, service) = setup();
2108 let report = service.check_semantics().expect("semantics check");
2109 assert_eq!(report.orphaned_chunks, 0);
2110 assert_eq!(report.null_source_ref_nodes, 0);
2111 assert_eq!(report.broken_step_fk, 0);
2112 assert_eq!(report.broken_action_fk, 0);
2113 assert_eq!(report.stale_fts_rows, 0);
2114 assert_eq!(report.fts_rows_for_superseded_nodes, 0);
2115 assert_eq!(report.dangling_edges, 0);
2116 assert_eq!(report.orphaned_supersession_chains, 0);
2117 assert_eq!(report.stale_vec_rows, 0);
2118 assert_eq!(report.vec_rows_for_superseded_nodes, 0);
2119 assert_eq!(report.missing_operational_current_rows, 0);
2120 assert_eq!(report.stale_operational_current_rows, 0);
2121 assert_eq!(report.disabled_collection_mutations, 0);
2122 assert_eq!(report.mismatched_kind_property_fts_rows, 0);
2123 assert_eq!(report.duplicate_property_fts_rows, 0);
2124 assert_eq!(report.drifted_property_fts_rows, 0);
2125 assert!(report.warnings.is_empty());
2126 }
2127
2128 #[test]
2129 fn register_operational_collection_persists_and_emits_provenance() {
2130 let (db, service) = setup();
2131 let record = service
2132 .register_operational_collection(&OperationalRegisterRequest {
2133 name: "connector_health".to_owned(),
2134 kind: OperationalCollectionKind::LatestState,
2135 schema_json: "{}".to_owned(),
2136 retention_json: "{}".to_owned(),
2137 filter_fields_json: "[]".to_owned(),
2138 validation_json: String::new(),
2139 secondary_indexes_json: "[]".to_owned(),
2140 format_version: 1,
2141 })
2142 .expect("register collection");
2143
2144 assert_eq!(record.name, "connector_health");
2145 assert_eq!(record.kind, OperationalCollectionKind::LatestState);
2146 assert_eq!(record.schema_json, "{}");
2147 assert_eq!(record.retention_json, "{}");
2148 assert_eq!(record.filter_fields_json, "[]");
2149 assert!(record.created_at > 0);
2150 assert_eq!(record.disabled_at, None);
2151
2152 let described = service
2153 .describe_operational_collection("connector_health")
2154 .expect("describe collection")
2155 .expect("collection exists");
2156 assert_eq!(described, record);
2157
2158 let conn = sqlite::open_connection(db.path()).expect("conn");
2159 let provenance_count: i64 = conn
2160 .query_row(
2161 "SELECT count(*) FROM provenance_events \
2162 WHERE event_type = 'operational_collection_registered' AND subject = 'connector_health'",
2163 [],
2164 |row| row.get(0),
2165 )
2166 .expect("provenance count");
2167 assert_eq!(provenance_count, 1);
2168 }
2169
2170 #[test]
2171 fn register_and_update_operational_collection_validation_round_trip() {
2172 let (db, service) = setup();
2173 let record = service
2174 .register_operational_collection(&OperationalRegisterRequest {
2175 name: "connector_health".to_owned(),
2176 kind: OperationalCollectionKind::LatestState,
2177 schema_json: "{}".to_owned(),
2178 retention_json: "{}".to_owned(),
2179 filter_fields_json: "[]".to_owned(),
2180 validation_json: String::new(),
2181 secondary_indexes_json: "[]".to_owned(),
2182 format_version: 1,
2183 })
2184 .expect("register collection");
2185 assert_eq!(record.validation_json, "");
2186
2187 let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
2188 let updated = service
2189 .update_operational_collection_validation("connector_health", validation_json)
2190 .expect("update validation");
2191 assert_eq!(updated.validation_json, validation_json);
2192
2193 let described = service
2194 .describe_operational_collection("connector_health")
2195 .expect("describe collection")
2196 .expect("collection exists");
2197 assert_eq!(described.validation_json, validation_json);
2198
2199 let conn = sqlite::open_connection(db.path()).expect("conn");
2200 let provenance_count: i64 = conn
2201 .query_row(
2202 "SELECT count(*) FROM provenance_events \
2203 WHERE event_type = 'operational_collection_validation_updated' \
2204 AND subject = 'connector_health'",
2205 [],
2206 |row| row.get(0),
2207 )
2208 .expect("provenance count");
2209 assert_eq!(provenance_count, 1);
2210 }
2211
2212 #[test]
2213 fn register_update_and_rebuild_operational_secondary_indexes_round_trip() {
2214 let (db, service) = setup();
2215 let record = service
2216 .register_operational_collection(&OperationalRegisterRequest {
2217 name: "audit_log".to_owned(),
2218 kind: OperationalCollectionKind::AppendOnlyLog,
2219 schema_json: "{}".to_owned(),
2220 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2221 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
2222 validation_json: String::new(),
2223 secondary_indexes_json: "[]".to_owned(),
2224 format_version: 1,
2225 })
2226 .expect("register collection");
2227 assert_eq!(record.secondary_indexes_json, "[]");
2228
2229 {
2230 let writer = crate::WriterActor::start(
2231 db.path(),
2232 Arc::new(SchemaManager::new()),
2233 crate::ProvenanceMode::Warn,
2234 Arc::new(crate::TelemetryCounters::default()),
2235 )
2236 .expect("writer");
2237 writer
2238 .submit(crate::WriteRequest {
2239 label: "secondary-index-seed".to_owned(),
2240 nodes: vec![],
2241 node_retires: vec![],
2242 edges: vec![],
2243 edge_retires: vec![],
2244 chunks: vec![],
2245 runs: vec![],
2246 steps: vec![],
2247 actions: vec![],
2248 optional_backfills: vec![],
2249 vec_inserts: vec![],
2250 operational_writes: vec![
2251 crate::OperationalWrite::Append {
2252 collection: "audit_log".to_owned(),
2253 record_key: "evt-1".to_owned(),
2254 payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
2255 source_ref: Some("src-1".to_owned()),
2256 },
2257 crate::OperationalWrite::Append {
2258 collection: "audit_log".to_owned(),
2259 record_key: "evt-2".to_owned(),
2260 payload_json: r#"{"actor":"bob","ts":200}"#.to_owned(),
2261 source_ref: Some("src-2".to_owned()),
2262 },
2263 ],
2264 })
2265 .expect("seed writes");
2266 }
2267
2268 let secondary_indexes_json = r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#;
2269 let updated = service
2270 .update_operational_collection_secondary_indexes("audit_log", secondary_indexes_json)
2271 .expect("update secondary indexes");
2272 assert_eq!(updated.secondary_indexes_json, secondary_indexes_json);
2273
2274 let conn = sqlite::open_connection(db.path()).expect("conn");
2275 let entry_count: i64 = conn
2276 .query_row(
2277 "SELECT count(*) FROM operational_secondary_index_entries \
2278 WHERE collection_name = 'audit_log' AND index_name = 'actor_ts'",
2279 [],
2280 |row| row.get(0),
2281 )
2282 .expect("secondary index count");
2283 assert_eq!(entry_count, 2);
2284 conn.execute(
2285 "DELETE FROM operational_secondary_index_entries WHERE collection_name = 'audit_log'",
2286 [],
2287 )
2288 .expect("clear index entries");
2289 drop(conn);
2290
2291 let rebuild = service
2292 .rebuild_operational_secondary_indexes("audit_log")
2293 .expect("rebuild secondary indexes");
2294 assert_eq!(rebuild.collection_name, "audit_log");
2295 assert_eq!(rebuild.mutation_entries_rebuilt, 2);
2296 assert_eq!(rebuild.current_entries_rebuilt, 0);
2297 }
2298
2299 #[test]
2300 fn register_operational_collection_rejects_invalid_validation_contract() {
2301 let (_db, service) = setup();
2302
2303 let error = service
2304 .register_operational_collection(&OperationalRegisterRequest {
2305 name: "connector_health".to_owned(),
2306 kind: OperationalCollectionKind::LatestState,
2307 schema_json: "{}".to_owned(),
2308 retention_json: "{}".to_owned(),
2309 filter_fields_json: "[]".to_owned(),
2310 validation_json: r#"{"format_version":1,"mode":"enforce","fields":[{"name":"status","type":"string","minimum":0}]}"#
2311 .to_owned(),
2312 secondary_indexes_json: "[]".to_owned(),
2313 format_version: 1,
2314 })
2315 .expect_err("invalid validation contract should reject");
2316
2317 assert!(matches!(error, EngineError::InvalidWrite(_)));
2318 assert!(error.to_string().contains("minimum/maximum"));
2319 }
2320
2321 #[test]
2322 fn validate_operational_collection_history_reports_invalid_rows_without_mutation() {
2323 let (db, service) = setup();
2324 service
2325 .register_operational_collection(&OperationalRegisterRequest {
2326 name: "audit_log".to_owned(),
2327 kind: OperationalCollectionKind::AppendOnlyLog,
2328 schema_json: "{}".to_owned(),
2329 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2330 filter_fields_json: "[]".to_owned(),
2331 validation_json: r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#
2332 .to_owned(),
2333 secondary_indexes_json: "[]".to_owned(),
2334 format_version: 1,
2335 })
2336 .expect("register collection");
2337 {
2338 let writer = crate::WriterActor::start(
2339 db.path(),
2340 Arc::new(SchemaManager::new()),
2341 crate::ProvenanceMode::Warn,
2342 Arc::new(crate::TelemetryCounters::default()),
2343 )
2344 .expect("writer");
2345 writer
2346 .submit(crate::WriteRequest {
2347 label: "history-validation".to_owned(),
2348 nodes: vec![],
2349 node_retires: vec![],
2350 edges: vec![],
2351 edge_retires: vec![],
2352 chunks: vec![],
2353 runs: vec![],
2354 steps: vec![],
2355 actions: vec![],
2356 optional_backfills: vec![],
2357 vec_inserts: vec![],
2358 operational_writes: vec![
2359 crate::OperationalWrite::Append {
2360 collection: "audit_log".to_owned(),
2361 record_key: "evt-1".to_owned(),
2362 payload_json: r#"{"status":"ok"}"#.to_owned(),
2363 source_ref: Some("src-1".to_owned()),
2364 },
2365 crate::OperationalWrite::Append {
2366 collection: "audit_log".to_owned(),
2367 record_key: "evt-2".to_owned(),
2368 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2369 source_ref: Some("src-2".to_owned()),
2370 },
2371 ],
2372 })
2373 .expect("write");
2374 }
2375
2376 let report = service
2377 .validate_operational_collection_history("audit_log")
2378 .expect("validate history");
2379 assert_eq!(report.collection_name, "audit_log");
2380 assert_eq!(report.checked_rows, 2);
2381 assert_eq!(report.invalid_row_count, 1);
2382 assert_eq!(report.issues.len(), 1);
2383 assert_eq!(report.issues[0].record_key, "evt-2");
2384 assert!(report.issues[0].message.contains("must be one of"));
2385
2386 let trace = service
2387 .trace_operational_collection("audit_log", None)
2388 .expect("trace");
2389 assert_eq!(trace.mutation_count, 2);
2390
2391 let conn = sqlite::open_connection(db.path()).expect("conn");
2392 let provenance_count: i64 = conn
2393 .query_row(
2394 "SELECT count(*) FROM provenance_events \
2395 WHERE event_type = 'operational_collection_history_validated' \
2396 AND subject = 'audit_log'",
2397 [],
2398 |row| row.get(0),
2399 )
2400 .expect("provenance count");
2401 assert_eq!(provenance_count, 0);
2402 }
2403
2404 #[test]
2405 fn trace_operational_collection_returns_mutations_and_current_rows() {
2406 let (db, service) = setup();
2407 service
2408 .register_operational_collection(&OperationalRegisterRequest {
2409 name: "connector_health".to_owned(),
2410 kind: OperationalCollectionKind::LatestState,
2411 schema_json: "{}".to_owned(),
2412 retention_json: "{}".to_owned(),
2413 filter_fields_json: "[]".to_owned(),
2414 validation_json: String::new(),
2415 secondary_indexes_json: "[]".to_owned(),
2416 format_version: 1,
2417 })
2418 .expect("register collection");
2419 {
2420 let writer = crate::WriterActor::start(
2421 db.path(),
2422 Arc::new(SchemaManager::new()),
2423 crate::ProvenanceMode::Warn,
2424 Arc::new(crate::TelemetryCounters::default()),
2425 )
2426 .expect("writer");
2427 writer
2428 .submit(crate::WriteRequest {
2429 label: "operational".to_owned(),
2430 nodes: vec![],
2431 node_retires: vec![],
2432 edges: vec![],
2433 edge_retires: vec![],
2434 chunks: vec![],
2435 runs: vec![],
2436 steps: vec![],
2437 actions: vec![],
2438 optional_backfills: vec![],
2439 vec_inserts: vec![],
2440 operational_writes: vec![crate::OperationalWrite::Put {
2441 collection: "connector_health".to_owned(),
2442 record_key: "gmail".to_owned(),
2443 payload_json: r#"{"status":"ok"}"#.to_owned(),
2444 source_ref: Some("src-1".to_owned()),
2445 }],
2446 })
2447 .expect("write");
2448 }
2449
2450 let report = service
2451 .trace_operational_collection("connector_health", Some("gmail"))
2452 .expect("trace");
2453 assert_eq!(report.collection_name, "connector_health");
2454 assert_eq!(report.record_key.as_deref(), Some("gmail"));
2455 assert_eq!(report.mutation_count, 1);
2456 assert_eq!(report.current_count, 1);
2457 assert_eq!(report.mutations[0].op_kind, "put");
2458 assert_eq!(report.current_rows[0].payload_json, r#"{"status":"ok"}"#);
2459 }
2460
2461 #[test]
2462 fn trace_operational_collection_rejects_unknown_collection() {
2463 let (_db, service) = setup();
2464
2465 let error = service
2466 .trace_operational_collection("missing_collection", None)
2467 .expect_err("unknown collection should fail");
2468
2469 assert!(matches!(error, EngineError::InvalidWrite(_)));
2470 assert!(error.to_string().contains("is not registered"));
2471 }
2472
2473 #[test]
2474 fn rebuild_operational_current_repairs_missing_latest_state_rows() {
2475 let (db, service) = setup();
2476 service
2477 .register_operational_collection(&OperationalRegisterRequest {
2478 name: "connector_health".to_owned(),
2479 kind: OperationalCollectionKind::LatestState,
2480 schema_json: "{}".to_owned(),
2481 retention_json: "{}".to_owned(),
2482 filter_fields_json: "[]".to_owned(),
2483 validation_json: String::new(),
2484 secondary_indexes_json: "[]".to_owned(),
2485 format_version: 1,
2486 })
2487 .expect("register collection");
2488 {
2489 let writer = crate::WriterActor::start(
2490 db.path(),
2491 Arc::new(SchemaManager::new()),
2492 crate::ProvenanceMode::Warn,
2493 Arc::new(crate::TelemetryCounters::default()),
2494 )
2495 .expect("writer");
2496 writer
2497 .submit(crate::WriteRequest {
2498 label: "operational".to_owned(),
2499 nodes: vec![],
2500 node_retires: vec![],
2501 edges: vec![],
2502 edge_retires: vec![],
2503 chunks: vec![],
2504 runs: vec![],
2505 steps: vec![],
2506 actions: vec![],
2507 optional_backfills: vec![],
2508 vec_inserts: vec![],
2509 operational_writes: vec![crate::OperationalWrite::Put {
2510 collection: "connector_health".to_owned(),
2511 record_key: "gmail".to_owned(),
2512 payload_json: r#"{"status":"ok"}"#.to_owned(),
2513 source_ref: Some("src-1".to_owned()),
2514 }],
2515 })
2516 .expect("write");
2517 }
2518 {
2519 let conn = sqlite::open_connection(db.path()).expect("conn");
2520 conn.execute(
2521 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2522 [],
2523 )
2524 .expect("delete current row");
2525 }
2526
2527 let before = service.check_semantics().expect("semantics before rebuild");
2528 assert_eq!(before.missing_operational_current_rows, 1);
2529
2530 let repair = service
2531 .rebuild_operational_current(Some("connector_health"))
2532 .expect("rebuild current");
2533 assert_eq!(repair.collections_rebuilt, 1);
2534 assert_eq!(repair.current_rows_rebuilt, 1);
2535
2536 let after = service.check_semantics().expect("semantics after rebuild");
2537 assert_eq!(after.missing_operational_current_rows, 0);
2538
2539 let conn = sqlite::open_connection(db.path()).expect("conn");
2540 let payload: String = conn
2541 .query_row(
2542 "SELECT payload_json FROM operational_current \
2543 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2544 [],
2545 |row| row.get(0),
2546 )
2547 .expect("restored payload");
2548 assert_eq!(payload, r#"{"status":"ok"}"#);
2549 }
2550
2551 #[test]
2552 fn rebuild_operational_current_restores_latest_state_secondary_index_entries() {
2553 let (db, service) = setup();
2554 service
2555 .register_operational_collection(&OperationalRegisterRequest {
2556 name: "connector_health".to_owned(),
2557 kind: OperationalCollectionKind::LatestState,
2558 schema_json: "{}".to_owned(),
2559 retention_json: "{}".to_owned(),
2560 filter_fields_json: "[]".to_owned(),
2561 validation_json: String::new(),
2562 secondary_indexes_json: r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"}]"#.to_owned(),
2563 format_version: 1,
2564 })
2565 .expect("register collection");
2566 {
2567 let writer = crate::WriterActor::start(
2568 db.path(),
2569 Arc::new(SchemaManager::new()),
2570 crate::ProvenanceMode::Warn,
2571 Arc::new(crate::TelemetryCounters::default()),
2572 )
2573 .expect("writer");
2574 writer
2575 .submit(crate::WriteRequest {
2576 label: "operational".to_owned(),
2577 nodes: vec![],
2578 node_retires: vec![],
2579 edges: vec![],
2580 edge_retires: vec![],
2581 chunks: vec![],
2582 runs: vec![],
2583 steps: vec![],
2584 actions: vec![],
2585 optional_backfills: vec![],
2586 vec_inserts: vec![],
2587 operational_writes: vec![crate::OperationalWrite::Put {
2588 collection: "connector_health".to_owned(),
2589 record_key: "gmail".to_owned(),
2590 payload_json: r#"{"status":"ok"}"#.to_owned(),
2591 source_ref: Some("src-1".to_owned()),
2592 }],
2593 })
2594 .expect("write");
2595 }
2596 {
2597 let conn = sqlite::open_connection(db.path()).expect("conn");
2598 let entry_count: i64 = conn
2599 .query_row(
2600 "SELECT count(*) FROM operational_secondary_index_entries \
2601 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2602 [],
2603 |row| row.get(0),
2604 )
2605 .expect("secondary index count before repair");
2606 assert_eq!(entry_count, 1);
2607 conn.execute(
2608 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2609 [],
2610 )
2611 .expect("delete current row");
2612 }
2613
2614 service
2615 .rebuild_operational_current(Some("connector_health"))
2616 .expect("rebuild current");
2617
2618 let conn = sqlite::open_connection(db.path()).expect("conn");
2619 let entry_count: i64 = conn
2620 .query_row(
2621 "SELECT count(*) FROM operational_secondary_index_entries \
2622 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2623 [],
2624 |row| row.get(0),
2625 )
2626 .expect("secondary index count after repair");
2627 assert_eq!(entry_count, 1);
2628 }
2629
2630 #[test]
2631 fn operational_current_semantics_and_rebuild_follow_mutation_order() {
2632 let (db, service) = setup();
2633 {
2634 let conn = sqlite::open_connection(db.path()).expect("conn");
2635 conn.execute(
2636 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2637 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
2638 [],
2639 )
2640 .expect("seed collection");
2641 conn.execute(
2642 "INSERT INTO operational_mutations \
2643 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2644 VALUES ('m3', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'src-1', 100, 1)",
2645 [],
2646 )
2647 .expect("seed first put");
2648 conn.execute(
2649 "INSERT INTO operational_mutations \
2650 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2651 VALUES ('m2', 'connector_health', 'gmail', 'delete', '', 'src-2', 100, 2)",
2652 [],
2653 )
2654 .expect("seed delete");
2655 conn.execute(
2656 "INSERT INTO operational_mutations \
2657 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2658 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'src-3', 100, 3)",
2659 [],
2660 )
2661 .expect("seed final put");
2662 conn.execute(
2663 "INSERT INTO operational_current \
2664 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2665 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 100, 'm1')",
2666 [],
2667 )
2668 .expect("seed current");
2669 }
2670
2671 let before = service.check_semantics().expect("semantics before rebuild");
2672 assert_eq!(before.missing_operational_current_rows, 0);
2673 assert_eq!(before.stale_operational_current_rows, 0);
2674
2675 {
2676 let conn = sqlite::open_connection(db.path()).expect("conn");
2677 conn.execute(
2678 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2679 [],
2680 )
2681 .expect("delete current row");
2682 }
2683
2684 let missing = service.check_semantics().expect("semantics after delete");
2685 assert_eq!(missing.missing_operational_current_rows, 1);
2686 assert_eq!(missing.stale_operational_current_rows, 0);
2687
2688 service
2689 .rebuild_operational_current(Some("connector_health"))
2690 .expect("rebuild current");
2691
2692 let after = service.check_semantics().expect("semantics after rebuild");
2693 assert_eq!(after.missing_operational_current_rows, 0);
2694 assert_eq!(after.stale_operational_current_rows, 0);
2695
2696 let conn = sqlite::open_connection(db.path()).expect("conn");
2697 let payload: String = conn
2698 .query_row(
2699 "SELECT payload_json FROM operational_current \
2700 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2701 [],
2702 |row| row.get(0),
2703 )
2704 .expect("restored payload");
2705 assert_eq!(payload, r#"{"status":"new"}"#);
2706 }
2707
2708 #[test]
2709 fn disable_operational_collection_sets_disabled_at_and_emits_provenance() {
2710 let (db, service) = setup();
2711 service
2712 .register_operational_collection(&OperationalRegisterRequest {
2713 name: "audit_log".to_owned(),
2714 kind: OperationalCollectionKind::AppendOnlyLog,
2715 schema_json: "{}".to_owned(),
2716 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2717 filter_fields_json: "[]".to_owned(),
2718 validation_json: String::new(),
2719 secondary_indexes_json: "[]".to_owned(),
2720 format_version: 1,
2721 })
2722 .expect("register collection");
2723
2724 let record = service
2725 .disable_operational_collection("audit_log")
2726 .expect("disable collection");
2727 assert_eq!(record.name, "audit_log");
2728 assert!(record.disabled_at.is_some());
2729
2730 let disabled_at = record.disabled_at.expect("disabled_at");
2731 let described = service
2732 .describe_operational_collection("audit_log")
2733 .expect("describe collection")
2734 .expect("collection exists");
2735 assert_eq!(described.disabled_at, Some(disabled_at));
2736
2737 let writer = crate::WriterActor::start(
2738 db.path(),
2739 Arc::new(SchemaManager::new()),
2740 crate::ProvenanceMode::Warn,
2741 Arc::new(crate::TelemetryCounters::default()),
2742 )
2743 .expect("writer");
2744 let error = writer
2745 .submit(crate::WriteRequest {
2746 label: "disabled-operational".to_owned(),
2747 nodes: vec![],
2748 node_retires: vec![],
2749 edges: vec![],
2750 edge_retires: vec![],
2751 chunks: vec![],
2752 runs: vec![],
2753 steps: vec![],
2754 actions: vec![],
2755 optional_backfills: vec![],
2756 vec_inserts: vec![],
2757 operational_writes: vec![crate::OperationalWrite::Append {
2758 collection: "audit_log".to_owned(),
2759 record_key: "evt-1".to_owned(),
2760 payload_json: r#"{"type":"sync"}"#.to_owned(),
2761 source_ref: Some("src-1".to_owned()),
2762 }],
2763 })
2764 .expect_err("disabled collection should reject writes");
2765 assert!(matches!(error, EngineError::InvalidWrite(_)));
2766 assert!(error.to_string().contains("is disabled"));
2767
2768 let conn = sqlite::open_connection(db.path()).expect("conn");
2769 let provenance_count: i64 = conn
2770 .query_row(
2771 "SELECT count(*) FROM provenance_events \
2772 WHERE event_type = 'operational_collection_disabled' AND subject = 'audit_log'",
2773 [],
2774 |row| row.get(0),
2775 )
2776 .expect("provenance count");
2777 assert_eq!(provenance_count, 1);
2778 }
2779
2780 #[test]
2781 fn purge_operational_collection_deletes_append_only_rows_before_cutoff() {
2782 let (db, service) = setup();
2783 {
2784 let conn = sqlite::open_connection(db.path()).expect("conn");
2785 conn.execute(
2786 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2787 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_all\"}', 1, 100)",
2788 [],
2789 )
2790 .expect("seed collection");
2791 conn.execute(
2792 "INSERT INTO operational_mutations \
2793 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2794 VALUES ('evt-1', 'audit_log', 'evt-1', 'append', '{\"seq\":1}', 'src-1', 100, 1)",
2795 [],
2796 )
2797 .expect("seed event 1");
2798 conn.execute(
2799 "INSERT INTO operational_mutations \
2800 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2801 VALUES ('evt-2', 'audit_log', 'evt-2', 'append', '{\"seq\":2}', 'src-2', 200, 2)",
2802 [],
2803 )
2804 .expect("seed event 2");
2805 conn.execute(
2806 "INSERT INTO operational_mutations \
2807 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2808 VALUES ('evt-3', 'audit_log', 'evt-3', 'append', '{\"seq\":3}', 'src-3', 300, 3)",
2809 [],
2810 )
2811 .expect("seed event 3");
2812 }
2813
2814 let report = service
2815 .purge_operational_collection("audit_log", 250)
2816 .expect("purge collection");
2817 assert_eq!(report.collection_name, "audit_log");
2818 assert_eq!(report.deleted_mutations, 2);
2819 assert_eq!(report.before_timestamp, 250);
2820
2821 let conn = sqlite::open_connection(db.path()).expect("conn");
2822 let remaining: Vec<String> = {
2823 let mut stmt = conn
2824 .prepare(
2825 "SELECT id FROM operational_mutations \
2826 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2827 )
2828 .expect("stmt");
2829 stmt.query_map([], |row| row.get(0))
2830 .expect("rows")
2831 .collect::<Result<_, _>>()
2832 .expect("collect")
2833 };
2834 assert_eq!(remaining, vec!["evt-3".to_owned()]);
2835 let provenance_count: i64 = conn
2836 .query_row(
2837 "SELECT count(*) FROM provenance_events \
2838 WHERE event_type = 'operational_collection_purged' AND subject = 'audit_log'",
2839 [],
2840 |row| row.get(0),
2841 )
2842 .expect("provenance count");
2843 assert_eq!(provenance_count, 1);
2844 }
2845
2846 #[test]
2847 fn compact_operational_collection_dry_run_reports_without_mutation() {
2848 let (db, service) = setup();
2849 {
2850 let conn = sqlite::open_connection(db.path()).expect("conn");
2851 conn.execute(
2852 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2853 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2854 [],
2855 )
2856 .expect("seed collection");
2857 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2858 conn.execute(
2859 "INSERT INTO operational_mutations \
2860 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2861 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2862 rusqlite::params![
2863 format!("evt-{index}"),
2864 format!("{{\"seq\":{index}}}"),
2865 created_at,
2866 index,
2867 ],
2868 )
2869 .expect("seed event");
2870 }
2871 }
2872
2873 let report = service
2874 .compact_operational_collection("audit_log", true)
2875 .expect("compact collection");
2876 assert_eq!(report.collection_name, "audit_log");
2877 assert_eq!(report.deleted_mutations, 1);
2878 assert!(report.dry_run);
2879 assert_eq!(report.before_timestamp, None);
2880
2881 let conn = sqlite::open_connection(db.path()).expect("conn");
2882 let remaining_count: i64 = conn
2883 .query_row(
2884 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2885 [],
2886 |row| row.get(0),
2887 )
2888 .expect("remaining count");
2889 assert_eq!(remaining_count, 3);
2890 let provenance_count: i64 = conn
2891 .query_row(
2892 "SELECT count(*) FROM provenance_events \
2893 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2894 [],
2895 |row| row.get(0),
2896 )
2897 .expect("provenance count");
2898 assert_eq!(provenance_count, 0);
2899 }
2900
2901 #[test]
2902 fn compact_operational_collection_keep_last_deletes_oldest_rows() {
2903 let (db, service) = setup();
2904 {
2905 let conn = sqlite::open_connection(db.path()).expect("conn");
2906 conn.execute(
2907 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2908 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2909 [],
2910 )
2911 .expect("seed collection");
2912 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2913 conn.execute(
2914 "INSERT INTO operational_mutations \
2915 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2916 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2917 rusqlite::params![
2918 format!("evt-{index}"),
2919 format!("{{\"seq\":{index}}}"),
2920 created_at,
2921 index,
2922 ],
2923 )
2924 .expect("seed event");
2925 }
2926 }
2927
2928 let report = service
2929 .compact_operational_collection("audit_log", false)
2930 .expect("compact collection");
2931 assert_eq!(report.deleted_mutations, 1);
2932 assert!(!report.dry_run);
2933
2934 let conn = sqlite::open_connection(db.path()).expect("conn");
2935 let remaining: Vec<String> = {
2936 let mut stmt = conn
2937 .prepare(
2938 "SELECT id FROM operational_mutations \
2939 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2940 )
2941 .expect("stmt");
2942 stmt.query_map([], |row| row.get(0))
2943 .expect("rows")
2944 .collect::<Result<_, _>>()
2945 .expect("collect")
2946 };
2947 assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
2948 let provenance_count: i64 = conn
2949 .query_row(
2950 "SELECT count(*) FROM provenance_events \
2951 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2952 [],
2953 |row| row.get(0),
2954 )
2955 .expect("provenance count");
2956 assert_eq!(provenance_count, 1);
2957 }
2958
2959 #[test]
2960 fn plan_and_run_operational_retention_keep_last() {
2961 let (db, service) = setup();
2962 {
2963 let conn = sqlite::open_connection(db.path()).expect("conn");
2964 conn.execute(
2965 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2966 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2967 [],
2968 )
2969 .expect("seed collection");
2970 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2971 conn.execute(
2972 "INSERT INTO operational_mutations \
2973 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2974 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2975 rusqlite::params![
2976 format!("evt-{index}"),
2977 format!("{{\"seq\":{index}}}"),
2978 created_at,
2979 index,
2980 ],
2981 )
2982 .expect("seed event");
2983 }
2984 }
2985
2986 let plan = service
2987 .plan_operational_retention(1_000, None, Some(10))
2988 .expect("plan retention");
2989 assert_eq!(plan.collections_examined, 1);
2990 assert_eq!(plan.items[0].collection_name, "audit_log");
2991 assert_eq!(
2992 plan.items[0].action_kind,
2993 crate::operational::OperationalRetentionActionKind::KeepLast
2994 );
2995 assert_eq!(plan.items[0].candidate_deletions, 1);
2996 assert_eq!(plan.items[0].max_rows, Some(2));
2997 assert_eq!(plan.items[0].last_run_at, None);
2998
2999 let dry_run = service
3000 .run_operational_retention(1_000, None, Some(10), true)
3001 .expect("dry-run retention");
3002 assert!(dry_run.dry_run);
3003 assert_eq!(dry_run.collections_acted_on, 1);
3004 assert_eq!(dry_run.items[0].deleted_mutations, 1);
3005 assert_eq!(dry_run.items[0].rows_remaining, 2);
3006
3007 let conn = sqlite::open_connection(db.path()).expect("conn");
3008 let remaining_count: i64 = conn
3009 .query_row(
3010 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
3011 [],
3012 |row| row.get(0),
3013 )
3014 .expect("remaining count after dry run");
3015 assert_eq!(remaining_count, 3);
3016 let retention_run_count: i64 = conn
3017 .query_row(
3018 "SELECT count(*) FROM operational_retention_runs WHERE collection_name = 'audit_log'",
3019 [],
3020 |row| row.get(0),
3021 )
3022 .expect("retention run count");
3023 assert_eq!(retention_run_count, 0);
3024 drop(conn);
3025
3026 let executed = service
3027 .run_operational_retention(1_000, None, Some(10), false)
3028 .expect("execute retention");
3029 assert_eq!(executed.collections_acted_on, 1);
3030 assert_eq!(executed.items[0].deleted_mutations, 1);
3031 assert_eq!(executed.items[0].rows_remaining, 2);
3032
3033 let conn = sqlite::open_connection(db.path()).expect("conn");
3034 let remaining: Vec<String> = {
3035 let mut stmt = conn
3036 .prepare(
3037 "SELECT id FROM operational_mutations \
3038 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
3039 )
3040 .expect("stmt");
3041 stmt.query_map([], |row| row.get(0))
3042 .expect("rows")
3043 .collect::<Result<_, _>>()
3044 .expect("collect")
3045 };
3046 assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
3047 let last_run_at: i64 = conn
3048 .query_row(
3049 "SELECT executed_at FROM operational_retention_runs \
3050 WHERE collection_name = 'audit_log' ORDER BY executed_at DESC LIMIT 1",
3051 [],
3052 |row| row.get(0),
3053 )
3054 .expect("last run at");
3055 assert_eq!(last_run_at, 1_000);
3056 }
3057
3058 #[test]
3059 fn dry_run_operational_retention_does_not_mark_noop_collection_as_acted_on() {
3060 let (db, service) = setup();
3061 let conn = sqlite::open_connection(db.path()).expect("conn");
3062 conn.execute(
3063 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3064 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3065 [],
3066 )
3067 .expect("seed collection");
3068 for (index, created_at) in [(1_i64, 100_i64), (2, 200)] {
3069 conn.execute(
3070 "INSERT INTO operational_mutations \
3071 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3072 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3073 rusqlite::params![
3074 format!("evt-{index}"),
3075 format!("{{\"seq\":{index}}}"),
3076 created_at,
3077 index,
3078 ],
3079 )
3080 .expect("seed event");
3081 }
3082 drop(conn);
3083
3084 let dry_run = service
3085 .run_operational_retention(1_000, None, Some(10), true)
3086 .expect("dry-run retention");
3087 assert!(dry_run.dry_run);
3088 assert_eq!(dry_run.collections_acted_on, 0);
3089 assert_eq!(dry_run.items[0].deleted_mutations, 0);
3090 assert_eq!(dry_run.items[0].rows_remaining, 2);
3091 }
3092
3093 #[test]
3094 fn compact_operational_collection_rejects_latest_state() {
3095 let (_db, service) = setup();
3096 service
3097 .register_operational_collection(&OperationalRegisterRequest {
3098 name: "connector_health".to_owned(),
3099 kind: OperationalCollectionKind::LatestState,
3100 schema_json: "{}".to_owned(),
3101 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3102 filter_fields_json: "[]".to_owned(),
3103 validation_json: String::new(),
3104 secondary_indexes_json: "[]".to_owned(),
3105 format_version: 1,
3106 })
3107 .expect("register collection");
3108
3109 let error = service
3110 .compact_operational_collection("connector_health", false)
3111 .expect_err("latest_state compaction should be rejected");
3112 assert!(matches!(error, EngineError::InvalidWrite(_)));
3113 assert!(error.to_string().contains("append_only_log"));
3114 }
3115
3116 #[test]
3117 fn register_operational_collection_persists_filter_fields_json() {
3118 let (_db, service) = setup();
3119
3120 let record = service
3121 .register_operational_collection(&OperationalRegisterRequest {
3122 name: "audit_log".to_owned(),
3123 kind: OperationalCollectionKind::AppendOnlyLog,
3124 schema_json: "{}".to_owned(),
3125 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3126 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3127 validation_json: String::new(),
3128 secondary_indexes_json: "[]".to_owned(),
3129 format_version: 1,
3130 })
3131 .expect("register collection");
3132
3133 assert_eq!(
3134 record.filter_fields_json,
3135 r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#
3136 );
3137 }
3138
3139 #[test]
3140 fn read_operational_collection_filters_append_only_rows_by_declared_fields() {
3141 let (db, service) = setup();
3142 service
3143 .register_operational_collection(&OperationalRegisterRequest {
3144 name: "audit_log".to_owned(),
3145 kind: OperationalCollectionKind::AppendOnlyLog,
3146 schema_json: "{}".to_owned(),
3147 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3148 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"seq","type":"integer","modes":["exact","range"]},{"name":"ts","type":"timestamp","modes":["exact","range"]}]"#.to_owned(),
3149 validation_json: String::new(),
3150 secondary_indexes_json: "[]".to_owned(),
3151 format_version: 1,
3152 })
3153 .expect("register collection");
3154 {
3155 let writer = crate::WriterActor::start(
3156 db.path(),
3157 Arc::new(SchemaManager::new()),
3158 crate::ProvenanceMode::Warn,
3159 Arc::new(crate::TelemetryCounters::default()),
3160 )
3161 .expect("writer");
3162 writer
3163 .submit(crate::WriteRequest {
3164 label: "operational".to_owned(),
3165 nodes: vec![],
3166 node_retires: vec![],
3167 edges: vec![],
3168 edge_retires: vec![],
3169 chunks: vec![],
3170 runs: vec![],
3171 steps: vec![],
3172 actions: vec![],
3173 optional_backfills: vec![],
3174 vec_inserts: vec![],
3175 operational_writes: vec![
3176 crate::OperationalWrite::Append {
3177 collection: "audit_log".to_owned(),
3178 record_key: "evt-1".to_owned(),
3179 payload_json: r#"{"actor":"alice","seq":1,"ts":100}"#.to_owned(),
3180 source_ref: Some("src-1".to_owned()),
3181 },
3182 crate::OperationalWrite::Append {
3183 collection: "audit_log".to_owned(),
3184 record_key: "evt-2".to_owned(),
3185 payload_json: r#"{"actor":"alice-admin","seq":2,"ts":200}"#.to_owned(),
3186 source_ref: Some("src-2".to_owned()),
3187 },
3188 crate::OperationalWrite::Append {
3189 collection: "audit_log".to_owned(),
3190 record_key: "evt-3".to_owned(),
3191 payload_json: r#"{"actor":"bob","seq":3,"ts":300}"#.to_owned(),
3192 source_ref: Some("src-3".to_owned()),
3193 },
3194 ],
3195 })
3196 .expect("write");
3197 }
3198
3199 let report = service
3200 .read_operational_collection(&crate::operational::OperationalReadRequest {
3201 collection_name: "audit_log".to_owned(),
3202 filters: vec![
3203 crate::operational::OperationalFilterClause::Prefix {
3204 field: "actor".to_owned(),
3205 value: "alice".to_owned(),
3206 },
3207 crate::operational::OperationalFilterClause::Range {
3208 field: "ts".to_owned(),
3209 lower: Some(150),
3210 upper: Some(250),
3211 },
3212 ],
3213 limit: Some(10),
3214 })
3215 .expect("filtered read");
3216
3217 assert_eq!(report.collection_name, "audit_log");
3218 assert_eq!(report.row_count, 1);
3219 assert!(!report.was_limited);
3220 assert_eq!(report.rows.len(), 1);
3221 assert_eq!(report.rows[0].record_key, "evt-2");
3222 assert_eq!(
3223 report.rows[0].payload_json,
3224 r#"{"actor":"alice-admin","seq":2,"ts":200}"#
3225 );
3226 }
3227
3228 #[test]
3229 fn read_operational_collection_uses_secondary_index_when_filter_values_are_missing() {
3230 let (db, service) = setup();
3231 service
3232 .register_operational_collection(&OperationalRegisterRequest {
3233 name: "audit_log".to_owned(),
3234 kind: OperationalCollectionKind::AppendOnlyLog,
3235 schema_json: "{}".to_owned(),
3236 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3237 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3238 validation_json: String::new(),
3239 secondary_indexes_json: r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#.to_owned(),
3240 format_version: 1,
3241 })
3242 .expect("register collection");
3243 {
3244 let writer = crate::WriterActor::start(
3245 db.path(),
3246 Arc::new(SchemaManager::new()),
3247 crate::ProvenanceMode::Warn,
3248 Arc::new(crate::TelemetryCounters::default()),
3249 )
3250 .expect("writer");
3251 writer
3252 .submit(crate::WriteRequest {
3253 label: "operational".to_owned(),
3254 nodes: vec![],
3255 node_retires: vec![],
3256 edges: vec![],
3257 edge_retires: vec![],
3258 chunks: vec![],
3259 runs: vec![],
3260 steps: vec![],
3261 actions: vec![],
3262 optional_backfills: vec![],
3263 vec_inserts: vec![],
3264 operational_writes: vec![
3265 crate::OperationalWrite::Append {
3266 collection: "audit_log".to_owned(),
3267 record_key: "evt-1".to_owned(),
3268 payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
3269 source_ref: Some("src-1".to_owned()),
3270 },
3271 crate::OperationalWrite::Append {
3272 collection: "audit_log".to_owned(),
3273 record_key: "evt-2".to_owned(),
3274 payload_json: r#"{"actor":"alice-admin","ts":200}"#.to_owned(),
3275 source_ref: Some("src-2".to_owned()),
3276 },
3277 ],
3278 })
3279 .expect("write");
3280 }
3281 let conn = sqlite::open_connection(db.path()).expect("conn");
3282 conn.execute(
3283 "DELETE FROM operational_filter_values WHERE collection_name = 'audit_log'",
3284 [],
3285 )
3286 .expect("clear filter values");
3287 drop(conn);
3288
3289 let report = service
3290 .read_operational_collection(&crate::operational::OperationalReadRequest {
3291 collection_name: "audit_log".to_owned(),
3292 filters: vec![
3293 crate::operational::OperationalFilterClause::Prefix {
3294 field: "actor".to_owned(),
3295 value: "alice".to_owned(),
3296 },
3297 crate::operational::OperationalFilterClause::Range {
3298 field: "ts".to_owned(),
3299 lower: Some(150),
3300 upper: Some(250),
3301 },
3302 ],
3303 limit: Some(10),
3304 })
3305 .expect("secondary-index read");
3306
3307 assert_eq!(report.row_count, 1);
3308 assert_eq!(report.rows[0].record_key, "evt-2");
3309 }
3310
3311 #[test]
3312 fn read_operational_collection_rejects_undeclared_fields_and_latest_state_collections() {
3313 let (_db, service) = setup();
3314 service
3315 .register_operational_collection(&OperationalRegisterRequest {
3316 name: "connector_health".to_owned(),
3317 kind: OperationalCollectionKind::LatestState,
3318 schema_json: "{}".to_owned(),
3319 retention_json: "{}".to_owned(),
3320 filter_fields_json: r#"[{"name":"status","type":"string","modes":["exact"]}]"#
3321 .to_owned(),
3322 validation_json: String::new(),
3323 secondary_indexes_json: "[]".to_owned(),
3324 format_version: 1,
3325 })
3326 .expect("register collection");
3327
3328 let latest_state_error = service
3329 .read_operational_collection(&crate::operational::OperationalReadRequest {
3330 collection_name: "connector_health".to_owned(),
3331 filters: vec![crate::operational::OperationalFilterClause::Exact {
3332 field: "status".to_owned(),
3333 value: crate::operational::OperationalFilterValue::String("ok".to_owned()),
3334 }],
3335 limit: Some(10),
3336 })
3337 .expect_err("latest_state filtered reads should be rejected");
3338 assert!(latest_state_error.to_string().contains("append_only_log"));
3339
3340 service
3341 .register_operational_collection(&OperationalRegisterRequest {
3342 name: "audit_log".to_owned(),
3343 kind: OperationalCollectionKind::AppendOnlyLog,
3344 schema_json: "{}".to_owned(),
3345 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3346 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact"]}]"#
3347 .to_owned(),
3348 validation_json: String::new(),
3349 secondary_indexes_json: "[]".to_owned(),
3350 format_version: 1,
3351 })
3352 .expect("register append-only collection");
3353
3354 let undeclared_error = service
3355 .read_operational_collection(&crate::operational::OperationalReadRequest {
3356 collection_name: "audit_log".to_owned(),
3357 filters: vec![crate::operational::OperationalFilterClause::Exact {
3358 field: "missing".to_owned(),
3359 value: crate::operational::OperationalFilterValue::String("x".to_owned()),
3360 }],
3361 limit: Some(10),
3362 })
3363 .expect_err("undeclared field should be rejected");
3364 assert!(undeclared_error.to_string().contains("undeclared"));
3365 }
3366
3367 #[test]
3368 fn read_operational_collection_applies_limit_and_reports_truncation() {
3369 let (db, service) = setup();
3370 service
3371 .register_operational_collection(&OperationalRegisterRequest {
3372 name: "audit_log".to_owned(),
3373 kind: OperationalCollectionKind::AppendOnlyLog,
3374 schema_json: "{}".to_owned(),
3375 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3376 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["prefix"]}]"#
3377 .to_owned(),
3378 validation_json: String::new(),
3379 secondary_indexes_json: "[]".to_owned(),
3380 format_version: 1,
3381 })
3382 .expect("register collection");
3383 {
3384 let writer = crate::WriterActor::start(
3385 db.path(),
3386 Arc::new(SchemaManager::new()),
3387 crate::ProvenanceMode::Warn,
3388 Arc::new(crate::TelemetryCounters::default()),
3389 )
3390 .expect("writer");
3391 writer
3392 .submit(crate::WriteRequest {
3393 label: "operational".to_owned(),
3394 nodes: vec![],
3395 node_retires: vec![],
3396 edges: vec![],
3397 edge_retires: vec![],
3398 chunks: vec![],
3399 runs: vec![],
3400 steps: vec![],
3401 actions: vec![],
3402 optional_backfills: vec![],
3403 vec_inserts: vec![],
3404 operational_writes: vec![
3405 crate::OperationalWrite::Append {
3406 collection: "audit_log".to_owned(),
3407 record_key: "evt-1".to_owned(),
3408 payload_json: r#"{"actor":"alice-1"}"#.to_owned(),
3409 source_ref: Some("src-1".to_owned()),
3410 },
3411 crate::OperationalWrite::Append {
3412 collection: "audit_log".to_owned(),
3413 record_key: "evt-2".to_owned(),
3414 payload_json: r#"{"actor":"alice-2"}"#.to_owned(),
3415 source_ref: Some("src-2".to_owned()),
3416 },
3417 ],
3418 })
3419 .expect("write");
3420 }
3421
3422 let report = service
3423 .read_operational_collection(&crate::operational::OperationalReadRequest {
3424 collection_name: "audit_log".to_owned(),
3425 filters: vec![crate::operational::OperationalFilterClause::Prefix {
3426 field: "actor".to_owned(),
3427 value: "alice".to_owned(),
3428 }],
3429 limit: Some(1),
3430 })
3431 .expect("limited read");
3432
3433 assert_eq!(report.row_count, 1);
3434 assert_eq!(report.applied_limit, 1);
3435 assert!(report.was_limited);
3436 assert_eq!(report.rows[0].record_key, "evt-2");
3437 }
3438
3439 #[test]
3440 fn preexisting_operational_collection_can_gain_filter_contract_after_upgrade() {
3441 let db = NamedTempFile::new().expect("temp db");
3442 let conn = sqlite::open_connection(db.path()).expect("conn");
3443 conn.execute_batch(
3444 r#"
3445 CREATE TABLE operational_collections (
3446 name TEXT PRIMARY KEY,
3447 kind TEXT NOT NULL,
3448 schema_json TEXT NOT NULL,
3449 retention_json TEXT NOT NULL,
3450 format_version INTEGER NOT NULL DEFAULT 1,
3451 created_at INTEGER NOT NULL DEFAULT 100,
3452 disabled_at INTEGER
3453 );
3454 CREATE TABLE operational_mutations (
3455 id TEXT PRIMARY KEY,
3456 collection_name TEXT NOT NULL,
3457 record_key TEXT NOT NULL,
3458 op_kind TEXT NOT NULL,
3459 payload_json TEXT NOT NULL,
3460 source_ref TEXT,
3461 created_at INTEGER NOT NULL DEFAULT 100,
3462 mutation_order INTEGER NOT NULL DEFAULT 1
3463 );
3464 INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at)
3465 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}', 1, 100);
3466 INSERT INTO operational_mutations
3467 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order)
3468 VALUES
3469 ('evt-1', 'audit_log', 'evt-1', 'append', '{"actor":"alice","ts":0}', 'src-1', 100, 1);
3470 "#,
3471 )
3472 .expect("seed pre-v10 schema");
3473 drop(conn);
3474
3475 let service = AdminService::new(db.path(), Arc::new(SchemaManager::new()));
3476 let pre_update = service
3477 .read_operational_collection(&crate::operational::OperationalReadRequest {
3478 collection_name: "audit_log".to_owned(),
3479 filters: vec![crate::operational::OperationalFilterClause::Exact {
3480 field: "actor".to_owned(),
3481 value: crate::operational::OperationalFilterValue::String("alice".to_owned()),
3482 }],
3483 limit: Some(10),
3484 })
3485 .expect_err("read should reject undeclared fields before migration update");
3486 assert!(pre_update.to_string().contains("undeclared"));
3487
3488 let updated = service
3489 .update_operational_collection_filters(
3490 "audit_log",
3491 r#"[{"name":"actor","type":"string","modes":["exact"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#,
3492 )
3493 .expect("update filter contract");
3494 assert!(updated.filter_fields_json.contains("\"actor\""));
3495
3496 let report = service
3497 .read_operational_collection(&crate::operational::OperationalReadRequest {
3498 collection_name: "audit_log".to_owned(),
3499 filters: vec![crate::operational::OperationalFilterClause::Range {
3500 field: "ts".to_owned(),
3501 lower: Some(0),
3502 upper: Some(0),
3503 }],
3504 limit: Some(10),
3505 })
3506 .expect("read after explicit filter update");
3507 assert_eq!(report.row_count, 1);
3508 assert_eq!(report.rows[0].record_key, "evt-1");
3509 }
3510
3511 #[cfg(feature = "sqlite-vec")]
3512 #[test]
3513 fn check_semantics_detects_stale_vec_rows() {
3514 use crate::sqlite::open_connection_with_vec;
3515
3516 let db = NamedTempFile::new().expect("temp file");
3517 let schema = Arc::new(SchemaManager::new());
3518 {
3519 let conn = open_connection_with_vec(db.path()).expect("vec conn");
3520 schema.bootstrap(&conn).expect("bootstrap");
3521 schema
3522 .ensure_vec_kind_profile(&conn, "Doc", 3)
3523 .expect("vec kind profile");
3524 let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
3526 .iter()
3527 .flat_map(|f| f.to_le_bytes())
3528 .collect();
3529 conn.execute(
3530 "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('ghost-chunk', ?1)",
3531 rusqlite::params![bytes],
3532 )
3533 .expect("insert stale vec row");
3534 }
3535 let service = AdminService::new(db.path(), Arc::clone(&schema));
3536 let report = service.check_semantics().expect("semantics check");
3537 assert_eq!(report.stale_vec_rows, 1);
3538 assert!(
3539 report.warnings.iter().any(|w| w.contains("stale vec")),
3540 "warning must mention stale vec"
3541 );
3542 }
3543
3544 #[cfg(feature = "sqlite-vec")]
3545 #[test]
3546 fn restore_vector_profiles_recreates_vec_table_from_metadata() {
3547 let db = NamedTempFile::new().expect("temp file");
3548 let schema = Arc::new(SchemaManager::new());
3549 {
3550 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3551 schema.bootstrap(&conn).expect("bootstrap");
3552 conn.execute(
3553 "INSERT INTO vector_profiles (profile, table_name, dimension, enabled) \
3554 VALUES ('default', 'vec_nodes_active', 3, 1)",
3555 [],
3556 )
3557 .expect("insert vector profile");
3558 }
3559
3560 let service = AdminService::new(db.path(), Arc::clone(&schema));
3561 let report = service
3562 .restore_vector_profiles()
3563 .expect("restore vector profiles");
3564 assert_eq!(
3565 report.targets,
3566 vec![crate::projection::ProjectionTarget::Vec]
3567 );
3568 assert_eq!(report.rebuilt_rows, 1);
3569
3570 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3571 let count: i64 = conn
3572 .query_row(
3573 "SELECT count(*) FROM sqlite_schema WHERE name = 'vec_nodes_active'",
3574 [],
3575 |row| row.get(0),
3576 )
3577 .expect("vec schema count");
3578 assert_eq!(count, 1, "vec table should exist after restore");
3579 }
3580
3581 #[cfg(feature = "sqlite-vec")]
3582 #[test]
3583 fn load_vector_regeneration_config_supports_json_and_toml() {
3584 let dir = tempfile::tempdir().expect("temp dir");
3585 let json_path = dir.path().join("regen.json");
3586 let toml_path = dir.path().join("regen.toml");
3587
3588 let config = VectorRegenerationConfig {
3589 kind: "Document".to_owned(),
3590 profile: "default".to_owned(),
3591 chunking_policy: "per_chunk".to_owned(),
3592 preprocessing_policy: "trim".to_owned(),
3593 };
3594
3595 fs::write(&json_path, serde_json::to_string(&config).expect("json")).expect("write json");
3596 fs::write(&toml_path, toml::to_string(&config).expect("toml")).expect("write toml");
3597
3598 let parsed_json = load_vector_regeneration_config(&json_path).expect("json parse");
3599 let parsed_toml = load_vector_regeneration_config(&toml_path).expect("toml parse");
3600
3601 assert_eq!(parsed_json, config);
3602 assert_eq!(parsed_toml, config);
3603 }
3604
3605 #[test]
3610 fn regenerate_vector_embeddings_config_rejects_old_identity_fields() {
3611 let legacy_json = r#"{
3614 "kind": "Document",
3615 "profile": "default",
3616 "table_name": "vec_nodes_active",
3617 "model_identity": "old-model",
3618 "model_version": "1.0",
3619 "dimension": 4,
3620 "normalization_policy": "l2",
3621 "chunking_policy": "per_chunk",
3622 "preprocessing_policy": "trim",
3623 "generator_command": ["/bin/echo"]
3624 }"#;
3625 let result: Result<VectorRegenerationConfig, _> = serde_json::from_str(legacy_json);
3626 assert!(
3627 result.is_err(),
3628 "legacy identity fields must be rejected at deserialization"
3629 );
3630 }
3631
3632 #[cfg(all(not(feature = "sqlite-vec"), unix))]
3633 #[test]
3634 fn regenerate_vector_embeddings_unsupported_vec_capability_writes_request_and_failed_audit() {
3635 let db = NamedTempFile::new().expect("temp file");
3636 let schema = Arc::new(SchemaManager::new());
3637
3638 {
3639 let conn = sqlite::open_connection(db.path()).expect("connection");
3640 schema.bootstrap(&conn).expect("bootstrap");
3641 conn.execute(
3642 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3643 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3644 [],
3645 )
3646 .expect("insert node");
3647 conn.execute(
3648 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3649 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3650 [],
3651 )
3652 .expect("insert chunk");
3653 }
3654
3655 let service = AdminService::new(db.path(), Arc::clone(&schema));
3656 let embedder = TestEmbedder::new("test-model", 4);
3657 let error = service
3658 .regenerate_vector_embeddings(
3659 &embedder,
3660 &VectorRegenerationConfig {
3661 kind: "Document".to_owned(),
3662 profile: "default".to_owned(),
3663 chunking_policy: "per_chunk".to_owned(),
3664 preprocessing_policy: "trim".to_owned(),
3665 },
3666 )
3667 .expect_err("sqlite-vec capability should be required");
3668
3669 assert!(error.to_string().contains("unsupported vec capability"));
3670
3671 let conn = sqlite::open_connection(db.path()).expect("connection");
3672 let request_count: i64 = conn
3673 .query_row(
3674 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3675 [],
3676 |row| row.get(0),
3677 )
3678 .expect("request count");
3679 assert_eq!(request_count, 1);
3680 let failed_count: i64 = conn
3681 .query_row(
3682 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3683 [],
3684 |row| row.get(0),
3685 )
3686 .expect("failed count");
3687 assert_eq!(failed_count, 1);
3688 let metadata_json: String = conn
3689 .query_row(
3690 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3691 [],
3692 |row| row.get(0),
3693 )
3694 .expect("failed metadata");
3695 assert!(metadata_json.contains("\"failure_class\":\"unsupported vec capability\""));
3696 }
3697
3698 #[cfg(feature = "sqlite-vec")]
3699 #[test]
3700 #[allow(clippy::too_many_lines)]
3701 fn regenerate_vector_embeddings_rebuilds_embeddings_via_embedder() {
3702 let db = NamedTempFile::new().expect("temp file");
3703 let schema = Arc::new(SchemaManager::new());
3704
3705 {
3706 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3707 schema.bootstrap(&conn).expect("bootstrap");
3708 conn.execute(
3709 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3710 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3711 [],
3712 )
3713 .expect("insert node");
3714 conn.execute(
3715 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3716 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3717 [],
3718 )
3719 .expect("insert chunk 1");
3720 conn.execute(
3721 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3722 VALUES ('chunk-2', 'doc-1', 'travel plan', 101)",
3723 [],
3724 )
3725 .expect("insert chunk 2");
3726 }
3727
3728 let service = AdminService::new(db.path(), Arc::clone(&schema));
3729 let embedder = TestEmbedder::new("test-model", 4);
3730 let report = service
3731 .regenerate_vector_embeddings(
3732 &embedder,
3733 &VectorRegenerationConfig {
3734 kind: "Document".to_owned(),
3735 profile: "default".to_owned(),
3736 chunking_policy: "per_chunk".to_owned(),
3737 preprocessing_policy: "trim".to_owned(),
3738 },
3739 )
3740 .expect("regenerate vectors");
3741
3742 assert_eq!(report.profile, "default");
3743 assert_eq!(report.table_name, "vec_document");
3744 assert_eq!(report.dimension, 4);
3745 assert_eq!(report.total_chunks, 2);
3746 assert_eq!(report.regenerated_rows, 2);
3747 assert!(report.contract_persisted);
3748
3749 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3750 let vec_count: i64 = conn
3751 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3752 .expect("vec count");
3753 assert_eq!(vec_count, 2);
3754
3755 let (model_identity, model_version, dimension, normalization_policy): (
3759 String,
3760 String,
3761 i64,
3762 String,
3763 ) = conn
3764 .query_row(
3765 "SELECT model_identity, model_version, dimension, normalization_policy \
3766 FROM vector_embedding_contracts WHERE profile = 'default'",
3767 [],
3768 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3769 )
3770 .expect("contract row");
3771 assert_eq!(model_identity, "test-model");
3772 assert_eq!(model_version, "1.0.0");
3773 assert_eq!(dimension, 4);
3774 assert_eq!(normalization_policy, "l2");
3775
3776 let contract_format_version: i64 = conn
3777 .query_row(
3778 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
3779 [],
3780 |row| row.get(0),
3781 )
3782 .expect("contract_format_version");
3783 assert_eq!(contract_format_version, 1);
3784 let request_count: i64 = conn
3785 .query_row(
3786 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3787 [],
3788 |row| row.get(0),
3789 )
3790 .expect("request audit count");
3791 assert_eq!(request_count, 1);
3792 let apply_count: i64 = conn
3793 .query_row(
3794 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3795 [],
3796 |row| row.get(0),
3797 )
3798 .expect("apply audit count");
3799 assert_eq!(apply_count, 1);
3800 let apply_metadata: String = conn
3801 .query_row(
3802 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3803 [],
3804 |row| row.get(0),
3805 )
3806 .expect("apply metadata");
3807 assert!(apply_metadata.contains("\"profile\":\"default\""));
3808 assert!(apply_metadata.contains("\"snapshot_hash\":"));
3809 assert!(apply_metadata.contains("\"model_identity\":\"test-model\""));
3810 }
3811
3812 #[cfg(feature = "sqlite-vec")]
3813 #[test]
3814 #[allow(clippy::too_many_lines)]
3815 fn regenerate_vector_embeddings_embedder_failure_leaves_contract_and_vec_rows_unchanged() {
3816 let db = NamedTempFile::new().expect("temp file");
3817 let schema = Arc::new(SchemaManager::new());
3818
3819 {
3820 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3821 schema.bootstrap(&conn).expect("bootstrap");
3822 conn.execute(
3823 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3824 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3825 [],
3826 )
3827 .expect("insert node");
3828 conn.execute(
3829 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3830 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3831 [],
3832 )
3833 .expect("insert chunk");
3834 schema
3835 .ensure_vec_kind_profile(&conn, "Document", 4)
3836 .expect("ensure vec kind profile");
3837 conn.execute(
3838 r"
3839 INSERT INTO vector_embedding_contracts (
3840 profile,
3841 table_name,
3842 model_identity,
3843 model_version,
3844 dimension,
3845 normalization_policy,
3846 chunking_policy,
3847 preprocessing_policy,
3848 generator_command_json,
3849 applied_at,
3850 snapshot_hash
3851 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3852 ",
3853 rusqlite::params![
3854 "default",
3855 "vec_document",
3856 "old-model",
3857 "0.9.0",
3858 4,
3859 "l2",
3860 "per_chunk",
3861 "trim",
3862 "[]",
3863 111,
3864 "old-snapshot"
3865 ],
3866 )
3867 .expect("seed contract");
3868 conn.execute(
3869 "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
3870 [],
3871 )
3872 .expect("seed vec row");
3873 }
3874
3875 let service = AdminService::new(db.path(), Arc::clone(&schema));
3876 let failing = FailingEmbedder {
3877 identity: QueryEmbedderIdentity {
3878 model_identity: "new-model".to_owned(),
3879 model_version: "1.0.0".to_owned(),
3880 dimension: 4,
3881 normalization_policy: "l2".to_owned(),
3882 },
3883 };
3884 let error = service
3885 .regenerate_vector_embeddings(
3886 &failing,
3887 &VectorRegenerationConfig {
3888 kind: "Document".to_owned(),
3889 profile: "default".to_owned(),
3890 chunking_policy: "per_chunk".to_owned(),
3891 preprocessing_policy: "trim".to_owned(),
3892 },
3893 )
3894 .expect_err("embedder should fail");
3895
3896 assert!(error.to_string().contains("embedder failure"));
3897
3898 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3899 let model_identity: String = conn
3900 .query_row(
3901 "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
3902 [],
3903 |row| row.get(0),
3904 )
3905 .expect("model identity");
3906 assert_eq!(model_identity, "old-model");
3907 let snapshot_hash: String = conn
3908 .query_row(
3909 "SELECT snapshot_hash FROM vector_embedding_contracts WHERE profile = 'default'",
3910 [],
3911 |row| row.get(0),
3912 )
3913 .expect("snapshot hash");
3914 assert_eq!(snapshot_hash, "old-snapshot");
3915 let vec_count: i64 = conn
3916 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3917 .expect("vec count");
3918 assert_eq!(vec_count, 1);
3919 let failure_count: i64 = conn
3920 .query_row(
3921 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3922 [],
3923 |row| row.get(0),
3924 )
3925 .expect("failure count");
3926 assert_eq!(failure_count, 1);
3927 let failure_metadata: String = conn
3928 .query_row(
3929 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3930 [],
3931 |row| row.get(0),
3932 )
3933 .expect("failure metadata");
3934 assert!(failure_metadata.contains("\"failure_class\":\"embedder failure\""));
3935 }
3936
3937 #[cfg(feature = "sqlite-vec")]
3948 #[test]
3949 fn regenerate_vector_embeddings_rejects_whitespace_only_profile_before_mutation() {
3950 let db = NamedTempFile::new().expect("temp file");
3951 let schema = Arc::new(SchemaManager::new());
3952 {
3953 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3954 schema.bootstrap(&conn).expect("bootstrap");
3955 conn.execute(
3956 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3957 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3958 [],
3959 )
3960 .expect("insert node");
3961 conn.execute(
3962 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3963 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3964 [],
3965 )
3966 .expect("insert chunk");
3967 }
3968
3969 let service = AdminService::new(db.path(), Arc::clone(&schema));
3970 let embedder = TestEmbedder::new("test-model", 4);
3971 let error = service
3972 .regenerate_vector_embeddings(
3973 &embedder,
3974 &VectorRegenerationConfig {
3975 kind: "Document".to_owned(),
3976 profile: " ".to_owned(),
3977 chunking_policy: "per_chunk".to_owned(),
3978 preprocessing_policy: "trim".to_owned(),
3979 },
3980 )
3981 .expect_err("whitespace profile should be rejected");
3982
3983 assert!(error.to_string().contains("invalid contract"));
3984 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3985 let contract_count: i64 = conn
3986 .query_row(
3987 "SELECT count(*) FROM vector_embedding_contracts",
3988 [],
3989 |row| row.get(0),
3990 )
3991 .expect("contract count");
3992 assert_eq!(contract_count, 0);
3993 let provenance_count: i64 = conn
3994 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
3995 row.get(0)
3996 })
3997 .expect("provenance count");
3998 assert_eq!(provenance_count, 0);
3999 }
4000
4001 #[cfg(feature = "sqlite-vec")]
4002 #[test]
4003 fn regenerate_vector_embeddings_rejects_future_contract_format_version() {
4004 let db = NamedTempFile::new().expect("temp file");
4005 let schema = Arc::new(SchemaManager::new());
4006 {
4007 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4008 schema.bootstrap(&conn).expect("bootstrap");
4009 conn.execute(
4010 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4011 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
4012 [],
4013 )
4014 .expect("insert node");
4015 conn.execute(
4016 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4017 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
4018 [],
4019 )
4020 .expect("insert chunk");
4021 conn.execute(
4022 r"
4023 INSERT INTO vector_embedding_contracts (
4024 profile,
4025 table_name,
4026 model_identity,
4027 model_version,
4028 dimension,
4029 normalization_policy,
4030 chunking_policy,
4031 preprocessing_policy,
4032 generator_command_json,
4033 applied_at,
4034 snapshot_hash,
4035 contract_format_version,
4036 updated_at
4037 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
4038 ",
4039 rusqlite::params![
4040 "default",
4041 "vec_nodes_active",
4042 "old-model",
4043 "0.9.0",
4044 4,
4045 "l2",
4046 "per_chunk",
4047 "trim",
4048 "[]",
4049 111,
4050 "old-snapshot",
4051 99,
4052 111,
4053 ],
4054 )
4055 .expect("seed future contract");
4056 }
4057
4058 let service = AdminService::new(db.path(), Arc::clone(&schema));
4059 let embedder = TestEmbedder::new("test-model", 4);
4060 let error = service
4061 .regenerate_vector_embeddings(
4062 &embedder,
4063 &VectorRegenerationConfig {
4064 kind: "Document".to_owned(),
4065 profile: "default".to_owned(),
4066 chunking_policy: "per_chunk".to_owned(),
4067 preprocessing_policy: "trim".to_owned(),
4068 },
4069 )
4070 .expect_err("future contract version should be rejected");
4071
4072 assert!(error.to_string().contains("unsupported"));
4073 assert!(error.to_string().contains("format version"));
4074 }
4075
4076 #[test]
4077 fn check_semantics_detects_orphaned_chunk() {
4078 let (db, service) = setup();
4079 {
4080 let conn = sqlite::open_connection(db.path()).expect("conn");
4082 conn.execute(
4083 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4084 VALUES ('c1', 'ghost-node', 'text', 100)",
4085 [],
4086 )
4087 .expect("insert orphaned chunk");
4088 }
4089 let report = service.check_semantics().expect("semantics check");
4090 assert_eq!(report.orphaned_chunks, 1);
4091 }
4092
4093 #[test]
4094 fn check_semantics_detects_null_source_ref() {
4095 let (db, service) = setup();
4096 {
4097 let conn = sqlite::open_connection(db.path()).expect("conn");
4098 conn.execute(
4099 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4100 VALUES ('r1', 'lg1', 'Meeting', '{}', 100)",
4101 [],
4102 )
4103 .expect("insert node with null source_ref");
4104 }
4105 let report = service.check_semantics().expect("semantics check");
4106 assert_eq!(report.null_source_ref_nodes, 1);
4107 }
4108
4109 #[test]
4110 fn check_semantics_detects_broken_step_fk() {
4111 let (db, service) = setup();
4112 {
4113 let conn = sqlite::open_connection(db.path()).expect("conn");
4116 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4117 .expect("disable FK");
4118 conn.execute(
4119 "INSERT INTO steps (id, run_id, kind, status, properties, created_at) \
4120 VALUES ('s1', 'ghost-run', 'llm', 'completed', '{}', 100)",
4121 [],
4122 )
4123 .expect("insert step with ghost run_id");
4124 }
4125 let report = service.check_semantics().expect("semantics check");
4126 assert_eq!(report.broken_step_fk, 1);
4127 }
4128
4129 #[test]
4130 fn check_semantics_detects_broken_action_fk() {
4131 let (db, service) = setup();
4132 {
4133 let conn = sqlite::open_connection(db.path()).expect("conn");
4134 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4135 .expect("disable FK");
4136 conn.execute(
4137 "INSERT INTO actions (id, step_id, kind, status, properties, created_at) \
4138 VALUES ('a1', 'ghost-step', 'emit', 'completed', '{}', 100)",
4139 [],
4140 )
4141 .expect("insert action with ghost step_id");
4142 }
4143 let report = service.check_semantics().expect("semantics check");
4144 assert_eq!(report.broken_action_fk, 1);
4145 }
4146
4147 #[test]
4148 fn check_semantics_detects_stale_fts_rows() {
4149 let (db, service) = setup();
4150 {
4151 let conn = sqlite::open_connection(db.path()).expect("conn");
4152 conn.execute(
4155 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4156 VALUES ('ghost-chunk', 'any-node', 'Meeting', 'stale content')",
4157 [],
4158 )
4159 .expect("insert stale FTS row");
4160 }
4161 let report = service.check_semantics().expect("semantics check");
4162 assert_eq!(report.stale_fts_rows, 1);
4163 }
4164
4165 #[test]
4166 fn check_semantics_detects_fts_rows_for_superseded_nodes() {
4167 let (db, service) = setup();
4168 {
4169 let conn = sqlite::open_connection(db.path()).expect("conn");
4170 conn.execute(
4172 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4173 VALUES ('r1', 'lg-sup', 'Meeting', '{}', 100, 200, 'src-1')",
4174 [],
4175 )
4176 .expect("insert superseded node");
4177 conn.execute(
4179 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4180 VALUES ('ck-x', 'lg-sup', 'Meeting', 'superseded content')",
4181 [],
4182 )
4183 .expect("insert FTS row for superseded node");
4184 }
4185 let report = service.check_semantics().expect("semantics check");
4186 assert_eq!(report.fts_rows_for_superseded_nodes, 1);
4187 }
4188
4189 #[test]
4190 fn check_semantics_detects_dangling_edges() {
4191 let (db, service) = setup();
4192 {
4193 let conn = sqlite::open_connection(db.path()).expect("conn");
4194 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4195 .expect("disable FK");
4196 conn.execute(
4198 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4199 VALUES ('r1', 'lg-src', 'Meeting', '{}', 100, 'src-1')",
4200 [],
4201 )
4202 .expect("insert source node");
4203 conn.execute(
4204 "INSERT INTO edges \
4205 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4206 VALUES ('e1', 'edge-1', 'lg-src', 'ghost-target', 'LINKS', '{}', 100, 'src-1')",
4207 [],
4208 )
4209 .expect("insert dangling edge");
4210 }
4211 let report = service.check_semantics().expect("semantics check");
4212 assert_eq!(report.dangling_edges, 1);
4213 }
4214
4215 #[test]
4216 fn check_semantics_detects_orphaned_supersession_chains() {
4217 let (db, service) = setup();
4218 {
4219 let conn = sqlite::open_connection(db.path()).expect("conn");
4220 conn.execute(
4222 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4223 VALUES ('r1', 'lg-orphaned', 'Meeting', '{}', 100, 200, 'src-1')",
4224 [],
4225 )
4226 .expect("insert fully superseded node");
4227 }
4228 let report = service.check_semantics().expect("semantics check");
4229 assert_eq!(report.orphaned_supersession_chains, 1);
4230 }
4231
4232 #[test]
4233 fn check_semantics_detects_mismatched_kind_property_fts_rows() {
4234 let (db, service) = setup();
4240 {
4241 let conn = sqlite::open_connection(db.path()).expect("conn");
4242 conn.execute(
4243 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4244 VALUES ('Goal', '[\"$.name\"]', ' ')",
4245 [],
4246 )
4247 .expect("register schema");
4248 conn.execute(
4249 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4250 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4251 [],
4252 )
4253 .expect("insert node");
4254 let table = fathomdb_schema::fts_kind_table_name("Goal");
4256 conn.execute_batch(&format!(
4257 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4258 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4259 ))
4260 .expect("create per-kind table");
4261 conn.execute(
4262 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4263 [],
4264 )
4265 .expect("insert per-kind FTS row");
4266 }
4267 let report = service.check_semantics().expect("semantics check");
4268 assert_eq!(report.mismatched_kind_property_fts_rows, 0);
4270 }
4271
4272 #[test]
4273 fn check_semantics_detects_duplicate_property_fts_rows() {
4274 let (db, service) = setup();
4275 {
4276 let conn = sqlite::open_connection(db.path()).expect("conn");
4277 conn.execute(
4278 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4279 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4280 [],
4281 )
4282 .expect("insert node");
4283 let table = fathomdb_schema::fts_kind_table_name("Goal");
4285 conn.execute_batch(&format!(
4286 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4287 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4288 ))
4289 .expect("create per-kind table");
4290 conn.execute(
4291 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4292 [],
4293 )
4294 .expect("insert first property FTS row");
4295 conn.execute(
4296 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2 duplicate')"),
4297 [],
4298 )
4299 .expect("insert duplicate property FTS row");
4300 }
4301 let report = service.check_semantics().expect("semantics check");
4302 assert_eq!(report.duplicate_property_fts_rows, 1);
4303 }
4304
4305 #[test]
4306 fn check_semantics_detects_drifted_property_fts_text() {
4307 let (db, service) = setup();
4308 {
4309 let conn = sqlite::open_connection(db.path()).expect("conn");
4310 conn.execute(
4311 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4312 VALUES ('Goal', '[\"$.name\"]', ' ')",
4313 [],
4314 )
4315 .expect("register schema");
4316 conn.execute(
4317 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4318 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Current name\"}', 100, 'src-1')",
4319 [],
4320 )
4321 .expect("insert node");
4322 let table = fathomdb_schema::fts_kind_table_name("Goal");
4324 conn.execute_batch(&format!(
4325 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4326 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4327 ))
4328 .expect("create per-kind table");
4329 conn.execute(
4330 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Old stale name')"),
4331 [],
4332 )
4333 .expect("insert stale property FTS row");
4334 }
4335 let report = service.check_semantics().expect("semantics check");
4336 assert_eq!(report.drifted_property_fts_rows, 1);
4337 }
4338
4339 #[test]
4340 fn check_semantics_detects_property_fts_row_that_should_not_exist() {
4341 let (db, service) = setup();
4342 {
4343 let conn = sqlite::open_connection(db.path()).expect("conn");
4344 conn.execute(
4345 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4346 VALUES ('Goal', '[\"$.searchable\"]', ' ')",
4347 [],
4348 )
4349 .expect("register schema");
4350 conn.execute(
4352 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4353 VALUES ('r1', 'goal-1', 'Goal', '{\"other\":\"field\"}', 100, 'src-1')",
4354 [],
4355 )
4356 .expect("insert node");
4357 let table = fathomdb_schema::fts_kind_table_name("Goal");
4359 conn.execute_batch(&format!(
4360 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4361 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4362 ))
4363 .expect("create per-kind table");
4364 conn.execute(
4365 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'phantom text')"),
4366 [],
4367 )
4368 .expect("insert phantom property FTS row");
4369 }
4370 let report = service.check_semantics().expect("semantics check");
4371 assert_eq!(
4372 report.drifted_property_fts_rows, 1,
4373 "row that should not exist must be counted as drifted"
4374 );
4375 }
4376
4377 #[test]
4383 fn check_semantics_clean_on_weighted_fts_schema_does_not_panic() {
4384 let (db, service) = setup();
4385 let entries = vec![
4391 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4392 FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4393 ];
4394 service
4395 .register_fts_property_schema_with_entries(
4396 "Article",
4397 &entries,
4398 Some(" "),
4399 &[],
4400 crate::rebuild_actor::RebuildMode::Eager,
4401 )
4402 .expect("register weighted schema");
4403
4404 {
4407 let conn = sqlite::open_connection(db.path()).expect("conn");
4408 let properties = r#"{"title":"Hello","body":{"text":"world"}}"#;
4409 conn.execute(
4410 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4411 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4412 [properties],
4413 )
4414 .expect("insert node");
4415
4416 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4417 let (_kind, schema) = schemas
4418 .iter()
4419 .find(|(k, _)| k == "Article")
4420 .expect("weighted schema present");
4421 let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4422 let cols = crate::writer::extract_property_fts_columns(&props, schema);
4423
4424 let table = fathomdb_schema::fts_kind_table_name("Article");
4425 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4426 let placeholders: Vec<String> =
4427 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4428 let sql = format!(
4429 "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4430 cols = col_names.join(", "),
4431 placeholders = placeholders.join(", "),
4432 );
4433 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4434 let params: Vec<&dyn rusqlite::ToSql> =
4435 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4436 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4437 .collect();
4438 conn.execute(&sql, params.as_slice())
4439 .expect("insert weighted FTS row");
4440 }
4441
4442 let report = service
4443 .check_semantics()
4444 .expect("semantics check must not crash on weighted schema");
4445 assert_eq!(report.drifted_property_fts_rows, 0);
4446 }
4447
4448 #[test]
4452 fn check_semantics_detects_drifted_property_fts_text_weighted() {
4453 let (db, service) = setup();
4454 let entries = vec![
4457 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4458 FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4459 ];
4460 service
4461 .register_fts_property_schema_with_entries(
4462 "Article",
4463 &entries,
4464 Some(" "),
4465 &[],
4466 crate::rebuild_actor::RebuildMode::Eager,
4467 )
4468 .expect("register weighted schema");
4469
4470 let title_col = fathomdb_schema::fts_column_name("$.title", false);
4471
4472 {
4473 let conn = sqlite::open_connection(db.path()).expect("conn");
4474 let properties = r#"{"title":"Current","body":{"text":"body"}}"#;
4475 conn.execute(
4476 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4477 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4478 [properties],
4479 )
4480 .expect("insert node");
4481
4482 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4483 let (_kind, schema) = schemas
4484 .iter()
4485 .find(|(k, _)| k == "Article")
4486 .expect("weighted schema present");
4487 let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4488 let cols = crate::writer::extract_property_fts_columns(&props, schema);
4489
4490 let table = fathomdb_schema::fts_kind_table_name("Article");
4491 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4492 let placeholders: Vec<String> =
4493 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4494 let sql = format!(
4495 "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4496 cols = col_names.join(", "),
4497 placeholders = placeholders.join(", "),
4498 );
4499 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4500 let params: Vec<&dyn rusqlite::ToSql> =
4501 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4502 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4503 .collect();
4504 conn.execute(&sql, params.as_slice())
4505 .expect("insert weighted FTS row");
4506
4507 conn.execute(
4509 &format!("UPDATE {table} SET {title_col} = 'tampered' WHERE node_logical_id = 'article-1'"),
4510 [],
4511 )
4512 .expect("tamper weighted FTS row");
4513 }
4514
4515 let report = service.check_semantics().expect("semantics check");
4516 assert_eq!(report.drifted_property_fts_rows, 1);
4517 }
4518
4519 #[test]
4524 fn check_semantics_mixed_weighted_and_non_weighted_schemas() {
4525 let (db, service) = setup();
4526
4527 let weighted_entries = vec![
4529 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4530 FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4531 ];
4532 service
4533 .register_fts_property_schema_with_entries(
4534 "Article",
4535 &weighted_entries,
4536 Some(" "),
4537 &[],
4538 crate::rebuild_actor::RebuildMode::Eager,
4539 )
4540 .expect("register weighted schema");
4541
4542 {
4545 let conn = sqlite::open_connection(db.path()).expect("conn");
4546 conn.execute(
4547 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4548 VALUES ('Goal', '[\"$.name\"]', ' ')",
4549 [],
4550 )
4551 .expect("register non-weighted schema");
4552 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
4553 conn.execute_batch(&format!(
4554 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
4555 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4556 ))
4557 .expect("create non-weighted per-kind table");
4558
4559 let article_props = r#"{"title":"Hello","body":{"text":"world"}}"#;
4561 conn.execute(
4562 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4563 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4564 [article_props],
4565 )
4566 .expect("insert article");
4567
4568 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4569 let (_k, article_schema) = schemas
4570 .iter()
4571 .find(|(k, _)| k == "Article")
4572 .expect("Article schema present");
4573 let props: serde_json::Value =
4574 serde_json::from_str(article_props).expect("parse article props");
4575 let cols = crate::writer::extract_property_fts_columns(&props, article_schema);
4576 let article_table = fathomdb_schema::fts_kind_table_name("Article");
4577 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4578 let placeholders: Vec<String> =
4579 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4580 let sql = format!(
4581 "INSERT INTO {article_table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4582 cols = col_names.join(", "),
4583 placeholders = placeholders.join(", "),
4584 );
4585 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4586 let params: Vec<&dyn rusqlite::ToSql> =
4587 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4588 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4589 .collect();
4590 conn.execute(&sql, params.as_slice())
4591 .expect("insert weighted FTS row");
4592
4593 conn.execute(
4597 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4598 VALUES ('r2', 'goal-1', 'Goal', '{\"name\":\"Goal One\"}', 100, 'src-2')",
4599 [],
4600 )
4601 .expect("insert goal node");
4602 conn.execute(
4603 &format!("INSERT INTO {goal_table} (node_logical_id, text_content) VALUES ('goal-1', 'Goal One')"),
4604 [],
4605 )
4606 .expect("insert non-weighted FTS row");
4607 }
4608
4609 let report = service
4610 .check_semantics()
4611 .expect("semantics check must handle both shapes");
4612 assert_eq!(
4613 report.drifted_property_fts_rows, 0,
4614 "clean mixed weighted + non-weighted DB must report 0 drift"
4615 );
4616 }
4617
4618 #[test]
4634 fn check_semantics_weighted_schema_with_text_content_path() {
4635 let (db, service) = setup();
4636 let entries = vec![
4637 FtsPropertyPathSpec::scalar("$.text_content").with_weight(2.0),
4638 FtsPropertyPathSpec::scalar("$.title").with_weight(1.0),
4639 ];
4640 service
4641 .register_fts_property_schema_with_entries(
4642 "Article",
4643 &entries,
4644 Some(" "),
4645 &[],
4646 crate::rebuild_actor::RebuildMode::Eager,
4647 )
4648 .expect("register weighted schema with $.text_content path");
4649
4650 {
4651 let conn = sqlite::open_connection(db.path()).expect("conn");
4652 let properties = r#"{"text_content":"canonical body","title":"Hello"}"#;
4659 conn.execute(
4660 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4661 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4662 [properties],
4663 )
4664 .expect("insert node");
4665
4666 let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4667 let (_kind, schema) = schemas
4668 .iter()
4669 .find(|(k, _)| k == "Article")
4670 .expect("weighted schema present");
4671 let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4672 let cols = crate::writer::extract_property_fts_columns(&props, schema);
4673
4674 let table = fathomdb_schema::fts_kind_table_name("Article");
4675 let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4676 let placeholders: Vec<String> =
4677 (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4678 let sql = format!(
4679 "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4680 cols = col_names.join(", "),
4681 placeholders = placeholders.join(", "),
4682 );
4683 let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4684 let params: Vec<&dyn rusqlite::ToSql> =
4685 std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4686 .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4687 .collect();
4688 conn.execute(&sql, params.as_slice())
4689 .expect("insert weighted FTS row");
4690 }
4691
4692 let report = service.check_semantics().expect("semantics check");
4693 assert_eq!(
4694 report.drifted_property_fts_rows, 0,
4695 "weighted schema whose path collapses to `text_content` must be \
4696 dispatched as weighted (per-column comparator); a clean DB \
4697 must report 0 drift"
4698 );
4699 }
4700
4701 #[test]
4702 fn safe_export_writes_manifest_with_sha256() {
4703 let (_db, service) = setup();
4704 let export_dir = tempfile::TempDir::new().expect("temp dir");
4705 let export_path = export_dir.path().join("backup.db");
4706
4707 let manifest = service
4708 .safe_export(
4709 &export_path,
4710 SafeExportOptions {
4711 force_checkpoint: false,
4712 },
4713 )
4714 .expect("export");
4715
4716 assert!(export_path.exists(), "exported db should exist");
4717 let manifest_path = export_dir.path().join("backup.db.export-manifest.json");
4718 assert!(
4719 manifest_path.exists(),
4720 "manifest file should exist at {}",
4721 manifest_path.display()
4722 );
4723 assert_eq!(manifest.sha256.len(), 64, "sha256 should be 64 hex chars");
4724 assert!(
4725 manifest.exported_at > 0,
4726 "exported_at should be a unix timestamp"
4727 );
4728 assert_eq!(
4729 manifest.schema_version,
4730 SchemaManager::new().current_version().0,
4731 "schema_version should match the live schema version"
4732 );
4733 assert_eq!(manifest.protocol_version, 1, "protocol_version should be 1");
4734 assert!(manifest.page_count > 0, "page_count should be positive");
4735 }
4736
4737 #[test]
4738 fn safe_export_preserves_operational_validation_contracts() {
4739 let (_db, service) = setup();
4740 let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
4741 service
4742 .register_operational_collection(&OperationalRegisterRequest {
4743 name: "connector_health".to_owned(),
4744 kind: OperationalCollectionKind::LatestState,
4745 schema_json: "{}".to_owned(),
4746 retention_json: "{}".to_owned(),
4747 filter_fields_json: "[]".to_owned(),
4748 validation_json: validation_json.to_owned(),
4749 secondary_indexes_json: "[]".to_owned(),
4750 format_version: 1,
4751 })
4752 .expect("register collection");
4753
4754 let export_dir = tempfile::TempDir::new().expect("temp dir");
4755 let export_path = export_dir.path().join("backup.db");
4756 service
4757 .safe_export(
4758 &export_path,
4759 SafeExportOptions {
4760 force_checkpoint: false,
4761 },
4762 )
4763 .expect("export");
4764
4765 let exported = sqlite::open_connection(&export_path).expect("exported conn");
4766 let exported_validation_json: String = exported
4767 .query_row(
4768 "SELECT validation_json FROM operational_collections WHERE name = 'connector_health'",
4769 [],
4770 |row| row.get(0),
4771 )
4772 .expect("validation_json");
4773 assert_eq!(exported_validation_json, validation_json);
4774 }
4775
4776 #[test]
4777 fn safe_export_force_checkpoint_false_skips_wal_pragma() {
4778 let (_db, service) = setup();
4779 let export_dir = tempfile::TempDir::new().expect("temp dir");
4780 let export_path = export_dir.path().join("no-wal.db");
4781
4782 let manifest = service
4784 .safe_export(
4785 &export_path,
4786 SafeExportOptions {
4787 force_checkpoint: false,
4788 },
4789 )
4790 .expect("export with no checkpoint");
4791
4792 assert!(
4793 manifest.page_count > 0,
4794 "page_count must be populated regardless of checkpoint mode"
4795 );
4796 assert_eq!(
4797 manifest.schema_version,
4798 SchemaManager::new().current_version().0
4799 );
4800 assert_eq!(manifest.protocol_version, 1);
4801 }
4802
4803 #[test]
4804 fn safe_export_force_checkpoint_false_still_captures_wal_backed_changes() {
4805 let (db, service) = setup();
4806 let conn = sqlite::open_connection(db.path()).expect("conn");
4807 let journal_mode: String = conn
4808 .query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
4809 .expect("enable wal");
4810 assert_eq!(journal_mode.to_lowercase(), "wal");
4811 let auto_checkpoint_pages: i64 = conn
4812 .query_row("PRAGMA wal_autocheckpoint=0", [], |row| row.get(0))
4813 .expect("disable auto checkpoint");
4814 assert_eq!(auto_checkpoint_pages, 0);
4815 conn.execute(
4816 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4817 VALUES ('r-wal', 'lg-wal', 'Meeting', '{}', 100, 'src-wal')",
4818 [],
4819 )
4820 .expect("insert wal-backed node");
4821
4822 let export_dir = tempfile::TempDir::new().expect("temp dir");
4823 let export_path = export_dir.path().join("wal-backed.db");
4824 service
4825 .safe_export(
4826 &export_path,
4827 SafeExportOptions {
4828 force_checkpoint: false,
4829 },
4830 )
4831 .expect("export wal-backed db");
4832
4833 let exported = sqlite::open_connection(&export_path).expect("open exported db");
4834 let exported_count: i64 = exported
4835 .query_row(
4836 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-wal'",
4837 [],
4838 |row| row.get(0),
4839 )
4840 .expect("count exported nodes");
4841 assert_eq!(
4842 exported_count, 1,
4843 "safe_export must include committed rows that are still resident in the WAL"
4844 );
4845 }
4846
4847 #[test]
4848 fn excise_source_removes_searchable_content_after_excision() {
4849 let (db, service) = setup();
4850 {
4851 let conn = sqlite::open_connection(db.path()).expect("conn");
4852 conn.execute(
4853 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4854 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4855 [],
4856 )
4857 .expect("insert v1");
4858 conn.execute(
4859 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4860 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4861 [],
4862 )
4863 .expect("insert v2");
4864 conn.execute(
4865 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4866 VALUES ('ck1', 'lg1', 'hello world', 100)",
4867 [],
4868 )
4869 .expect("insert chunk");
4870 }
4871 service.excise_source("source-2").expect("excise");
4872 {
4873 let conn = sqlite::open_connection(db.path()).expect("conn");
4874 let fts_count: i64 = conn
4875 .query_row(
4876 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'ck1'",
4877 [],
4878 |row| row.get(0),
4879 )
4880 .expect("fts count");
4881 assert_eq!(
4882 fts_count, 0,
4883 "excised content should not remain searchable after excise"
4884 );
4885 }
4886 }
4887
4888 #[cfg(feature = "sqlite-vec")]
4889 #[test]
4890 fn excise_source_cleans_chunks_and_vec_rows_for_excised_version() {
4891 let (db, service) = setup();
4892 {
4893 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4894 service
4895 .schema_manager
4896 .ensure_vec_kind_profile(&conn, "Meeting", 4)
4897 .expect("ensure vec kind profile");
4898 conn.execute(
4899 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4900 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4901 [],
4902 )
4903 .expect("insert v1");
4904 conn.execute(
4905 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4906 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4907 [],
4908 )
4909 .expect("insert v2");
4910 conn.execute(
4911 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4912 VALUES ('ck1', 'lg1', 'new content', 200)",
4913 [],
4914 )
4915 .expect("insert chunk");
4916 conn.execute(
4917 "INSERT INTO vec_meeting (chunk_id, embedding) VALUES ('ck1', zeroblob(16))",
4918 [],
4919 )
4920 .expect("insert vec row");
4921 }
4922
4923 service.excise_source("source-2").expect("excise");
4924
4925 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4926 let active_row: String = conn
4927 .query_row(
4928 "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
4929 [],
4930 |row| row.get(0),
4931 )
4932 .expect("restored active row");
4933 assert_eq!(active_row, "r1");
4934 let chunk_count: i64 = conn
4935 .query_row(
4936 "SELECT count(*) FROM chunks WHERE node_logical_id = 'lg1'",
4937 [],
4938 |row| row.get(0),
4939 )
4940 .expect("chunk count");
4941 assert_eq!(
4942 chunk_count, 0,
4943 "excised source content must not survive as chunks"
4944 );
4945 let vec_count: i64 = conn
4946 .query_row("SELECT count(*) FROM vec_meeting", [], |row| row.get(0))
4947 .expect("vec count");
4948 assert_eq!(vec_count, 0, "excised source vec rows must be removed");
4949 let fts_count: i64 = conn
4950 .query_row(
4951 "SELECT count(*) FROM fts_nodes WHERE node_logical_id = 'lg1'",
4952 [],
4953 |row| row.get(0),
4954 )
4955 .expect("fts count");
4956 assert_eq!(
4957 fts_count, 0,
4958 "excised source content must not remain searchable"
4959 );
4960 }
4961
4962 #[test]
4963 fn export_page_count_matches_exported_file() {
4964 let (_db, service) = setup();
4965 let export_dir = tempfile::TempDir::new().expect("temp dir");
4966 let export_path = export_dir.path().join("page-count.db");
4967
4968 let manifest = service
4969 .safe_export(
4970 &export_path,
4971 SafeExportOptions {
4972 force_checkpoint: false,
4973 },
4974 )
4975 .expect("export");
4976
4977 let exported = sqlite::open_connection(&export_path).expect("open exported db");
4978 let actual_page_count: u64 = exported
4979 .query_row("PRAGMA page_count", [], |row| row.get(0))
4980 .expect("page_count from exported file");
4981
4982 assert_eq!(
4983 manifest.page_count, actual_page_count,
4984 "manifest page_count must match the exported file's PRAGMA page_count"
4985 );
4986 }
4987
4988 #[test]
4989 fn no_temp_file_after_successful_export() {
4990 let (_db, service) = setup();
4991 let export_dir = tempfile::TempDir::new().expect("temp dir");
4992 let export_path = export_dir.path().join("no-tmp.db");
4993
4994 service
4995 .safe_export(
4996 &export_path,
4997 SafeExportOptions {
4998 force_checkpoint: false,
4999 },
5000 )
5001 .expect("export");
5002
5003 let tmp_files: Vec<_> = fs::read_dir(export_dir.path())
5004 .expect("read export dir")
5005 .filter_map(Result::ok)
5006 .filter(|e| e.path().extension().is_some_and(|ext| ext == "tmp"))
5007 .collect();
5008
5009 assert!(
5010 tmp_files.is_empty(),
5011 "no .tmp files should remain after a successful export, found: {tmp_files:?}"
5012 );
5013 }
5014
5015 #[test]
5016 fn export_manifest_is_valid_json() {
5017 let (_db, service) = setup();
5018 let export_dir = tempfile::TempDir::new().expect("temp dir");
5019 let export_path = export_dir.path().join("valid-json.db");
5020
5021 service
5022 .safe_export(
5023 &export_path,
5024 SafeExportOptions {
5025 force_checkpoint: false,
5026 },
5027 )
5028 .expect("export");
5029
5030 let manifest_path = export_dir.path().join("valid-json.db.export-manifest.json");
5031 let manifest_contents = fs::read_to_string(&manifest_path).expect("read manifest");
5032 let parsed: serde_json::Value =
5033 serde_json::from_str(&manifest_contents).expect("manifest must be valid JSON");
5034
5035 assert!(
5036 parsed.get("exported_at").is_some(),
5037 "manifest must contain exported_at"
5038 );
5039 assert!(
5040 parsed.get("sha256").is_some(),
5041 "manifest must contain sha256"
5042 );
5043 assert!(
5044 parsed.get("schema_version").is_some(),
5045 "manifest must contain schema_version"
5046 );
5047 assert!(
5048 parsed.get("protocol_version").is_some(),
5049 "manifest must contain protocol_version"
5050 );
5051 assert!(
5052 parsed.get("page_count").is_some(),
5053 "manifest must contain page_count"
5054 );
5055 }
5056
5057 #[test]
5058 fn provenance_purge_dry_run_reports_counts() {
5059 let (db, service) = setup();
5060 {
5061 let conn = sqlite::open_connection(db.path()).expect("conn");
5062 conn.execute(
5063 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5064 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5065 [],
5066 )
5067 .expect("insert p1");
5068 conn.execute(
5069 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5070 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5071 [],
5072 )
5073 .expect("insert p2");
5074 conn.execute(
5075 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5076 VALUES ('p3', 'excise', 'lg3', 'src-1', 300)",
5077 [],
5078 )
5079 .expect("insert p3");
5080 }
5081
5082 let options = super::ProvenancePurgeOptions {
5083 dry_run: true,
5084 preserve_event_types: Vec::new(),
5085 };
5086 let report = service
5087 .purge_provenance_events(250, &options)
5088 .expect("dry run purge");
5089
5090 assert_eq!(report.events_deleted, 2);
5091 assert_eq!(report.events_preserved, 1);
5092 assert!(report.oldest_remaining.is_some());
5093
5094 let conn = sqlite::open_connection(db.path()).expect("conn");
5095 let total: i64 = conn
5096 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5097 row.get(0)
5098 })
5099 .expect("count");
5100 assert_eq!(total, 3, "dry_run must not delete any events");
5101 }
5102
5103 #[test]
5104 fn provenance_purge_deletes_old_events() {
5105 let (db, service) = setup();
5106 {
5107 let conn = sqlite::open_connection(db.path()).expect("conn");
5108 conn.execute(
5109 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5110 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5111 [],
5112 )
5113 .expect("insert p1");
5114 conn.execute(
5115 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5116 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5117 [],
5118 )
5119 .expect("insert p2");
5120 }
5121
5122 let options = super::ProvenancePurgeOptions {
5123 dry_run: false,
5124 preserve_event_types: Vec::new(),
5125 };
5126 let report = service
5127 .purge_provenance_events(150, &options)
5128 .expect("purge");
5129
5130 assert_eq!(report.events_deleted, 1);
5131 assert_eq!(report.events_preserved, 1);
5132 assert_eq!(report.oldest_remaining, Some(200));
5133
5134 let conn = sqlite::open_connection(db.path()).expect("conn");
5135 let remaining: i64 = conn
5136 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5137 row.get(0)
5138 })
5139 .expect("count");
5140 assert_eq!(remaining, 1);
5141 }
5142
5143 #[test]
5144 fn provenance_purge_preserves_specified_types() {
5145 let (db, service) = setup();
5146 {
5147 let conn = sqlite::open_connection(db.path()).expect("conn");
5148 conn.execute(
5149 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5150 VALUES ('p1', 'excise', 'lg1', 'src-1', 100)",
5151 [],
5152 )
5153 .expect("insert p1");
5154 conn.execute(
5155 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5156 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 100)",
5157 [],
5158 )
5159 .expect("insert p2");
5160 conn.execute(
5161 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5162 VALUES ('p3', 'node_insert', 'lg3', 'src-1', 100)",
5163 [],
5164 )
5165 .expect("insert p3");
5166 }
5167
5168 let options = super::ProvenancePurgeOptions {
5169 dry_run: false,
5170 preserve_event_types: Vec::new(),
5171 };
5172 let report = service
5173 .purge_provenance_events(500, &options)
5174 .expect("purge");
5175
5176 assert_eq!(report.events_deleted, 2);
5177 assert_eq!(report.events_preserved, 1);
5178
5179 let conn = sqlite::open_connection(db.path()).expect("conn");
5180 let remaining_type: String = conn
5181 .query_row("SELECT event_type FROM provenance_events", [], |row| {
5182 row.get(0)
5183 })
5184 .expect("remaining event type");
5185 assert_eq!(remaining_type, "excise");
5186 }
5187
5188 #[test]
5189 fn provenance_purge_noop_with_zero_timestamp() {
5190 let (db, service) = setup();
5191 {
5192 let conn = sqlite::open_connection(db.path()).expect("conn");
5193 conn.execute(
5194 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5195 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5196 [],
5197 )
5198 .expect("insert p1");
5199 }
5200
5201 let options = super::ProvenancePurgeOptions {
5202 dry_run: false,
5203 preserve_event_types: Vec::new(),
5204 };
5205 let report = service.purge_provenance_events(0, &options).expect("purge");
5206
5207 assert_eq!(report.events_deleted, 0);
5208 assert_eq!(report.events_preserved, 1);
5209 assert_eq!(report.oldest_remaining, Some(100));
5210 }
5211
5212 #[test]
5213 fn restore_skips_edge_when_counterpart_purged() {
5214 let (db, service) = setup();
5215 {
5216 let conn = sqlite::open_connection(db.path()).expect("conn");
5217 conn.execute(
5219 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5220 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5221 [],
5222 )
5223 .expect("insert node A");
5224 conn.execute(
5225 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5226 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5227 [],
5228 )
5229 .expect("insert node B");
5230 conn.execute(
5232 "INSERT INTO edges \
5233 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5234 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5235 [],
5236 )
5237 .expect("insert edge");
5238 conn.execute(
5240 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5241 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5242 [],
5243 )
5244 .expect("insert retire event A");
5245 conn.execute(
5246 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5247 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5248 [],
5249 )
5250 .expect("insert edge retire event");
5251 conn.execute(
5252 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5253 [],
5254 )
5255 .expect("retire node A");
5256 conn.execute(
5257 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5258 [],
5259 )
5260 .expect("retire node B");
5261 conn.execute(
5262 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5263 [],
5264 )
5265 .expect("retire edge");
5266 conn.execute("DELETE FROM nodes WHERE logical_id = 'doc-2'", [])
5269 .expect("purge node B rows");
5270 }
5271
5272 let report = service.restore_logical_id("doc-1").expect("restore A");
5274 assert!(!report.was_noop);
5275 assert_eq!(report.restored_node_rows, 1);
5276 assert_eq!(report.restored_edge_rows, 0, "edge should not be restored");
5277 assert_eq!(report.skipped_edges.len(), 1);
5278 assert_eq!(report.skipped_edges[0].edge_logical_id, "edge-1");
5279 assert_eq!(report.skipped_edges[0].missing_endpoint, "doc-2");
5280
5281 let conn = sqlite::open_connection(db.path()).expect("conn");
5283 let active_edge_count: i64 = conn
5284 .query_row(
5285 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5286 [],
5287 |row| row.get(0),
5288 )
5289 .expect("active edge count");
5290 assert_eq!(active_edge_count, 0, "edge must remain retired");
5291 }
5292
5293 #[test]
5294 fn restore_restores_edges_to_active_nodes() {
5295 let (db, service) = setup();
5296 {
5297 let conn = sqlite::open_connection(db.path()).expect("conn");
5298 conn.execute(
5300 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5301 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5302 [],
5303 )
5304 .expect("insert node A");
5305 conn.execute(
5306 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5307 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5308 [],
5309 )
5310 .expect("insert node B");
5311 conn.execute(
5313 "INSERT INTO edges \
5314 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5315 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5316 [],
5317 )
5318 .expect("insert edge");
5319 conn.execute(
5321 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5322 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5323 [],
5324 )
5325 .expect("insert retire event A");
5326 conn.execute(
5327 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5328 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5329 [],
5330 )
5331 .expect("insert edge retire event");
5332 conn.execute(
5333 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5334 [],
5335 )
5336 .expect("retire node A");
5337 conn.execute(
5338 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5339 [],
5340 )
5341 .expect("retire edge");
5342 }
5343
5344 let report = service.restore_logical_id("doc-1").expect("restore A");
5346 assert!(!report.was_noop);
5347 assert_eq!(report.restored_node_rows, 1);
5348 assert!(report.restored_edge_rows > 0, "edge should be restored");
5349 assert!(
5350 report.skipped_edges.is_empty(),
5351 "no edges should be skipped"
5352 );
5353
5354 let conn = sqlite::open_connection(db.path()).expect("conn");
5355 let active_edge_count: i64 = conn
5356 .query_row(
5357 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5358 [],
5359 |row| row.get(0),
5360 )
5361 .expect("active edge count");
5362 assert_eq!(active_edge_count, 1, "edge must be active");
5363 }
5364
5365 #[test]
5366 fn restore_restores_edges_when_both_restored() {
5367 let (db, service) = setup();
5368 {
5369 let conn = sqlite::open_connection(db.path()).expect("conn");
5370 conn.execute(
5372 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5373 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5374 [],
5375 )
5376 .expect("insert node A");
5377 conn.execute(
5378 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5379 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5380 [],
5381 )
5382 .expect("insert node B");
5383 conn.execute(
5385 "INSERT INTO edges \
5386 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5387 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5388 [],
5389 )
5390 .expect("insert edge");
5391 conn.execute(
5393 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5394 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5395 [],
5396 )
5397 .expect("insert retire event A");
5398 conn.execute(
5399 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5400 VALUES ('evt-retire-b', 'node_retire', 'doc-2', 'forget-1', 200, '')",
5401 [],
5402 )
5403 .expect("insert retire event B");
5404 conn.execute(
5405 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5406 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5407 [],
5408 )
5409 .expect("insert edge retire event");
5410 conn.execute(
5411 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5412 [],
5413 )
5414 .expect("retire node A");
5415 conn.execute(
5416 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5417 [],
5418 )
5419 .expect("retire node B");
5420 conn.execute(
5421 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5422 [],
5423 )
5424 .expect("retire edge");
5425 }
5426
5427 let report_b = service.restore_logical_id("doc-2").expect("restore B");
5429 assert!(!report_b.was_noop);
5430
5431 let report_a = service.restore_logical_id("doc-1").expect("restore A");
5433 assert!(!report_a.was_noop);
5434 assert_eq!(report_a.restored_node_rows, 1);
5435 assert!(
5436 report_a.restored_edge_rows > 0,
5437 "edge should be restored when both endpoints active"
5438 );
5439 assert!(
5440 report_a.skipped_edges.is_empty(),
5441 "no edges should be skipped"
5442 );
5443
5444 let conn = sqlite::open_connection(db.path()).expect("conn");
5445 let active_edge_count: i64 = conn
5446 .query_row(
5447 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5448 [],
5449 |row| row.get(0),
5450 )
5451 .expect("active edge count");
5452 assert_eq!(
5453 active_edge_count, 1,
5454 "edge must be active after both endpoints restored"
5455 );
5456 }
5457
5458 #[test]
5461 fn fts_property_schema_crud_round_trip() {
5462 let (_db, service) = setup();
5463
5464 let record = service
5466 .register_fts_property_schema(
5467 "Meeting",
5468 &["$.title".to_owned(), "$.summary".to_owned()],
5469 None,
5470 )
5471 .expect("register");
5472 assert_eq!(record.kind, "Meeting");
5473 assert_eq!(record.property_paths, vec!["$.title", "$.summary"]);
5474 assert_eq!(record.separator, " ");
5475 assert_eq!(record.format_version, 1);
5476
5477 let described = service
5479 .describe_fts_property_schema("Meeting")
5480 .expect("describe")
5481 .expect("should exist");
5482 assert_eq!(described, record);
5483
5484 let missing = service
5486 .describe_fts_property_schema("NoSuchKind")
5487 .expect("describe missing");
5488 assert!(missing.is_none());
5489
5490 let list = service.list_fts_property_schemas().expect("list");
5492 assert_eq!(list.len(), 1);
5493 assert_eq!(list[0].kind, "Meeting");
5494
5495 let updated = service
5497 .register_fts_property_schema(
5498 "Meeting",
5499 &["$.title".to_owned(), "$.notes".to_owned()],
5500 Some("\n"),
5501 )
5502 .expect("update");
5503 assert_eq!(updated.property_paths, vec!["$.title", "$.notes"]);
5504 assert_eq!(updated.separator, "\n");
5505
5506 service
5508 .remove_fts_property_schema("Meeting")
5509 .expect("remove");
5510 let after_remove = service
5511 .describe_fts_property_schema("Meeting")
5512 .expect("describe after remove");
5513 assert!(after_remove.is_none());
5514
5515 let err = service.remove_fts_property_schema("Meeting");
5517 assert!(err.is_err());
5518 }
5519
5520 #[test]
5521 fn describe_fts_property_schema_round_trips_recursive_entries() {
5522 let (_db, service) = setup();
5523
5524 let entries = vec![
5525 FtsPropertyPathSpec::scalar("$.title"),
5526 FtsPropertyPathSpec::recursive("$.payload"),
5527 ];
5528 let exclude = vec!["$.payload.private".to_owned()];
5529 let registered = service
5530 .register_fts_property_schema_with_entries(
5531 "KnowledgeItem",
5532 &entries,
5533 Some(" "),
5534 &exclude,
5535 crate::rebuild_actor::RebuildMode::Eager,
5536 )
5537 .expect("register recursive");
5538
5539 assert_eq!(registered.entries, entries);
5542 assert_eq!(registered.exclude_paths, exclude);
5543 assert_eq!(registered.property_paths, vec!["$.title", "$.payload"]);
5544
5545 let described = service
5546 .describe_fts_property_schema("KnowledgeItem")
5547 .expect("describe")
5548 .expect("should exist");
5549 assert_eq!(described.kind, "KnowledgeItem");
5550 assert_eq!(described.entries, entries);
5551 assert_eq!(described.exclude_paths, exclude);
5552 assert_eq!(described.property_paths, vec!["$.title", "$.payload"]);
5553 assert_eq!(described.separator, " ");
5554 assert_eq!(described.format_version, 1);
5555 }
5556
5557 #[test]
5558 fn list_fts_property_schemas_round_trips_recursive_entries() {
5559 let (_db, service) = setup();
5560
5561 let entries = vec![
5562 FtsPropertyPathSpec::scalar("$.title"),
5563 FtsPropertyPathSpec::recursive("$.payload"),
5564 ];
5565 let exclude = vec!["$.payload.secret".to_owned()];
5566 service
5567 .register_fts_property_schema_with_entries(
5568 "KnowledgeItem",
5569 &entries,
5570 Some(" "),
5571 &exclude,
5572 crate::rebuild_actor::RebuildMode::Eager,
5573 )
5574 .expect("register recursive");
5575
5576 let listed = service.list_fts_property_schemas().expect("list");
5577 assert_eq!(listed.len(), 1);
5578 let record = &listed[0];
5579 assert_eq!(record.kind, "KnowledgeItem");
5580 assert_eq!(record.entries, entries);
5581 assert_eq!(record.exclude_paths, exclude);
5582 assert_eq!(record.property_paths, vec!["$.title", "$.payload"]);
5583 }
5584
5585 #[test]
5586 fn describe_fts_property_schema_round_trips_scalar_only_entries() {
5587 let (_db, service) = setup();
5588
5589 service
5590 .register_fts_property_schema(
5591 "Meeting",
5592 &["$.title".to_owned(), "$.summary".to_owned()],
5593 None,
5594 )
5595 .expect("register scalar");
5596
5597 let described = service
5598 .describe_fts_property_schema("Meeting")
5599 .expect("describe")
5600 .expect("should exist");
5601 assert_eq!(described.property_paths, vec!["$.title", "$.summary"]);
5602 assert_eq!(described.entries.len(), 2);
5603 for entry in &described.entries {
5604 assert_eq!(
5605 entry.mode,
5606 FtsPropertyPathMode::Scalar,
5607 "scalar-only schema should deserialize every entry as Scalar"
5608 );
5609 }
5610 assert!(described.exclude_paths.is_empty());
5611 }
5612
5613 #[test]
5614 fn restore_reestablishes_property_fts_visibility() {
5615 let (db, service) = setup();
5616 let doc_table = fathomdb_schema::fts_kind_table_name("Document");
5617 {
5618 let conn = sqlite::open_connection(db.path()).expect("conn");
5619 conn.execute(
5621 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5622 VALUES ('Document', '[\"$.title\", \"$.body\"]', ' ')",
5623 [],
5624 )
5625 .expect("register schema");
5626 conn.execute_batch(&format!(
5628 "CREATE VIRTUAL TABLE IF NOT EXISTS {doc_table} USING fts5(\
5629 node_logical_id UNINDEXED, text_content, \
5630 tokenize = 'porter unicode61 remove_diacritics 2'\
5631 )"
5632 ))
5633 .expect("create per-kind table");
5634 conn.execute(
5636 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5637 VALUES ('row-1', 'doc-1', 'Document', '{\"title\":\"Budget\",\"body\":\"Q3 forecast\"}', 100, 'seed')",
5638 [],
5639 )
5640 .expect("insert node");
5641 conn.execute(
5643 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5644 VALUES ('chunk-1', 'doc-1', 'budget text', 100)",
5645 [],
5646 )
5647 .expect("insert chunk");
5648 conn.execute(
5650 &format!(
5651 "INSERT INTO {doc_table} (node_logical_id, text_content) \
5652 VALUES ('doc-1', 'Budget Q3 forecast')"
5653 ),
5654 [],
5655 )
5656 .expect("insert property fts");
5657 conn.execute(
5659 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5660 VALUES ('evt-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5661 [],
5662 )
5663 .expect("retire event");
5664 conn.execute(
5665 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5666 [],
5667 )
5668 .expect("supersede");
5669 conn.execute("DELETE FROM fts_nodes", [])
5670 .expect("clear chunk fts");
5671 conn.execute(&format!("DELETE FROM {doc_table}"), [])
5672 .expect("clear property fts");
5673 }
5674
5675 let report = service.restore_logical_id("doc-1").expect("restore");
5676 assert_eq!(report.restored_property_fts_rows, 1);
5677
5678 let conn = sqlite::open_connection(db.path()).expect("conn");
5680 let prop_fts_count: i64 = conn
5681 .query_row(
5682 &format!("SELECT count(*) FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5683 [],
5684 |row| row.get(0),
5685 )
5686 .expect("prop fts count");
5687 assert_eq!(prop_fts_count, 1, "property FTS must be restored");
5688
5689 let text: String = conn
5690 .query_row(
5691 &format!("SELECT text_content FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5692 [],
5693 |row| row.get(0),
5694 )
5695 .expect("prop fts text");
5696 assert_eq!(text, "Budget Q3 forecast");
5697 }
5698
5699 #[test]
5700 fn safe_export_preserves_fts_property_schemas() {
5701 let (_db, service) = setup();
5702 service
5703 .register_fts_property_schema(
5704 "Goal",
5705 &["$.name".to_owned(), "$.rationale".to_owned()],
5706 None,
5707 )
5708 .expect("register schema");
5709
5710 let export_dir = tempfile::TempDir::new().expect("temp dir");
5711 let export_path = export_dir.path().join("backup.db");
5712 service
5713 .safe_export(
5714 &export_path,
5715 SafeExportOptions {
5716 force_checkpoint: false,
5717 },
5718 )
5719 .expect("export");
5720
5721 let exported_conn = rusqlite::Connection::open(&export_path).expect("open exported db");
5723 let kind: String = exported_conn
5724 .query_row(
5725 "SELECT kind FROM fts_property_schemas WHERE kind = 'Goal'",
5726 [],
5727 |row| row.get(0),
5728 )
5729 .expect("schema must exist in export");
5730 assert_eq!(kind, "Goal");
5731 let paths_json: String = exported_conn
5732 .query_row(
5733 "SELECT property_paths_json FROM fts_property_schemas WHERE kind = 'Goal'",
5734 [],
5735 |row| row.get(0),
5736 )
5737 .expect("paths must exist");
5738 let paths: Vec<String> = serde_json::from_str(&paths_json).expect("valid json");
5739 assert_eq!(paths, vec!["$.name", "$.rationale"]);
5740 }
5741
5742 #[test]
5743 #[allow(clippy::too_many_lines)]
5744 fn export_recovery_rebuilds_property_fts_from_canonical_state() {
5745 let (db, service) = setup();
5746 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5747 service
5749 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5750 .expect("register");
5751 {
5752 let conn = sqlite::open_connection(db.path()).expect("conn");
5753 conn.execute(
5754 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5755 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5756 [],
5757 )
5758 .expect("insert node 1");
5759 conn.execute(
5760 &format!(
5761 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5762 VALUES ('goal-1', 'Ship v2')"
5763 ),
5764 [],
5765 )
5766 .expect("insert property FTS row 1");
5767 conn.execute(
5768 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5769 VALUES ('row-2', 'goal-2', 'Goal', '{\"name\":\"Launch redesign\"}', 100, 'seed')",
5770 [],
5771 )
5772 .expect("insert node 2");
5773 conn.execute(
5774 &format!(
5775 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5776 VALUES ('goal-2', 'Launch redesign')"
5777 ),
5778 [],
5779 )
5780 .expect("insert property FTS row 2");
5781 }
5782
5783 let export_dir = tempfile::TempDir::new().expect("temp dir");
5785 let export_path = export_dir.path().join("backup.db");
5786 service
5787 .safe_export(
5788 &export_path,
5789 SafeExportOptions {
5790 force_checkpoint: false,
5791 },
5792 )
5793 .expect("export");
5794
5795 {
5799 let conn = rusqlite::Connection::open(&export_path).expect("open export");
5800 SchemaManager::new()
5802 .bootstrap(&conn)
5803 .expect("bootstrap export");
5804 conn.execute(
5805 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5806 [],
5807 )
5808 .expect("delete old row");
5809 conn.execute(
5810 &format!(
5811 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5812 VALUES ('goal-1', 'completely wrong stale text')"
5813 ),
5814 [],
5815 )
5816 .expect("insert corrupted row");
5817 conn.execute(
5818 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5819 [],
5820 )
5821 .expect("delete goal-2 row");
5822 }
5823
5824 let schema = Arc::new(SchemaManager::new());
5826 let exported_service = AdminService::new(&export_path, Arc::clone(&schema));
5827 exported_service
5828 .rebuild_projections(ProjectionTarget::Fts)
5829 .expect("rebuild");
5830
5831 let conn = rusqlite::Connection::open(&export_path).expect("open export for verify");
5833 let goal1_text: String = conn
5834 .query_row(
5835 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5836 [],
5837 |r| r.get(0),
5838 )
5839 .expect("goal-1 text after rebuild");
5840 assert_eq!(
5841 goal1_text, "Ship v2",
5842 "goal-1 text must be corrected by rebuild"
5843 );
5844
5845 let goal2_count: i64 = conn
5846 .query_row(
5847 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5848 [],
5849 |r| r.get(0),
5850 )
5851 .expect("goal-2 count");
5852 assert_eq!(goal2_count, 1, "goal-2 row must be restored by rebuild");
5853
5854 let stale_count: i64 = conn
5855 .query_row(
5856 &format!("SELECT count(*) FROM {goal_table} WHERE text_content = 'completely wrong stale text'"),
5857 [],
5858 |r| r.get(0),
5859 )
5860 .expect("stale count");
5861 assert_eq!(stale_count, 0, "corrupted text must be gone after rebuild");
5862
5863 let integrity = exported_service.check_integrity().expect("integrity");
5865 assert_eq!(integrity.missing_property_fts_rows, 0);
5866 let semantics = exported_service.check_semantics().expect("semantics");
5867 assert_eq!(semantics.drifted_property_fts_rows, 0);
5868 assert_eq!(semantics.orphaned_property_fts_rows, 0);
5869 assert_eq!(semantics.duplicate_property_fts_rows, 0);
5870 }
5871
5872 #[test]
5873 fn check_integrity_no_false_positives_for_empty_extraction() {
5874 let (db, service) = setup();
5875 {
5876 let conn = sqlite::open_connection(db.path()).expect("conn");
5877 conn.execute(
5879 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5880 VALUES ('Ticket', '[\"$.searchable\"]', ' ')",
5881 [],
5882 )
5883 .expect("register schema");
5884 conn.execute(
5887 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5888 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"status\":\"open\"}', 100, 'seed')",
5889 [],
5890 )
5891 .expect("insert node");
5892 }
5893
5894 let report = service.check_integrity().expect("integrity");
5895 assert_eq!(
5896 report.missing_property_fts_rows, 0,
5897 "node with no extractable values must not be counted as missing"
5898 );
5899 }
5900
5901 #[test]
5902 fn check_integrity_detects_genuinely_missing_property_fts_rows() {
5903 let (db, service) = setup();
5904 {
5905 let conn = sqlite::open_connection(db.path()).expect("conn");
5906 conn.execute(
5907 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5908 VALUES ('Ticket', '[\"$.title\"]', ' ')",
5909 [],
5910 )
5911 .expect("register schema");
5912 conn.execute(
5914 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5915 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"title\":\"fix login bug\"}', 100, 'seed')",
5916 [],
5917 )
5918 .expect("insert node");
5919 }
5920
5921 let report = service.check_integrity().expect("integrity");
5922 assert_eq!(
5923 report.missing_property_fts_rows, 1,
5924 "node with extractable values but no property FTS row must be detected"
5925 );
5926 }
5927
5928 #[test]
5929 fn rebuild_projections_fts_restores_missing_property_fts_rows() {
5930 let (db, service) = setup();
5931 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5932 {
5933 let conn = sqlite::open_connection(db.path()).expect("conn");
5934 conn.execute(
5935 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5936 VALUES ('Goal', '[\"$.name\"]', ' ')",
5937 [],
5938 )
5939 .expect("register schema");
5940 conn.execute(
5941 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5942 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5943 [],
5944 )
5945 .expect("insert node");
5946 }
5948
5949 let report = service
5950 .rebuild_projections(ProjectionTarget::Fts)
5951 .expect("rebuild");
5952 assert!(
5953 report.rebuilt_rows >= 1,
5954 "rebuild must insert at least one property FTS row"
5955 );
5956
5957 let conn = sqlite::open_connection(db.path()).expect("conn");
5958 let text: String = conn
5959 .query_row(
5960 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5961 [],
5962 |row| row.get(0),
5963 )
5964 .expect("property FTS row must exist after rebuild");
5965 assert_eq!(text, "Ship v2");
5966 }
5967
5968 #[test]
5969 fn rebuild_missing_projections_fills_gap_for_deleted_property_fts_row() {
5970 let (db, service) = setup();
5971 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5972 {
5973 let conn = sqlite::open_connection(db.path()).expect("conn");
5974 conn.execute(
5975 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5976 VALUES ('Goal', '[\"$.name\"]', ' ')",
5977 [],
5978 )
5979 .expect("register schema");
5980 conn.execute(
5981 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5982 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5983 [],
5984 )
5985 .expect("insert node");
5986 conn.execute_batch(&format!(
5988 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
5989 node_logical_id UNINDEXED, text_content, \
5990 tokenize = 'porter unicode61 remove_diacritics 2'\
5991 )"
5992 ))
5993 .expect("create per-kind table");
5994 conn.execute(
5995 &format!(
5996 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5997 VALUES ('goal-1', 'Ship v2')"
5998 ),
5999 [],
6000 )
6001 .expect("insert property fts");
6002 conn.execute(
6003 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6004 [],
6005 )
6006 .expect("delete property fts");
6007 }
6008
6009 let report = service
6010 .rebuild_missing_projections()
6011 .expect("rebuild missing");
6012 assert!(
6013 report.rebuilt_rows >= 1,
6014 "missing rebuild must insert the gap-fill row"
6015 );
6016
6017 let conn = sqlite::open_connection(db.path()).expect("conn");
6018 let count: i64 = conn
6019 .query_row(
6020 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6021 [],
6022 |row| row.get(0),
6023 )
6024 .expect("count");
6025 assert_eq!(
6026 count, 1,
6027 "gap-fill must restore exactly one property FTS row"
6028 );
6029 }
6030
6031 #[test]
6032 fn remove_schema_then_rebuild_cleans_stale_property_fts_rows() {
6033 let (db, service) = setup();
6039 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6040 {
6041 let conn = sqlite::open_connection(db.path()).expect("conn");
6042 conn.execute(
6043 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6044 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6045 [],
6046 )
6047 .expect("insert node");
6048 conn.execute_batch(&format!(
6051 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
6052 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
6053 ))
6054 .expect("create per-kind table");
6055 conn.execute(
6056 &format!(
6057 "INSERT INTO {goal_table} (node_logical_id, text_content) \
6058 VALUES ('goal-1', 'Ship v2')"
6059 ),
6060 [],
6061 )
6062 .expect("insert property fts");
6063 }
6064
6065 let semantics = service.check_semantics().expect("semantics");
6067 assert_eq!(
6068 semantics.orphaned_property_fts_rows, 1,
6069 "orphaned property FTS rows must be detected with no registered schema"
6070 );
6071
6072 service
6074 .rebuild_projections(ProjectionTarget::Fts)
6075 .expect("rebuild");
6076
6077 let conn = sqlite::open_connection(db.path()).expect("conn");
6078 let count: i64 = conn
6079 .query_row(
6080 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6081 [],
6082 |row| row.get(0),
6083 )
6084 .expect("count");
6085 assert_eq!(
6086 count, 0,
6087 "rebuild must delete rows from per-kind tables with no registered schema"
6088 );
6089 }
6090
6091 mod validate_fts_property_paths_tests {
6092 use super::super::validate_fts_property_paths;
6093
6094 #[test]
6095 fn valid_simple_path() {
6096 assert!(validate_fts_property_paths(&["$.name".to_owned()]).is_ok());
6097 }
6098
6099 #[test]
6100 fn valid_nested_path() {
6101 assert!(validate_fts_property_paths(&["$.address.city".to_owned()]).is_ok());
6102 }
6103
6104 #[test]
6105 fn valid_underscore_segment() {
6106 assert!(validate_fts_property_paths(&["$.a_b".to_owned()]).is_ok());
6107 }
6108
6109 #[test]
6110 fn rejects_bare_prefix() {
6111 let result = validate_fts_property_paths(&["$.".to_owned()]);
6112 assert!(result.is_err(), "path '$.' must be rejected");
6113 }
6114
6115 #[test]
6116 fn rejects_double_dot() {
6117 let result = validate_fts_property_paths(&["$..x".to_owned()]);
6118 assert!(result.is_err(), "path '$..x' must be rejected");
6119 }
6120
6121 #[test]
6122 fn rejects_trailing_dot() {
6123 let result = validate_fts_property_paths(&["$.foo.".to_owned()]);
6124 assert!(result.is_err(), "path '$.foo.' must be rejected");
6125 }
6126
6127 #[test]
6128 fn rejects_space_in_segment() {
6129 let result = validate_fts_property_paths(&["$.foo bar".to_owned()]);
6130 assert!(result.is_err(), "path '$.foo bar' must be rejected");
6131 }
6132
6133 #[test]
6134 fn rejects_bracket_syntax() {
6135 let result = validate_fts_property_paths(&["$.foo[0]".to_owned()]);
6136 assert!(result.is_err(), "path '$.foo[0]' must be rejected");
6137 }
6138
6139 #[test]
6140 fn rejects_duplicates() {
6141 let result = validate_fts_property_paths(&["$.name".to_owned(), "$.name".to_owned()]);
6142 assert!(result.is_err(), "duplicate paths must be rejected");
6143 }
6144
6145 #[test]
6146 fn rejects_empty_list() {
6147 let result = validate_fts_property_paths(&[]);
6148 assert!(result.is_err(), "empty path list must be rejected");
6149 }
6150 }
6151
6152 #[test]
6155 fn register_fts_schema_writes_to_per_kind_table() {
6156 let (db, service) = setup();
6159 {
6160 let conn = sqlite::open_connection(db.path()).expect("conn");
6161 conn.execute(
6163 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6164 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6165 [],
6166 )
6167 .expect("insert node");
6168 }
6169
6170 service
6172 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6173 .expect("register schema");
6174
6175 let conn = sqlite::open_connection(db.path()).expect("conn");
6176 let table = fathomdb_schema::fts_kind_table_name("Goal");
6177 let per_kind_count: i64 = conn
6179 .query_row(
6180 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6181 [],
6182 |row| row.get(0),
6183 )
6184 .expect("per-kind count");
6185 assert_eq!(
6186 per_kind_count, 1,
6187 "per-kind table must have the row after registration"
6188 );
6189 }
6190
6191 #[test]
6192 fn remove_fts_schema_deletes_from_per_kind_table() {
6193 let (db, service) = setup();
6195 {
6196 let conn = sqlite::open_connection(db.path()).expect("conn");
6197 conn.execute(
6198 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6199 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6200 [],
6201 )
6202 .expect("insert node");
6203 }
6204
6205 service
6206 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6207 .expect("register schema");
6208 service
6209 .remove_fts_property_schema("Goal")
6210 .expect("remove schema");
6211
6212 let conn = sqlite::open_connection(db.path()).expect("conn");
6213 let table = fathomdb_schema::fts_kind_table_name("Goal");
6214 let per_kind_count: i64 = conn
6215 .query_row(
6216 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6217 [],
6218 |row| row.get(0),
6219 )
6220 .expect("per-kind count");
6221 assert_eq!(
6222 per_kind_count, 0,
6223 "per-kind table must be empty after schema removal"
6224 );
6225 }
6226
6227 #[test]
6230 fn fts_path_spec_with_weight_builder() {
6231 let spec = FtsPropertyPathSpec::scalar("$.title").with_weight(5.0);
6232 assert_eq!(spec.weight, Some(5.0));
6233 assert_eq!(spec.path, "$.title");
6234 assert_eq!(spec.mode, FtsPropertyPathMode::Scalar);
6235 }
6236
6237 #[test]
6238 fn fts_path_spec_serialize_with_weight() {
6239 use super::serialize_property_paths_json;
6240 let entries = vec![
6241 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6242 FtsPropertyPathSpec::scalar("$.body"),
6243 ];
6244 let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6245 let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6247 let paths = v
6248 .get("paths")
6249 .expect("paths key")
6250 .as_array()
6251 .expect("array");
6252 assert_eq!(paths.len(), 2);
6253 assert_eq!(
6255 paths[0].get("path").and_then(serde_json::Value::as_str),
6256 Some("$.title")
6257 );
6258 assert_eq!(
6259 paths[0].get("weight").and_then(serde_json::Value::as_f64),
6260 Some(2.0)
6261 );
6262 assert!(
6264 paths[1].get("weight").is_none(),
6265 "unweighted spec must omit weight field"
6266 );
6267 }
6268
6269 #[test]
6270 fn fts_path_spec_serialize_no_weights() {
6271 use super::serialize_property_paths_json;
6272 let entries = vec![
6273 FtsPropertyPathSpec::scalar("$.title"),
6274 FtsPropertyPathSpec::scalar("$.payload"),
6275 ];
6276 let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6277 let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6279 assert!(
6280 v.is_array(),
6281 "all-scalar no-weight schema must serialize as bare string array"
6282 );
6283 let arr = v.as_array().expect("array");
6284 assert_eq!(arr.len(), 2);
6285 assert_eq!(arr[0].as_str(), Some("$.title"));
6286 assert_eq!(arr[1].as_str(), Some("$.payload"));
6287 }
6288
6289 #[test]
6290 fn fts_weight_validation_out_of_range() {
6291 let (_db, service) = setup();
6292 let entries_zero = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(0.0)];
6294 let result = service.register_fts_property_schema_with_entries(
6295 "Article",
6296 &entries_zero,
6297 None,
6298 &[],
6299 crate::rebuild_actor::RebuildMode::Eager,
6300 );
6301 assert!(result.is_err(), "weight 0.0 must be rejected");
6302 let err_msg = result.expect_err("weight 0.0 must be rejected").to_string();
6303 assert!(
6304 err_msg.contains("weight"),
6305 "error must mention weight: {err_msg}"
6306 );
6307
6308 let entries_big = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(1001.0)];
6310 let result = service.register_fts_property_schema_with_entries(
6311 "Article",
6312 &entries_big,
6313 None,
6314 &[],
6315 crate::rebuild_actor::RebuildMode::Eager,
6316 );
6317 assert!(result.is_err(), "weight 1001.0 must be rejected");
6318 }
6319
6320 #[test]
6321 fn fts_weight_validation_valid() {
6322 let (_db, service) = setup();
6323 let entries = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(10.0)];
6324 let result = service.register_fts_property_schema_with_entries(
6325 "Article",
6326 &entries,
6327 None,
6328 &[],
6329 crate::rebuild_actor::RebuildMode::Eager,
6330 );
6331 assert!(
6332 result.is_ok(),
6333 "weight 10.0 must be accepted: {:?}",
6334 result.err()
6335 );
6336 }
6337
6338 #[test]
6341 fn create_or_replace_creates_multi_column_table() {
6342 use super::create_or_replace_fts_kind_table;
6343 let (db, _service) = setup();
6344 let conn = sqlite::open_connection(db.path()).expect("conn");
6345 let specs = vec![
6346 FtsPropertyPathSpec::scalar("$.title"),
6347 FtsPropertyPathSpec::recursive("$.payload"),
6348 ];
6349 create_or_replace_fts_kind_table(
6350 &conn,
6351 "Article",
6352 &specs,
6353 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6354 )
6355 .expect("create table");
6356
6357 let table = fathomdb_schema::fts_kind_table_name("Article");
6359 let count: i64 = conn
6361 .query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))
6362 .expect("count");
6363 assert_eq!(count, 0, "new table must be empty");
6364
6365 let title_col = fathomdb_schema::fts_column_name("$.title", false);
6367 let payload_col = fathomdb_schema::fts_column_name("$.payload", true);
6368 conn.execute(
6369 &format!(
6370 "INSERT INTO {table} (node_logical_id, {title_col}, {payload_col}) VALUES ('id1', 'hello', 'world')"
6371 ),
6372 [],
6373 )
6374 .expect("insert with per-spec columns must succeed");
6375 }
6376
6377 #[test]
6378 fn create_or_replace_drops_and_recreates() {
6379 use super::create_or_replace_fts_kind_table;
6380 let (db, _service) = setup();
6381 let conn = sqlite::open_connection(db.path()).expect("conn");
6382
6383 let specs_v1 = vec![FtsPropertyPathSpec::scalar("$.title")];
6385 create_or_replace_fts_kind_table(
6386 &conn,
6387 "Post",
6388 &specs_v1,
6389 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6390 )
6391 .expect("create v1");
6392
6393 let specs_v2 = vec![
6395 FtsPropertyPathSpec::scalar("$.title"),
6396 FtsPropertyPathSpec::scalar("$.summary"),
6397 ];
6398 create_or_replace_fts_kind_table(
6399 &conn,
6400 "Post",
6401 &specs_v2,
6402 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6403 )
6404 .expect("create v2");
6405
6406 let table = fathomdb_schema::fts_kind_table_name("Post");
6408 let summary_col = fathomdb_schema::fts_column_name("$.summary", false);
6409 conn.execute(
6410 &format!("INSERT INTO {table} (node_logical_id, {summary_col}) VALUES ('id1', 'text')"),
6411 [],
6412 )
6413 .expect("second layout must allow summary column");
6414 }
6415
6416 #[test]
6417 fn create_or_replace_invalid_tokenizer() {
6418 use super::create_or_replace_fts_kind_table;
6419 let (db, _service) = setup();
6420 let conn = sqlite::open_connection(db.path()).expect("conn");
6421 let specs = vec![FtsPropertyPathSpec::scalar("$.title")];
6422 let result = create_or_replace_fts_kind_table(&conn, "Post", &specs, "'; DROP TABLE --");
6423 assert!(result.is_err(), "invalid tokenizer must be rejected");
6424 let err_msg = result
6425 .expect_err("invalid tokenizer must be rejected")
6426 .to_string();
6427 assert!(
6428 err_msg.contains("tokenizer"),
6429 "error must mention tokenizer: {err_msg}"
6430 );
6431 }
6432
6433 #[test]
6434 fn register_with_weights_creates_per_column_table() {
6435 let (db, service) = setup();
6436 let entries = vec![
6437 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6438 FtsPropertyPathSpec::scalar("$.body"),
6439 ];
6440 service
6441 .register_fts_property_schema_with_entries(
6442 "Article",
6443 &entries,
6444 None,
6445 &[],
6446 crate::rebuild_actor::RebuildMode::Eager,
6447 )
6448 .expect("register");
6449
6450 let conn = sqlite::open_connection(db.path()).expect("conn");
6452 let table = fathomdb_schema::fts_kind_table_name("Article");
6453 let title_col = fathomdb_schema::fts_column_name("$.title", false);
6454 let body_col = fathomdb_schema::fts_column_name("$.body", false);
6455 conn.execute(
6457 &format!(
6458 "INSERT INTO {table} (node_logical_id, {title_col}, {body_col}) VALUES ('art-1', 'hello', 'world')"
6459 ),
6460 [],
6461 )
6462 .expect("per-spec columns must exist after registration with weights");
6463 }
6464
6465 #[test]
6466 fn weighted_to_unweighted_downgrade_recreates_table() {
6467 let (db, service) = setup();
6468
6469 let weighted_entries = vec![
6471 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6472 FtsPropertyPathSpec::scalar("$.body"),
6473 ];
6474 service
6475 .register_fts_property_schema_with_entries(
6476 "Article",
6477 &weighted_entries,
6478 None,
6479 &[],
6480 crate::rebuild_actor::RebuildMode::Eager,
6481 )
6482 .expect("register weighted");
6483
6484 let unweighted_entries = vec![
6486 FtsPropertyPathSpec::scalar("$.title"),
6487 FtsPropertyPathSpec::scalar("$.body"),
6488 ];
6489 service
6490 .register_fts_property_schema_with_entries(
6491 "Article",
6492 &unweighted_entries,
6493 None,
6494 &[],
6495 crate::rebuild_actor::RebuildMode::Eager,
6496 )
6497 .expect("re-register unweighted");
6498
6499 let conn = sqlite::open_connection(db.path()).expect("conn");
6502 let table = fathomdb_schema::fts_kind_table_name("Article");
6503 let result = conn.execute(
6504 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('art-1', 'hello world')"),
6505 [],
6506 );
6507 assert!(
6508 result.is_ok(),
6509 "text_content column must exist after weighted-to-unweighted downgrade"
6510 );
6511 }
6512
6513 #[test]
6516 fn set_get_fts_profile_roundtrip() {
6517 let (_db, service) = setup();
6518 let profile = service
6519 .set_fts_profile("book", "unicode61")
6520 .expect("set_fts_profile");
6521 assert_eq!(profile.kind, "book");
6522 assert_eq!(profile.tokenizer, "unicode61");
6523
6524 let got = service
6525 .get_fts_profile("book")
6526 .expect("get_fts_profile")
6527 .expect("should be Some");
6528 assert_eq!(got.kind, "book");
6529 assert_eq!(got.tokenizer, "unicode61");
6530 }
6531
6532 #[test]
6533 fn fts_profile_upsert() {
6534 let (_db, service) = setup();
6535 service
6536 .set_fts_profile("article", "unicode61")
6537 .expect("first set");
6538 service
6539 .set_fts_profile("article", "porter unicode61 remove_diacritics 2")
6540 .expect("second set");
6541 let got = service
6542 .get_fts_profile("article")
6543 .expect("get")
6544 .expect("Some");
6545 assert_eq!(got.tokenizer, "porter unicode61 remove_diacritics 2");
6546 }
6547
6548 #[test]
6549 fn invalid_tokenizer_rejected() {
6550 let (_db, service) = setup();
6551 let result = service.set_fts_profile("book", "'; DROP TABLE nodes --");
6552 assert!(result.is_err(), "invalid tokenizer must be rejected");
6553 let msg = result.expect_err("must be Err").to_string();
6554 assert!(
6555 msg.contains("tokenizer") || msg.contains("invalid"),
6556 "error must mention tokenizer or invalid: {msg}"
6557 );
6558 }
6559
6560 #[test]
6561 fn preset_recall_optimized_english() {
6562 assert_eq!(
6563 super::resolve_tokenizer_preset("recall-optimized-english"),
6564 "porter unicode61 remove_diacritics 2"
6565 );
6566 }
6567
6568 #[test]
6569 fn preset_precision_optimized() {
6570 assert_eq!(
6571 super::resolve_tokenizer_preset("precision-optimized"),
6572 "unicode61 remove_diacritics 2"
6573 );
6574 }
6575
6576 #[test]
6577 fn preset_global_cjk() {
6578 assert_eq!(super::resolve_tokenizer_preset("global-cjk"), "icu");
6579 }
6580
6581 #[test]
6582 fn preset_substring_trigram() {
6583 assert_eq!(
6584 super::resolve_tokenizer_preset("substring-trigram"),
6585 "trigram"
6586 );
6587 }
6588
6589 #[test]
6590 fn preset_source_code() {
6591 assert_eq!(
6592 super::resolve_tokenizer_preset("source-code"),
6593 "unicode61 tokenchars '._-$@'"
6594 );
6595 }
6596
6597 #[test]
6598 fn preview_fts_row_count() {
6599 let (db, service) = setup();
6600 {
6601 let conn = sqlite::open_connection(db.path()).expect("conn");
6602 for i in 0..5u32 {
6603 conn.execute(
6604 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6605 VALUES (?1, ?2, 'book', '{}', 100, 'src')",
6606 rusqlite::params![format!("r{i}"), format!("lg{i}")],
6607 )
6608 .expect("insert node");
6609 }
6610 conn.execute(
6612 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, superseded_at) \
6613 VALUES ('r99', 'lg99', 'book', '{}', 100, 'src', 200)",
6614 [],
6615 )
6616 .expect("insert superseded");
6617 }
6618 let impact = service
6619 .preview_projection_impact("book", "fts")
6620 .expect("preview");
6621 assert_eq!(impact.rows_to_rebuild, 5);
6622 }
6623
6624 #[test]
6625 fn preview_populates_current_tokenizer() {
6626 let (_db, service) = setup();
6627 service
6628 .set_fts_profile("doc", "trigram")
6629 .expect("set profile");
6630 let impact = service
6631 .preview_projection_impact("doc", "fts")
6632 .expect("preview");
6633 assert_eq!(impact.current_tokenizer, Some("trigram".to_owned()));
6634 assert_eq!(impact.target_tokenizer, None);
6635 }
6636
6637 #[test]
6640 fn create_or_replace_source_code_tokenizer_is_accepted() {
6641 use super::create_or_replace_fts_kind_table;
6645 let (db, _service) = setup();
6646 let conn = sqlite::open_connection(db.path()).expect("conn");
6647 let specs = vec![FtsPropertyPathSpec::scalar("$.symbol")];
6648 let source_code_tokenizer = "unicode61 tokenchars '._-$@'";
6649 let result =
6650 create_or_replace_fts_kind_table(&conn, "Symbol", &specs, source_code_tokenizer);
6651 assert!(
6652 result.is_ok(),
6653 "source-code tokenizer string must be accepted by create_or_replace_fts_kind_table: {:?}",
6654 result.err()
6655 );
6656 }
6657
6658 #[test]
6659 fn source_code_profile_round_trip_through_register_fts_schema() {
6660 let db = tempfile::NamedTempFile::new().expect("temp file");
6665 let schema = Arc::new(fathomdb_schema::SchemaManager::new());
6666
6667 {
6669 let _coord = crate::ExecutionCoordinator::open(
6670 db.path(),
6671 Arc::clone(&schema),
6672 None,
6673 1,
6674 Arc::new(crate::TelemetryCounters::default()),
6675 None,
6676 )
6677 .expect("coordinator opens for bootstrap");
6678 }
6679
6680 let service = AdminService::new(db.path(), Arc::clone(&schema));
6681
6682 service
6684 .set_fts_profile("Symbol", "source-code")
6685 .expect("set_fts_profile with source-code preset must succeed");
6686
6687 let result = service.register_fts_property_schema("Symbol", &["$.name".to_owned()], None);
6690 assert!(
6691 result.is_ok(),
6692 "register_fts_property_schema must succeed when source-code profile is active: {:?}",
6693 result.err()
6694 );
6695 }
6696
6697 #[cfg(feature = "sqlite-vec")]
6706 #[test]
6707 fn embedder_max_tokens_8192_handles_chunk_exceeding_512_words() {
6708 let long_text = (0..600u32)
6709 .map(|i| format!("word{i}"))
6710 .collect::<Vec<_>>()
6711 .join(" ");
6712
6713 let db = NamedTempFile::new().expect("temp file");
6714 let schema = Arc::new(SchemaManager::new());
6715
6716 {
6717 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6718 schema.bootstrap(&conn).expect("bootstrap");
6719 conn.execute(
6720 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6721 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'src-1')",
6722 [],
6723 )
6724 .expect("insert node");
6725 conn.execute(
6726 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6727 VALUES (?1, 'doc-1', ?2, 100)",
6728 rusqlite::params!["chunk-long", long_text],
6729 )
6730 .expect("insert long chunk");
6731 }
6732
6733 let embedder = LargeContextTestEmbedder::new("long-context-model", 4, 8192);
6734 let service = AdminService::new(db.path(), Arc::clone(&schema));
6735 let report = service
6736 .regenerate_vector_embeddings(
6737 &embedder,
6738 &VectorRegenerationConfig {
6739 kind: "Document".to_owned(),
6740 profile: "default".to_owned(),
6741 chunking_policy: "per_chunk".to_owned(),
6742 preprocessing_policy: "trim".to_owned(),
6743 },
6744 )
6745 .expect("regenerate with long-context embedder");
6746
6747 assert_eq!(
6748 report.total_chunks, 1,
6749 "600-word text pre-written as one chunk must result in exactly one embedded row"
6750 );
6751 assert_eq!(report.regenerated_rows, 1);
6752 assert_eq!(
6753 embedder.max_tokens(),
6754 8192,
6755 "embedder must advertise 8192 token capacity"
6756 );
6757 }
6758
6759 #[cfg(feature = "sqlite-vec")]
6761 #[derive(Debug)]
6762 struct LargeContextTestEmbedder {
6763 identity: QueryEmbedderIdentity,
6764 vector: Vec<f32>,
6765 max_tokens: usize,
6766 }
6767
6768 #[cfg(feature = "sqlite-vec")]
6769 impl LargeContextTestEmbedder {
6770 fn new(model: &str, dimension: usize, max_tokens: usize) -> Self {
6771 Self {
6772 identity: QueryEmbedderIdentity {
6773 model_identity: model.to_owned(),
6774 model_version: "1.0.0".to_owned(),
6775 dimension,
6776 normalization_policy: "l2".to_owned(),
6777 },
6778 vector: vec![1.0; dimension],
6779 max_tokens,
6780 }
6781 }
6782 }
6783
6784 #[cfg(feature = "sqlite-vec")]
6785 impl QueryEmbedder for LargeContextTestEmbedder {
6786 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
6787 Ok(self.vector.clone())
6788 }
6789 fn identity(&self) -> QueryEmbedderIdentity {
6790 self.identity.clone()
6791 }
6792 fn max_tokens(&self) -> usize {
6793 self.max_tokens
6794 }
6795 }
6796
6797 #[cfg(feature = "sqlite-vec")]
6801 #[test]
6802 #[allow(clippy::too_many_lines)]
6803 fn regenerate_vector_embeddings_in_process_writes_contract_and_vec_rows() {
6804 let db = NamedTempFile::new().expect("temp file");
6805 let schema = Arc::new(SchemaManager::new());
6806
6807 {
6808 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6809 schema.bootstrap(&conn).expect("bootstrap");
6810 for (row_id, logical_id, created_at, src) in [
6811 ("r1", "node-1", 100, "src1"),
6812 ("r2", "node-2", 101, "src2"),
6813 ("r3", "node-3", 102, "src3"),
6814 ] {
6815 conn.execute(
6816 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6817 VALUES (?1, ?2, 'Doc', '{}', ?3, ?4)",
6818 rusqlite::params![row_id, logical_id, created_at, src],
6819 )
6820 .expect("insert node");
6821 }
6822 for (chunk_id, node_id, text, created_at) in [
6823 ("c1", "node-1", "first document text", 100),
6824 ("c2", "node-2", "second document text", 101),
6825 ("c3", "node-3", "third document text", 102),
6826 ] {
6827 conn.execute(
6828 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6829 VALUES (?1, ?2, ?3, ?4)",
6830 rusqlite::params![chunk_id, node_id, text, created_at],
6831 )
6832 .expect("insert chunk");
6833 }
6834 }
6835
6836 let service = AdminService::new(db.path(), Arc::clone(&schema));
6837 let embedder = TestEmbedder::new("batch-test-model", 4);
6838 let config = VectorRegenerationConfig {
6839 kind: "Doc".to_owned(),
6840 profile: "default".to_owned(),
6841 chunking_policy: "per_chunk".to_owned(),
6842 preprocessing_policy: "trim".to_owned(),
6843 };
6844 let report = service
6845 .regenerate_vector_embeddings_in_process(&embedder, &config)
6846 .expect("in-process regen must succeed");
6847
6848 assert_eq!(report.total_chunks, 3);
6849 assert_eq!(report.regenerated_rows, 3);
6850 assert!(report.contract_persisted);
6851
6852 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6853 let vec_count: i64 = conn
6854 .query_row("SELECT count(*) FROM vec_doc", [], |row| row.get(0))
6855 .expect("vec_doc count");
6856 assert_eq!(vec_count, 3, "one vec row per chunk");
6857
6858 let model_identity: String = conn
6859 .query_row(
6860 "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
6861 [],
6862 |row| row.get(0),
6863 )
6864 .expect("contract row");
6865 assert_eq!(model_identity, "batch-test-model");
6866 }
6867
6868 #[cfg(feature = "sqlite-vec")]
6871 #[test]
6872 #[allow(clippy::too_many_lines)]
6873 fn regenerate_vector_embeddings_targets_per_kind_table() {
6874 let db = NamedTempFile::new().expect("temp file");
6875 let schema = Arc::new(SchemaManager::new());
6876
6877 {
6878 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6879 schema.bootstrap(&conn).expect("bootstrap");
6880 conn.execute(
6881 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6882 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
6883 [],
6884 )
6885 .expect("insert node");
6886 conn.execute(
6887 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6888 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
6889 [],
6890 )
6891 .expect("insert chunk");
6892 }
6893
6894 let service = AdminService::new(db.path(), Arc::clone(&schema));
6895 let embedder = TestEmbedder::new("test-model", 4);
6896 let report = service
6897 .regenerate_vector_embeddings(
6898 &embedder,
6899 &VectorRegenerationConfig {
6900 kind: "Document".to_owned(),
6901 profile: "default".to_owned(),
6902 chunking_policy: "per_chunk".to_owned(),
6903 preprocessing_policy: "trim".to_owned(),
6904 },
6905 )
6906 .expect("regenerate vectors");
6907
6908 assert_eq!(report.table_name, "vec_document");
6909 assert_eq!(report.regenerated_rows, 1);
6910
6911 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6912 let vec_count: i64 = conn
6913 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
6914 .expect("vec_document count");
6915 assert_eq!(vec_count, 1, "rows must be in vec_document");
6916
6917 let old_count: i64 = conn
6918 .query_row(
6919 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
6920 [],
6921 |r| r.get(0),
6922 )
6923 .expect("sqlite_master check");
6924 assert_eq!(
6925 old_count, 0,
6926 "vec_nodes_active must NOT be created for per-kind regen"
6927 );
6928 }
6929
6930 #[test]
6933 fn get_vec_profile_returns_none_when_no_profile_exists() {
6934 let (db, service) = setup();
6935 let _ = db;
6936 let result = service.get_vec_profile("MyKind").expect("should not error");
6937 assert!(
6938 result.is_none(),
6939 "must return None when no profile registered"
6940 );
6941 }
6942
6943 #[cfg(feature = "sqlite-vec")]
6944 #[test]
6945 fn get_vec_profile_returns_profile_for_registered_kind() {
6946 let db = NamedTempFile::new().expect("temp file");
6947 let schema = Arc::new(SchemaManager::new());
6948 {
6949 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6950 schema.bootstrap(&conn).expect("bootstrap");
6951 schema
6952 .ensure_vec_kind_profile(&conn, "MyKind", 128)
6953 .expect("ensure_vec_kind_profile");
6954 }
6955
6956 let service = AdminService::new(db.path(), Arc::clone(&schema));
6957 let profile = service.get_vec_profile("MyKind").expect("should not error");
6958 assert!(profile.is_some(), "must return profile after registration");
6959 assert_eq!(profile.unwrap().dimensions, 128);
6960 }
6961
6962 #[test]
6963 fn get_vec_profile_does_not_return_global_sentinel_row() {
6964 let (db, service) = setup();
6965 {
6966 let conn = sqlite::open_connection(db.path()).expect("conn");
6967 conn.execute(
6968 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
6969 VALUES ('*', 'vec', '{\"model_identity\":\"old-model\",\"dimensions\":384}', 0, 0)",
6970 [],
6971 )
6972 .expect("insert global sentinel");
6973 }
6974 let result = service
6975 .get_vec_profile("SomeKind")
6976 .expect("should not error");
6977 assert!(
6978 result.is_none(),
6979 "per-kind query must not return global ('*', 'vec') row"
6980 );
6981 }
6982}