1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::mpsc::SyncSender;
4
5use fathomdb_schema::SchemaManager;
6use serde::{Deserialize, Serialize};
7
8use crate::rebuild_actor::{RebuildMode, RebuildRequest};
9
10use crate::{
11 EngineError, ProjectionRepairReport, ProjectionService, ids::new_id,
12 projection::ProjectionTarget, sqlite,
13};
14
15mod fts;
16mod operational;
17mod provenance;
18mod vector;
19
20pub use vector::load_vector_regeneration_config;
21
22#[cfg(test)]
23use fts::{
24 create_or_replace_fts_kind_table, serialize_property_paths_json, validate_fts_property_paths,
25};
26
27#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
29pub struct IntegrityReport {
30 pub physical_ok: bool,
31 pub foreign_keys_ok: bool,
32 pub missing_fts_rows: usize,
33 pub missing_property_fts_rows: usize,
34 pub duplicate_active_logical_ids: usize,
35 pub operational_missing_collections: usize,
36 pub operational_missing_last_mutations: usize,
37 pub warnings: Vec<String>,
38}
39
40#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
42pub struct FtsPropertySchemaRecord {
43 pub kind: String,
45 pub property_paths: Vec<String>,
50 pub entries: Vec<FtsPropertyPathSpec>,
55 pub exclude_paths: Vec<String>,
58 pub separator: String,
60 pub format_version: i64,
62}
63
64#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
66#[serde(rename_all = "snake_case")]
67pub enum FtsPropertyPathMode {
68 #[default]
71 Scalar,
72 Recursive,
75}
76
77#[non_exhaustive]
79#[derive(Clone, Debug, PartialEq, Serialize)]
80pub struct FtsPropertyPathSpec {
81 pub path: String,
83 pub mode: FtsPropertyPathMode,
85 pub weight: Option<f32>,
88}
89
90impl Eq for FtsPropertyPathSpec {}
93
94impl FtsPropertyPathSpec {
95 #[must_use]
96 pub fn scalar(path: impl Into<String>) -> Self {
97 Self {
98 path: path.into(),
99 mode: FtsPropertyPathMode::Scalar,
100 weight: None,
101 }
102 }
103
104 #[must_use]
105 pub fn recursive(path: impl Into<String>) -> Self {
106 Self {
107 path: path.into(),
108 mode: FtsPropertyPathMode::Recursive,
109 weight: None,
110 }
111 }
112
113 #[must_use]
119 pub fn with_weight(mut self, weight: f32) -> Self {
120 self.weight = Some(weight);
121 self
122 }
123}
124
125#[derive(Clone, Copy, Debug)]
127pub struct SafeExportOptions {
128 pub force_checkpoint: bool,
132}
133
134impl Default for SafeExportOptions {
135 fn default() -> Self {
136 Self {
137 force_checkpoint: true,
138 }
139 }
140}
141
142const EXPORT_PROTOCOL_VERSION: u32 = 1;
144
145#[derive(Clone, Debug, Serialize)]
147pub struct SafeExportManifest {
148 pub exported_at: u64,
150 pub sha256: String,
152 pub schema_version: u32,
154 pub protocol_version: u32,
156 pub page_count: u64,
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
162pub struct TraceReport {
163 pub source_ref: String,
164 pub node_rows: usize,
165 pub edge_rows: usize,
166 pub action_rows: usize,
167 pub operational_mutation_rows: usize,
168 pub node_logical_ids: Vec<String>,
169 pub action_ids: Vec<String>,
170 pub operational_mutation_ids: Vec<String>,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
175pub struct SkippedEdge {
176 pub edge_logical_id: String,
177 pub missing_endpoint: String,
178}
179
180#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
182pub struct LogicalRestoreReport {
183 pub logical_id: String,
184 pub was_noop: bool,
185 pub restored_node_rows: usize,
186 pub restored_edge_rows: usize,
187 pub restored_chunk_rows: usize,
188 pub restored_fts_rows: usize,
189 pub restored_property_fts_rows: usize,
190 pub restored_vec_rows: usize,
191 pub skipped_edges: Vec<SkippedEdge>,
192 pub notes: Vec<String>,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
197pub struct LogicalPurgeReport {
198 pub logical_id: String,
199 pub was_noop: bool,
200 pub deleted_node_rows: usize,
201 pub deleted_edge_rows: usize,
202 pub deleted_chunk_rows: usize,
203 pub deleted_fts_rows: usize,
204 pub deleted_vec_rows: usize,
205 pub notes: Vec<String>,
206}
207
208#[derive(Clone, Debug, Serialize, Deserialize)]
210pub struct ProvenancePurgeOptions {
211 pub dry_run: bool,
212 #[serde(default)]
213 pub preserve_event_types: Vec<String>,
214}
215
216#[derive(Clone, Debug, Serialize)]
218pub struct ProvenancePurgeReport {
219 pub events_deleted: u64,
220 pub events_preserved: u64,
221 pub oldest_remaining: Option<i64>,
222}
223
224#[derive(Debug)]
226pub struct AdminService {
227 pub(super) database_path: PathBuf,
228 pub(super) schema_manager: Arc<SchemaManager>,
229 pub(super) projections: ProjectionService,
230 pub(super) rebuild_sender: Option<SyncSender<RebuildRequest>>,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
238pub struct SemanticReport {
239 pub orphaned_chunks: usize,
241 pub null_source_ref_nodes: usize,
243 pub broken_step_fk: usize,
245 pub broken_action_fk: usize,
247 pub stale_fts_rows: usize,
249 pub fts_rows_for_superseded_nodes: usize,
251 pub stale_property_fts_rows: usize,
253 pub orphaned_property_fts_rows: usize,
255 pub mismatched_kind_property_fts_rows: usize,
257 pub duplicate_property_fts_rows: usize,
259 pub drifted_property_fts_rows: usize,
261 pub dangling_edges: usize,
263 pub orphaned_supersession_chains: usize,
265 pub stale_vec_rows: usize,
267 pub vec_rows_for_superseded_nodes: usize,
269 pub missing_operational_current_rows: usize,
271 pub stale_operational_current_rows: usize,
273 pub disabled_collection_mutations: usize,
275 pub orphaned_last_access_metadata_rows: usize,
277 pub warnings: Vec<String>,
278}
279
280#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
293#[serde(rename_all = "snake_case", deny_unknown_fields)]
294pub struct VectorRegenerationConfig {
295 pub kind: String,
296 pub profile: String,
297 pub chunking_policy: String,
298 pub preprocessing_policy: String,
299}
300
301#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
303pub struct VectorRegenerationReport {
304 pub profile: String,
305 pub table_name: String,
306 pub dimension: usize,
307 pub total_chunks: usize,
308 pub regenerated_rows: usize,
309 pub contract_persisted: bool,
310 pub notes: Vec<String>,
311}
312
313#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
317pub struct FtsProfile {
318 pub kind: String,
320 pub tokenizer: String,
322 pub active_at: Option<i64>,
324 pub created_at: i64,
326}
327
328#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
332pub struct VecProfile {
333 pub model_identity: String,
335 pub model_version: Option<String>,
337 pub dimensions: u32,
339 pub active_at: Option<i64>,
341 pub created_at: i64,
343}
344
345#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
349pub struct ProjectionImpact {
350 pub rows_to_rebuild: u64,
352 pub estimated_seconds: u64,
354 pub temp_db_size_bytes: u64,
356 pub current_tokenizer: Option<String>,
358 pub target_tokenizer: Option<String>,
360}
361
362pub const TOKENIZER_PRESETS: &[(&str, &str)] = &[
364 (
365 "recall-optimized-english",
366 "porter unicode61 remove_diacritics 2",
367 ),
368 ("precision-optimized", "unicode61 remove_diacritics 2"),
369 ("global-cjk", "icu"),
370 ("substring-trigram", "trigram"),
371 ("source-code", "unicode61 tokenchars '._-$@'"),
372];
373
374pub fn resolve_tokenizer_preset(input: &str) -> &str {
379 for (name, value) in TOKENIZER_PRESETS {
380 if *name == input {
381 return value;
382 }
383 }
384 input
385}
386
387pub(super) const CURRENT_VECTOR_CONTRACT_FORMAT_VERSION: i64 = 1;
388pub(super) const MAX_PROFILE_LEN: usize = 128;
389pub(super) const MAX_POLICY_LEN: usize = 128;
390pub(super) const MAX_CONTRACT_JSON_BYTES: usize = 32 * 1024;
391pub(super) const MAX_AUDIT_METADATA_BYTES: usize = 2048;
392const DEFAULT_OPERATIONAL_READ_LIMIT: usize = 100;
393const MAX_OPERATIONAL_READ_LIMIT: usize = 1000;
394
395#[derive(Clone, Debug)]
397pub struct AdminHandle {
398 inner: Arc<AdminService>,
399}
400
401impl AdminHandle {
402 #[must_use]
404 pub fn new(service: AdminService) -> Self {
405 Self {
406 inner: Arc::new(service),
407 }
408 }
409
410 #[must_use]
412 pub fn service(&self) -> Arc<AdminService> {
413 Arc::clone(&self.inner)
414 }
415}
416
417impl AdminService {
418 #[must_use]
420 pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
421 let database_path = path.as_ref().to_path_buf();
422 let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
423 Self {
424 database_path,
425 schema_manager,
426 projections,
427 rebuild_sender: None,
428 }
429 }
430
431 #[must_use]
433 pub fn new_with_rebuild(
434 path: impl AsRef<Path>,
435 schema_manager: Arc<SchemaManager>,
436 rebuild_sender: SyncSender<RebuildRequest>,
437 ) -> Self {
438 let database_path = path.as_ref().to_path_buf();
439 let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
440 Self {
441 database_path,
442 schema_manager,
443 projections,
444 rebuild_sender: Some(rebuild_sender),
445 }
446 }
447
448 pub(super) fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
449 #[cfg(feature = "sqlite-vec")]
450 let conn = sqlite::open_connection_with_vec(&self.database_path)?;
451 #[cfg(not(feature = "sqlite-vec"))]
452 let conn = sqlite::open_connection(&self.database_path)?;
453 self.schema_manager.bootstrap(&conn)?;
454 Ok(conn)
455 }
456
457 pub fn check_integrity(&self) -> Result<IntegrityReport, EngineError> {
460 let conn = self.connect()?;
461
462 let physical_result: String =
463 conn.query_row("PRAGMA integrity_check", [], |row| row.get(0))?;
464 let foreign_key_count: i64 =
465 conn.query_row("SELECT count(*) FROM pragma_foreign_key_check", [], |row| {
466 row.get(0)
467 })?;
468 let missing_fts_rows: i64 = conn.query_row(
469 r"
470 SELECT count(*)
471 FROM chunks c
472 JOIN nodes n
473 ON n.logical_id = c.node_logical_id
474 AND n.superseded_at IS NULL
475 WHERE NOT EXISTS (
476 SELECT 1
477 FROM fts_nodes f
478 WHERE f.chunk_id = c.id
479 )
480 ",
481 [],
482 |row| row.get(0),
483 )?;
484 let duplicate_active: i64 = conn.query_row(
485 r"
486 SELECT count(*)
487 FROM (
488 SELECT logical_id
489 FROM nodes
490 WHERE superseded_at IS NULL
491 GROUP BY logical_id
492 HAVING count(*) > 1
493 )
494 ",
495 [],
496 |row| row.get(0),
497 )?;
498 let operational_missing_collections: i64 = conn.query_row(
499 r"
500 SELECT (
501 SELECT count(*)
502 FROM operational_mutations m
503 LEFT JOIN operational_collections c ON c.name = m.collection_name
504 WHERE c.name IS NULL
505 ) + (
506 SELECT count(*)
507 FROM operational_current oc
508 LEFT JOIN operational_collections c ON c.name = oc.collection_name
509 WHERE c.name IS NULL
510 )
511 ",
512 [],
513 |row| row.get(0),
514 )?;
515 let operational_missing_last_mutations: i64 = conn.query_row(
516 r"
517 SELECT count(*)
518 FROM operational_current oc
519 LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
520 WHERE m.id IS NULL
521 ",
522 [],
523 |row| row.get(0),
524 )?;
525
526 let missing_property_fts_rows = count_missing_property_fts_rows(&conn)?;
530
531 let mut warnings = Vec::new();
532 if missing_fts_rows > 0 {
533 warnings.push("missing FTS projections detected".to_owned());
534 }
535 if missing_property_fts_rows > 0 {
536 warnings.push("missing property FTS projections detected".to_owned());
537 }
538 if duplicate_active > 0 {
539 warnings.push("duplicate active logical_ids detected".to_owned());
540 }
541 if operational_missing_collections > 0 {
542 warnings.push("operational rows reference missing collections".to_owned());
543 }
544 if operational_missing_last_mutations > 0 {
545 warnings.push("operational current rows reference missing last mutations".to_owned());
546 }
547
548 Ok(IntegrityReport {
553 physical_ok: physical_result == "ok",
554 foreign_keys_ok: foreign_key_count == 0,
555 missing_fts_rows: i64_to_usize(missing_fts_rows),
556 missing_property_fts_rows: i64_to_usize(missing_property_fts_rows),
557 duplicate_active_logical_ids: i64_to_usize(duplicate_active),
558 operational_missing_collections: i64_to_usize(operational_missing_collections),
559 operational_missing_last_mutations: i64_to_usize(operational_missing_last_mutations),
560 warnings,
561 })
562 }
563
564 #[allow(clippy::too_many_lines)]
567 pub fn check_semantics(&self) -> Result<SemanticReport, EngineError> {
568 let conn = self.connect()?;
569
570 let orphaned_chunks: i64 = conn.query_row(
571 r"
572 SELECT count(*)
573 FROM chunks c
574 WHERE NOT EXISTS (
575 SELECT 1 FROM nodes n
576 WHERE n.logical_id = c.node_logical_id
577 )
578 ",
579 [],
580 |row| row.get(0),
581 )?;
582
583 let null_source_ref_nodes: i64 = conn.query_row(
584 "SELECT count(*) FROM nodes WHERE source_ref IS NULL AND superseded_at IS NULL",
585 [],
586 |row| row.get(0),
587 )?;
588
589 let broken_step_fk: i64 = conn.query_row(
590 r"
591 SELECT count(*) FROM steps s
592 WHERE NOT EXISTS (SELECT 1 FROM runs r WHERE r.id = s.run_id)
593 ",
594 [],
595 |row| row.get(0),
596 )?;
597
598 let broken_action_fk: i64 = conn.query_row(
599 r"
600 SELECT count(*) FROM actions a
601 WHERE NOT EXISTS (SELECT 1 FROM steps s WHERE s.id = a.step_id)
602 ",
603 [],
604 |row| row.get(0),
605 )?;
606
607 let stale_fts_rows: i64 = conn.query_row(
608 r"
609 SELECT count(*) FROM fts_nodes f
610 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = f.chunk_id)
611 ",
612 [],
613 |row| row.get(0),
614 )?;
615
616 let fts_rows_for_superseded_nodes: i64 = conn.query_row(
617 r"
618 SELECT count(*) FROM fts_nodes f
619 WHERE NOT EXISTS (
620 SELECT 1 FROM nodes n
621 WHERE n.logical_id = f.node_logical_id AND n.superseded_at IS NULL
622 )
623 ",
624 [],
625 |row| row.get(0),
626 )?;
627
628 let (
629 stale_property_fts_rows,
630 orphaned_property_fts_rows,
631 mismatched_kind_property_fts_rows,
632 duplicate_property_fts_rows,
633 ) = count_per_kind_property_fts_issues(&conn)?;
634
635 let drifted_property_fts_rows = count_drifted_property_fts_rows(&conn)?;
636
637 let dangling_edges: i64 = conn.query_row(
638 r"
639 SELECT count(*) FROM edges e
640 WHERE e.superseded_at IS NULL AND (
641 NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.source_logical_id AND n.superseded_at IS NULL)
642 OR
643 NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.target_logical_id AND n.superseded_at IS NULL)
644 )
645 ",
646 [],
647 |row| row.get(0),
648 )?;
649
650 let orphaned_supersession_chains: i64 = conn.query_row(
651 r"
652 SELECT count(*) FROM (
653 SELECT logical_id FROM nodes
654 GROUP BY logical_id
655 HAVING count(*) > 0 AND sum(CASE WHEN superseded_at IS NULL THEN 1 ELSE 0 END) = 0
656 )
657 ",
658 [],
659 |row| row.get(0),
660 )?;
661
662 #[cfg(feature = "sqlite-vec")]
664 let (stale_vec_rows, vec_rows_for_superseded_nodes): (i64, i64) = {
665 let kinds: Vec<String> =
666 match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
667 Ok(mut stmt) => stmt
668 .query_map([], |row| row.get(0))
669 .map_err(EngineError::Sqlite)?
670 .collect::<Result<Vec<_>, _>>()
671 .map_err(EngineError::Sqlite)?,
672 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
673 if msg.contains("no such table: projection_profiles") =>
674 {
675 vec![]
676 }
677 Err(e) => return Err(EngineError::Sqlite(e)),
678 };
679 let mut stale = 0i64;
680 let mut superseded = 0i64;
681 for kind in &kinds {
682 let table = fathomdb_schema::vec_kind_table_name(kind);
683 let stale_sql = format!(
684 "SELECT count(*) FROM {table} v \
685 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = v.chunk_id)"
686 );
687 let superseded_sql = format!(
688 "SELECT count(*) FROM {table} v \
689 JOIN chunks c ON c.id = v.chunk_id \
690 WHERE NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = c.node_logical_id)"
691 );
692 stale += match conn.query_row(&stale_sql, [], |row| row.get(0)) {
693 Ok(n) => n,
694 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
695 if msg.contains("no such table:")
696 || msg.contains("no such module: vec0") =>
697 {
698 0
699 }
700 Err(e) => return Err(EngineError::Sqlite(e)),
701 };
702 superseded += match conn.query_row(&superseded_sql, [], |row| row.get(0)) {
703 Ok(n) => n,
704 Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
705 if msg.contains("no such table:")
706 || msg.contains("no such module: vec0") =>
707 {
708 0
709 }
710 Err(e) => return Err(EngineError::Sqlite(e)),
711 };
712 }
713 (stale, superseded)
714 };
715 #[cfg(not(feature = "sqlite-vec"))]
716 let stale_vec_rows: i64 = 0;
717 #[cfg(not(feature = "sqlite-vec"))]
718 let vec_rows_for_superseded_nodes: i64 = 0;
719 let missing_operational_current_rows: i64 = conn.query_row(
720 r"
721 SELECT count(*)
722 FROM operational_mutations m
723 JOIN operational_collections c
724 ON c.name = m.collection_name
725 AND c.kind = 'latest_state'
726 WHERE m.op_kind = 'put'
727 AND NOT EXISTS (
728 SELECT 1
729 FROM operational_mutations newer
730 WHERE newer.collection_name = m.collection_name
731 AND newer.record_key = m.record_key
732 AND newer.mutation_order > m.mutation_order
733 )
734 AND NOT EXISTS (
735 SELECT 1
736 FROM operational_current oc
737 WHERE oc.collection_name = m.collection_name
738 AND oc.record_key = m.record_key
739 )
740 ",
741 [],
742 |row| row.get(0),
743 )?;
744 let stale_operational_current_rows: i64 = conn.query_row(
745 r"
746 SELECT count(*)
747 FROM operational_current oc
748 JOIN operational_collections c
749 ON c.name = oc.collection_name
750 AND c.kind = 'latest_state'
751 LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
752 WHERE m.id IS NULL
753 OR m.collection_name != oc.collection_name
754 OR m.record_key != oc.record_key
755 OR m.op_kind != 'put'
756 OR m.payload_json != oc.payload_json
757 OR EXISTS (
758 SELECT 1
759 FROM operational_mutations newer
760 WHERE newer.collection_name = oc.collection_name
761 AND newer.record_key = oc.record_key
762 AND newer.mutation_order > m.mutation_order
763 )
764 ",
765 [],
766 |row| row.get(0),
767 )?;
768 let disabled_collection_mutations: i64 = conn.query_row(
769 r"
770 SELECT count(*)
771 FROM operational_mutations m
772 JOIN operational_collections c ON c.name = m.collection_name
773 WHERE c.disabled_at IS NOT NULL AND m.created_at > c.disabled_at
774 ",
775 [],
776 |row| row.get(0),
777 )?;
778 let orphaned_last_access_metadata_rows: i64 = conn.query_row(
779 r"
780 SELECT count(*)
781 FROM node_access_metadata am
782 WHERE NOT EXISTS (
783 SELECT 1 FROM nodes n WHERE n.logical_id = am.logical_id
784 )
785 ",
786 [],
787 |row| row.get(0),
788 )?;
789
790 let mut warnings = Vec::new();
791 if orphaned_chunks > 0 {
792 warnings.push(format!(
793 "{orphaned_chunks} orphaned chunk(s) with no surviving node history"
794 ));
795 }
796 if null_source_ref_nodes > 0 {
797 warnings.push(format!(
798 "{null_source_ref_nodes} active node(s) with null source_ref"
799 ));
800 }
801 if broken_step_fk > 0 {
802 warnings.push(format!(
803 "{broken_step_fk} step(s) referencing non-existent run"
804 ));
805 }
806 if broken_action_fk > 0 {
807 warnings.push(format!(
808 "{broken_action_fk} action(s) referencing non-existent step"
809 ));
810 }
811 if stale_fts_rows > 0 {
812 warnings.push(format!(
813 "{stale_fts_rows} stale FTS row(s) referencing missing chunk"
814 ));
815 }
816 if fts_rows_for_superseded_nodes > 0 {
817 warnings.push(format!(
818 "{fts_rows_for_superseded_nodes} FTS row(s) for superseded node(s)"
819 ));
820 }
821 if stale_property_fts_rows > 0 {
822 warnings.push(format!(
823 "{stale_property_fts_rows} stale property FTS row(s) for superseded/missing node(s)"
824 ));
825 }
826 if orphaned_property_fts_rows > 0 {
827 warnings.push(format!(
828 "{orphaned_property_fts_rows} orphaned property FTS row(s) for unregistered kind(s)"
829 ));
830 }
831 if mismatched_kind_property_fts_rows > 0 {
832 warnings.push(format!(
833 "{mismatched_kind_property_fts_rows} property FTS row(s) whose kind does not match the active node"
834 ));
835 }
836 if duplicate_property_fts_rows > 0 {
837 warnings.push(format!(
838 "{duplicate_property_fts_rows} active logical ID(s) with duplicate property FTS rows"
839 ));
840 }
841 if drifted_property_fts_rows > 0 {
842 warnings.push(format!(
843 "{drifted_property_fts_rows} property FTS row(s) with stale text_content"
844 ));
845 }
846 if dangling_edges > 0 {
847 warnings.push(format!(
848 "{dangling_edges} active edge(s) with missing endpoint node"
849 ));
850 }
851 if orphaned_supersession_chains > 0 {
852 warnings.push(format!(
853 "{orphaned_supersession_chains} logical_id(s) with all versions superseded"
854 ));
855 }
856 if stale_vec_rows > 0 {
857 warnings.push(format!(
858 "{stale_vec_rows} stale vec row(s) referencing missing chunk"
859 ));
860 }
861 if vec_rows_for_superseded_nodes > 0 {
862 warnings.push(format!(
863 "{vec_rows_for_superseded_nodes} vec row(s) whose node history is missing"
864 ));
865 }
866 if missing_operational_current_rows > 0 {
867 warnings.push(format!(
868 "{missing_operational_current_rows} latest-state key(s) missing operational_current rows"
869 ));
870 }
871 if stale_operational_current_rows > 0 {
872 warnings.push(format!(
873 "{stale_operational_current_rows} stale operational_current row(s)"
874 ));
875 }
876 if disabled_collection_mutations > 0 {
877 warnings.push(format!(
878 "{disabled_collection_mutations} mutation(s) were written after collection disable"
879 ));
880 }
881 if orphaned_last_access_metadata_rows > 0 {
882 warnings.push(format!(
883 "{orphaned_last_access_metadata_rows} last_access metadata row(s) reference missing node history"
884 ));
885 }
886
887 Ok(SemanticReport {
888 orphaned_chunks: i64_to_usize(orphaned_chunks),
889 null_source_ref_nodes: i64_to_usize(null_source_ref_nodes),
890 broken_step_fk: i64_to_usize(broken_step_fk),
891 broken_action_fk: i64_to_usize(broken_action_fk),
892 stale_fts_rows: i64_to_usize(stale_fts_rows),
893 fts_rows_for_superseded_nodes: i64_to_usize(fts_rows_for_superseded_nodes),
894 stale_property_fts_rows: i64_to_usize(stale_property_fts_rows),
895 orphaned_property_fts_rows: i64_to_usize(orphaned_property_fts_rows),
896 mismatched_kind_property_fts_rows: i64_to_usize(mismatched_kind_property_fts_rows),
897 duplicate_property_fts_rows: i64_to_usize(duplicate_property_fts_rows),
898 drifted_property_fts_rows: i64_to_usize(drifted_property_fts_rows),
899 dangling_edges: i64_to_usize(dangling_edges),
900 orphaned_supersession_chains: i64_to_usize(orphaned_supersession_chains),
901 stale_vec_rows: i64_to_usize(stale_vec_rows),
902 vec_rows_for_superseded_nodes: i64_to_usize(vec_rows_for_superseded_nodes),
903 missing_operational_current_rows: i64_to_usize(missing_operational_current_rows),
904 stale_operational_current_rows: i64_to_usize(stale_operational_current_rows),
905 disabled_collection_mutations: i64_to_usize(disabled_collection_mutations),
906 orphaned_last_access_metadata_rows: i64_to_usize(orphaned_last_access_metadata_rows),
907 warnings,
908 })
909 }
910}
911
912fn count_per_kind_property_fts_issues(
920 conn: &rusqlite::Connection,
921) -> Result<(i64, i64, i64, i64), EngineError> {
922 let per_kind_tables: Vec<String> = {
926 let mut stmt = conn.prepare(
927 "SELECT name FROM sqlite_master \
928 WHERE type='table' AND name LIKE 'fts_props_%' \
929 AND sql LIKE 'CREATE VIRTUAL TABLE%'",
930 )?;
931 stmt.query_map([], |r| r.get::<_, String>(0))?
932 .collect::<Result<Vec<_>, _>>()?
933 };
934
935 let registered_kinds: std::collections::HashSet<String> = {
936 let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
937 stmt.query_map([], |r| r.get::<_, String>(0))?
938 .collect::<Result<std::collections::HashSet<_>, _>>()?
939 };
940
941 let mut stale = 0i64;
942 let mut orphaned = 0i64;
943 let mut duplicate = 0i64;
944
945 for table in &per_kind_tables {
946 let kind_stale: i64 = conn.query_row(
948 &format!(
949 "SELECT count(*) FROM {table} fp \
950 WHERE NOT EXISTS (\
951 SELECT 1 FROM nodes n \
952 WHERE n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL\
953 )"
954 ),
955 [],
956 |r| r.get(0),
957 )?;
958 stale += kind_stale;
959
960 let kind_dup: i64 = conn.query_row(
962 &format!(
963 "SELECT count(*) FROM (\
964 SELECT node_logical_id FROM {table} \
965 GROUP BY node_logical_id HAVING count(*) > 1\
966 )"
967 ),
968 [],
969 |r| r.get(0),
970 )?;
971 duplicate += kind_dup;
972
973 let table_has_schema = registered_kinds
976 .iter()
977 .any(|k| fathomdb_schema::fts_kind_table_name(k) == *table);
978 if !table_has_schema {
979 let table_rows: i64 =
980 conn.query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))?;
981 orphaned += table_rows;
982 }
983 }
984
985 Ok((stale, orphaned, 0, duplicate))
987}
988
989fn count_missing_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
993 let schemas = crate::writer::load_fts_property_schemas(conn)?;
994 if schemas.is_empty() {
995 return Ok(0);
996 }
997
998 let mut missing = 0i64;
999 for (kind, schema) in &schemas {
1000 let table = fathomdb_schema::fts_kind_table_name(kind);
1001 let table_exists: bool = conn
1003 .query_row(
1004 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1005 [table.as_str()],
1006 |r| r.get::<_, i64>(0),
1007 )
1008 .unwrap_or(0)
1009 > 0;
1010
1011 if table_exists {
1012 let mut stmt = conn.prepare(&format!(
1013 "SELECT n.logical_id, n.properties FROM nodes n \
1014 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
1015 AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
1016 ))?;
1017 let rows = stmt.query_map([kind.as_str()], |row| {
1018 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1019 })?;
1020 for row in rows {
1021 let (_logical_id, properties_str) = row?;
1022 let props: serde_json::Value =
1023 serde_json::from_str(&properties_str).unwrap_or_default();
1024 if crate::writer::extract_property_fts(&props, schema)
1025 .0
1026 .is_some()
1027 {
1028 missing += 1;
1029 }
1030 }
1031 } else {
1032 let mut stmt = conn.prepare(
1034 "SELECT n.logical_id, n.properties FROM nodes n \
1035 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
1036 )?;
1037 let rows = stmt.query_map([kind.as_str()], |row| {
1038 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1039 })?;
1040 for row in rows {
1041 let (_logical_id, properties_str) = row?;
1042 let props: serde_json::Value =
1043 serde_json::from_str(&properties_str).unwrap_or_default();
1044 if crate::writer::extract_property_fts(&props, schema)
1045 .0
1046 .is_some()
1047 {
1048 missing += 1;
1049 }
1050 }
1051 }
1052 }
1053 Ok(missing)
1054}
1055
1056fn count_drifted_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1061 let schemas = crate::writer::load_fts_property_schemas(conn)?;
1062 if schemas.is_empty() {
1063 return Ok(0);
1064 }
1065
1066 let mut drifted = 0i64;
1067 for (kind, schema) in &schemas {
1068 let table = fathomdb_schema::fts_kind_table_name(kind);
1069 let table_exists: bool = conn
1071 .query_row(
1072 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1073 [table.as_str()],
1074 |r| r.get::<_, i64>(0),
1075 )
1076 .unwrap_or(0)
1077 > 0;
1078 if !table_exists {
1079 continue;
1080 }
1081 let mut stmt = conn.prepare(&format!(
1082 "SELECT fp.node_logical_id, fp.text_content, n.properties \
1083 FROM {table} fp \
1084 JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1085 WHERE n.kind = ?1"
1086 ))?;
1087 let rows = stmt.query_map([kind.as_str()], |row| {
1088 Ok((
1089 row.get::<_, String>(0)?,
1090 row.get::<_, String>(1)?,
1091 row.get::<_, String>(2)?,
1092 ))
1093 })?;
1094 for row in rows {
1095 let (_logical_id, stored_text, properties_str) = row?;
1096 let props: serde_json::Value =
1097 serde_json::from_str(&properties_str).unwrap_or_default();
1098 let (expected, _positions, _stats) =
1099 crate::writer::extract_property_fts(&props, schema);
1100 match expected {
1101 Some(text) if text == stored_text => {}
1102 _ => drifted += 1,
1103 }
1104 }
1105 }
1106 Ok(drifted)
1107}
1108
1109#[allow(clippy::expect_used)]
1112pub(super) fn i64_to_usize(val: i64) -> usize {
1113 usize::try_from(val).expect("count(*) must be non-negative")
1114}
1115
1116pub(super) fn persist_simple_provenance_event(
1117 conn: &rusqlite::Connection,
1118 event_type: &str,
1119 subject: &str,
1120 metadata: Option<serde_json::Value>,
1121) -> Result<(), EngineError> {
1122 let metadata_json = metadata.map(|value| value.to_string()).unwrap_or_default();
1123 conn.execute(
1124 "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1125 rusqlite::params![new_id(), event_type, subject, metadata_json],
1126 )?;
1127 Ok(())
1128}
1129
1130pub(super) fn rebuild_operational_current_rows(
1131 tx: &rusqlite::Transaction<'_>,
1132 collections: &[String],
1133) -> Result<usize, EngineError> {
1134 let mut rebuilt_rows = 0usize;
1135 clear_operational_current_rows(tx, collections)?;
1136 let mut ins_current = tx.prepare_cached(
1137 "INSERT INTO operational_current \
1138 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1139 VALUES (?1, ?2, ?3, ?4, ?5)",
1140 )?;
1141
1142 for collection in collections {
1143 let mut stmt = tx.prepare(
1144 "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
1145 FROM operational_mutations \
1146 WHERE collection_name = ?1 \
1147 ORDER BY record_key, mutation_order",
1148 )?;
1149 let mut latest_by_key: std::collections::HashMap<String, Option<(String, i64, String)>> =
1150 std::collections::HashMap::new();
1151 let rows = stmt.query_map([collection], operational::map_operational_mutation_row)?;
1152 for row in rows {
1153 let mutation = row?;
1154 match mutation.op_kind.as_str() {
1155 "put" => {
1156 latest_by_key.insert(
1157 mutation.record_key,
1158 Some((mutation.payload_json, mutation.created_at, mutation.id)),
1159 );
1160 }
1161 "delete" => {
1162 latest_by_key.insert(mutation.record_key, None);
1163 }
1164 _ => {}
1165 }
1166 }
1167
1168 for (record_key, state) in latest_by_key {
1169 if let Some((payload_json, updated_at, last_mutation_id)) = state {
1170 ins_current.execute(rusqlite::params![
1171 collection,
1172 record_key,
1173 payload_json,
1174 updated_at,
1175 last_mutation_id,
1176 ])?;
1177 rebuilt_rows += 1;
1178 }
1179 }
1180 }
1181
1182 drop(ins_current);
1183 Ok(rebuilt_rows)
1184}
1185
1186pub(super) fn clear_operational_current_rows(
1187 tx: &rusqlite::Transaction<'_>,
1188 collections: &[String],
1189) -> Result<(), EngineError> {
1190 let mut delete_current =
1191 tx.prepare_cached("DELETE FROM operational_current WHERE collection_name = ?1")?;
1192 let mut delete_secondary_current = tx.prepare_cached(
1193 "DELETE FROM operational_secondary_index_entries \
1194 WHERE collection_name = ?1 AND subject_kind = 'current'",
1195 )?;
1196 for collection in collections {
1197 delete_secondary_current.execute([collection])?;
1198 delete_current.execute([collection])?;
1199 }
1200 drop(delete_secondary_current);
1201 drop(delete_current);
1202 Ok(())
1203}
1204
1205#[cfg(test)]
1206#[allow(clippy::expect_used)]
1207mod tests {
1208 use std::fs;
1209 use std::sync::Arc;
1210
1211 use fathomdb_schema::SchemaManager;
1212 use tempfile::NamedTempFile;
1213
1214 use super::{
1215 AdminService, FtsPropertyPathMode, FtsPropertyPathSpec, SafeExportOptions,
1216 VectorRegenerationConfig,
1217 };
1218 use crate::embedder::{BatchEmbedder, EmbedderError, QueryEmbedder, QueryEmbedderIdentity};
1219 use crate::projection::ProjectionTarget;
1220 use crate::sqlite;
1221 use crate::{EngineError, OperationalCollectionKind, OperationalRegisterRequest};
1222
1223 #[cfg(feature = "sqlite-vec")]
1224 use crate::{ExecutionCoordinator, TelemetryCounters};
1225
1226 #[cfg(feature = "sqlite-vec")]
1227 use fathomdb_query::QueryBuilder;
1228
1229 #[cfg(feature = "sqlite-vec")]
1230 use super::load_vector_regeneration_config;
1231
1232 #[derive(Debug)]
1236 #[allow(dead_code)]
1237 struct TestEmbedder {
1238 identity: QueryEmbedderIdentity,
1239 vector: Vec<f32>,
1240 }
1241
1242 #[allow(dead_code)]
1243 impl TestEmbedder {
1244 fn new(model: &str, dimension: usize) -> Self {
1245 Self {
1246 identity: QueryEmbedderIdentity {
1247 model_identity: model.to_owned(),
1248 model_version: "1.0.0".to_owned(),
1249 dimension,
1250 normalization_policy: "l2".to_owned(),
1251 },
1252 vector: vec![1.0; dimension],
1253 }
1254 }
1255 }
1256
1257 impl QueryEmbedder for TestEmbedder {
1258 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1259 Ok(self.vector.clone())
1260 }
1261 fn identity(&self) -> QueryEmbedderIdentity {
1262 self.identity.clone()
1263 }
1264 fn max_tokens(&self) -> usize {
1265 512
1266 }
1267 }
1268
1269 impl BatchEmbedder for TestEmbedder {
1270 fn batch_embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, EmbedderError> {
1271 Ok(texts.iter().map(|_| self.vector.clone()).collect())
1272 }
1273 fn identity(&self) -> QueryEmbedderIdentity {
1274 self.identity.clone()
1275 }
1276 fn max_tokens(&self) -> usize {
1277 512
1278 }
1279 }
1280
1281 #[derive(Debug)]
1284 #[allow(dead_code)]
1285 struct FailingEmbedder {
1286 identity: QueryEmbedderIdentity,
1287 }
1288
1289 impl QueryEmbedder for FailingEmbedder {
1290 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1291 Err(EmbedderError::Failed("test failure".to_owned()))
1292 }
1293 fn identity(&self) -> QueryEmbedderIdentity {
1294 self.identity.clone()
1295 }
1296 fn max_tokens(&self) -> usize {
1297 512
1298 }
1299 }
1300
1301 #[allow(dead_code)]
1302 #[cfg(unix)]
1303 fn set_file_mode(path: &std::path::Path, mode: u32) {
1304 use std::os::unix::fs::PermissionsExt;
1305
1306 let mut permissions = fs::metadata(path).expect("script metadata").permissions();
1307 permissions.set_mode(mode);
1308 fs::set_permissions(path, permissions).expect("chmod");
1309 }
1310
1311 #[allow(dead_code)]
1312 #[cfg(not(unix))]
1313 fn set_file_mode(_path: &std::path::Path, _mode: u32) {}
1314 fn setup() -> (NamedTempFile, AdminService) {
1315 let db = NamedTempFile::new().expect("temp file");
1316 let schema = Arc::new(SchemaManager::new());
1317 {
1318 let conn = sqlite::open_connection(db.path()).expect("connection");
1319 schema.bootstrap(&conn).expect("bootstrap");
1320 }
1321 let service = AdminService::new(db.path(), Arc::clone(&schema));
1322 (db, service)
1323 }
1324
1325 #[test]
1326 fn check_integrity_includes_active_uniqueness_count() {
1327 let (_db, service) = setup();
1328 let report = service.check_integrity().expect("integrity check");
1329 assert_eq!(report.duplicate_active_logical_ids, 0);
1330 assert_eq!(report.operational_missing_collections, 0);
1331 assert_eq!(report.operational_missing_last_mutations, 0);
1332 }
1333
1334 #[test]
1335 fn trace_source_returns_node_logical_ids() {
1336 let (db, service) = setup();
1337 {
1338 let conn = sqlite::open_connection(db.path()).expect("conn");
1339 conn.execute(
1340 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1341 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 'source-1')",
1342 [],
1343 )
1344 .expect("insert node");
1345 }
1346 let report = service.trace_source("source-1").expect("trace");
1347 assert_eq!(report.node_rows, 1);
1348 assert_eq!(report.node_logical_ids, vec!["lg1"]);
1349 }
1350
1351 #[test]
1352 fn trace_source_includes_operational_mutations() {
1353 let (db, service) = setup();
1354 {
1355 let conn = sqlite::open_connection(db.path()).expect("conn");
1356 conn.execute(
1357 "INSERT INTO operational_collections \
1358 (name, kind, schema_json, retention_json, format_version, created_at) \
1359 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1360 [],
1361 )
1362 .expect("insert collection");
1363 conn.execute(
1364 "INSERT INTO operational_mutations \
1365 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1366 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"ok\"}', 'source-1', 100, 1)",
1367 [],
1368 )
1369 .expect("insert mutation");
1370 }
1371
1372 let report = service.trace_source("source-1").expect("trace");
1373 assert_eq!(report.operational_mutation_rows, 1);
1374 assert_eq!(report.operational_mutation_ids, vec!["m1"]);
1375 }
1376
1377 #[test]
1378 fn excise_source_restores_prior_active_node() {
1379 let (db, service) = setup();
1380 {
1381 let conn = sqlite::open_connection(db.path()).expect("conn");
1382 conn.execute(
1383 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1384 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
1385 [],
1386 )
1387 .expect("insert v1 superseded");
1388 conn.execute(
1389 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1390 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
1391 [],
1392 )
1393 .expect("insert v2 active");
1394 }
1395 service.excise_source("source-2").expect("excise");
1396 {
1397 let conn = sqlite::open_connection(db.path()).expect("conn");
1398 let active_row_id: String = conn
1399 .query_row(
1400 "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
1401 [],
1402 |row| row.get(0),
1403 )
1404 .expect("active row exists after excise");
1405 assert_eq!(active_row_id, "r1");
1406 }
1407 }
1408
1409 #[test]
1410 fn excise_source_deletes_operational_mutations_and_repairs_latest_state_current() {
1411 let (db, service) = setup();
1412 {
1413 let conn = sqlite::open_connection(db.path()).expect("conn");
1414 conn.execute(
1415 "INSERT INTO operational_collections \
1416 (name, kind, schema_json, retention_json, format_version, created_at) \
1417 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1418 [],
1419 )
1420 .expect("insert collection");
1421 conn.execute(
1422 "INSERT INTO operational_mutations \
1423 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1424 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'source-1', 100, 1)",
1425 [],
1426 )
1427 .expect("insert prior mutation");
1428 conn.execute(
1429 "INSERT INTO operational_mutations \
1430 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1431 VALUES ('m2', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'source-2', 200, 2)",
1432 [],
1433 )
1434 .expect("insert excised mutation");
1435 conn.execute(
1436 "INSERT INTO operational_current \
1437 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1438 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 200, 'm2')",
1439 [],
1440 )
1441 .expect("insert current row");
1442 }
1443
1444 let traced = service
1445 .trace_source("source-2")
1446 .expect("trace before excise");
1447 assert_eq!(traced.operational_mutation_rows, 1);
1448 assert_eq!(traced.operational_mutation_ids, vec!["m2"]);
1449
1450 let excised = service.excise_source("source-2").expect("excise");
1451 assert_eq!(excised.operational_mutation_rows, 0);
1452 assert!(excised.operational_mutation_ids.is_empty());
1453
1454 {
1455 let conn = sqlite::open_connection(db.path()).expect("conn");
1456 let remaining: i64 = conn
1457 .query_row(
1458 "SELECT count(*) FROM operational_mutations WHERE source_ref = 'source-2'",
1459 [],
1460 |row| row.get(0),
1461 )
1462 .expect("remaining count");
1463 assert_eq!(remaining, 0);
1464
1465 let current: (String, String) = conn
1466 .query_row(
1467 "SELECT payload_json, last_mutation_id FROM operational_current \
1468 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1469 [],
1470 |row| Ok((row.get(0)?, row.get(1)?)),
1471 )
1472 .expect("rebuilt current row");
1473 assert_eq!(current.0, "{\"status\":\"old\"}");
1474 assert_eq!(current.1, "m1");
1475 }
1476 }
1477
1478 #[test]
1479 fn restore_logical_id_reestablishes_last_pre_retire_content_and_attached_edges() {
1480 let (db, service) = setup();
1481 {
1482 let conn = sqlite::open_connection(db.path()).expect("conn");
1483 conn.execute(
1484 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1485 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1486 [],
1487 )
1488 .expect("insert node");
1489 conn.execute(
1490 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1491 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1492 [],
1493 )
1494 .expect("insert target node");
1495 conn.execute(
1496 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1497 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1498 [],
1499 )
1500 .expect("insert chunk");
1501 conn.execute(
1502 "INSERT INTO edges \
1503 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1504 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1505 [],
1506 )
1507 .expect("insert edge");
1508 conn.execute(
1509 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1510 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1511 [],
1512 )
1513 .expect("insert node retire event");
1514 conn.execute(
1515 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1516 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
1517 [],
1518 )
1519 .expect("insert edge retire event");
1520 conn.execute(
1521 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1522 [],
1523 )
1524 .expect("retire node");
1525 conn.execute(
1526 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
1527 [],
1528 )
1529 .expect("retire edge");
1530 conn.execute("DELETE FROM fts_nodes", [])
1531 .expect("clear fts");
1532 }
1533
1534 let report = service.restore_logical_id("doc-1").expect("restore");
1535 assert_eq!(report.logical_id, "doc-1");
1536 assert!(!report.was_noop);
1537 assert_eq!(report.restored_node_rows, 1);
1538 assert_eq!(report.restored_edge_rows, 1);
1539 assert_eq!(report.restored_chunk_rows, 1);
1540 assert_eq!(report.restored_fts_rows, 1);
1541
1542 let conn = sqlite::open_connection(db.path()).expect("conn");
1543 let active_node_count: i64 = conn
1544 .query_row(
1545 "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1546 [],
1547 |row| row.get(0),
1548 )
1549 .expect("active node count");
1550 assert_eq!(active_node_count, 1);
1551 let active_edge_count: i64 = conn
1552 .query_row(
1553 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1554 [],
1555 |row| row.get(0),
1556 )
1557 .expect("active edge count");
1558 assert_eq!(active_edge_count, 1);
1559 let fts_count: i64 = conn
1560 .query_row(
1561 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
1562 [],
1563 |row| row.get(0),
1564 )
1565 .expect("fts count");
1566 assert_eq!(fts_count, 1);
1567 }
1568
1569 #[test]
1570 fn restore_logical_id_restores_edges_retired_after_the_node_retire_event() {
1571 let (db, service) = setup();
1572 {
1573 let conn = sqlite::open_connection(db.path()).expect("conn");
1574 conn.execute(
1575 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1576 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1577 [],
1578 )
1579 .expect("insert node");
1580 conn.execute(
1581 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1582 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1583 [],
1584 )
1585 .expect("insert target node");
1586 conn.execute(
1587 "INSERT INTO edges \
1588 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1589 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1590 [],
1591 )
1592 .expect("insert edge");
1593 conn.execute(
1594 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1595 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1596 [],
1597 )
1598 .expect("insert node retire event");
1599 conn.execute(
1600 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1601 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 201, '')",
1602 [],
1603 )
1604 .expect("insert edge retire event");
1605 conn.execute(
1606 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1607 [],
1608 )
1609 .expect("retire node");
1610 conn.execute(
1611 "UPDATE edges SET superseded_at = 201 WHERE logical_id = 'edge-1'",
1612 [],
1613 )
1614 .expect("retire edge");
1615 }
1616
1617 let report = service.restore_logical_id("doc-1").expect("restore");
1618 assert_eq!(report.restored_edge_rows, 1);
1619
1620 let conn = sqlite::open_connection(db.path()).expect("conn");
1621 let active_edge_count: i64 = conn
1622 .query_row(
1623 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1624 [],
1625 |row| row.get(0),
1626 )
1627 .expect("active edge count");
1628 assert_eq!(active_edge_count, 1);
1629 }
1630
1631 #[test]
1632 fn restore_logical_id_prefers_latest_retired_revision_when_timestamps_tie() {
1633 let (db, service) = setup();
1634 {
1635 let conn = sqlite::open_connection(db.path()).expect("conn");
1636 conn.execute(
1637 "INSERT INTO nodes \
1638 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1639 VALUES ('node-row-older', 'doc-1', 'Document', '{\"title\":\"older\"}', 100, 200, 'forget-1')",
1640 [],
1641 )
1642 .expect("insert older retired node");
1643 conn.execute(
1644 "INSERT INTO nodes \
1645 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1646 VALUES ('node-row-newer', 'doc-1', 'Document', '{\"title\":\"newer\"}', 100, 200, 'forget-1')",
1647 [],
1648 )
1649 .expect("insert newer retired node");
1650 conn.execute(
1651 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1652 VALUES ('evt-retire-older', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1653 [],
1654 )
1655 .expect("insert older retire event");
1656 conn.execute(
1657 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1658 VALUES ('evt-retire-newer', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1659 [],
1660 )
1661 .expect("insert newer retire event");
1662 }
1663
1664 let report = service.restore_logical_id("doc-1").expect("restore");
1665
1666 assert!(!report.was_noop);
1667 let conn = sqlite::open_connection(db.path()).expect("conn");
1668 let active_row: (String, String) = conn
1669 .query_row(
1670 "SELECT row_id, properties FROM nodes \
1671 WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1672 [],
1673 |row| Ok((row.get(0)?, row.get(1)?)),
1674 )
1675 .expect("restored active row");
1676 assert_eq!(active_row.0, "node-row-newer");
1677 assert_eq!(active_row.1, "{\"title\":\"newer\"}");
1678 }
1679
1680 #[test]
1681 fn purge_logical_id_removes_retired_content_and_records_tombstone() {
1682 let (db, service) = setup();
1683 {
1684 let conn = sqlite::open_connection(db.path()).expect("conn");
1685 conn.execute(
1686 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1687 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1688 [],
1689 )
1690 .expect("insert retired node");
1691 conn.execute(
1692 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1693 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1694 [],
1695 )
1696 .expect("insert chunk");
1697 conn.execute(
1698 "INSERT INTO edges \
1699 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, superseded_at, source_ref) \
1700 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 200, 'seed')",
1701 [],
1702 )
1703 .expect("insert retired edge");
1704 conn.execute(
1705 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1706 VALUES ('chunk-1', 'doc-1', 'Document', 'budget narrative')",
1707 [],
1708 )
1709 .expect("insert fts");
1710 }
1711
1712 let report = service.purge_logical_id("doc-1").expect("purge");
1713 assert_eq!(report.logical_id, "doc-1");
1714 assert!(!report.was_noop);
1715 assert_eq!(report.deleted_node_rows, 1);
1716 assert_eq!(report.deleted_edge_rows, 1);
1717 assert_eq!(report.deleted_chunk_rows, 1);
1718 assert_eq!(report.deleted_fts_rows, 1);
1719
1720 let conn = sqlite::open_connection(db.path()).expect("conn");
1721 let remaining_nodes: i64 = conn
1722 .query_row(
1723 "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1'",
1724 [],
1725 |row| row.get(0),
1726 )
1727 .expect("remaining nodes");
1728 assert_eq!(remaining_nodes, 0);
1729 let remaining_edges: i64 = conn
1730 .query_row(
1731 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1'",
1732 [],
1733 |row| row.get(0),
1734 )
1735 .expect("remaining edges");
1736 assert_eq!(remaining_edges, 0);
1737 let remaining_chunks: i64 = conn
1738 .query_row(
1739 "SELECT count(*) FROM chunks WHERE id = 'chunk-1'",
1740 [],
1741 |row| row.get(0),
1742 )
1743 .expect("remaining chunks");
1744 assert_eq!(remaining_chunks, 0);
1745 let purge_events: i64 = conn
1746 .query_row(
1747 "SELECT count(*) FROM provenance_events WHERE event_type = 'purge_logical_id' AND subject = 'doc-1'",
1748 [],
1749 |row| row.get(0),
1750 )
1751 .expect("purge events");
1752 assert_eq!(purge_events, 1);
1753 }
1754
1755 #[test]
1756 fn check_semantics_accepts_preserved_retired_chunks() {
1757 let (db, service) = setup();
1758 {
1759 let conn = sqlite::open_connection(db.path()).expect("conn");
1760 conn.execute(
1761 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1762 VALUES ('node-row-1', 'doc-1', 'Document', '{}', 100, 200, 'seed')",
1763 [],
1764 )
1765 .expect("insert retired node");
1766 conn.execute(
1767 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1768 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1769 [],
1770 )
1771 .expect("insert chunk");
1772 }
1773
1774 let report = service.check_semantics().expect("semantics");
1775 assert_eq!(report.orphaned_chunks, 0);
1776 }
1777
1778 #[test]
1779 fn check_semantics_detects_missing_retired_node_history_for_preserved_chunks() {
1780 let (db, service) = setup();
1781 {
1782 let conn = sqlite::open_connection(db.path()).expect("conn");
1783 conn.execute(
1784 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1785 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1786 [],
1787 )
1788 .expect("insert orphaned chunk");
1789 }
1790
1791 let report = service.check_semantics().expect("semantics");
1792 assert_eq!(report.orphaned_chunks, 1);
1793 }
1794
1795 #[cfg(feature = "sqlite-vec")]
1796 #[test]
1797 fn check_semantics_detects_missing_retired_node_history_for_preserved_vec_rows() {
1798 let (db, service) = setup();
1799 {
1800 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1801 service
1802 .schema_manager
1803 .ensure_vec_kind_profile(&conn, "Doc", 4)
1804 .expect("ensure vec kind profile");
1805 conn.execute(
1806 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1807 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1808 [],
1809 )
1810 .expect("insert orphaned chunk");
1811 conn.execute(
1812 "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1813 [],
1814 )
1815 .expect("insert vec row");
1816 }
1817
1818 let report = service.check_semantics().expect("semantics");
1819 assert_eq!(report.orphaned_chunks, 1);
1820 assert_eq!(report.vec_rows_for_superseded_nodes, 1);
1821 }
1822
1823 #[cfg(feature = "sqlite-vec")]
1824 #[test]
1825 fn restore_logical_id_reestablishes_vector_search_without_reingest() {
1826 let (db, service) = setup();
1827 {
1828 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1829 service
1830 .schema_manager
1831 .ensure_vec_kind_profile(&conn, "Document", 4)
1832 .expect("ensure vec kind profile");
1833 conn.execute(
1834 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1835 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1836 [],
1837 )
1838 .expect("insert retired node");
1839 conn.execute(
1840 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1841 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1842 [],
1843 )
1844 .expect("insert chunk");
1845 conn.execute(
1846 "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1847 [],
1848 )
1849 .expect("insert vec row");
1850 conn.execute(
1851 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1852 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1853 [],
1854 )
1855 .expect("insert retire event");
1856 }
1857
1858 let report = service.restore_logical_id("doc-1").expect("restore");
1859 assert_eq!(report.restored_vec_rows, 1);
1860
1861 let coordinator = ExecutionCoordinator::open(
1862 db.path(),
1863 Arc::new(SchemaManager::new()),
1864 Some(4),
1865 1,
1866 Arc::new(TelemetryCounters::default()),
1867 None,
1868 )
1869 .expect("coordinator");
1870 let compiled = QueryBuilder::nodes("Document")
1871 .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
1872 .compile()
1873 .expect("compile");
1874 let rows = coordinator
1875 .execute_compiled_read(&compiled)
1876 .expect("vector read");
1877 assert!(
1878 rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
1879 "restore should make the preserved vec row visible again without re-ingest"
1880 );
1881 }
1882
1883 #[cfg(feature = "sqlite-vec")]
1884 #[test]
1885 fn purge_logical_id_deletes_vec_rows_for_retired_content() {
1886 let (db, service) = setup();
1887 {
1888 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1889 service
1890 .schema_manager
1891 .ensure_vec_kind_profile(&conn, "Document", 4)
1892 .expect("ensure vec kind profile");
1893 conn.execute(
1894 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1895 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1896 [],
1897 )
1898 .expect("insert retired node");
1899 conn.execute(
1900 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1901 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1902 [],
1903 )
1904 .expect("insert chunk");
1905 conn.execute(
1906 "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1907 [],
1908 )
1909 .expect("insert vec row");
1910 }
1911
1912 let report = service.purge_logical_id("doc-1").expect("purge");
1913 assert_eq!(report.deleted_vec_rows, 1);
1914
1915 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1916 let vec_count: i64 = conn
1917 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
1918 .expect("vec count");
1919 assert_eq!(vec_count, 0);
1920 }
1921
1922 #[cfg(feature = "sqlite-vec")]
1923 #[test]
1924 fn restore_logical_id_restores_visibility_of_regenerated_vectors() {
1925 let (db, service) = setup();
1926
1927 {
1928 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1929 conn.execute(
1930 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1931 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1932 [],
1933 )
1934 .expect("insert node");
1935 conn.execute(
1936 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1937 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1938 [],
1939 )
1940 .expect("insert chunk");
1941 }
1942
1943 let embedder = TestEmbedder::new("test-model", 4);
1944 service
1945 .regenerate_vector_embeddings(
1946 &embedder,
1947 &VectorRegenerationConfig {
1948 kind: "Document".to_owned(),
1949 profile: "default".to_owned(),
1950 chunking_policy: "per_chunk".to_owned(),
1951 preprocessing_policy: "trim".to_owned(),
1952 },
1953 )
1954 .expect("regenerate");
1955
1956 {
1957 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1958 conn.execute(
1959 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1960 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1961 [],
1962 )
1963 .expect("insert retire event");
1964 conn.execute(
1965 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1966 [],
1967 )
1968 .expect("retire node");
1969 }
1970
1971 let report = service.restore_logical_id("doc-1").expect("restore");
1972 assert_eq!(report.restored_vec_rows, 1);
1973
1974 let coordinator = ExecutionCoordinator::open(
1975 db.path(),
1976 Arc::new(SchemaManager::new()),
1977 Some(4),
1978 1,
1979 Arc::new(TelemetryCounters::default()),
1980 None,
1981 )
1982 .expect("coordinator");
1983 let compiled = QueryBuilder::nodes("Document")
1984 .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
1985 .compile()
1986 .expect("compile");
1987 let rows = coordinator
1988 .execute_compiled_read(&compiled)
1989 .expect("vector read");
1990 assert!(
1991 rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
1992 "restored logical_id should become visible through regenerated vectors"
1993 );
1994 }
1995
1996 #[test]
1997 fn check_semantics_clean_db_returns_zeros() {
1998 let (_db, service) = setup();
1999 let report = service.check_semantics().expect("semantics check");
2000 assert_eq!(report.orphaned_chunks, 0);
2001 assert_eq!(report.null_source_ref_nodes, 0);
2002 assert_eq!(report.broken_step_fk, 0);
2003 assert_eq!(report.broken_action_fk, 0);
2004 assert_eq!(report.stale_fts_rows, 0);
2005 assert_eq!(report.fts_rows_for_superseded_nodes, 0);
2006 assert_eq!(report.dangling_edges, 0);
2007 assert_eq!(report.orphaned_supersession_chains, 0);
2008 assert_eq!(report.stale_vec_rows, 0);
2009 assert_eq!(report.vec_rows_for_superseded_nodes, 0);
2010 assert_eq!(report.missing_operational_current_rows, 0);
2011 assert_eq!(report.stale_operational_current_rows, 0);
2012 assert_eq!(report.disabled_collection_mutations, 0);
2013 assert_eq!(report.mismatched_kind_property_fts_rows, 0);
2014 assert_eq!(report.duplicate_property_fts_rows, 0);
2015 assert_eq!(report.drifted_property_fts_rows, 0);
2016 assert!(report.warnings.is_empty());
2017 }
2018
2019 #[test]
2020 fn register_operational_collection_persists_and_emits_provenance() {
2021 let (db, service) = setup();
2022 let record = service
2023 .register_operational_collection(&OperationalRegisterRequest {
2024 name: "connector_health".to_owned(),
2025 kind: OperationalCollectionKind::LatestState,
2026 schema_json: "{}".to_owned(),
2027 retention_json: "{}".to_owned(),
2028 filter_fields_json: "[]".to_owned(),
2029 validation_json: String::new(),
2030 secondary_indexes_json: "[]".to_owned(),
2031 format_version: 1,
2032 })
2033 .expect("register collection");
2034
2035 assert_eq!(record.name, "connector_health");
2036 assert_eq!(record.kind, OperationalCollectionKind::LatestState);
2037 assert_eq!(record.schema_json, "{}");
2038 assert_eq!(record.retention_json, "{}");
2039 assert_eq!(record.filter_fields_json, "[]");
2040 assert!(record.created_at > 0);
2041 assert_eq!(record.disabled_at, None);
2042
2043 let described = service
2044 .describe_operational_collection("connector_health")
2045 .expect("describe collection")
2046 .expect("collection exists");
2047 assert_eq!(described, record);
2048
2049 let conn = sqlite::open_connection(db.path()).expect("conn");
2050 let provenance_count: i64 = conn
2051 .query_row(
2052 "SELECT count(*) FROM provenance_events \
2053 WHERE event_type = 'operational_collection_registered' AND subject = 'connector_health'",
2054 [],
2055 |row| row.get(0),
2056 )
2057 .expect("provenance count");
2058 assert_eq!(provenance_count, 1);
2059 }
2060
2061 #[test]
2062 fn register_and_update_operational_collection_validation_round_trip() {
2063 let (db, service) = setup();
2064 let record = service
2065 .register_operational_collection(&OperationalRegisterRequest {
2066 name: "connector_health".to_owned(),
2067 kind: OperationalCollectionKind::LatestState,
2068 schema_json: "{}".to_owned(),
2069 retention_json: "{}".to_owned(),
2070 filter_fields_json: "[]".to_owned(),
2071 validation_json: String::new(),
2072 secondary_indexes_json: "[]".to_owned(),
2073 format_version: 1,
2074 })
2075 .expect("register collection");
2076 assert_eq!(record.validation_json, "");
2077
2078 let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
2079 let updated = service
2080 .update_operational_collection_validation("connector_health", validation_json)
2081 .expect("update validation");
2082 assert_eq!(updated.validation_json, validation_json);
2083
2084 let described = service
2085 .describe_operational_collection("connector_health")
2086 .expect("describe collection")
2087 .expect("collection exists");
2088 assert_eq!(described.validation_json, validation_json);
2089
2090 let conn = sqlite::open_connection(db.path()).expect("conn");
2091 let provenance_count: i64 = conn
2092 .query_row(
2093 "SELECT count(*) FROM provenance_events \
2094 WHERE event_type = 'operational_collection_validation_updated' \
2095 AND subject = 'connector_health'",
2096 [],
2097 |row| row.get(0),
2098 )
2099 .expect("provenance count");
2100 assert_eq!(provenance_count, 1);
2101 }
2102
2103 #[test]
2104 fn register_update_and_rebuild_operational_secondary_indexes_round_trip() {
2105 let (db, service) = setup();
2106 let record = service
2107 .register_operational_collection(&OperationalRegisterRequest {
2108 name: "audit_log".to_owned(),
2109 kind: OperationalCollectionKind::AppendOnlyLog,
2110 schema_json: "{}".to_owned(),
2111 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2112 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
2113 validation_json: String::new(),
2114 secondary_indexes_json: "[]".to_owned(),
2115 format_version: 1,
2116 })
2117 .expect("register collection");
2118 assert_eq!(record.secondary_indexes_json, "[]");
2119
2120 {
2121 let writer = crate::WriterActor::start(
2122 db.path(),
2123 Arc::new(SchemaManager::new()),
2124 crate::ProvenanceMode::Warn,
2125 Arc::new(crate::TelemetryCounters::default()),
2126 )
2127 .expect("writer");
2128 writer
2129 .submit(crate::WriteRequest {
2130 label: "secondary-index-seed".to_owned(),
2131 nodes: vec![],
2132 node_retires: vec![],
2133 edges: vec![],
2134 edge_retires: vec![],
2135 chunks: vec![],
2136 runs: vec![],
2137 steps: vec![],
2138 actions: vec![],
2139 optional_backfills: vec![],
2140 vec_inserts: vec![],
2141 operational_writes: vec![
2142 crate::OperationalWrite::Append {
2143 collection: "audit_log".to_owned(),
2144 record_key: "evt-1".to_owned(),
2145 payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
2146 source_ref: Some("src-1".to_owned()),
2147 },
2148 crate::OperationalWrite::Append {
2149 collection: "audit_log".to_owned(),
2150 record_key: "evt-2".to_owned(),
2151 payload_json: r#"{"actor":"bob","ts":200}"#.to_owned(),
2152 source_ref: Some("src-2".to_owned()),
2153 },
2154 ],
2155 })
2156 .expect("seed writes");
2157 }
2158
2159 let secondary_indexes_json = r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#;
2160 let updated = service
2161 .update_operational_collection_secondary_indexes("audit_log", secondary_indexes_json)
2162 .expect("update secondary indexes");
2163 assert_eq!(updated.secondary_indexes_json, secondary_indexes_json);
2164
2165 let conn = sqlite::open_connection(db.path()).expect("conn");
2166 let entry_count: i64 = conn
2167 .query_row(
2168 "SELECT count(*) FROM operational_secondary_index_entries \
2169 WHERE collection_name = 'audit_log' AND index_name = 'actor_ts'",
2170 [],
2171 |row| row.get(0),
2172 )
2173 .expect("secondary index count");
2174 assert_eq!(entry_count, 2);
2175 conn.execute(
2176 "DELETE FROM operational_secondary_index_entries WHERE collection_name = 'audit_log'",
2177 [],
2178 )
2179 .expect("clear index entries");
2180 drop(conn);
2181
2182 let rebuild = service
2183 .rebuild_operational_secondary_indexes("audit_log")
2184 .expect("rebuild secondary indexes");
2185 assert_eq!(rebuild.collection_name, "audit_log");
2186 assert_eq!(rebuild.mutation_entries_rebuilt, 2);
2187 assert_eq!(rebuild.current_entries_rebuilt, 0);
2188 }
2189
2190 #[test]
2191 fn register_operational_collection_rejects_invalid_validation_contract() {
2192 let (_db, service) = setup();
2193
2194 let error = service
2195 .register_operational_collection(&OperationalRegisterRequest {
2196 name: "connector_health".to_owned(),
2197 kind: OperationalCollectionKind::LatestState,
2198 schema_json: "{}".to_owned(),
2199 retention_json: "{}".to_owned(),
2200 filter_fields_json: "[]".to_owned(),
2201 validation_json: r#"{"format_version":1,"mode":"enforce","fields":[{"name":"status","type":"string","minimum":0}]}"#
2202 .to_owned(),
2203 secondary_indexes_json: "[]".to_owned(),
2204 format_version: 1,
2205 })
2206 .expect_err("invalid validation contract should reject");
2207
2208 assert!(matches!(error, EngineError::InvalidWrite(_)));
2209 assert!(error.to_string().contains("minimum/maximum"));
2210 }
2211
2212 #[test]
2213 fn validate_operational_collection_history_reports_invalid_rows_without_mutation() {
2214 let (db, service) = setup();
2215 service
2216 .register_operational_collection(&OperationalRegisterRequest {
2217 name: "audit_log".to_owned(),
2218 kind: OperationalCollectionKind::AppendOnlyLog,
2219 schema_json: "{}".to_owned(),
2220 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2221 filter_fields_json: "[]".to_owned(),
2222 validation_json: r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#
2223 .to_owned(),
2224 secondary_indexes_json: "[]".to_owned(),
2225 format_version: 1,
2226 })
2227 .expect("register collection");
2228 {
2229 let writer = crate::WriterActor::start(
2230 db.path(),
2231 Arc::new(SchemaManager::new()),
2232 crate::ProvenanceMode::Warn,
2233 Arc::new(crate::TelemetryCounters::default()),
2234 )
2235 .expect("writer");
2236 writer
2237 .submit(crate::WriteRequest {
2238 label: "history-validation".to_owned(),
2239 nodes: vec![],
2240 node_retires: vec![],
2241 edges: vec![],
2242 edge_retires: vec![],
2243 chunks: vec![],
2244 runs: vec![],
2245 steps: vec![],
2246 actions: vec![],
2247 optional_backfills: vec![],
2248 vec_inserts: vec![],
2249 operational_writes: vec![
2250 crate::OperationalWrite::Append {
2251 collection: "audit_log".to_owned(),
2252 record_key: "evt-1".to_owned(),
2253 payload_json: r#"{"status":"ok"}"#.to_owned(),
2254 source_ref: Some("src-1".to_owned()),
2255 },
2256 crate::OperationalWrite::Append {
2257 collection: "audit_log".to_owned(),
2258 record_key: "evt-2".to_owned(),
2259 payload_json: r#"{"status":"bogus"}"#.to_owned(),
2260 source_ref: Some("src-2".to_owned()),
2261 },
2262 ],
2263 })
2264 .expect("write");
2265 }
2266
2267 let report = service
2268 .validate_operational_collection_history("audit_log")
2269 .expect("validate history");
2270 assert_eq!(report.collection_name, "audit_log");
2271 assert_eq!(report.checked_rows, 2);
2272 assert_eq!(report.invalid_row_count, 1);
2273 assert_eq!(report.issues.len(), 1);
2274 assert_eq!(report.issues[0].record_key, "evt-2");
2275 assert!(report.issues[0].message.contains("must be one of"));
2276
2277 let trace = service
2278 .trace_operational_collection("audit_log", None)
2279 .expect("trace");
2280 assert_eq!(trace.mutation_count, 2);
2281
2282 let conn = sqlite::open_connection(db.path()).expect("conn");
2283 let provenance_count: i64 = conn
2284 .query_row(
2285 "SELECT count(*) FROM provenance_events \
2286 WHERE event_type = 'operational_collection_history_validated' \
2287 AND subject = 'audit_log'",
2288 [],
2289 |row| row.get(0),
2290 )
2291 .expect("provenance count");
2292 assert_eq!(provenance_count, 0);
2293 }
2294
2295 #[test]
2296 fn trace_operational_collection_returns_mutations_and_current_rows() {
2297 let (db, service) = setup();
2298 service
2299 .register_operational_collection(&OperationalRegisterRequest {
2300 name: "connector_health".to_owned(),
2301 kind: OperationalCollectionKind::LatestState,
2302 schema_json: "{}".to_owned(),
2303 retention_json: "{}".to_owned(),
2304 filter_fields_json: "[]".to_owned(),
2305 validation_json: String::new(),
2306 secondary_indexes_json: "[]".to_owned(),
2307 format_version: 1,
2308 })
2309 .expect("register collection");
2310 {
2311 let writer = crate::WriterActor::start(
2312 db.path(),
2313 Arc::new(SchemaManager::new()),
2314 crate::ProvenanceMode::Warn,
2315 Arc::new(crate::TelemetryCounters::default()),
2316 )
2317 .expect("writer");
2318 writer
2319 .submit(crate::WriteRequest {
2320 label: "operational".to_owned(),
2321 nodes: vec![],
2322 node_retires: vec![],
2323 edges: vec![],
2324 edge_retires: vec![],
2325 chunks: vec![],
2326 runs: vec![],
2327 steps: vec![],
2328 actions: vec![],
2329 optional_backfills: vec![],
2330 vec_inserts: vec![],
2331 operational_writes: vec![crate::OperationalWrite::Put {
2332 collection: "connector_health".to_owned(),
2333 record_key: "gmail".to_owned(),
2334 payload_json: r#"{"status":"ok"}"#.to_owned(),
2335 source_ref: Some("src-1".to_owned()),
2336 }],
2337 })
2338 .expect("write");
2339 }
2340
2341 let report = service
2342 .trace_operational_collection("connector_health", Some("gmail"))
2343 .expect("trace");
2344 assert_eq!(report.collection_name, "connector_health");
2345 assert_eq!(report.record_key.as_deref(), Some("gmail"));
2346 assert_eq!(report.mutation_count, 1);
2347 assert_eq!(report.current_count, 1);
2348 assert_eq!(report.mutations[0].op_kind, "put");
2349 assert_eq!(report.current_rows[0].payload_json, r#"{"status":"ok"}"#);
2350 }
2351
2352 #[test]
2353 fn trace_operational_collection_rejects_unknown_collection() {
2354 let (_db, service) = setup();
2355
2356 let error = service
2357 .trace_operational_collection("missing_collection", None)
2358 .expect_err("unknown collection should fail");
2359
2360 assert!(matches!(error, EngineError::InvalidWrite(_)));
2361 assert!(error.to_string().contains("is not registered"));
2362 }
2363
2364 #[test]
2365 fn rebuild_operational_current_repairs_missing_latest_state_rows() {
2366 let (db, service) = setup();
2367 service
2368 .register_operational_collection(&OperationalRegisterRequest {
2369 name: "connector_health".to_owned(),
2370 kind: OperationalCollectionKind::LatestState,
2371 schema_json: "{}".to_owned(),
2372 retention_json: "{}".to_owned(),
2373 filter_fields_json: "[]".to_owned(),
2374 validation_json: String::new(),
2375 secondary_indexes_json: "[]".to_owned(),
2376 format_version: 1,
2377 })
2378 .expect("register collection");
2379 {
2380 let writer = crate::WriterActor::start(
2381 db.path(),
2382 Arc::new(SchemaManager::new()),
2383 crate::ProvenanceMode::Warn,
2384 Arc::new(crate::TelemetryCounters::default()),
2385 )
2386 .expect("writer");
2387 writer
2388 .submit(crate::WriteRequest {
2389 label: "operational".to_owned(),
2390 nodes: vec![],
2391 node_retires: vec![],
2392 edges: vec![],
2393 edge_retires: vec![],
2394 chunks: vec![],
2395 runs: vec![],
2396 steps: vec![],
2397 actions: vec![],
2398 optional_backfills: vec![],
2399 vec_inserts: vec![],
2400 operational_writes: vec![crate::OperationalWrite::Put {
2401 collection: "connector_health".to_owned(),
2402 record_key: "gmail".to_owned(),
2403 payload_json: r#"{"status":"ok"}"#.to_owned(),
2404 source_ref: Some("src-1".to_owned()),
2405 }],
2406 })
2407 .expect("write");
2408 }
2409 {
2410 let conn = sqlite::open_connection(db.path()).expect("conn");
2411 conn.execute(
2412 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2413 [],
2414 )
2415 .expect("delete current row");
2416 }
2417
2418 let before = service.check_semantics().expect("semantics before rebuild");
2419 assert_eq!(before.missing_operational_current_rows, 1);
2420
2421 let repair = service
2422 .rebuild_operational_current(Some("connector_health"))
2423 .expect("rebuild current");
2424 assert_eq!(repair.collections_rebuilt, 1);
2425 assert_eq!(repair.current_rows_rebuilt, 1);
2426
2427 let after = service.check_semantics().expect("semantics after rebuild");
2428 assert_eq!(after.missing_operational_current_rows, 0);
2429
2430 let conn = sqlite::open_connection(db.path()).expect("conn");
2431 let payload: String = conn
2432 .query_row(
2433 "SELECT payload_json FROM operational_current \
2434 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2435 [],
2436 |row| row.get(0),
2437 )
2438 .expect("restored payload");
2439 assert_eq!(payload, r#"{"status":"ok"}"#);
2440 }
2441
2442 #[test]
2443 fn rebuild_operational_current_restores_latest_state_secondary_index_entries() {
2444 let (db, service) = setup();
2445 service
2446 .register_operational_collection(&OperationalRegisterRequest {
2447 name: "connector_health".to_owned(),
2448 kind: OperationalCollectionKind::LatestState,
2449 schema_json: "{}".to_owned(),
2450 retention_json: "{}".to_owned(),
2451 filter_fields_json: "[]".to_owned(),
2452 validation_json: String::new(),
2453 secondary_indexes_json: r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"}]"#.to_owned(),
2454 format_version: 1,
2455 })
2456 .expect("register collection");
2457 {
2458 let writer = crate::WriterActor::start(
2459 db.path(),
2460 Arc::new(SchemaManager::new()),
2461 crate::ProvenanceMode::Warn,
2462 Arc::new(crate::TelemetryCounters::default()),
2463 )
2464 .expect("writer");
2465 writer
2466 .submit(crate::WriteRequest {
2467 label: "operational".to_owned(),
2468 nodes: vec![],
2469 node_retires: vec![],
2470 edges: vec![],
2471 edge_retires: vec![],
2472 chunks: vec![],
2473 runs: vec![],
2474 steps: vec![],
2475 actions: vec![],
2476 optional_backfills: vec![],
2477 vec_inserts: vec![],
2478 operational_writes: vec![crate::OperationalWrite::Put {
2479 collection: "connector_health".to_owned(),
2480 record_key: "gmail".to_owned(),
2481 payload_json: r#"{"status":"ok"}"#.to_owned(),
2482 source_ref: Some("src-1".to_owned()),
2483 }],
2484 })
2485 .expect("write");
2486 }
2487 {
2488 let conn = sqlite::open_connection(db.path()).expect("conn");
2489 let entry_count: i64 = conn
2490 .query_row(
2491 "SELECT count(*) FROM operational_secondary_index_entries \
2492 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2493 [],
2494 |row| row.get(0),
2495 )
2496 .expect("secondary index count before repair");
2497 assert_eq!(entry_count, 1);
2498 conn.execute(
2499 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2500 [],
2501 )
2502 .expect("delete current row");
2503 }
2504
2505 service
2506 .rebuild_operational_current(Some("connector_health"))
2507 .expect("rebuild current");
2508
2509 let conn = sqlite::open_connection(db.path()).expect("conn");
2510 let entry_count: i64 = conn
2511 .query_row(
2512 "SELECT count(*) FROM operational_secondary_index_entries \
2513 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2514 [],
2515 |row| row.get(0),
2516 )
2517 .expect("secondary index count after repair");
2518 assert_eq!(entry_count, 1);
2519 }
2520
2521 #[test]
2522 fn operational_current_semantics_and_rebuild_follow_mutation_order() {
2523 let (db, service) = setup();
2524 {
2525 let conn = sqlite::open_connection(db.path()).expect("conn");
2526 conn.execute(
2527 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2528 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
2529 [],
2530 )
2531 .expect("seed collection");
2532 conn.execute(
2533 "INSERT INTO operational_mutations \
2534 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2535 VALUES ('m3', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'src-1', 100, 1)",
2536 [],
2537 )
2538 .expect("seed first put");
2539 conn.execute(
2540 "INSERT INTO operational_mutations \
2541 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2542 VALUES ('m2', 'connector_health', 'gmail', 'delete', '', 'src-2', 100, 2)",
2543 [],
2544 )
2545 .expect("seed delete");
2546 conn.execute(
2547 "INSERT INTO operational_mutations \
2548 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2549 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'src-3', 100, 3)",
2550 [],
2551 )
2552 .expect("seed final put");
2553 conn.execute(
2554 "INSERT INTO operational_current \
2555 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2556 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 100, 'm1')",
2557 [],
2558 )
2559 .expect("seed current");
2560 }
2561
2562 let before = service.check_semantics().expect("semantics before rebuild");
2563 assert_eq!(before.missing_operational_current_rows, 0);
2564 assert_eq!(before.stale_operational_current_rows, 0);
2565
2566 {
2567 let conn = sqlite::open_connection(db.path()).expect("conn");
2568 conn.execute(
2569 "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2570 [],
2571 )
2572 .expect("delete current row");
2573 }
2574
2575 let missing = service.check_semantics().expect("semantics after delete");
2576 assert_eq!(missing.missing_operational_current_rows, 1);
2577 assert_eq!(missing.stale_operational_current_rows, 0);
2578
2579 service
2580 .rebuild_operational_current(Some("connector_health"))
2581 .expect("rebuild current");
2582
2583 let after = service.check_semantics().expect("semantics after rebuild");
2584 assert_eq!(after.missing_operational_current_rows, 0);
2585 assert_eq!(after.stale_operational_current_rows, 0);
2586
2587 let conn = sqlite::open_connection(db.path()).expect("conn");
2588 let payload: String = conn
2589 .query_row(
2590 "SELECT payload_json FROM operational_current \
2591 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2592 [],
2593 |row| row.get(0),
2594 )
2595 .expect("restored payload");
2596 assert_eq!(payload, r#"{"status":"new"}"#);
2597 }
2598
2599 #[test]
2600 fn disable_operational_collection_sets_disabled_at_and_emits_provenance() {
2601 let (db, service) = setup();
2602 service
2603 .register_operational_collection(&OperationalRegisterRequest {
2604 name: "audit_log".to_owned(),
2605 kind: OperationalCollectionKind::AppendOnlyLog,
2606 schema_json: "{}".to_owned(),
2607 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2608 filter_fields_json: "[]".to_owned(),
2609 validation_json: String::new(),
2610 secondary_indexes_json: "[]".to_owned(),
2611 format_version: 1,
2612 })
2613 .expect("register collection");
2614
2615 let record = service
2616 .disable_operational_collection("audit_log")
2617 .expect("disable collection");
2618 assert_eq!(record.name, "audit_log");
2619 assert!(record.disabled_at.is_some());
2620
2621 let disabled_at = record.disabled_at.expect("disabled_at");
2622 let described = service
2623 .describe_operational_collection("audit_log")
2624 .expect("describe collection")
2625 .expect("collection exists");
2626 assert_eq!(described.disabled_at, Some(disabled_at));
2627
2628 let writer = crate::WriterActor::start(
2629 db.path(),
2630 Arc::new(SchemaManager::new()),
2631 crate::ProvenanceMode::Warn,
2632 Arc::new(crate::TelemetryCounters::default()),
2633 )
2634 .expect("writer");
2635 let error = writer
2636 .submit(crate::WriteRequest {
2637 label: "disabled-operational".to_owned(),
2638 nodes: vec![],
2639 node_retires: vec![],
2640 edges: vec![],
2641 edge_retires: vec![],
2642 chunks: vec![],
2643 runs: vec![],
2644 steps: vec![],
2645 actions: vec![],
2646 optional_backfills: vec![],
2647 vec_inserts: vec![],
2648 operational_writes: vec![crate::OperationalWrite::Append {
2649 collection: "audit_log".to_owned(),
2650 record_key: "evt-1".to_owned(),
2651 payload_json: r#"{"type":"sync"}"#.to_owned(),
2652 source_ref: Some("src-1".to_owned()),
2653 }],
2654 })
2655 .expect_err("disabled collection should reject writes");
2656 assert!(matches!(error, EngineError::InvalidWrite(_)));
2657 assert!(error.to_string().contains("is disabled"));
2658
2659 let conn = sqlite::open_connection(db.path()).expect("conn");
2660 let provenance_count: i64 = conn
2661 .query_row(
2662 "SELECT count(*) FROM provenance_events \
2663 WHERE event_type = 'operational_collection_disabled' AND subject = 'audit_log'",
2664 [],
2665 |row| row.get(0),
2666 )
2667 .expect("provenance count");
2668 assert_eq!(provenance_count, 1);
2669 }
2670
2671 #[test]
2672 fn purge_operational_collection_deletes_append_only_rows_before_cutoff() {
2673 let (db, service) = setup();
2674 {
2675 let conn = sqlite::open_connection(db.path()).expect("conn");
2676 conn.execute(
2677 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2678 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_all\"}', 1, 100)",
2679 [],
2680 )
2681 .expect("seed collection");
2682 conn.execute(
2683 "INSERT INTO operational_mutations \
2684 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2685 VALUES ('evt-1', 'audit_log', 'evt-1', 'append', '{\"seq\":1}', 'src-1', 100, 1)",
2686 [],
2687 )
2688 .expect("seed event 1");
2689 conn.execute(
2690 "INSERT INTO operational_mutations \
2691 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2692 VALUES ('evt-2', 'audit_log', 'evt-2', 'append', '{\"seq\":2}', 'src-2', 200, 2)",
2693 [],
2694 )
2695 .expect("seed event 2");
2696 conn.execute(
2697 "INSERT INTO operational_mutations \
2698 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2699 VALUES ('evt-3', 'audit_log', 'evt-3', 'append', '{\"seq\":3}', 'src-3', 300, 3)",
2700 [],
2701 )
2702 .expect("seed event 3");
2703 }
2704
2705 let report = service
2706 .purge_operational_collection("audit_log", 250)
2707 .expect("purge collection");
2708 assert_eq!(report.collection_name, "audit_log");
2709 assert_eq!(report.deleted_mutations, 2);
2710 assert_eq!(report.before_timestamp, 250);
2711
2712 let conn = sqlite::open_connection(db.path()).expect("conn");
2713 let remaining: Vec<String> = {
2714 let mut stmt = conn
2715 .prepare(
2716 "SELECT id FROM operational_mutations \
2717 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2718 )
2719 .expect("stmt");
2720 stmt.query_map([], |row| row.get(0))
2721 .expect("rows")
2722 .collect::<Result<_, _>>()
2723 .expect("collect")
2724 };
2725 assert_eq!(remaining, vec!["evt-3".to_owned()]);
2726 let provenance_count: i64 = conn
2727 .query_row(
2728 "SELECT count(*) FROM provenance_events \
2729 WHERE event_type = 'operational_collection_purged' AND subject = 'audit_log'",
2730 [],
2731 |row| row.get(0),
2732 )
2733 .expect("provenance count");
2734 assert_eq!(provenance_count, 1);
2735 }
2736
2737 #[test]
2738 fn compact_operational_collection_dry_run_reports_without_mutation() {
2739 let (db, service) = setup();
2740 {
2741 let conn = sqlite::open_connection(db.path()).expect("conn");
2742 conn.execute(
2743 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2744 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2745 [],
2746 )
2747 .expect("seed collection");
2748 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2749 conn.execute(
2750 "INSERT INTO operational_mutations \
2751 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2752 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2753 rusqlite::params![
2754 format!("evt-{index}"),
2755 format!("{{\"seq\":{index}}}"),
2756 created_at,
2757 index,
2758 ],
2759 )
2760 .expect("seed event");
2761 }
2762 }
2763
2764 let report = service
2765 .compact_operational_collection("audit_log", true)
2766 .expect("compact collection");
2767 assert_eq!(report.collection_name, "audit_log");
2768 assert_eq!(report.deleted_mutations, 1);
2769 assert!(report.dry_run);
2770 assert_eq!(report.before_timestamp, None);
2771
2772 let conn = sqlite::open_connection(db.path()).expect("conn");
2773 let remaining_count: i64 = conn
2774 .query_row(
2775 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2776 [],
2777 |row| row.get(0),
2778 )
2779 .expect("remaining count");
2780 assert_eq!(remaining_count, 3);
2781 let provenance_count: i64 = conn
2782 .query_row(
2783 "SELECT count(*) FROM provenance_events \
2784 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2785 [],
2786 |row| row.get(0),
2787 )
2788 .expect("provenance count");
2789 assert_eq!(provenance_count, 0);
2790 }
2791
2792 #[test]
2793 fn compact_operational_collection_keep_last_deletes_oldest_rows() {
2794 let (db, service) = setup();
2795 {
2796 let conn = sqlite::open_connection(db.path()).expect("conn");
2797 conn.execute(
2798 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2799 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2800 [],
2801 )
2802 .expect("seed collection");
2803 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2804 conn.execute(
2805 "INSERT INTO operational_mutations \
2806 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2807 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2808 rusqlite::params![
2809 format!("evt-{index}"),
2810 format!("{{\"seq\":{index}}}"),
2811 created_at,
2812 index,
2813 ],
2814 )
2815 .expect("seed event");
2816 }
2817 }
2818
2819 let report = service
2820 .compact_operational_collection("audit_log", false)
2821 .expect("compact collection");
2822 assert_eq!(report.deleted_mutations, 1);
2823 assert!(!report.dry_run);
2824
2825 let conn = sqlite::open_connection(db.path()).expect("conn");
2826 let remaining: Vec<String> = {
2827 let mut stmt = conn
2828 .prepare(
2829 "SELECT id FROM operational_mutations \
2830 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2831 )
2832 .expect("stmt");
2833 stmt.query_map([], |row| row.get(0))
2834 .expect("rows")
2835 .collect::<Result<_, _>>()
2836 .expect("collect")
2837 };
2838 assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
2839 let provenance_count: i64 = conn
2840 .query_row(
2841 "SELECT count(*) FROM provenance_events \
2842 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2843 [],
2844 |row| row.get(0),
2845 )
2846 .expect("provenance count");
2847 assert_eq!(provenance_count, 1);
2848 }
2849
2850 #[test]
2851 fn plan_and_run_operational_retention_keep_last() {
2852 let (db, service) = setup();
2853 {
2854 let conn = sqlite::open_connection(db.path()).expect("conn");
2855 conn.execute(
2856 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2857 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2858 [],
2859 )
2860 .expect("seed collection");
2861 for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2862 conn.execute(
2863 "INSERT INTO operational_mutations \
2864 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2865 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2866 rusqlite::params![
2867 format!("evt-{index}"),
2868 format!("{{\"seq\":{index}}}"),
2869 created_at,
2870 index,
2871 ],
2872 )
2873 .expect("seed event");
2874 }
2875 }
2876
2877 let plan = service
2878 .plan_operational_retention(1_000, None, Some(10))
2879 .expect("plan retention");
2880 assert_eq!(plan.collections_examined, 1);
2881 assert_eq!(plan.items[0].collection_name, "audit_log");
2882 assert_eq!(
2883 plan.items[0].action_kind,
2884 crate::operational::OperationalRetentionActionKind::KeepLast
2885 );
2886 assert_eq!(plan.items[0].candidate_deletions, 1);
2887 assert_eq!(plan.items[0].max_rows, Some(2));
2888 assert_eq!(plan.items[0].last_run_at, None);
2889
2890 let dry_run = service
2891 .run_operational_retention(1_000, None, Some(10), true)
2892 .expect("dry-run retention");
2893 assert!(dry_run.dry_run);
2894 assert_eq!(dry_run.collections_acted_on, 1);
2895 assert_eq!(dry_run.items[0].deleted_mutations, 1);
2896 assert_eq!(dry_run.items[0].rows_remaining, 2);
2897
2898 let conn = sqlite::open_connection(db.path()).expect("conn");
2899 let remaining_count: i64 = conn
2900 .query_row(
2901 "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2902 [],
2903 |row| row.get(0),
2904 )
2905 .expect("remaining count after dry run");
2906 assert_eq!(remaining_count, 3);
2907 let retention_run_count: i64 = conn
2908 .query_row(
2909 "SELECT count(*) FROM operational_retention_runs WHERE collection_name = 'audit_log'",
2910 [],
2911 |row| row.get(0),
2912 )
2913 .expect("retention run count");
2914 assert_eq!(retention_run_count, 0);
2915 drop(conn);
2916
2917 let executed = service
2918 .run_operational_retention(1_000, None, Some(10), false)
2919 .expect("execute retention");
2920 assert_eq!(executed.collections_acted_on, 1);
2921 assert_eq!(executed.items[0].deleted_mutations, 1);
2922 assert_eq!(executed.items[0].rows_remaining, 2);
2923
2924 let conn = sqlite::open_connection(db.path()).expect("conn");
2925 let remaining: Vec<String> = {
2926 let mut stmt = conn
2927 .prepare(
2928 "SELECT id FROM operational_mutations \
2929 WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2930 )
2931 .expect("stmt");
2932 stmt.query_map([], |row| row.get(0))
2933 .expect("rows")
2934 .collect::<Result<_, _>>()
2935 .expect("collect")
2936 };
2937 assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
2938 let last_run_at: i64 = conn
2939 .query_row(
2940 "SELECT executed_at FROM operational_retention_runs \
2941 WHERE collection_name = 'audit_log' ORDER BY executed_at DESC LIMIT 1",
2942 [],
2943 |row| row.get(0),
2944 )
2945 .expect("last run at");
2946 assert_eq!(last_run_at, 1_000);
2947 }
2948
2949 #[test]
2950 fn dry_run_operational_retention_does_not_mark_noop_collection_as_acted_on() {
2951 let (db, service) = setup();
2952 let conn = sqlite::open_connection(db.path()).expect("conn");
2953 conn.execute(
2954 "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2955 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2956 [],
2957 )
2958 .expect("seed collection");
2959 for (index, created_at) in [(1_i64, 100_i64), (2, 200)] {
2960 conn.execute(
2961 "INSERT INTO operational_mutations \
2962 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2963 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2964 rusqlite::params![
2965 format!("evt-{index}"),
2966 format!("{{\"seq\":{index}}}"),
2967 created_at,
2968 index,
2969 ],
2970 )
2971 .expect("seed event");
2972 }
2973 drop(conn);
2974
2975 let dry_run = service
2976 .run_operational_retention(1_000, None, Some(10), true)
2977 .expect("dry-run retention");
2978 assert!(dry_run.dry_run);
2979 assert_eq!(dry_run.collections_acted_on, 0);
2980 assert_eq!(dry_run.items[0].deleted_mutations, 0);
2981 assert_eq!(dry_run.items[0].rows_remaining, 2);
2982 }
2983
2984 #[test]
2985 fn compact_operational_collection_rejects_latest_state() {
2986 let (_db, service) = setup();
2987 service
2988 .register_operational_collection(&OperationalRegisterRequest {
2989 name: "connector_health".to_owned(),
2990 kind: OperationalCollectionKind::LatestState,
2991 schema_json: "{}".to_owned(),
2992 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2993 filter_fields_json: "[]".to_owned(),
2994 validation_json: String::new(),
2995 secondary_indexes_json: "[]".to_owned(),
2996 format_version: 1,
2997 })
2998 .expect("register collection");
2999
3000 let error = service
3001 .compact_operational_collection("connector_health", false)
3002 .expect_err("latest_state compaction should be rejected");
3003 assert!(matches!(error, EngineError::InvalidWrite(_)));
3004 assert!(error.to_string().contains("append_only_log"));
3005 }
3006
3007 #[test]
3008 fn register_operational_collection_persists_filter_fields_json() {
3009 let (_db, service) = setup();
3010
3011 let record = service
3012 .register_operational_collection(&OperationalRegisterRequest {
3013 name: "audit_log".to_owned(),
3014 kind: OperationalCollectionKind::AppendOnlyLog,
3015 schema_json: "{}".to_owned(),
3016 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3017 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3018 validation_json: String::new(),
3019 secondary_indexes_json: "[]".to_owned(),
3020 format_version: 1,
3021 })
3022 .expect("register collection");
3023
3024 assert_eq!(
3025 record.filter_fields_json,
3026 r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#
3027 );
3028 }
3029
3030 #[test]
3031 fn read_operational_collection_filters_append_only_rows_by_declared_fields() {
3032 let (db, service) = setup();
3033 service
3034 .register_operational_collection(&OperationalRegisterRequest {
3035 name: "audit_log".to_owned(),
3036 kind: OperationalCollectionKind::AppendOnlyLog,
3037 schema_json: "{}".to_owned(),
3038 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3039 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"seq","type":"integer","modes":["exact","range"]},{"name":"ts","type":"timestamp","modes":["exact","range"]}]"#.to_owned(),
3040 validation_json: String::new(),
3041 secondary_indexes_json: "[]".to_owned(),
3042 format_version: 1,
3043 })
3044 .expect("register collection");
3045 {
3046 let writer = crate::WriterActor::start(
3047 db.path(),
3048 Arc::new(SchemaManager::new()),
3049 crate::ProvenanceMode::Warn,
3050 Arc::new(crate::TelemetryCounters::default()),
3051 )
3052 .expect("writer");
3053 writer
3054 .submit(crate::WriteRequest {
3055 label: "operational".to_owned(),
3056 nodes: vec![],
3057 node_retires: vec![],
3058 edges: vec![],
3059 edge_retires: vec![],
3060 chunks: vec![],
3061 runs: vec![],
3062 steps: vec![],
3063 actions: vec![],
3064 optional_backfills: vec![],
3065 vec_inserts: vec![],
3066 operational_writes: vec![
3067 crate::OperationalWrite::Append {
3068 collection: "audit_log".to_owned(),
3069 record_key: "evt-1".to_owned(),
3070 payload_json: r#"{"actor":"alice","seq":1,"ts":100}"#.to_owned(),
3071 source_ref: Some("src-1".to_owned()),
3072 },
3073 crate::OperationalWrite::Append {
3074 collection: "audit_log".to_owned(),
3075 record_key: "evt-2".to_owned(),
3076 payload_json: r#"{"actor":"alice-admin","seq":2,"ts":200}"#.to_owned(),
3077 source_ref: Some("src-2".to_owned()),
3078 },
3079 crate::OperationalWrite::Append {
3080 collection: "audit_log".to_owned(),
3081 record_key: "evt-3".to_owned(),
3082 payload_json: r#"{"actor":"bob","seq":3,"ts":300}"#.to_owned(),
3083 source_ref: Some("src-3".to_owned()),
3084 },
3085 ],
3086 })
3087 .expect("write");
3088 }
3089
3090 let report = service
3091 .read_operational_collection(&crate::operational::OperationalReadRequest {
3092 collection_name: "audit_log".to_owned(),
3093 filters: vec![
3094 crate::operational::OperationalFilterClause::Prefix {
3095 field: "actor".to_owned(),
3096 value: "alice".to_owned(),
3097 },
3098 crate::operational::OperationalFilterClause::Range {
3099 field: "ts".to_owned(),
3100 lower: Some(150),
3101 upper: Some(250),
3102 },
3103 ],
3104 limit: Some(10),
3105 })
3106 .expect("filtered read");
3107
3108 assert_eq!(report.collection_name, "audit_log");
3109 assert_eq!(report.row_count, 1);
3110 assert!(!report.was_limited);
3111 assert_eq!(report.rows.len(), 1);
3112 assert_eq!(report.rows[0].record_key, "evt-2");
3113 assert_eq!(
3114 report.rows[0].payload_json,
3115 r#"{"actor":"alice-admin","seq":2,"ts":200}"#
3116 );
3117 }
3118
3119 #[test]
3120 fn read_operational_collection_uses_secondary_index_when_filter_values_are_missing() {
3121 let (db, service) = setup();
3122 service
3123 .register_operational_collection(&OperationalRegisterRequest {
3124 name: "audit_log".to_owned(),
3125 kind: OperationalCollectionKind::AppendOnlyLog,
3126 schema_json: "{}".to_owned(),
3127 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3128 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3129 validation_json: String::new(),
3130 secondary_indexes_json: r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#.to_owned(),
3131 format_version: 1,
3132 })
3133 .expect("register collection");
3134 {
3135 let writer = crate::WriterActor::start(
3136 db.path(),
3137 Arc::new(SchemaManager::new()),
3138 crate::ProvenanceMode::Warn,
3139 Arc::new(crate::TelemetryCounters::default()),
3140 )
3141 .expect("writer");
3142 writer
3143 .submit(crate::WriteRequest {
3144 label: "operational".to_owned(),
3145 nodes: vec![],
3146 node_retires: vec![],
3147 edges: vec![],
3148 edge_retires: vec![],
3149 chunks: vec![],
3150 runs: vec![],
3151 steps: vec![],
3152 actions: vec![],
3153 optional_backfills: vec![],
3154 vec_inserts: vec![],
3155 operational_writes: vec![
3156 crate::OperationalWrite::Append {
3157 collection: "audit_log".to_owned(),
3158 record_key: "evt-1".to_owned(),
3159 payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
3160 source_ref: Some("src-1".to_owned()),
3161 },
3162 crate::OperationalWrite::Append {
3163 collection: "audit_log".to_owned(),
3164 record_key: "evt-2".to_owned(),
3165 payload_json: r#"{"actor":"alice-admin","ts":200}"#.to_owned(),
3166 source_ref: Some("src-2".to_owned()),
3167 },
3168 ],
3169 })
3170 .expect("write");
3171 }
3172 let conn = sqlite::open_connection(db.path()).expect("conn");
3173 conn.execute(
3174 "DELETE FROM operational_filter_values WHERE collection_name = 'audit_log'",
3175 [],
3176 )
3177 .expect("clear filter values");
3178 drop(conn);
3179
3180 let report = service
3181 .read_operational_collection(&crate::operational::OperationalReadRequest {
3182 collection_name: "audit_log".to_owned(),
3183 filters: vec![
3184 crate::operational::OperationalFilterClause::Prefix {
3185 field: "actor".to_owned(),
3186 value: "alice".to_owned(),
3187 },
3188 crate::operational::OperationalFilterClause::Range {
3189 field: "ts".to_owned(),
3190 lower: Some(150),
3191 upper: Some(250),
3192 },
3193 ],
3194 limit: Some(10),
3195 })
3196 .expect("secondary-index read");
3197
3198 assert_eq!(report.row_count, 1);
3199 assert_eq!(report.rows[0].record_key, "evt-2");
3200 }
3201
3202 #[test]
3203 fn read_operational_collection_rejects_undeclared_fields_and_latest_state_collections() {
3204 let (_db, service) = setup();
3205 service
3206 .register_operational_collection(&OperationalRegisterRequest {
3207 name: "connector_health".to_owned(),
3208 kind: OperationalCollectionKind::LatestState,
3209 schema_json: "{}".to_owned(),
3210 retention_json: "{}".to_owned(),
3211 filter_fields_json: r#"[{"name":"status","type":"string","modes":["exact"]}]"#
3212 .to_owned(),
3213 validation_json: String::new(),
3214 secondary_indexes_json: "[]".to_owned(),
3215 format_version: 1,
3216 })
3217 .expect("register collection");
3218
3219 let latest_state_error = service
3220 .read_operational_collection(&crate::operational::OperationalReadRequest {
3221 collection_name: "connector_health".to_owned(),
3222 filters: vec![crate::operational::OperationalFilterClause::Exact {
3223 field: "status".to_owned(),
3224 value: crate::operational::OperationalFilterValue::String("ok".to_owned()),
3225 }],
3226 limit: Some(10),
3227 })
3228 .expect_err("latest_state filtered reads should be rejected");
3229 assert!(latest_state_error.to_string().contains("append_only_log"));
3230
3231 service
3232 .register_operational_collection(&OperationalRegisterRequest {
3233 name: "audit_log".to_owned(),
3234 kind: OperationalCollectionKind::AppendOnlyLog,
3235 schema_json: "{}".to_owned(),
3236 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3237 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact"]}]"#
3238 .to_owned(),
3239 validation_json: String::new(),
3240 secondary_indexes_json: "[]".to_owned(),
3241 format_version: 1,
3242 })
3243 .expect("register append-only collection");
3244
3245 let undeclared_error = service
3246 .read_operational_collection(&crate::operational::OperationalReadRequest {
3247 collection_name: "audit_log".to_owned(),
3248 filters: vec![crate::operational::OperationalFilterClause::Exact {
3249 field: "missing".to_owned(),
3250 value: crate::operational::OperationalFilterValue::String("x".to_owned()),
3251 }],
3252 limit: Some(10),
3253 })
3254 .expect_err("undeclared field should be rejected");
3255 assert!(undeclared_error.to_string().contains("undeclared"));
3256 }
3257
3258 #[test]
3259 fn read_operational_collection_applies_limit_and_reports_truncation() {
3260 let (db, service) = setup();
3261 service
3262 .register_operational_collection(&OperationalRegisterRequest {
3263 name: "audit_log".to_owned(),
3264 kind: OperationalCollectionKind::AppendOnlyLog,
3265 schema_json: "{}".to_owned(),
3266 retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3267 filter_fields_json: r#"[{"name":"actor","type":"string","modes":["prefix"]}]"#
3268 .to_owned(),
3269 validation_json: String::new(),
3270 secondary_indexes_json: "[]".to_owned(),
3271 format_version: 1,
3272 })
3273 .expect("register collection");
3274 {
3275 let writer = crate::WriterActor::start(
3276 db.path(),
3277 Arc::new(SchemaManager::new()),
3278 crate::ProvenanceMode::Warn,
3279 Arc::new(crate::TelemetryCounters::default()),
3280 )
3281 .expect("writer");
3282 writer
3283 .submit(crate::WriteRequest {
3284 label: "operational".to_owned(),
3285 nodes: vec![],
3286 node_retires: vec![],
3287 edges: vec![],
3288 edge_retires: vec![],
3289 chunks: vec![],
3290 runs: vec![],
3291 steps: vec![],
3292 actions: vec![],
3293 optional_backfills: vec![],
3294 vec_inserts: vec![],
3295 operational_writes: vec![
3296 crate::OperationalWrite::Append {
3297 collection: "audit_log".to_owned(),
3298 record_key: "evt-1".to_owned(),
3299 payload_json: r#"{"actor":"alice-1"}"#.to_owned(),
3300 source_ref: Some("src-1".to_owned()),
3301 },
3302 crate::OperationalWrite::Append {
3303 collection: "audit_log".to_owned(),
3304 record_key: "evt-2".to_owned(),
3305 payload_json: r#"{"actor":"alice-2"}"#.to_owned(),
3306 source_ref: Some("src-2".to_owned()),
3307 },
3308 ],
3309 })
3310 .expect("write");
3311 }
3312
3313 let report = service
3314 .read_operational_collection(&crate::operational::OperationalReadRequest {
3315 collection_name: "audit_log".to_owned(),
3316 filters: vec![crate::operational::OperationalFilterClause::Prefix {
3317 field: "actor".to_owned(),
3318 value: "alice".to_owned(),
3319 }],
3320 limit: Some(1),
3321 })
3322 .expect("limited read");
3323
3324 assert_eq!(report.row_count, 1);
3325 assert_eq!(report.applied_limit, 1);
3326 assert!(report.was_limited);
3327 assert_eq!(report.rows[0].record_key, "evt-2");
3328 }
3329
3330 #[test]
3331 fn preexisting_operational_collection_can_gain_filter_contract_after_upgrade() {
3332 let db = NamedTempFile::new().expect("temp db");
3333 let conn = sqlite::open_connection(db.path()).expect("conn");
3334 conn.execute_batch(
3335 r#"
3336 CREATE TABLE operational_collections (
3337 name TEXT PRIMARY KEY,
3338 kind TEXT NOT NULL,
3339 schema_json TEXT NOT NULL,
3340 retention_json TEXT NOT NULL,
3341 format_version INTEGER NOT NULL DEFAULT 1,
3342 created_at INTEGER NOT NULL DEFAULT 100,
3343 disabled_at INTEGER
3344 );
3345 CREATE TABLE operational_mutations (
3346 id TEXT PRIMARY KEY,
3347 collection_name TEXT NOT NULL,
3348 record_key TEXT NOT NULL,
3349 op_kind TEXT NOT NULL,
3350 payload_json TEXT NOT NULL,
3351 source_ref TEXT,
3352 created_at INTEGER NOT NULL DEFAULT 100,
3353 mutation_order INTEGER NOT NULL DEFAULT 1
3354 );
3355 INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at)
3356 VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}', 1, 100);
3357 INSERT INTO operational_mutations
3358 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order)
3359 VALUES
3360 ('evt-1', 'audit_log', 'evt-1', 'append', '{"actor":"alice","ts":0}', 'src-1', 100, 1);
3361 "#,
3362 )
3363 .expect("seed pre-v10 schema");
3364 drop(conn);
3365
3366 let service = AdminService::new(db.path(), Arc::new(SchemaManager::new()));
3367 let pre_update = service
3368 .read_operational_collection(&crate::operational::OperationalReadRequest {
3369 collection_name: "audit_log".to_owned(),
3370 filters: vec![crate::operational::OperationalFilterClause::Exact {
3371 field: "actor".to_owned(),
3372 value: crate::operational::OperationalFilterValue::String("alice".to_owned()),
3373 }],
3374 limit: Some(10),
3375 })
3376 .expect_err("read should reject undeclared fields before migration update");
3377 assert!(pre_update.to_string().contains("undeclared"));
3378
3379 let updated = service
3380 .update_operational_collection_filters(
3381 "audit_log",
3382 r#"[{"name":"actor","type":"string","modes":["exact"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#,
3383 )
3384 .expect("update filter contract");
3385 assert!(updated.filter_fields_json.contains("\"actor\""));
3386
3387 let report = service
3388 .read_operational_collection(&crate::operational::OperationalReadRequest {
3389 collection_name: "audit_log".to_owned(),
3390 filters: vec![crate::operational::OperationalFilterClause::Range {
3391 field: "ts".to_owned(),
3392 lower: Some(0),
3393 upper: Some(0),
3394 }],
3395 limit: Some(10),
3396 })
3397 .expect("read after explicit filter update");
3398 assert_eq!(report.row_count, 1);
3399 assert_eq!(report.rows[0].record_key, "evt-1");
3400 }
3401
3402 #[cfg(feature = "sqlite-vec")]
3403 #[test]
3404 fn check_semantics_detects_stale_vec_rows() {
3405 use crate::sqlite::open_connection_with_vec;
3406
3407 let db = NamedTempFile::new().expect("temp file");
3408 let schema = Arc::new(SchemaManager::new());
3409 {
3410 let conn = open_connection_with_vec(db.path()).expect("vec conn");
3411 schema.bootstrap(&conn).expect("bootstrap");
3412 schema
3413 .ensure_vec_kind_profile(&conn, "Doc", 3)
3414 .expect("vec kind profile");
3415 let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
3417 .iter()
3418 .flat_map(|f| f.to_le_bytes())
3419 .collect();
3420 conn.execute(
3421 "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('ghost-chunk', ?1)",
3422 rusqlite::params![bytes],
3423 )
3424 .expect("insert stale vec row");
3425 }
3426 let service = AdminService::new(db.path(), Arc::clone(&schema));
3427 let report = service.check_semantics().expect("semantics check");
3428 assert_eq!(report.stale_vec_rows, 1);
3429 assert!(
3430 report.warnings.iter().any(|w| w.contains("stale vec")),
3431 "warning must mention stale vec"
3432 );
3433 }
3434
3435 #[cfg(feature = "sqlite-vec")]
3436 #[test]
3437 fn restore_vector_profiles_recreates_vec_table_from_metadata() {
3438 let db = NamedTempFile::new().expect("temp file");
3439 let schema = Arc::new(SchemaManager::new());
3440 {
3441 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3442 schema.bootstrap(&conn).expect("bootstrap");
3443 conn.execute(
3444 "INSERT INTO vector_profiles (profile, table_name, dimension, enabled) \
3445 VALUES ('default', 'vec_nodes_active', 3, 1)",
3446 [],
3447 )
3448 .expect("insert vector profile");
3449 }
3450
3451 let service = AdminService::new(db.path(), Arc::clone(&schema));
3452 let report = service
3453 .restore_vector_profiles()
3454 .expect("restore vector profiles");
3455 assert_eq!(
3456 report.targets,
3457 vec![crate::projection::ProjectionTarget::Vec]
3458 );
3459 assert_eq!(report.rebuilt_rows, 1);
3460
3461 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3462 let count: i64 = conn
3463 .query_row(
3464 "SELECT count(*) FROM sqlite_schema WHERE name = 'vec_nodes_active'",
3465 [],
3466 |row| row.get(0),
3467 )
3468 .expect("vec schema count");
3469 assert_eq!(count, 1, "vec table should exist after restore");
3470 }
3471
3472 #[cfg(feature = "sqlite-vec")]
3473 #[test]
3474 fn load_vector_regeneration_config_supports_json_and_toml() {
3475 let dir = tempfile::tempdir().expect("temp dir");
3476 let json_path = dir.path().join("regen.json");
3477 let toml_path = dir.path().join("regen.toml");
3478
3479 let config = VectorRegenerationConfig {
3480 kind: "Document".to_owned(),
3481 profile: "default".to_owned(),
3482 chunking_policy: "per_chunk".to_owned(),
3483 preprocessing_policy: "trim".to_owned(),
3484 };
3485
3486 fs::write(&json_path, serde_json::to_string(&config).expect("json")).expect("write json");
3487 fs::write(&toml_path, toml::to_string(&config).expect("toml")).expect("write toml");
3488
3489 let parsed_json = load_vector_regeneration_config(&json_path).expect("json parse");
3490 let parsed_toml = load_vector_regeneration_config(&toml_path).expect("toml parse");
3491
3492 assert_eq!(parsed_json, config);
3493 assert_eq!(parsed_toml, config);
3494 }
3495
3496 #[test]
3501 fn regenerate_vector_embeddings_config_rejects_old_identity_fields() {
3502 let legacy_json = r#"{
3505 "kind": "Document",
3506 "profile": "default",
3507 "table_name": "vec_nodes_active",
3508 "model_identity": "old-model",
3509 "model_version": "1.0",
3510 "dimension": 4,
3511 "normalization_policy": "l2",
3512 "chunking_policy": "per_chunk",
3513 "preprocessing_policy": "trim",
3514 "generator_command": ["/bin/echo"]
3515 }"#;
3516 let result: Result<VectorRegenerationConfig, _> = serde_json::from_str(legacy_json);
3517 assert!(
3518 result.is_err(),
3519 "legacy identity fields must be rejected at deserialization"
3520 );
3521 }
3522
3523 #[cfg(all(not(feature = "sqlite-vec"), unix))]
3524 #[test]
3525 fn regenerate_vector_embeddings_unsupported_vec_capability_writes_request_and_failed_audit() {
3526 let db = NamedTempFile::new().expect("temp file");
3527 let schema = Arc::new(SchemaManager::new());
3528
3529 {
3530 let conn = sqlite::open_connection(db.path()).expect("connection");
3531 schema.bootstrap(&conn).expect("bootstrap");
3532 conn.execute(
3533 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3534 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3535 [],
3536 )
3537 .expect("insert node");
3538 conn.execute(
3539 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3540 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3541 [],
3542 )
3543 .expect("insert chunk");
3544 }
3545
3546 let service = AdminService::new(db.path(), Arc::clone(&schema));
3547 let embedder = TestEmbedder::new("test-model", 4);
3548 let error = service
3549 .regenerate_vector_embeddings(
3550 &embedder,
3551 &VectorRegenerationConfig {
3552 kind: "Document".to_owned(),
3553 profile: "default".to_owned(),
3554 chunking_policy: "per_chunk".to_owned(),
3555 preprocessing_policy: "trim".to_owned(),
3556 },
3557 )
3558 .expect_err("sqlite-vec capability should be required");
3559
3560 assert!(error.to_string().contains("unsupported vec capability"));
3561
3562 let conn = sqlite::open_connection(db.path()).expect("connection");
3563 let request_count: i64 = conn
3564 .query_row(
3565 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3566 [],
3567 |row| row.get(0),
3568 )
3569 .expect("request count");
3570 assert_eq!(request_count, 1);
3571 let failed_count: i64 = conn
3572 .query_row(
3573 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3574 [],
3575 |row| row.get(0),
3576 )
3577 .expect("failed count");
3578 assert_eq!(failed_count, 1);
3579 let metadata_json: String = conn
3580 .query_row(
3581 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3582 [],
3583 |row| row.get(0),
3584 )
3585 .expect("failed metadata");
3586 assert!(metadata_json.contains("\"failure_class\":\"unsupported vec capability\""));
3587 }
3588
3589 #[cfg(feature = "sqlite-vec")]
3590 #[test]
3591 #[allow(clippy::too_many_lines)]
3592 fn regenerate_vector_embeddings_rebuilds_embeddings_via_embedder() {
3593 let db = NamedTempFile::new().expect("temp file");
3594 let schema = Arc::new(SchemaManager::new());
3595
3596 {
3597 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3598 schema.bootstrap(&conn).expect("bootstrap");
3599 conn.execute(
3600 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3601 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3602 [],
3603 )
3604 .expect("insert node");
3605 conn.execute(
3606 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3607 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3608 [],
3609 )
3610 .expect("insert chunk 1");
3611 conn.execute(
3612 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3613 VALUES ('chunk-2', 'doc-1', 'travel plan', 101)",
3614 [],
3615 )
3616 .expect("insert chunk 2");
3617 }
3618
3619 let service = AdminService::new(db.path(), Arc::clone(&schema));
3620 let embedder = TestEmbedder::new("test-model", 4);
3621 let report = service
3622 .regenerate_vector_embeddings(
3623 &embedder,
3624 &VectorRegenerationConfig {
3625 kind: "Document".to_owned(),
3626 profile: "default".to_owned(),
3627 chunking_policy: "per_chunk".to_owned(),
3628 preprocessing_policy: "trim".to_owned(),
3629 },
3630 )
3631 .expect("regenerate vectors");
3632
3633 assert_eq!(report.profile, "default");
3634 assert_eq!(report.table_name, "vec_document");
3635 assert_eq!(report.dimension, 4);
3636 assert_eq!(report.total_chunks, 2);
3637 assert_eq!(report.regenerated_rows, 2);
3638 assert!(report.contract_persisted);
3639
3640 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3641 let vec_count: i64 = conn
3642 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3643 .expect("vec count");
3644 assert_eq!(vec_count, 2);
3645
3646 let (model_identity, model_version, dimension, normalization_policy): (
3650 String,
3651 String,
3652 i64,
3653 String,
3654 ) = conn
3655 .query_row(
3656 "SELECT model_identity, model_version, dimension, normalization_policy \
3657 FROM vector_embedding_contracts WHERE profile = 'default'",
3658 [],
3659 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3660 )
3661 .expect("contract row");
3662 assert_eq!(model_identity, "test-model");
3663 assert_eq!(model_version, "1.0.0");
3664 assert_eq!(dimension, 4);
3665 assert_eq!(normalization_policy, "l2");
3666
3667 let contract_format_version: i64 = conn
3668 .query_row(
3669 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
3670 [],
3671 |row| row.get(0),
3672 )
3673 .expect("contract_format_version");
3674 assert_eq!(contract_format_version, 1);
3675 let request_count: i64 = conn
3676 .query_row(
3677 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3678 [],
3679 |row| row.get(0),
3680 )
3681 .expect("request audit count");
3682 assert_eq!(request_count, 1);
3683 let apply_count: i64 = conn
3684 .query_row(
3685 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3686 [],
3687 |row| row.get(0),
3688 )
3689 .expect("apply audit count");
3690 assert_eq!(apply_count, 1);
3691 let apply_metadata: String = conn
3692 .query_row(
3693 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3694 [],
3695 |row| row.get(0),
3696 )
3697 .expect("apply metadata");
3698 assert!(apply_metadata.contains("\"profile\":\"default\""));
3699 assert!(apply_metadata.contains("\"snapshot_hash\":"));
3700 assert!(apply_metadata.contains("\"model_identity\":\"test-model\""));
3701 }
3702
3703 #[cfg(feature = "sqlite-vec")]
3704 #[test]
3705 #[allow(clippy::too_many_lines)]
3706 fn regenerate_vector_embeddings_embedder_failure_leaves_contract_and_vec_rows_unchanged() {
3707 let db = NamedTempFile::new().expect("temp file");
3708 let schema = Arc::new(SchemaManager::new());
3709
3710 {
3711 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3712 schema.bootstrap(&conn).expect("bootstrap");
3713 conn.execute(
3714 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3715 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3716 [],
3717 )
3718 .expect("insert node");
3719 conn.execute(
3720 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3721 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3722 [],
3723 )
3724 .expect("insert chunk");
3725 schema
3726 .ensure_vec_kind_profile(&conn, "Document", 4)
3727 .expect("ensure vec kind profile");
3728 conn.execute(
3729 r"
3730 INSERT INTO vector_embedding_contracts (
3731 profile,
3732 table_name,
3733 model_identity,
3734 model_version,
3735 dimension,
3736 normalization_policy,
3737 chunking_policy,
3738 preprocessing_policy,
3739 generator_command_json,
3740 applied_at,
3741 snapshot_hash
3742 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3743 ",
3744 rusqlite::params![
3745 "default",
3746 "vec_document",
3747 "old-model",
3748 "0.9.0",
3749 4,
3750 "l2",
3751 "per_chunk",
3752 "trim",
3753 "[]",
3754 111,
3755 "old-snapshot"
3756 ],
3757 )
3758 .expect("seed contract");
3759 conn.execute(
3760 "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
3761 [],
3762 )
3763 .expect("seed vec row");
3764 }
3765
3766 let service = AdminService::new(db.path(), Arc::clone(&schema));
3767 let failing = FailingEmbedder {
3768 identity: QueryEmbedderIdentity {
3769 model_identity: "new-model".to_owned(),
3770 model_version: "1.0.0".to_owned(),
3771 dimension: 4,
3772 normalization_policy: "l2".to_owned(),
3773 },
3774 };
3775 let error = service
3776 .regenerate_vector_embeddings(
3777 &failing,
3778 &VectorRegenerationConfig {
3779 kind: "Document".to_owned(),
3780 profile: "default".to_owned(),
3781 chunking_policy: "per_chunk".to_owned(),
3782 preprocessing_policy: "trim".to_owned(),
3783 },
3784 )
3785 .expect_err("embedder should fail");
3786
3787 assert!(error.to_string().contains("embedder failure"));
3788
3789 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3790 let model_identity: String = conn
3791 .query_row(
3792 "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
3793 [],
3794 |row| row.get(0),
3795 )
3796 .expect("model identity");
3797 assert_eq!(model_identity, "old-model");
3798 let snapshot_hash: String = conn
3799 .query_row(
3800 "SELECT snapshot_hash FROM vector_embedding_contracts WHERE profile = 'default'",
3801 [],
3802 |row| row.get(0),
3803 )
3804 .expect("snapshot hash");
3805 assert_eq!(snapshot_hash, "old-snapshot");
3806 let vec_count: i64 = conn
3807 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3808 .expect("vec count");
3809 assert_eq!(vec_count, 1);
3810 let failure_count: i64 = conn
3811 .query_row(
3812 "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3813 [],
3814 |row| row.get(0),
3815 )
3816 .expect("failure count");
3817 assert_eq!(failure_count, 1);
3818 let failure_metadata: String = conn
3819 .query_row(
3820 "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3821 [],
3822 |row| row.get(0),
3823 )
3824 .expect("failure metadata");
3825 assert!(failure_metadata.contains("\"failure_class\":\"embedder failure\""));
3826 }
3827
3828 #[cfg(feature = "sqlite-vec")]
3839 #[test]
3840 fn regenerate_vector_embeddings_rejects_whitespace_only_profile_before_mutation() {
3841 let db = NamedTempFile::new().expect("temp file");
3842 let schema = Arc::new(SchemaManager::new());
3843 {
3844 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3845 schema.bootstrap(&conn).expect("bootstrap");
3846 conn.execute(
3847 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3848 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3849 [],
3850 )
3851 .expect("insert node");
3852 conn.execute(
3853 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3854 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3855 [],
3856 )
3857 .expect("insert chunk");
3858 }
3859
3860 let service = AdminService::new(db.path(), Arc::clone(&schema));
3861 let embedder = TestEmbedder::new("test-model", 4);
3862 let error = service
3863 .regenerate_vector_embeddings(
3864 &embedder,
3865 &VectorRegenerationConfig {
3866 kind: "Document".to_owned(),
3867 profile: " ".to_owned(),
3868 chunking_policy: "per_chunk".to_owned(),
3869 preprocessing_policy: "trim".to_owned(),
3870 },
3871 )
3872 .expect_err("whitespace profile should be rejected");
3873
3874 assert!(error.to_string().contains("invalid contract"));
3875 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3876 let contract_count: i64 = conn
3877 .query_row(
3878 "SELECT count(*) FROM vector_embedding_contracts",
3879 [],
3880 |row| row.get(0),
3881 )
3882 .expect("contract count");
3883 assert_eq!(contract_count, 0);
3884 let provenance_count: i64 = conn
3885 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
3886 row.get(0)
3887 })
3888 .expect("provenance count");
3889 assert_eq!(provenance_count, 0);
3890 }
3891
3892 #[cfg(feature = "sqlite-vec")]
3893 #[test]
3894 fn regenerate_vector_embeddings_rejects_future_contract_format_version() {
3895 let db = NamedTempFile::new().expect("temp file");
3896 let schema = Arc::new(SchemaManager::new());
3897 {
3898 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3899 schema.bootstrap(&conn).expect("bootstrap");
3900 conn.execute(
3901 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3902 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3903 [],
3904 )
3905 .expect("insert node");
3906 conn.execute(
3907 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3908 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3909 [],
3910 )
3911 .expect("insert chunk");
3912 conn.execute(
3913 r"
3914 INSERT INTO vector_embedding_contracts (
3915 profile,
3916 table_name,
3917 model_identity,
3918 model_version,
3919 dimension,
3920 normalization_policy,
3921 chunking_policy,
3922 preprocessing_policy,
3923 generator_command_json,
3924 applied_at,
3925 snapshot_hash,
3926 contract_format_version,
3927 updated_at
3928 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
3929 ",
3930 rusqlite::params![
3931 "default",
3932 "vec_nodes_active",
3933 "old-model",
3934 "0.9.0",
3935 4,
3936 "l2",
3937 "per_chunk",
3938 "trim",
3939 "[]",
3940 111,
3941 "old-snapshot",
3942 99,
3943 111,
3944 ],
3945 )
3946 .expect("seed future contract");
3947 }
3948
3949 let service = AdminService::new(db.path(), Arc::clone(&schema));
3950 let embedder = TestEmbedder::new("test-model", 4);
3951 let error = service
3952 .regenerate_vector_embeddings(
3953 &embedder,
3954 &VectorRegenerationConfig {
3955 kind: "Document".to_owned(),
3956 profile: "default".to_owned(),
3957 chunking_policy: "per_chunk".to_owned(),
3958 preprocessing_policy: "trim".to_owned(),
3959 },
3960 )
3961 .expect_err("future contract version should be rejected");
3962
3963 assert!(error.to_string().contains("unsupported"));
3964 assert!(error.to_string().contains("format version"));
3965 }
3966
3967 #[test]
3968 fn check_semantics_detects_orphaned_chunk() {
3969 let (db, service) = setup();
3970 {
3971 let conn = sqlite::open_connection(db.path()).expect("conn");
3973 conn.execute(
3974 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3975 VALUES ('c1', 'ghost-node', 'text', 100)",
3976 [],
3977 )
3978 .expect("insert orphaned chunk");
3979 }
3980 let report = service.check_semantics().expect("semantics check");
3981 assert_eq!(report.orphaned_chunks, 1);
3982 }
3983
3984 #[test]
3985 fn check_semantics_detects_null_source_ref() {
3986 let (db, service) = setup();
3987 {
3988 let conn = sqlite::open_connection(db.path()).expect("conn");
3989 conn.execute(
3990 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
3991 VALUES ('r1', 'lg1', 'Meeting', '{}', 100)",
3992 [],
3993 )
3994 .expect("insert node with null source_ref");
3995 }
3996 let report = service.check_semantics().expect("semantics check");
3997 assert_eq!(report.null_source_ref_nodes, 1);
3998 }
3999
4000 #[test]
4001 fn check_semantics_detects_broken_step_fk() {
4002 let (db, service) = setup();
4003 {
4004 let conn = sqlite::open_connection(db.path()).expect("conn");
4007 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4008 .expect("disable FK");
4009 conn.execute(
4010 "INSERT INTO steps (id, run_id, kind, status, properties, created_at) \
4011 VALUES ('s1', 'ghost-run', 'llm', 'completed', '{}', 100)",
4012 [],
4013 )
4014 .expect("insert step with ghost run_id");
4015 }
4016 let report = service.check_semantics().expect("semantics check");
4017 assert_eq!(report.broken_step_fk, 1);
4018 }
4019
4020 #[test]
4021 fn check_semantics_detects_broken_action_fk() {
4022 let (db, service) = setup();
4023 {
4024 let conn = sqlite::open_connection(db.path()).expect("conn");
4025 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4026 .expect("disable FK");
4027 conn.execute(
4028 "INSERT INTO actions (id, step_id, kind, status, properties, created_at) \
4029 VALUES ('a1', 'ghost-step', 'emit', 'completed', '{}', 100)",
4030 [],
4031 )
4032 .expect("insert action with ghost step_id");
4033 }
4034 let report = service.check_semantics().expect("semantics check");
4035 assert_eq!(report.broken_action_fk, 1);
4036 }
4037
4038 #[test]
4039 fn check_semantics_detects_stale_fts_rows() {
4040 let (db, service) = setup();
4041 {
4042 let conn = sqlite::open_connection(db.path()).expect("conn");
4043 conn.execute(
4046 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4047 VALUES ('ghost-chunk', 'any-node', 'Meeting', 'stale content')",
4048 [],
4049 )
4050 .expect("insert stale FTS row");
4051 }
4052 let report = service.check_semantics().expect("semantics check");
4053 assert_eq!(report.stale_fts_rows, 1);
4054 }
4055
4056 #[test]
4057 fn check_semantics_detects_fts_rows_for_superseded_nodes() {
4058 let (db, service) = setup();
4059 {
4060 let conn = sqlite::open_connection(db.path()).expect("conn");
4061 conn.execute(
4063 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4064 VALUES ('r1', 'lg-sup', 'Meeting', '{}', 100, 200, 'src-1')",
4065 [],
4066 )
4067 .expect("insert superseded node");
4068 conn.execute(
4070 "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4071 VALUES ('ck-x', 'lg-sup', 'Meeting', 'superseded content')",
4072 [],
4073 )
4074 .expect("insert FTS row for superseded node");
4075 }
4076 let report = service.check_semantics().expect("semantics check");
4077 assert_eq!(report.fts_rows_for_superseded_nodes, 1);
4078 }
4079
4080 #[test]
4081 fn check_semantics_detects_dangling_edges() {
4082 let (db, service) = setup();
4083 {
4084 let conn = sqlite::open_connection(db.path()).expect("conn");
4085 conn.execute_batch("PRAGMA foreign_keys = OFF;")
4086 .expect("disable FK");
4087 conn.execute(
4089 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4090 VALUES ('r1', 'lg-src', 'Meeting', '{}', 100, 'src-1')",
4091 [],
4092 )
4093 .expect("insert source node");
4094 conn.execute(
4095 "INSERT INTO edges \
4096 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4097 VALUES ('e1', 'edge-1', 'lg-src', 'ghost-target', 'LINKS', '{}', 100, 'src-1')",
4098 [],
4099 )
4100 .expect("insert dangling edge");
4101 }
4102 let report = service.check_semantics().expect("semantics check");
4103 assert_eq!(report.dangling_edges, 1);
4104 }
4105
4106 #[test]
4107 fn check_semantics_detects_orphaned_supersession_chains() {
4108 let (db, service) = setup();
4109 {
4110 let conn = sqlite::open_connection(db.path()).expect("conn");
4111 conn.execute(
4113 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4114 VALUES ('r1', 'lg-orphaned', 'Meeting', '{}', 100, 200, 'src-1')",
4115 [],
4116 )
4117 .expect("insert fully superseded node");
4118 }
4119 let report = service.check_semantics().expect("semantics check");
4120 assert_eq!(report.orphaned_supersession_chains, 1);
4121 }
4122
4123 #[test]
4124 fn check_semantics_detects_mismatched_kind_property_fts_rows() {
4125 let (db, service) = setup();
4131 {
4132 let conn = sqlite::open_connection(db.path()).expect("conn");
4133 conn.execute(
4134 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4135 VALUES ('Goal', '[\"$.name\"]', ' ')",
4136 [],
4137 )
4138 .expect("register schema");
4139 conn.execute(
4140 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4141 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4142 [],
4143 )
4144 .expect("insert node");
4145 let table = fathomdb_schema::fts_kind_table_name("Goal");
4147 conn.execute_batch(&format!(
4148 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4149 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4150 ))
4151 .expect("create per-kind table");
4152 conn.execute(
4153 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4154 [],
4155 )
4156 .expect("insert per-kind FTS row");
4157 }
4158 let report = service.check_semantics().expect("semantics check");
4159 assert_eq!(report.mismatched_kind_property_fts_rows, 0);
4161 }
4162
4163 #[test]
4164 fn check_semantics_detects_duplicate_property_fts_rows() {
4165 let (db, service) = setup();
4166 {
4167 let conn = sqlite::open_connection(db.path()).expect("conn");
4168 conn.execute(
4169 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4170 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4171 [],
4172 )
4173 .expect("insert node");
4174 let table = fathomdb_schema::fts_kind_table_name("Goal");
4176 conn.execute_batch(&format!(
4177 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4178 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4179 ))
4180 .expect("create per-kind table");
4181 conn.execute(
4182 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4183 [],
4184 )
4185 .expect("insert first property FTS row");
4186 conn.execute(
4187 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2 duplicate')"),
4188 [],
4189 )
4190 .expect("insert duplicate property FTS row");
4191 }
4192 let report = service.check_semantics().expect("semantics check");
4193 assert_eq!(report.duplicate_property_fts_rows, 1);
4194 }
4195
4196 #[test]
4197 fn check_semantics_detects_drifted_property_fts_text() {
4198 let (db, service) = setup();
4199 {
4200 let conn = sqlite::open_connection(db.path()).expect("conn");
4201 conn.execute(
4202 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4203 VALUES ('Goal', '[\"$.name\"]', ' ')",
4204 [],
4205 )
4206 .expect("register schema");
4207 conn.execute(
4208 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4209 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Current name\"}', 100, 'src-1')",
4210 [],
4211 )
4212 .expect("insert node");
4213 let table = fathomdb_schema::fts_kind_table_name("Goal");
4215 conn.execute_batch(&format!(
4216 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4217 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4218 ))
4219 .expect("create per-kind table");
4220 conn.execute(
4221 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Old stale name')"),
4222 [],
4223 )
4224 .expect("insert stale property FTS row");
4225 }
4226 let report = service.check_semantics().expect("semantics check");
4227 assert_eq!(report.drifted_property_fts_rows, 1);
4228 }
4229
4230 #[test]
4231 fn check_semantics_detects_property_fts_row_that_should_not_exist() {
4232 let (db, service) = setup();
4233 {
4234 let conn = sqlite::open_connection(db.path()).expect("conn");
4235 conn.execute(
4236 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4237 VALUES ('Goal', '[\"$.searchable\"]', ' ')",
4238 [],
4239 )
4240 .expect("register schema");
4241 conn.execute(
4243 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4244 VALUES ('r1', 'goal-1', 'Goal', '{\"other\":\"field\"}', 100, 'src-1')",
4245 [],
4246 )
4247 .expect("insert node");
4248 let table = fathomdb_schema::fts_kind_table_name("Goal");
4250 conn.execute_batch(&format!(
4251 "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4252 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4253 ))
4254 .expect("create per-kind table");
4255 conn.execute(
4256 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'phantom text')"),
4257 [],
4258 )
4259 .expect("insert phantom property FTS row");
4260 }
4261 let report = service.check_semantics().expect("semantics check");
4262 assert_eq!(
4263 report.drifted_property_fts_rows, 1,
4264 "row that should not exist must be counted as drifted"
4265 );
4266 }
4267
4268 #[test]
4269 fn safe_export_writes_manifest_with_sha256() {
4270 let (_db, service) = setup();
4271 let export_dir = tempfile::TempDir::new().expect("temp dir");
4272 let export_path = export_dir.path().join("backup.db");
4273
4274 let manifest = service
4275 .safe_export(
4276 &export_path,
4277 SafeExportOptions {
4278 force_checkpoint: false,
4279 },
4280 )
4281 .expect("export");
4282
4283 assert!(export_path.exists(), "exported db should exist");
4284 let manifest_path = export_dir.path().join("backup.db.export-manifest.json");
4285 assert!(
4286 manifest_path.exists(),
4287 "manifest file should exist at {}",
4288 manifest_path.display()
4289 );
4290 assert_eq!(manifest.sha256.len(), 64, "sha256 should be 64 hex chars");
4291 assert!(
4292 manifest.exported_at > 0,
4293 "exported_at should be a unix timestamp"
4294 );
4295 assert_eq!(
4296 manifest.schema_version,
4297 SchemaManager::new().current_version().0,
4298 "schema_version should match the live schema version"
4299 );
4300 assert_eq!(manifest.protocol_version, 1, "protocol_version should be 1");
4301 assert!(manifest.page_count > 0, "page_count should be positive");
4302 }
4303
4304 #[test]
4305 fn safe_export_preserves_operational_validation_contracts() {
4306 let (_db, service) = setup();
4307 let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
4308 service
4309 .register_operational_collection(&OperationalRegisterRequest {
4310 name: "connector_health".to_owned(),
4311 kind: OperationalCollectionKind::LatestState,
4312 schema_json: "{}".to_owned(),
4313 retention_json: "{}".to_owned(),
4314 filter_fields_json: "[]".to_owned(),
4315 validation_json: validation_json.to_owned(),
4316 secondary_indexes_json: "[]".to_owned(),
4317 format_version: 1,
4318 })
4319 .expect("register collection");
4320
4321 let export_dir = tempfile::TempDir::new().expect("temp dir");
4322 let export_path = export_dir.path().join("backup.db");
4323 service
4324 .safe_export(
4325 &export_path,
4326 SafeExportOptions {
4327 force_checkpoint: false,
4328 },
4329 )
4330 .expect("export");
4331
4332 let exported = sqlite::open_connection(&export_path).expect("exported conn");
4333 let exported_validation_json: String = exported
4334 .query_row(
4335 "SELECT validation_json FROM operational_collections WHERE name = 'connector_health'",
4336 [],
4337 |row| row.get(0),
4338 )
4339 .expect("validation_json");
4340 assert_eq!(exported_validation_json, validation_json);
4341 }
4342
4343 #[test]
4344 fn safe_export_force_checkpoint_false_skips_wal_pragma() {
4345 let (_db, service) = setup();
4346 let export_dir = tempfile::TempDir::new().expect("temp dir");
4347 let export_path = export_dir.path().join("no-wal.db");
4348
4349 let manifest = service
4351 .safe_export(
4352 &export_path,
4353 SafeExportOptions {
4354 force_checkpoint: false,
4355 },
4356 )
4357 .expect("export with no checkpoint");
4358
4359 assert!(
4360 manifest.page_count > 0,
4361 "page_count must be populated regardless of checkpoint mode"
4362 );
4363 assert_eq!(
4364 manifest.schema_version,
4365 SchemaManager::new().current_version().0
4366 );
4367 assert_eq!(manifest.protocol_version, 1);
4368 }
4369
4370 #[test]
4371 fn safe_export_force_checkpoint_false_still_captures_wal_backed_changes() {
4372 let (db, service) = setup();
4373 let conn = sqlite::open_connection(db.path()).expect("conn");
4374 let journal_mode: String = conn
4375 .query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
4376 .expect("enable wal");
4377 assert_eq!(journal_mode.to_lowercase(), "wal");
4378 let auto_checkpoint_pages: i64 = conn
4379 .query_row("PRAGMA wal_autocheckpoint=0", [], |row| row.get(0))
4380 .expect("disable auto checkpoint");
4381 assert_eq!(auto_checkpoint_pages, 0);
4382 conn.execute(
4383 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4384 VALUES ('r-wal', 'lg-wal', 'Meeting', '{}', 100, 'src-wal')",
4385 [],
4386 )
4387 .expect("insert wal-backed node");
4388
4389 let export_dir = tempfile::TempDir::new().expect("temp dir");
4390 let export_path = export_dir.path().join("wal-backed.db");
4391 service
4392 .safe_export(
4393 &export_path,
4394 SafeExportOptions {
4395 force_checkpoint: false,
4396 },
4397 )
4398 .expect("export wal-backed db");
4399
4400 let exported = sqlite::open_connection(&export_path).expect("open exported db");
4401 let exported_count: i64 = exported
4402 .query_row(
4403 "SELECT count(*) FROM nodes WHERE logical_id = 'lg-wal'",
4404 [],
4405 |row| row.get(0),
4406 )
4407 .expect("count exported nodes");
4408 assert_eq!(
4409 exported_count, 1,
4410 "safe_export must include committed rows that are still resident in the WAL"
4411 );
4412 }
4413
4414 #[test]
4415 fn excise_source_removes_searchable_content_after_excision() {
4416 let (db, service) = setup();
4417 {
4418 let conn = sqlite::open_connection(db.path()).expect("conn");
4419 conn.execute(
4420 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4421 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4422 [],
4423 )
4424 .expect("insert v1");
4425 conn.execute(
4426 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4427 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4428 [],
4429 )
4430 .expect("insert v2");
4431 conn.execute(
4432 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4433 VALUES ('ck1', 'lg1', 'hello world', 100)",
4434 [],
4435 )
4436 .expect("insert chunk");
4437 }
4438 service.excise_source("source-2").expect("excise");
4439 {
4440 let conn = sqlite::open_connection(db.path()).expect("conn");
4441 let fts_count: i64 = conn
4442 .query_row(
4443 "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'ck1'",
4444 [],
4445 |row| row.get(0),
4446 )
4447 .expect("fts count");
4448 assert_eq!(
4449 fts_count, 0,
4450 "excised content should not remain searchable after excise"
4451 );
4452 }
4453 }
4454
4455 #[cfg(feature = "sqlite-vec")]
4456 #[test]
4457 fn excise_source_cleans_chunks_and_vec_rows_for_excised_version() {
4458 let (db, service) = setup();
4459 {
4460 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4461 service
4462 .schema_manager
4463 .ensure_vec_kind_profile(&conn, "Meeting", 4)
4464 .expect("ensure vec kind profile");
4465 conn.execute(
4466 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4467 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4468 [],
4469 )
4470 .expect("insert v1");
4471 conn.execute(
4472 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4473 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4474 [],
4475 )
4476 .expect("insert v2");
4477 conn.execute(
4478 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4479 VALUES ('ck1', 'lg1', 'new content', 200)",
4480 [],
4481 )
4482 .expect("insert chunk");
4483 conn.execute(
4484 "INSERT INTO vec_meeting (chunk_id, embedding) VALUES ('ck1', zeroblob(16))",
4485 [],
4486 )
4487 .expect("insert vec row");
4488 }
4489
4490 service.excise_source("source-2").expect("excise");
4491
4492 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4493 let active_row: String = conn
4494 .query_row(
4495 "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
4496 [],
4497 |row| row.get(0),
4498 )
4499 .expect("restored active row");
4500 assert_eq!(active_row, "r1");
4501 let chunk_count: i64 = conn
4502 .query_row(
4503 "SELECT count(*) FROM chunks WHERE node_logical_id = 'lg1'",
4504 [],
4505 |row| row.get(0),
4506 )
4507 .expect("chunk count");
4508 assert_eq!(
4509 chunk_count, 0,
4510 "excised source content must not survive as chunks"
4511 );
4512 let vec_count: i64 = conn
4513 .query_row("SELECT count(*) FROM vec_meeting", [], |row| row.get(0))
4514 .expect("vec count");
4515 assert_eq!(vec_count, 0, "excised source vec rows must be removed");
4516 let fts_count: i64 = conn
4517 .query_row(
4518 "SELECT count(*) FROM fts_nodes WHERE node_logical_id = 'lg1'",
4519 [],
4520 |row| row.get(0),
4521 )
4522 .expect("fts count");
4523 assert_eq!(
4524 fts_count, 0,
4525 "excised source content must not remain searchable"
4526 );
4527 }
4528
4529 #[test]
4530 fn export_page_count_matches_exported_file() {
4531 let (_db, service) = setup();
4532 let export_dir = tempfile::TempDir::new().expect("temp dir");
4533 let export_path = export_dir.path().join("page-count.db");
4534
4535 let manifest = service
4536 .safe_export(
4537 &export_path,
4538 SafeExportOptions {
4539 force_checkpoint: false,
4540 },
4541 )
4542 .expect("export");
4543
4544 let exported = sqlite::open_connection(&export_path).expect("open exported db");
4545 let actual_page_count: u64 = exported
4546 .query_row("PRAGMA page_count", [], |row| row.get(0))
4547 .expect("page_count from exported file");
4548
4549 assert_eq!(
4550 manifest.page_count, actual_page_count,
4551 "manifest page_count must match the exported file's PRAGMA page_count"
4552 );
4553 }
4554
4555 #[test]
4556 fn no_temp_file_after_successful_export() {
4557 let (_db, service) = setup();
4558 let export_dir = tempfile::TempDir::new().expect("temp dir");
4559 let export_path = export_dir.path().join("no-tmp.db");
4560
4561 service
4562 .safe_export(
4563 &export_path,
4564 SafeExportOptions {
4565 force_checkpoint: false,
4566 },
4567 )
4568 .expect("export");
4569
4570 let tmp_files: Vec<_> = fs::read_dir(export_dir.path())
4571 .expect("read export dir")
4572 .filter_map(Result::ok)
4573 .filter(|e| e.path().extension().is_some_and(|ext| ext == "tmp"))
4574 .collect();
4575
4576 assert!(
4577 tmp_files.is_empty(),
4578 "no .tmp files should remain after a successful export, found: {tmp_files:?}"
4579 );
4580 }
4581
4582 #[test]
4583 fn export_manifest_is_valid_json() {
4584 let (_db, service) = setup();
4585 let export_dir = tempfile::TempDir::new().expect("temp dir");
4586 let export_path = export_dir.path().join("valid-json.db");
4587
4588 service
4589 .safe_export(
4590 &export_path,
4591 SafeExportOptions {
4592 force_checkpoint: false,
4593 },
4594 )
4595 .expect("export");
4596
4597 let manifest_path = export_dir.path().join("valid-json.db.export-manifest.json");
4598 let manifest_contents = fs::read_to_string(&manifest_path).expect("read manifest");
4599 let parsed: serde_json::Value =
4600 serde_json::from_str(&manifest_contents).expect("manifest must be valid JSON");
4601
4602 assert!(
4603 parsed.get("exported_at").is_some(),
4604 "manifest must contain exported_at"
4605 );
4606 assert!(
4607 parsed.get("sha256").is_some(),
4608 "manifest must contain sha256"
4609 );
4610 assert!(
4611 parsed.get("schema_version").is_some(),
4612 "manifest must contain schema_version"
4613 );
4614 assert!(
4615 parsed.get("protocol_version").is_some(),
4616 "manifest must contain protocol_version"
4617 );
4618 assert!(
4619 parsed.get("page_count").is_some(),
4620 "manifest must contain page_count"
4621 );
4622 }
4623
4624 #[test]
4625 fn provenance_purge_dry_run_reports_counts() {
4626 let (db, service) = setup();
4627 {
4628 let conn = sqlite::open_connection(db.path()).expect("conn");
4629 conn.execute(
4630 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4631 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
4632 [],
4633 )
4634 .expect("insert p1");
4635 conn.execute(
4636 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4637 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
4638 [],
4639 )
4640 .expect("insert p2");
4641 conn.execute(
4642 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4643 VALUES ('p3', 'excise', 'lg3', 'src-1', 300)",
4644 [],
4645 )
4646 .expect("insert p3");
4647 }
4648
4649 let options = super::ProvenancePurgeOptions {
4650 dry_run: true,
4651 preserve_event_types: Vec::new(),
4652 };
4653 let report = service
4654 .purge_provenance_events(250, &options)
4655 .expect("dry run purge");
4656
4657 assert_eq!(report.events_deleted, 2);
4658 assert_eq!(report.events_preserved, 1);
4659 assert!(report.oldest_remaining.is_some());
4660
4661 let conn = sqlite::open_connection(db.path()).expect("conn");
4662 let total: i64 = conn
4663 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
4664 row.get(0)
4665 })
4666 .expect("count");
4667 assert_eq!(total, 3, "dry_run must not delete any events");
4668 }
4669
4670 #[test]
4671 fn provenance_purge_deletes_old_events() {
4672 let (db, service) = setup();
4673 {
4674 let conn = sqlite::open_connection(db.path()).expect("conn");
4675 conn.execute(
4676 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4677 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
4678 [],
4679 )
4680 .expect("insert p1");
4681 conn.execute(
4682 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4683 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
4684 [],
4685 )
4686 .expect("insert p2");
4687 }
4688
4689 let options = super::ProvenancePurgeOptions {
4690 dry_run: false,
4691 preserve_event_types: Vec::new(),
4692 };
4693 let report = service
4694 .purge_provenance_events(150, &options)
4695 .expect("purge");
4696
4697 assert_eq!(report.events_deleted, 1);
4698 assert_eq!(report.events_preserved, 1);
4699 assert_eq!(report.oldest_remaining, Some(200));
4700
4701 let conn = sqlite::open_connection(db.path()).expect("conn");
4702 let remaining: i64 = conn
4703 .query_row("SELECT count(*) FROM provenance_events", [], |row| {
4704 row.get(0)
4705 })
4706 .expect("count");
4707 assert_eq!(remaining, 1);
4708 }
4709
4710 #[test]
4711 fn provenance_purge_preserves_specified_types() {
4712 let (db, service) = setup();
4713 {
4714 let conn = sqlite::open_connection(db.path()).expect("conn");
4715 conn.execute(
4716 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4717 VALUES ('p1', 'excise', 'lg1', 'src-1', 100)",
4718 [],
4719 )
4720 .expect("insert p1");
4721 conn.execute(
4722 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4723 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 100)",
4724 [],
4725 )
4726 .expect("insert p2");
4727 conn.execute(
4728 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4729 VALUES ('p3', 'node_insert', 'lg3', 'src-1', 100)",
4730 [],
4731 )
4732 .expect("insert p3");
4733 }
4734
4735 let options = super::ProvenancePurgeOptions {
4736 dry_run: false,
4737 preserve_event_types: Vec::new(),
4738 };
4739 let report = service
4740 .purge_provenance_events(500, &options)
4741 .expect("purge");
4742
4743 assert_eq!(report.events_deleted, 2);
4744 assert_eq!(report.events_preserved, 1);
4745
4746 let conn = sqlite::open_connection(db.path()).expect("conn");
4747 let remaining_type: String = conn
4748 .query_row("SELECT event_type FROM provenance_events", [], |row| {
4749 row.get(0)
4750 })
4751 .expect("remaining event type");
4752 assert_eq!(remaining_type, "excise");
4753 }
4754
4755 #[test]
4756 fn provenance_purge_noop_with_zero_timestamp() {
4757 let (db, service) = setup();
4758 {
4759 let conn = sqlite::open_connection(db.path()).expect("conn");
4760 conn.execute(
4761 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4762 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
4763 [],
4764 )
4765 .expect("insert p1");
4766 }
4767
4768 let options = super::ProvenancePurgeOptions {
4769 dry_run: false,
4770 preserve_event_types: Vec::new(),
4771 };
4772 let report = service.purge_provenance_events(0, &options).expect("purge");
4773
4774 assert_eq!(report.events_deleted, 0);
4775 assert_eq!(report.events_preserved, 1);
4776 assert_eq!(report.oldest_remaining, Some(100));
4777 }
4778
4779 #[test]
4780 fn restore_skips_edge_when_counterpart_purged() {
4781 let (db, service) = setup();
4782 {
4783 let conn = sqlite::open_connection(db.path()).expect("conn");
4784 conn.execute(
4786 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4787 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
4788 [],
4789 )
4790 .expect("insert node A");
4791 conn.execute(
4792 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4793 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
4794 [],
4795 )
4796 .expect("insert node B");
4797 conn.execute(
4799 "INSERT INTO edges \
4800 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4801 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
4802 [],
4803 )
4804 .expect("insert edge");
4805 conn.execute(
4807 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4808 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
4809 [],
4810 )
4811 .expect("insert retire event A");
4812 conn.execute(
4813 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4814 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
4815 [],
4816 )
4817 .expect("insert edge retire event");
4818 conn.execute(
4819 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
4820 [],
4821 )
4822 .expect("retire node A");
4823 conn.execute(
4824 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
4825 [],
4826 )
4827 .expect("retire node B");
4828 conn.execute(
4829 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
4830 [],
4831 )
4832 .expect("retire edge");
4833 conn.execute("DELETE FROM nodes WHERE logical_id = 'doc-2'", [])
4836 .expect("purge node B rows");
4837 }
4838
4839 let report = service.restore_logical_id("doc-1").expect("restore A");
4841 assert!(!report.was_noop);
4842 assert_eq!(report.restored_node_rows, 1);
4843 assert_eq!(report.restored_edge_rows, 0, "edge should not be restored");
4844 assert_eq!(report.skipped_edges.len(), 1);
4845 assert_eq!(report.skipped_edges[0].edge_logical_id, "edge-1");
4846 assert_eq!(report.skipped_edges[0].missing_endpoint, "doc-2");
4847
4848 let conn = sqlite::open_connection(db.path()).expect("conn");
4850 let active_edge_count: i64 = conn
4851 .query_row(
4852 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
4853 [],
4854 |row| row.get(0),
4855 )
4856 .expect("active edge count");
4857 assert_eq!(active_edge_count, 0, "edge must remain retired");
4858 }
4859
4860 #[test]
4861 fn restore_restores_edges_to_active_nodes() {
4862 let (db, service) = setup();
4863 {
4864 let conn = sqlite::open_connection(db.path()).expect("conn");
4865 conn.execute(
4867 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4868 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
4869 [],
4870 )
4871 .expect("insert node A");
4872 conn.execute(
4873 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4874 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
4875 [],
4876 )
4877 .expect("insert node B");
4878 conn.execute(
4880 "INSERT INTO edges \
4881 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4882 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
4883 [],
4884 )
4885 .expect("insert edge");
4886 conn.execute(
4888 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4889 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
4890 [],
4891 )
4892 .expect("insert retire event A");
4893 conn.execute(
4894 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4895 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
4896 [],
4897 )
4898 .expect("insert edge retire event");
4899 conn.execute(
4900 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
4901 [],
4902 )
4903 .expect("retire node A");
4904 conn.execute(
4905 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
4906 [],
4907 )
4908 .expect("retire edge");
4909 }
4910
4911 let report = service.restore_logical_id("doc-1").expect("restore A");
4913 assert!(!report.was_noop);
4914 assert_eq!(report.restored_node_rows, 1);
4915 assert!(report.restored_edge_rows > 0, "edge should be restored");
4916 assert!(
4917 report.skipped_edges.is_empty(),
4918 "no edges should be skipped"
4919 );
4920
4921 let conn = sqlite::open_connection(db.path()).expect("conn");
4922 let active_edge_count: i64 = conn
4923 .query_row(
4924 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
4925 [],
4926 |row| row.get(0),
4927 )
4928 .expect("active edge count");
4929 assert_eq!(active_edge_count, 1, "edge must be active");
4930 }
4931
4932 #[test]
4933 fn restore_restores_edges_when_both_restored() {
4934 let (db, service) = setup();
4935 {
4936 let conn = sqlite::open_connection(db.path()).expect("conn");
4937 conn.execute(
4939 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4940 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
4941 [],
4942 )
4943 .expect("insert node A");
4944 conn.execute(
4945 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4946 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
4947 [],
4948 )
4949 .expect("insert node B");
4950 conn.execute(
4952 "INSERT INTO edges \
4953 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4954 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
4955 [],
4956 )
4957 .expect("insert edge");
4958 conn.execute(
4960 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4961 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
4962 [],
4963 )
4964 .expect("insert retire event A");
4965 conn.execute(
4966 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4967 VALUES ('evt-retire-b', 'node_retire', 'doc-2', 'forget-1', 200, '')",
4968 [],
4969 )
4970 .expect("insert retire event B");
4971 conn.execute(
4972 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4973 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
4974 [],
4975 )
4976 .expect("insert edge retire event");
4977 conn.execute(
4978 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
4979 [],
4980 )
4981 .expect("retire node A");
4982 conn.execute(
4983 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
4984 [],
4985 )
4986 .expect("retire node B");
4987 conn.execute(
4988 "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
4989 [],
4990 )
4991 .expect("retire edge");
4992 }
4993
4994 let report_b = service.restore_logical_id("doc-2").expect("restore B");
4996 assert!(!report_b.was_noop);
4997
4998 let report_a = service.restore_logical_id("doc-1").expect("restore A");
5000 assert!(!report_a.was_noop);
5001 assert_eq!(report_a.restored_node_rows, 1);
5002 assert!(
5003 report_a.restored_edge_rows > 0,
5004 "edge should be restored when both endpoints active"
5005 );
5006 assert!(
5007 report_a.skipped_edges.is_empty(),
5008 "no edges should be skipped"
5009 );
5010
5011 let conn = sqlite::open_connection(db.path()).expect("conn");
5012 let active_edge_count: i64 = conn
5013 .query_row(
5014 "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5015 [],
5016 |row| row.get(0),
5017 )
5018 .expect("active edge count");
5019 assert_eq!(
5020 active_edge_count, 1,
5021 "edge must be active after both endpoints restored"
5022 );
5023 }
5024
5025 #[test]
5028 fn fts_property_schema_crud_round_trip() {
5029 let (_db, service) = setup();
5030
5031 let record = service
5033 .register_fts_property_schema(
5034 "Meeting",
5035 &["$.title".to_owned(), "$.summary".to_owned()],
5036 None,
5037 )
5038 .expect("register");
5039 assert_eq!(record.kind, "Meeting");
5040 assert_eq!(record.property_paths, vec!["$.title", "$.summary"]);
5041 assert_eq!(record.separator, " ");
5042 assert_eq!(record.format_version, 1);
5043
5044 let described = service
5046 .describe_fts_property_schema("Meeting")
5047 .expect("describe")
5048 .expect("should exist");
5049 assert_eq!(described, record);
5050
5051 let missing = service
5053 .describe_fts_property_schema("NoSuchKind")
5054 .expect("describe missing");
5055 assert!(missing.is_none());
5056
5057 let list = service.list_fts_property_schemas().expect("list");
5059 assert_eq!(list.len(), 1);
5060 assert_eq!(list[0].kind, "Meeting");
5061
5062 let updated = service
5064 .register_fts_property_schema(
5065 "Meeting",
5066 &["$.title".to_owned(), "$.notes".to_owned()],
5067 Some("\n"),
5068 )
5069 .expect("update");
5070 assert_eq!(updated.property_paths, vec!["$.title", "$.notes"]);
5071 assert_eq!(updated.separator, "\n");
5072
5073 service
5075 .remove_fts_property_schema("Meeting")
5076 .expect("remove");
5077 let after_remove = service
5078 .describe_fts_property_schema("Meeting")
5079 .expect("describe after remove");
5080 assert!(after_remove.is_none());
5081
5082 let err = service.remove_fts_property_schema("Meeting");
5084 assert!(err.is_err());
5085 }
5086
5087 #[test]
5088 fn describe_fts_property_schema_round_trips_recursive_entries() {
5089 let (_db, service) = setup();
5090
5091 let entries = vec![
5092 FtsPropertyPathSpec::scalar("$.title"),
5093 FtsPropertyPathSpec::recursive("$.payload"),
5094 ];
5095 let exclude = vec!["$.payload.private".to_owned()];
5096 let registered = service
5097 .register_fts_property_schema_with_entries(
5098 "KnowledgeItem",
5099 &entries,
5100 Some(" "),
5101 &exclude,
5102 crate::rebuild_actor::RebuildMode::Eager,
5103 )
5104 .expect("register recursive");
5105
5106 assert_eq!(registered.entries, entries);
5109 assert_eq!(registered.exclude_paths, exclude);
5110 assert_eq!(registered.property_paths, vec!["$.title", "$.payload"]);
5111
5112 let described = service
5113 .describe_fts_property_schema("KnowledgeItem")
5114 .expect("describe")
5115 .expect("should exist");
5116 assert_eq!(described.kind, "KnowledgeItem");
5117 assert_eq!(described.entries, entries);
5118 assert_eq!(described.exclude_paths, exclude);
5119 assert_eq!(described.property_paths, vec!["$.title", "$.payload"]);
5120 assert_eq!(described.separator, " ");
5121 assert_eq!(described.format_version, 1);
5122 }
5123
5124 #[test]
5125 fn list_fts_property_schemas_round_trips_recursive_entries() {
5126 let (_db, service) = setup();
5127
5128 let entries = vec![
5129 FtsPropertyPathSpec::scalar("$.title"),
5130 FtsPropertyPathSpec::recursive("$.payload"),
5131 ];
5132 let exclude = vec!["$.payload.secret".to_owned()];
5133 service
5134 .register_fts_property_schema_with_entries(
5135 "KnowledgeItem",
5136 &entries,
5137 Some(" "),
5138 &exclude,
5139 crate::rebuild_actor::RebuildMode::Eager,
5140 )
5141 .expect("register recursive");
5142
5143 let listed = service.list_fts_property_schemas().expect("list");
5144 assert_eq!(listed.len(), 1);
5145 let record = &listed[0];
5146 assert_eq!(record.kind, "KnowledgeItem");
5147 assert_eq!(record.entries, entries);
5148 assert_eq!(record.exclude_paths, exclude);
5149 assert_eq!(record.property_paths, vec!["$.title", "$.payload"]);
5150 }
5151
5152 #[test]
5153 fn describe_fts_property_schema_round_trips_scalar_only_entries() {
5154 let (_db, service) = setup();
5155
5156 service
5157 .register_fts_property_schema(
5158 "Meeting",
5159 &["$.title".to_owned(), "$.summary".to_owned()],
5160 None,
5161 )
5162 .expect("register scalar");
5163
5164 let described = service
5165 .describe_fts_property_schema("Meeting")
5166 .expect("describe")
5167 .expect("should exist");
5168 assert_eq!(described.property_paths, vec!["$.title", "$.summary"]);
5169 assert_eq!(described.entries.len(), 2);
5170 for entry in &described.entries {
5171 assert_eq!(
5172 entry.mode,
5173 FtsPropertyPathMode::Scalar,
5174 "scalar-only schema should deserialize every entry as Scalar"
5175 );
5176 }
5177 assert!(described.exclude_paths.is_empty());
5178 }
5179
5180 #[test]
5181 fn restore_reestablishes_property_fts_visibility() {
5182 let (db, service) = setup();
5183 let doc_table = fathomdb_schema::fts_kind_table_name("Document");
5184 {
5185 let conn = sqlite::open_connection(db.path()).expect("conn");
5186 conn.execute(
5188 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5189 VALUES ('Document', '[\"$.title\", \"$.body\"]', ' ')",
5190 [],
5191 )
5192 .expect("register schema");
5193 conn.execute_batch(&format!(
5195 "CREATE VIRTUAL TABLE IF NOT EXISTS {doc_table} USING fts5(\
5196 node_logical_id UNINDEXED, text_content, \
5197 tokenize = 'porter unicode61 remove_diacritics 2'\
5198 )"
5199 ))
5200 .expect("create per-kind table");
5201 conn.execute(
5203 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5204 VALUES ('row-1', 'doc-1', 'Document', '{\"title\":\"Budget\",\"body\":\"Q3 forecast\"}', 100, 'seed')",
5205 [],
5206 )
5207 .expect("insert node");
5208 conn.execute(
5210 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5211 VALUES ('chunk-1', 'doc-1', 'budget text', 100)",
5212 [],
5213 )
5214 .expect("insert chunk");
5215 conn.execute(
5217 &format!(
5218 "INSERT INTO {doc_table} (node_logical_id, text_content) \
5219 VALUES ('doc-1', 'Budget Q3 forecast')"
5220 ),
5221 [],
5222 )
5223 .expect("insert property fts");
5224 conn.execute(
5226 "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5227 VALUES ('evt-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5228 [],
5229 )
5230 .expect("retire event");
5231 conn.execute(
5232 "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5233 [],
5234 )
5235 .expect("supersede");
5236 conn.execute("DELETE FROM fts_nodes", [])
5237 .expect("clear chunk fts");
5238 conn.execute(&format!("DELETE FROM {doc_table}"), [])
5239 .expect("clear property fts");
5240 }
5241
5242 let report = service.restore_logical_id("doc-1").expect("restore");
5243 assert_eq!(report.restored_property_fts_rows, 1);
5244
5245 let conn = sqlite::open_connection(db.path()).expect("conn");
5247 let prop_fts_count: i64 = conn
5248 .query_row(
5249 &format!("SELECT count(*) FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5250 [],
5251 |row| row.get(0),
5252 )
5253 .expect("prop fts count");
5254 assert_eq!(prop_fts_count, 1, "property FTS must be restored");
5255
5256 let text: String = conn
5257 .query_row(
5258 &format!("SELECT text_content FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5259 [],
5260 |row| row.get(0),
5261 )
5262 .expect("prop fts text");
5263 assert_eq!(text, "Budget Q3 forecast");
5264 }
5265
5266 #[test]
5267 fn safe_export_preserves_fts_property_schemas() {
5268 let (_db, service) = setup();
5269 service
5270 .register_fts_property_schema(
5271 "Goal",
5272 &["$.name".to_owned(), "$.rationale".to_owned()],
5273 None,
5274 )
5275 .expect("register schema");
5276
5277 let export_dir = tempfile::TempDir::new().expect("temp dir");
5278 let export_path = export_dir.path().join("backup.db");
5279 service
5280 .safe_export(
5281 &export_path,
5282 SafeExportOptions {
5283 force_checkpoint: false,
5284 },
5285 )
5286 .expect("export");
5287
5288 let exported_conn = rusqlite::Connection::open(&export_path).expect("open exported db");
5290 let kind: String = exported_conn
5291 .query_row(
5292 "SELECT kind FROM fts_property_schemas WHERE kind = 'Goal'",
5293 [],
5294 |row| row.get(0),
5295 )
5296 .expect("schema must exist in export");
5297 assert_eq!(kind, "Goal");
5298 let paths_json: String = exported_conn
5299 .query_row(
5300 "SELECT property_paths_json FROM fts_property_schemas WHERE kind = 'Goal'",
5301 [],
5302 |row| row.get(0),
5303 )
5304 .expect("paths must exist");
5305 let paths: Vec<String> = serde_json::from_str(&paths_json).expect("valid json");
5306 assert_eq!(paths, vec!["$.name", "$.rationale"]);
5307 }
5308
5309 #[test]
5310 #[allow(clippy::too_many_lines)]
5311 fn export_recovery_rebuilds_property_fts_from_canonical_state() {
5312 let (db, service) = setup();
5313 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5314 service
5316 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5317 .expect("register");
5318 {
5319 let conn = sqlite::open_connection(db.path()).expect("conn");
5320 conn.execute(
5321 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5322 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5323 [],
5324 )
5325 .expect("insert node 1");
5326 conn.execute(
5327 &format!(
5328 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5329 VALUES ('goal-1', 'Ship v2')"
5330 ),
5331 [],
5332 )
5333 .expect("insert property FTS row 1");
5334 conn.execute(
5335 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5336 VALUES ('row-2', 'goal-2', 'Goal', '{\"name\":\"Launch redesign\"}', 100, 'seed')",
5337 [],
5338 )
5339 .expect("insert node 2");
5340 conn.execute(
5341 &format!(
5342 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5343 VALUES ('goal-2', 'Launch redesign')"
5344 ),
5345 [],
5346 )
5347 .expect("insert property FTS row 2");
5348 }
5349
5350 let export_dir = tempfile::TempDir::new().expect("temp dir");
5352 let export_path = export_dir.path().join("backup.db");
5353 service
5354 .safe_export(
5355 &export_path,
5356 SafeExportOptions {
5357 force_checkpoint: false,
5358 },
5359 )
5360 .expect("export");
5361
5362 {
5366 let conn = rusqlite::Connection::open(&export_path).expect("open export");
5367 SchemaManager::new()
5369 .bootstrap(&conn)
5370 .expect("bootstrap export");
5371 conn.execute(
5372 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5373 [],
5374 )
5375 .expect("delete old row");
5376 conn.execute(
5377 &format!(
5378 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5379 VALUES ('goal-1', 'completely wrong stale text')"
5380 ),
5381 [],
5382 )
5383 .expect("insert corrupted row");
5384 conn.execute(
5385 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5386 [],
5387 )
5388 .expect("delete goal-2 row");
5389 }
5390
5391 let schema = Arc::new(SchemaManager::new());
5393 let exported_service = AdminService::new(&export_path, Arc::clone(&schema));
5394 exported_service
5395 .rebuild_projections(ProjectionTarget::Fts)
5396 .expect("rebuild");
5397
5398 let conn = rusqlite::Connection::open(&export_path).expect("open export for verify");
5400 let goal1_text: String = conn
5401 .query_row(
5402 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5403 [],
5404 |r| r.get(0),
5405 )
5406 .expect("goal-1 text after rebuild");
5407 assert_eq!(
5408 goal1_text, "Ship v2",
5409 "goal-1 text must be corrected by rebuild"
5410 );
5411
5412 let goal2_count: i64 = conn
5413 .query_row(
5414 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5415 [],
5416 |r| r.get(0),
5417 )
5418 .expect("goal-2 count");
5419 assert_eq!(goal2_count, 1, "goal-2 row must be restored by rebuild");
5420
5421 let stale_count: i64 = conn
5422 .query_row(
5423 &format!("SELECT count(*) FROM {goal_table} WHERE text_content = 'completely wrong stale text'"),
5424 [],
5425 |r| r.get(0),
5426 )
5427 .expect("stale count");
5428 assert_eq!(stale_count, 0, "corrupted text must be gone after rebuild");
5429
5430 let integrity = exported_service.check_integrity().expect("integrity");
5432 assert_eq!(integrity.missing_property_fts_rows, 0);
5433 let semantics = exported_service.check_semantics().expect("semantics");
5434 assert_eq!(semantics.drifted_property_fts_rows, 0);
5435 assert_eq!(semantics.orphaned_property_fts_rows, 0);
5436 assert_eq!(semantics.duplicate_property_fts_rows, 0);
5437 }
5438
5439 #[test]
5440 fn check_integrity_no_false_positives_for_empty_extraction() {
5441 let (db, service) = setup();
5442 {
5443 let conn = sqlite::open_connection(db.path()).expect("conn");
5444 conn.execute(
5446 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5447 VALUES ('Ticket', '[\"$.searchable\"]', ' ')",
5448 [],
5449 )
5450 .expect("register schema");
5451 conn.execute(
5454 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5455 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"status\":\"open\"}', 100, 'seed')",
5456 [],
5457 )
5458 .expect("insert node");
5459 }
5460
5461 let report = service.check_integrity().expect("integrity");
5462 assert_eq!(
5463 report.missing_property_fts_rows, 0,
5464 "node with no extractable values must not be counted as missing"
5465 );
5466 }
5467
5468 #[test]
5469 fn check_integrity_detects_genuinely_missing_property_fts_rows() {
5470 let (db, service) = setup();
5471 {
5472 let conn = sqlite::open_connection(db.path()).expect("conn");
5473 conn.execute(
5474 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5475 VALUES ('Ticket', '[\"$.title\"]', ' ')",
5476 [],
5477 )
5478 .expect("register schema");
5479 conn.execute(
5481 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5482 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"title\":\"fix login bug\"}', 100, 'seed')",
5483 [],
5484 )
5485 .expect("insert node");
5486 }
5487
5488 let report = service.check_integrity().expect("integrity");
5489 assert_eq!(
5490 report.missing_property_fts_rows, 1,
5491 "node with extractable values but no property FTS row must be detected"
5492 );
5493 }
5494
5495 #[test]
5496 fn rebuild_projections_fts_restores_missing_property_fts_rows() {
5497 let (db, service) = setup();
5498 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5499 {
5500 let conn = sqlite::open_connection(db.path()).expect("conn");
5501 conn.execute(
5502 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5503 VALUES ('Goal', '[\"$.name\"]', ' ')",
5504 [],
5505 )
5506 .expect("register schema");
5507 conn.execute(
5508 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5509 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5510 [],
5511 )
5512 .expect("insert node");
5513 }
5515
5516 let report = service
5517 .rebuild_projections(ProjectionTarget::Fts)
5518 .expect("rebuild");
5519 assert!(
5520 report.rebuilt_rows >= 1,
5521 "rebuild must insert at least one property FTS row"
5522 );
5523
5524 let conn = sqlite::open_connection(db.path()).expect("conn");
5525 let text: String = conn
5526 .query_row(
5527 &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5528 [],
5529 |row| row.get(0),
5530 )
5531 .expect("property FTS row must exist after rebuild");
5532 assert_eq!(text, "Ship v2");
5533 }
5534
5535 #[test]
5536 fn rebuild_missing_projections_fills_gap_for_deleted_property_fts_row() {
5537 let (db, service) = setup();
5538 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5539 {
5540 let conn = sqlite::open_connection(db.path()).expect("conn");
5541 conn.execute(
5542 "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5543 VALUES ('Goal', '[\"$.name\"]', ' ')",
5544 [],
5545 )
5546 .expect("register schema");
5547 conn.execute(
5548 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5549 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5550 [],
5551 )
5552 .expect("insert node");
5553 conn.execute_batch(&format!(
5555 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
5556 node_logical_id UNINDEXED, text_content, \
5557 tokenize = 'porter unicode61 remove_diacritics 2'\
5558 )"
5559 ))
5560 .expect("create per-kind table");
5561 conn.execute(
5562 &format!(
5563 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5564 VALUES ('goal-1', 'Ship v2')"
5565 ),
5566 [],
5567 )
5568 .expect("insert property fts");
5569 conn.execute(
5570 &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5571 [],
5572 )
5573 .expect("delete property fts");
5574 }
5575
5576 let report = service
5577 .rebuild_missing_projections()
5578 .expect("rebuild missing");
5579 assert!(
5580 report.rebuilt_rows >= 1,
5581 "missing rebuild must insert the gap-fill row"
5582 );
5583
5584 let conn = sqlite::open_connection(db.path()).expect("conn");
5585 let count: i64 = conn
5586 .query_row(
5587 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5588 [],
5589 |row| row.get(0),
5590 )
5591 .expect("count");
5592 assert_eq!(
5593 count, 1,
5594 "gap-fill must restore exactly one property FTS row"
5595 );
5596 }
5597
5598 #[test]
5599 fn remove_schema_then_rebuild_cleans_stale_property_fts_rows() {
5600 let (db, service) = setup();
5606 let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5607 {
5608 let conn = sqlite::open_connection(db.path()).expect("conn");
5609 conn.execute(
5610 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5611 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5612 [],
5613 )
5614 .expect("insert node");
5615 conn.execute_batch(&format!(
5618 "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
5619 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
5620 ))
5621 .expect("create per-kind table");
5622 conn.execute(
5623 &format!(
5624 "INSERT INTO {goal_table} (node_logical_id, text_content) \
5625 VALUES ('goal-1', 'Ship v2')"
5626 ),
5627 [],
5628 )
5629 .expect("insert property fts");
5630 }
5631
5632 let semantics = service.check_semantics().expect("semantics");
5634 assert_eq!(
5635 semantics.orphaned_property_fts_rows, 1,
5636 "orphaned property FTS rows must be detected with no registered schema"
5637 );
5638
5639 service
5641 .rebuild_projections(ProjectionTarget::Fts)
5642 .expect("rebuild");
5643
5644 let conn = sqlite::open_connection(db.path()).expect("conn");
5645 let count: i64 = conn
5646 .query_row(
5647 &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5648 [],
5649 |row| row.get(0),
5650 )
5651 .expect("count");
5652 assert_eq!(
5653 count, 0,
5654 "rebuild must delete rows from per-kind tables with no registered schema"
5655 );
5656 }
5657
5658 mod validate_fts_property_paths_tests {
5659 use super::super::validate_fts_property_paths;
5660
5661 #[test]
5662 fn valid_simple_path() {
5663 assert!(validate_fts_property_paths(&["$.name".to_owned()]).is_ok());
5664 }
5665
5666 #[test]
5667 fn valid_nested_path() {
5668 assert!(validate_fts_property_paths(&["$.address.city".to_owned()]).is_ok());
5669 }
5670
5671 #[test]
5672 fn valid_underscore_segment() {
5673 assert!(validate_fts_property_paths(&["$.a_b".to_owned()]).is_ok());
5674 }
5675
5676 #[test]
5677 fn rejects_bare_prefix() {
5678 let result = validate_fts_property_paths(&["$.".to_owned()]);
5679 assert!(result.is_err(), "path '$.' must be rejected");
5680 }
5681
5682 #[test]
5683 fn rejects_double_dot() {
5684 let result = validate_fts_property_paths(&["$..x".to_owned()]);
5685 assert!(result.is_err(), "path '$..x' must be rejected");
5686 }
5687
5688 #[test]
5689 fn rejects_trailing_dot() {
5690 let result = validate_fts_property_paths(&["$.foo.".to_owned()]);
5691 assert!(result.is_err(), "path '$.foo.' must be rejected");
5692 }
5693
5694 #[test]
5695 fn rejects_space_in_segment() {
5696 let result = validate_fts_property_paths(&["$.foo bar".to_owned()]);
5697 assert!(result.is_err(), "path '$.foo bar' must be rejected");
5698 }
5699
5700 #[test]
5701 fn rejects_bracket_syntax() {
5702 let result = validate_fts_property_paths(&["$.foo[0]".to_owned()]);
5703 assert!(result.is_err(), "path '$.foo[0]' must be rejected");
5704 }
5705
5706 #[test]
5707 fn rejects_duplicates() {
5708 let result = validate_fts_property_paths(&["$.name".to_owned(), "$.name".to_owned()]);
5709 assert!(result.is_err(), "duplicate paths must be rejected");
5710 }
5711
5712 #[test]
5713 fn rejects_empty_list() {
5714 let result = validate_fts_property_paths(&[]);
5715 assert!(result.is_err(), "empty path list must be rejected");
5716 }
5717 }
5718
5719 #[test]
5722 fn register_fts_schema_writes_to_per_kind_table() {
5723 let (db, service) = setup();
5726 {
5727 let conn = sqlite::open_connection(db.path()).expect("conn");
5728 conn.execute(
5730 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5731 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5732 [],
5733 )
5734 .expect("insert node");
5735 }
5736
5737 service
5739 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5740 .expect("register schema");
5741
5742 let conn = sqlite::open_connection(db.path()).expect("conn");
5743 let table = fathomdb_schema::fts_kind_table_name("Goal");
5744 let per_kind_count: i64 = conn
5746 .query_row(
5747 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
5748 [],
5749 |row| row.get(0),
5750 )
5751 .expect("per-kind count");
5752 assert_eq!(
5753 per_kind_count, 1,
5754 "per-kind table must have the row after registration"
5755 );
5756 }
5757
5758 #[test]
5759 fn remove_fts_schema_deletes_from_per_kind_table() {
5760 let (db, service) = setup();
5762 {
5763 let conn = sqlite::open_connection(db.path()).expect("conn");
5764 conn.execute(
5765 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5766 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5767 [],
5768 )
5769 .expect("insert node");
5770 }
5771
5772 service
5773 .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5774 .expect("register schema");
5775 service
5776 .remove_fts_property_schema("Goal")
5777 .expect("remove schema");
5778
5779 let conn = sqlite::open_connection(db.path()).expect("conn");
5780 let table = fathomdb_schema::fts_kind_table_name("Goal");
5781 let per_kind_count: i64 = conn
5782 .query_row(
5783 &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
5784 [],
5785 |row| row.get(0),
5786 )
5787 .expect("per-kind count");
5788 assert_eq!(
5789 per_kind_count, 0,
5790 "per-kind table must be empty after schema removal"
5791 );
5792 }
5793
5794 #[test]
5797 fn fts_path_spec_with_weight_builder() {
5798 let spec = FtsPropertyPathSpec::scalar("$.title").with_weight(5.0);
5799 assert_eq!(spec.weight, Some(5.0));
5800 assert_eq!(spec.path, "$.title");
5801 assert_eq!(spec.mode, FtsPropertyPathMode::Scalar);
5802 }
5803
5804 #[test]
5805 fn fts_path_spec_serialize_with_weight() {
5806 use super::serialize_property_paths_json;
5807 let entries = vec![
5808 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
5809 FtsPropertyPathSpec::scalar("$.body"),
5810 ];
5811 let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
5812 let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
5814 let paths = v
5815 .get("paths")
5816 .expect("paths key")
5817 .as_array()
5818 .expect("array");
5819 assert_eq!(paths.len(), 2);
5820 assert_eq!(
5822 paths[0].get("path").and_then(serde_json::Value::as_str),
5823 Some("$.title")
5824 );
5825 assert_eq!(
5826 paths[0].get("weight").and_then(serde_json::Value::as_f64),
5827 Some(2.0)
5828 );
5829 assert!(
5831 paths[1].get("weight").is_none(),
5832 "unweighted spec must omit weight field"
5833 );
5834 }
5835
5836 #[test]
5837 fn fts_path_spec_serialize_no_weights() {
5838 use super::serialize_property_paths_json;
5839 let entries = vec![
5840 FtsPropertyPathSpec::scalar("$.title"),
5841 FtsPropertyPathSpec::scalar("$.payload"),
5842 ];
5843 let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
5844 let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
5846 assert!(
5847 v.is_array(),
5848 "all-scalar no-weight schema must serialize as bare string array"
5849 );
5850 let arr = v.as_array().expect("array");
5851 assert_eq!(arr.len(), 2);
5852 assert_eq!(arr[0].as_str(), Some("$.title"));
5853 assert_eq!(arr[1].as_str(), Some("$.payload"));
5854 }
5855
5856 #[test]
5857 fn fts_weight_validation_out_of_range() {
5858 let (_db, service) = setup();
5859 let entries_zero = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(0.0)];
5861 let result = service.register_fts_property_schema_with_entries(
5862 "Article",
5863 &entries_zero,
5864 None,
5865 &[],
5866 crate::rebuild_actor::RebuildMode::Eager,
5867 );
5868 assert!(result.is_err(), "weight 0.0 must be rejected");
5869 let err_msg = result.expect_err("weight 0.0 must be rejected").to_string();
5870 assert!(
5871 err_msg.contains("weight"),
5872 "error must mention weight: {err_msg}"
5873 );
5874
5875 let entries_big = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(1001.0)];
5877 let result = service.register_fts_property_schema_with_entries(
5878 "Article",
5879 &entries_big,
5880 None,
5881 &[],
5882 crate::rebuild_actor::RebuildMode::Eager,
5883 );
5884 assert!(result.is_err(), "weight 1001.0 must be rejected");
5885 }
5886
5887 #[test]
5888 fn fts_weight_validation_valid() {
5889 let (_db, service) = setup();
5890 let entries = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(10.0)];
5891 let result = service.register_fts_property_schema_with_entries(
5892 "Article",
5893 &entries,
5894 None,
5895 &[],
5896 crate::rebuild_actor::RebuildMode::Eager,
5897 );
5898 assert!(
5899 result.is_ok(),
5900 "weight 10.0 must be accepted: {:?}",
5901 result.err()
5902 );
5903 }
5904
5905 #[test]
5908 fn create_or_replace_creates_multi_column_table() {
5909 use super::create_or_replace_fts_kind_table;
5910 let (db, _service) = setup();
5911 let conn = sqlite::open_connection(db.path()).expect("conn");
5912 let specs = vec![
5913 FtsPropertyPathSpec::scalar("$.title"),
5914 FtsPropertyPathSpec::recursive("$.payload"),
5915 ];
5916 create_or_replace_fts_kind_table(
5917 &conn,
5918 "Article",
5919 &specs,
5920 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
5921 )
5922 .expect("create table");
5923
5924 let table = fathomdb_schema::fts_kind_table_name("Article");
5926 let count: i64 = conn
5928 .query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))
5929 .expect("count");
5930 assert_eq!(count, 0, "new table must be empty");
5931
5932 let title_col = fathomdb_schema::fts_column_name("$.title", false);
5934 let payload_col = fathomdb_schema::fts_column_name("$.payload", true);
5935 conn.execute(
5936 &format!(
5937 "INSERT INTO {table} (node_logical_id, {title_col}, {payload_col}) VALUES ('id1', 'hello', 'world')"
5938 ),
5939 [],
5940 )
5941 .expect("insert with per-spec columns must succeed");
5942 }
5943
5944 #[test]
5945 fn create_or_replace_drops_and_recreates() {
5946 use super::create_or_replace_fts_kind_table;
5947 let (db, _service) = setup();
5948 let conn = sqlite::open_connection(db.path()).expect("conn");
5949
5950 let specs_v1 = vec![FtsPropertyPathSpec::scalar("$.title")];
5952 create_or_replace_fts_kind_table(
5953 &conn,
5954 "Post",
5955 &specs_v1,
5956 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
5957 )
5958 .expect("create v1");
5959
5960 let specs_v2 = vec![
5962 FtsPropertyPathSpec::scalar("$.title"),
5963 FtsPropertyPathSpec::scalar("$.summary"),
5964 ];
5965 create_or_replace_fts_kind_table(
5966 &conn,
5967 "Post",
5968 &specs_v2,
5969 fathomdb_schema::DEFAULT_FTS_TOKENIZER,
5970 )
5971 .expect("create v2");
5972
5973 let table = fathomdb_schema::fts_kind_table_name("Post");
5975 let summary_col = fathomdb_schema::fts_column_name("$.summary", false);
5976 conn.execute(
5977 &format!("INSERT INTO {table} (node_logical_id, {summary_col}) VALUES ('id1', 'text')"),
5978 [],
5979 )
5980 .expect("second layout must allow summary column");
5981 }
5982
5983 #[test]
5984 fn create_or_replace_invalid_tokenizer() {
5985 use super::create_or_replace_fts_kind_table;
5986 let (db, _service) = setup();
5987 let conn = sqlite::open_connection(db.path()).expect("conn");
5988 let specs = vec![FtsPropertyPathSpec::scalar("$.title")];
5989 let result = create_or_replace_fts_kind_table(&conn, "Post", &specs, "'; DROP TABLE --");
5990 assert!(result.is_err(), "invalid tokenizer must be rejected");
5991 let err_msg = result
5992 .expect_err("invalid tokenizer must be rejected")
5993 .to_string();
5994 assert!(
5995 err_msg.contains("tokenizer"),
5996 "error must mention tokenizer: {err_msg}"
5997 );
5998 }
5999
6000 #[test]
6001 fn register_with_weights_creates_per_column_table() {
6002 let (db, service) = setup();
6003 let entries = vec![
6004 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6005 FtsPropertyPathSpec::scalar("$.body"),
6006 ];
6007 service
6008 .register_fts_property_schema_with_entries(
6009 "Article",
6010 &entries,
6011 None,
6012 &[],
6013 crate::rebuild_actor::RebuildMode::Eager,
6014 )
6015 .expect("register");
6016
6017 let conn = sqlite::open_connection(db.path()).expect("conn");
6019 let table = fathomdb_schema::fts_kind_table_name("Article");
6020 let title_col = fathomdb_schema::fts_column_name("$.title", false);
6021 let body_col = fathomdb_schema::fts_column_name("$.body", false);
6022 conn.execute(
6024 &format!(
6025 "INSERT INTO {table} (node_logical_id, {title_col}, {body_col}) VALUES ('art-1', 'hello', 'world')"
6026 ),
6027 [],
6028 )
6029 .expect("per-spec columns must exist after registration with weights");
6030 }
6031
6032 #[test]
6033 fn weighted_to_unweighted_downgrade_recreates_table() {
6034 let (db, service) = setup();
6035
6036 let weighted_entries = vec![
6038 FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6039 FtsPropertyPathSpec::scalar("$.body"),
6040 ];
6041 service
6042 .register_fts_property_schema_with_entries(
6043 "Article",
6044 &weighted_entries,
6045 None,
6046 &[],
6047 crate::rebuild_actor::RebuildMode::Eager,
6048 )
6049 .expect("register weighted");
6050
6051 let unweighted_entries = vec![
6053 FtsPropertyPathSpec::scalar("$.title"),
6054 FtsPropertyPathSpec::scalar("$.body"),
6055 ];
6056 service
6057 .register_fts_property_schema_with_entries(
6058 "Article",
6059 &unweighted_entries,
6060 None,
6061 &[],
6062 crate::rebuild_actor::RebuildMode::Eager,
6063 )
6064 .expect("re-register unweighted");
6065
6066 let conn = sqlite::open_connection(db.path()).expect("conn");
6069 let table = fathomdb_schema::fts_kind_table_name("Article");
6070 let result = conn.execute(
6071 &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('art-1', 'hello world')"),
6072 [],
6073 );
6074 assert!(
6075 result.is_ok(),
6076 "text_content column must exist after weighted-to-unweighted downgrade"
6077 );
6078 }
6079
6080 #[test]
6083 fn set_get_fts_profile_roundtrip() {
6084 let (_db, service) = setup();
6085 let profile = service
6086 .set_fts_profile("book", "unicode61")
6087 .expect("set_fts_profile");
6088 assert_eq!(profile.kind, "book");
6089 assert_eq!(profile.tokenizer, "unicode61");
6090
6091 let got = service
6092 .get_fts_profile("book")
6093 .expect("get_fts_profile")
6094 .expect("should be Some");
6095 assert_eq!(got.kind, "book");
6096 assert_eq!(got.tokenizer, "unicode61");
6097 }
6098
6099 #[test]
6100 fn fts_profile_upsert() {
6101 let (_db, service) = setup();
6102 service
6103 .set_fts_profile("article", "unicode61")
6104 .expect("first set");
6105 service
6106 .set_fts_profile("article", "porter unicode61 remove_diacritics 2")
6107 .expect("second set");
6108 let got = service
6109 .get_fts_profile("article")
6110 .expect("get")
6111 .expect("Some");
6112 assert_eq!(got.tokenizer, "porter unicode61 remove_diacritics 2");
6113 }
6114
6115 #[test]
6116 fn invalid_tokenizer_rejected() {
6117 let (_db, service) = setup();
6118 let result = service.set_fts_profile("book", "'; DROP TABLE nodes --");
6119 assert!(result.is_err(), "invalid tokenizer must be rejected");
6120 let msg = result.expect_err("must be Err").to_string();
6121 assert!(
6122 msg.contains("tokenizer") || msg.contains("invalid"),
6123 "error must mention tokenizer or invalid: {msg}"
6124 );
6125 }
6126
6127 #[test]
6128 fn preset_recall_optimized_english() {
6129 assert_eq!(
6130 super::resolve_tokenizer_preset("recall-optimized-english"),
6131 "porter unicode61 remove_diacritics 2"
6132 );
6133 }
6134
6135 #[test]
6136 fn preset_precision_optimized() {
6137 assert_eq!(
6138 super::resolve_tokenizer_preset("precision-optimized"),
6139 "unicode61 remove_diacritics 2"
6140 );
6141 }
6142
6143 #[test]
6144 fn preset_global_cjk() {
6145 assert_eq!(super::resolve_tokenizer_preset("global-cjk"), "icu");
6146 }
6147
6148 #[test]
6149 fn preset_substring_trigram() {
6150 assert_eq!(
6151 super::resolve_tokenizer_preset("substring-trigram"),
6152 "trigram"
6153 );
6154 }
6155
6156 #[test]
6157 fn preset_source_code() {
6158 assert_eq!(
6159 super::resolve_tokenizer_preset("source-code"),
6160 "unicode61 tokenchars '._-$@'"
6161 );
6162 }
6163
6164 #[test]
6165 fn preview_fts_row_count() {
6166 let (db, service) = setup();
6167 {
6168 let conn = sqlite::open_connection(db.path()).expect("conn");
6169 for i in 0..5u32 {
6170 conn.execute(
6171 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6172 VALUES (?1, ?2, 'book', '{}', 100, 'src')",
6173 rusqlite::params![format!("r{i}"), format!("lg{i}")],
6174 )
6175 .expect("insert node");
6176 }
6177 conn.execute(
6179 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, superseded_at) \
6180 VALUES ('r99', 'lg99', 'book', '{}', 100, 'src', 200)",
6181 [],
6182 )
6183 .expect("insert superseded");
6184 }
6185 let impact = service
6186 .preview_projection_impact("book", "fts")
6187 .expect("preview");
6188 assert_eq!(impact.rows_to_rebuild, 5);
6189 }
6190
6191 #[test]
6192 fn preview_populates_current_tokenizer() {
6193 let (_db, service) = setup();
6194 service
6195 .set_fts_profile("doc", "trigram")
6196 .expect("set profile");
6197 let impact = service
6198 .preview_projection_impact("doc", "fts")
6199 .expect("preview");
6200 assert_eq!(impact.current_tokenizer, Some("trigram".to_owned()));
6201 assert_eq!(impact.target_tokenizer, None);
6202 }
6203
6204 #[test]
6207 fn create_or_replace_source_code_tokenizer_is_accepted() {
6208 use super::create_or_replace_fts_kind_table;
6212 let (db, _service) = setup();
6213 let conn = sqlite::open_connection(db.path()).expect("conn");
6214 let specs = vec![FtsPropertyPathSpec::scalar("$.symbol")];
6215 let source_code_tokenizer = "unicode61 tokenchars '._-$@'";
6216 let result =
6217 create_or_replace_fts_kind_table(&conn, "Symbol", &specs, source_code_tokenizer);
6218 assert!(
6219 result.is_ok(),
6220 "source-code tokenizer string must be accepted by create_or_replace_fts_kind_table: {:?}",
6221 result.err()
6222 );
6223 }
6224
6225 #[test]
6226 fn source_code_profile_round_trip_through_register_fts_schema() {
6227 let db = tempfile::NamedTempFile::new().expect("temp file");
6232 let schema = Arc::new(fathomdb_schema::SchemaManager::new());
6233
6234 {
6236 let _coord = crate::ExecutionCoordinator::open(
6237 db.path(),
6238 Arc::clone(&schema),
6239 None,
6240 1,
6241 Arc::new(crate::TelemetryCounters::default()),
6242 None,
6243 )
6244 .expect("coordinator opens for bootstrap");
6245 }
6246
6247 let service = AdminService::new(db.path(), Arc::clone(&schema));
6248
6249 service
6251 .set_fts_profile("Symbol", "source-code")
6252 .expect("set_fts_profile with source-code preset must succeed");
6253
6254 let result = service.register_fts_property_schema("Symbol", &["$.name".to_owned()], None);
6257 assert!(
6258 result.is_ok(),
6259 "register_fts_property_schema must succeed when source-code profile is active: {:?}",
6260 result.err()
6261 );
6262 }
6263
6264 #[cfg(feature = "sqlite-vec")]
6273 #[test]
6274 fn embedder_max_tokens_8192_handles_chunk_exceeding_512_words() {
6275 let long_text = (0..600u32)
6276 .map(|i| format!("word{i}"))
6277 .collect::<Vec<_>>()
6278 .join(" ");
6279
6280 let db = NamedTempFile::new().expect("temp file");
6281 let schema = Arc::new(SchemaManager::new());
6282
6283 {
6284 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6285 schema.bootstrap(&conn).expect("bootstrap");
6286 conn.execute(
6287 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6288 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'src-1')",
6289 [],
6290 )
6291 .expect("insert node");
6292 conn.execute(
6293 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6294 VALUES (?1, 'doc-1', ?2, 100)",
6295 rusqlite::params!["chunk-long", long_text],
6296 )
6297 .expect("insert long chunk");
6298 }
6299
6300 let embedder = LargeContextTestEmbedder::new("long-context-model", 4, 8192);
6301 let service = AdminService::new(db.path(), Arc::clone(&schema));
6302 let report = service
6303 .regenerate_vector_embeddings(
6304 &embedder,
6305 &VectorRegenerationConfig {
6306 kind: "Document".to_owned(),
6307 profile: "default".to_owned(),
6308 chunking_policy: "per_chunk".to_owned(),
6309 preprocessing_policy: "trim".to_owned(),
6310 },
6311 )
6312 .expect("regenerate with long-context embedder");
6313
6314 assert_eq!(
6315 report.total_chunks, 1,
6316 "600-word text pre-written as one chunk must result in exactly one embedded row"
6317 );
6318 assert_eq!(report.regenerated_rows, 1);
6319 assert_eq!(
6320 embedder.max_tokens(),
6321 8192,
6322 "embedder must advertise 8192 token capacity"
6323 );
6324 }
6325
6326 #[cfg(feature = "sqlite-vec")]
6328 #[derive(Debug)]
6329 struct LargeContextTestEmbedder {
6330 identity: QueryEmbedderIdentity,
6331 vector: Vec<f32>,
6332 max_tokens: usize,
6333 }
6334
6335 #[cfg(feature = "sqlite-vec")]
6336 impl LargeContextTestEmbedder {
6337 fn new(model: &str, dimension: usize, max_tokens: usize) -> Self {
6338 Self {
6339 identity: QueryEmbedderIdentity {
6340 model_identity: model.to_owned(),
6341 model_version: "1.0.0".to_owned(),
6342 dimension,
6343 normalization_policy: "l2".to_owned(),
6344 },
6345 vector: vec![1.0; dimension],
6346 max_tokens,
6347 }
6348 }
6349 }
6350
6351 #[cfg(feature = "sqlite-vec")]
6352 impl QueryEmbedder for LargeContextTestEmbedder {
6353 fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
6354 Ok(self.vector.clone())
6355 }
6356 fn identity(&self) -> QueryEmbedderIdentity {
6357 self.identity.clone()
6358 }
6359 fn max_tokens(&self) -> usize {
6360 self.max_tokens
6361 }
6362 }
6363
6364 #[cfg(feature = "sqlite-vec")]
6368 #[test]
6369 #[allow(clippy::too_many_lines)]
6370 fn regenerate_vector_embeddings_in_process_writes_contract_and_vec_rows() {
6371 let db = NamedTempFile::new().expect("temp file");
6372 let schema = Arc::new(SchemaManager::new());
6373
6374 {
6375 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6376 schema.bootstrap(&conn).expect("bootstrap");
6377 for (row_id, logical_id, created_at, src) in [
6378 ("r1", "node-1", 100, "src1"),
6379 ("r2", "node-2", 101, "src2"),
6380 ("r3", "node-3", 102, "src3"),
6381 ] {
6382 conn.execute(
6383 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6384 VALUES (?1, ?2, 'Doc', '{}', ?3, ?4)",
6385 rusqlite::params![row_id, logical_id, created_at, src],
6386 )
6387 .expect("insert node");
6388 }
6389 for (chunk_id, node_id, text, created_at) in [
6390 ("c1", "node-1", "first document text", 100),
6391 ("c2", "node-2", "second document text", 101),
6392 ("c3", "node-3", "third document text", 102),
6393 ] {
6394 conn.execute(
6395 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6396 VALUES (?1, ?2, ?3, ?4)",
6397 rusqlite::params![chunk_id, node_id, text, created_at],
6398 )
6399 .expect("insert chunk");
6400 }
6401 }
6402
6403 let service = AdminService::new(db.path(), Arc::clone(&schema));
6404 let embedder = TestEmbedder::new("batch-test-model", 4);
6405 let config = VectorRegenerationConfig {
6406 kind: "Doc".to_owned(),
6407 profile: "default".to_owned(),
6408 chunking_policy: "per_chunk".to_owned(),
6409 preprocessing_policy: "trim".to_owned(),
6410 };
6411 let report = service
6412 .regenerate_vector_embeddings_in_process(&embedder, &config)
6413 .expect("in-process regen must succeed");
6414
6415 assert_eq!(report.total_chunks, 3);
6416 assert_eq!(report.regenerated_rows, 3);
6417 assert!(report.contract_persisted);
6418
6419 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6420 let vec_count: i64 = conn
6421 .query_row("SELECT count(*) FROM vec_doc", [], |row| row.get(0))
6422 .expect("vec_doc count");
6423 assert_eq!(vec_count, 3, "one vec row per chunk");
6424
6425 let model_identity: String = conn
6426 .query_row(
6427 "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
6428 [],
6429 |row| row.get(0),
6430 )
6431 .expect("contract row");
6432 assert_eq!(model_identity, "batch-test-model");
6433 }
6434
6435 #[cfg(feature = "sqlite-vec")]
6438 #[test]
6439 #[allow(clippy::too_many_lines)]
6440 fn regenerate_vector_embeddings_targets_per_kind_table() {
6441 let db = NamedTempFile::new().expect("temp file");
6442 let schema = Arc::new(SchemaManager::new());
6443
6444 {
6445 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6446 schema.bootstrap(&conn).expect("bootstrap");
6447 conn.execute(
6448 "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6449 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
6450 [],
6451 )
6452 .expect("insert node");
6453 conn.execute(
6454 "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6455 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
6456 [],
6457 )
6458 .expect("insert chunk");
6459 }
6460
6461 let service = AdminService::new(db.path(), Arc::clone(&schema));
6462 let embedder = TestEmbedder::new("test-model", 4);
6463 let report = service
6464 .regenerate_vector_embeddings(
6465 &embedder,
6466 &VectorRegenerationConfig {
6467 kind: "Document".to_owned(),
6468 profile: "default".to_owned(),
6469 chunking_policy: "per_chunk".to_owned(),
6470 preprocessing_policy: "trim".to_owned(),
6471 },
6472 )
6473 .expect("regenerate vectors");
6474
6475 assert_eq!(report.table_name, "vec_document");
6476 assert_eq!(report.regenerated_rows, 1);
6477
6478 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6479 let vec_count: i64 = conn
6480 .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
6481 .expect("vec_document count");
6482 assert_eq!(vec_count, 1, "rows must be in vec_document");
6483
6484 let old_count: i64 = conn
6485 .query_row(
6486 "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
6487 [],
6488 |r| r.get(0),
6489 )
6490 .expect("sqlite_master check");
6491 assert_eq!(
6492 old_count, 0,
6493 "vec_nodes_active must NOT be created for per-kind regen"
6494 );
6495 }
6496
6497 #[test]
6500 fn get_vec_profile_returns_none_when_no_profile_exists() {
6501 let (db, service) = setup();
6502 let _ = db;
6503 let result = service.get_vec_profile("MyKind").expect("should not error");
6504 assert!(
6505 result.is_none(),
6506 "must return None when no profile registered"
6507 );
6508 }
6509
6510 #[cfg(feature = "sqlite-vec")]
6511 #[test]
6512 fn get_vec_profile_returns_profile_for_registered_kind() {
6513 let db = NamedTempFile::new().expect("temp file");
6514 let schema = Arc::new(SchemaManager::new());
6515 {
6516 let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6517 schema.bootstrap(&conn).expect("bootstrap");
6518 schema
6519 .ensure_vec_kind_profile(&conn, "MyKind", 128)
6520 .expect("ensure_vec_kind_profile");
6521 }
6522
6523 let service = AdminService::new(db.path(), Arc::clone(&schema));
6524 let profile = service.get_vec_profile("MyKind").expect("should not error");
6525 assert!(profile.is_some(), "must return profile after registration");
6526 assert_eq!(profile.unwrap().dimensions, 128);
6527 }
6528
6529 #[test]
6530 fn get_vec_profile_does_not_return_global_sentinel_row() {
6531 let (db, service) = setup();
6532 {
6533 let conn = sqlite::open_connection(db.path()).expect("conn");
6534 conn.execute(
6535 "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
6536 VALUES ('*', 'vec', '{\"model_identity\":\"old-model\",\"dimensions\":384}', 0, 0)",
6537 [],
6538 )
6539 .expect("insert global sentinel");
6540 }
6541 let result = service
6542 .get_vec_profile("SomeKind")
6543 .expect("should not error");
6544 assert!(
6545 result.is_none(),
6546 "per-kind query must not return global ('*', 'vec') row"
6547 );
6548 }
6549}