Skip to main content

fathomdb_engine/admin/
mod.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5use serde::{Deserialize, Serialize};
6
7use crate::rebuild_actor::{RebuildClient, RebuildMode, RebuildRequest, RebuildSubmit};
8
9use crate::{
10    EngineError, ProjectionRepairReport, ProjectionService, ids::new_id,
11    projection::ProjectionTarget, sqlite,
12};
13
14mod fts;
15mod introspection;
16mod operational;
17mod provenance;
18mod vector;
19
20pub use introspection::{
21    Capabilities, CurrentConfig, EmbedderCapability, EmbeddingProfileSummary, FtsKindConfig,
22    KindDescription, VecKindConfig, WorkQueueSummary,
23};
24pub(crate) use vector::canonical_chunk_hash;
25pub use vector::{
26    ConfigureEmbeddingOutcome, ConfigureVecOutcome, VecIndexStatus, VectorSource,
27    load_vector_regeneration_config,
28};
29
30#[cfg(test)]
31use fts::{
32    create_or_replace_fts_kind_table, serialize_property_paths_json, validate_fts_property_paths,
33};
34
35/// Results of a physical and structural integrity check on the database.
36#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
37pub struct IntegrityReport {
38    pub physical_ok: bool,
39    pub foreign_keys_ok: bool,
40    pub missing_fts_rows: usize,
41    pub missing_property_fts_rows: usize,
42    pub duplicate_active_logical_ids: usize,
43    pub operational_missing_collections: usize,
44    pub operational_missing_last_mutations: usize,
45    pub warnings: Vec<String>,
46}
47
48/// A registered FTS property projection schema for a node kind.
49#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
50pub struct FtsPropertySchemaRecord {
51    /// The node kind this schema applies to.
52    pub kind: String,
53    /// Flat display list of registered JSON property paths
54    /// (e.g. `["$.name", "$.title"]`). For recursive entries this lists
55    /// only the root path; mode information is carried by
56    /// [`Self::entries`].
57    pub property_paths: Vec<String>,
58    /// Full per-entry schema shape with mode
59    /// ([`FtsPropertyPathMode::Scalar`] | [`FtsPropertyPathMode::Recursive`]).
60    /// Read this field for mode-accurate round-trip of the registered
61    /// schema.
62    pub entries: Vec<FtsPropertyPathSpec>,
63    /// Subtree paths excluded from recursive walks. Empty for
64    /// scalar-only schemas or recursive schemas with no exclusions.
65    pub exclude_paths: Vec<String>,
66    /// Separator used when concatenating extracted values.
67    pub separator: String,
68    /// Schema format version.
69    pub format_version: i64,
70}
71
72/// Extraction mode for a single registered FTS property path.
73#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
74#[serde(rename_all = "snake_case")]
75pub enum FtsPropertyPathMode {
76    /// Resolve the path and append the scalar value(s). Matches legacy
77    /// pre-Phase-4 behaviour.
78    #[default]
79    Scalar,
80    /// Recursively walk every scalar leaf rooted at the path. Each leaf
81    /// contributes one entry to the position map.
82    Recursive,
83}
84
85/// A single registered property-FTS path with its extraction mode.
86#[non_exhaustive]
87#[derive(Clone, Debug, PartialEq, Serialize)]
88pub struct FtsPropertyPathSpec {
89    /// JSON path to the property (must start with `$.`).
90    pub path: String,
91    /// Whether to treat this path as a scalar or recursively walk it.
92    pub mode: FtsPropertyPathMode,
93    /// Optional BM25 weight multiplier for this path (1.0 = default).
94    /// Must satisfy `0.0 < weight <= 1000.0` when set.
95    pub weight: Option<f32>,
96}
97
98// f32 does not implement Eq (due to NaN), but weights in practice are
99// always finite values set by callers, so reflexivity holds.
100impl Eq for FtsPropertyPathSpec {}
101
102impl FtsPropertyPathSpec {
103    #[must_use]
104    pub fn scalar(path: impl Into<String>) -> Self {
105        Self {
106            path: path.into(),
107            mode: FtsPropertyPathMode::Scalar,
108            weight: None,
109        }
110    }
111
112    #[must_use]
113    pub fn recursive(path: impl Into<String>) -> Self {
114        Self {
115            path: path.into(),
116            mode: FtsPropertyPathMode::Recursive,
117            weight: None,
118        }
119    }
120
121    /// Set the BM25 weight multiplier for this path.
122    ///
123    /// The weight must satisfy `0.0 < weight <= 1000.0` at registration
124    /// time; this builder method does not validate — validation happens in
125    /// `register_fts_property_schema_with_entries`.
126    #[must_use]
127    pub fn with_weight(mut self, weight: f32) -> Self {
128        self.weight = Some(weight);
129        self
130    }
131}
132
133/// Options controlling how a safe database export is performed.
134#[derive(Clone, Copy, Debug)]
135pub struct SafeExportOptions {
136    /// When true, runs `PRAGMA wal_checkpoint(FULL)` before copying and fails if
137    /// any WAL frames could not be applied (busy != 0). Set to false only in
138    /// tests that seed a database without WAL mode.
139    pub force_checkpoint: bool,
140}
141
142impl Default for SafeExportOptions {
143    fn default() -> Self {
144        Self {
145            force_checkpoint: true,
146        }
147    }
148}
149
150// Must match PROTOCOL_VERSION in fathomdb-admin-bridge.rs
151const EXPORT_PROTOCOL_VERSION: u32 = 1;
152
153/// Manifest describing a completed safe export.
154#[derive(Clone, Debug, Serialize)]
155pub struct SafeExportManifest {
156    /// Unix timestamp (seconds since epoch) when the export was created.
157    pub exported_at: u64,
158    /// SHA-256 hex digest of the exported database file.
159    pub sha256: String,
160    /// Schema version recorded in `fathom_schema_migrations` at export time.
161    pub schema_version: u32,
162    /// Bridge protocol version compiled into this binary.
163    pub protocol_version: u32,
164    /// Number of `SQLite` pages in the exported database file.
165    pub page_count: u64,
166}
167
168/// Report from tracing all rows associated with a given `source_ref`.
169#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
170pub struct TraceReport {
171    pub source_ref: String,
172    pub node_rows: usize,
173    pub edge_rows: usize,
174    pub action_rows: usize,
175    pub operational_mutation_rows: usize,
176    pub node_logical_ids: Vec<String>,
177    pub action_ids: Vec<String>,
178    pub operational_mutation_ids: Vec<String>,
179}
180
181/// An edge that was skipped during a restore because an endpoint is missing.
182#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
183pub struct SkippedEdge {
184    pub edge_logical_id: String,
185    pub missing_endpoint: String,
186}
187
188/// Report from restoring a retired logical ID back to active state.
189#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
190pub struct LogicalRestoreReport {
191    pub logical_id: String,
192    pub was_noop: bool,
193    pub restored_node_rows: usize,
194    pub restored_edge_rows: usize,
195    pub restored_chunk_rows: usize,
196    pub restored_fts_rows: usize,
197    pub restored_property_fts_rows: usize,
198    pub restored_vec_rows: usize,
199    pub skipped_edges: Vec<SkippedEdge>,
200    pub notes: Vec<String>,
201}
202
203/// Report from permanently purging all rows for a logical ID.
204#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
205pub struct LogicalPurgeReport {
206    pub logical_id: String,
207    pub was_noop: bool,
208    pub deleted_node_rows: usize,
209    pub deleted_edge_rows: usize,
210    pub deleted_chunk_rows: usize,
211    pub deleted_fts_rows: usize,
212    pub deleted_vec_rows: usize,
213    pub notes: Vec<String>,
214}
215
216/// Options controlling provenance event purging behavior.
217#[derive(Clone, Debug, Serialize, Deserialize)]
218pub struct ProvenancePurgeOptions {
219    pub dry_run: bool,
220    #[serde(default)]
221    pub preserve_event_types: Vec<String>,
222}
223
224/// Report from a provenance event purge operation.
225#[derive(Clone, Debug, Serialize)]
226pub struct ProvenancePurgeReport {
227    pub events_deleted: u64,
228    pub events_preserved: u64,
229    pub oldest_remaining: Option<i64>,
230}
231
232/// Service providing administrative operations (integrity checks, exports, restores, purges).
233#[derive(Debug)]
234pub struct AdminService {
235    pub(super) database_path: PathBuf,
236    pub(super) schema_manager: Arc<SchemaManager>,
237    pub(super) projections: ProjectionService,
238    /// Client side of the rebuild actor's channel.  `None` when the engine
239    /// was opened without a rebuild actor (e.g. in tests that use
240    /// [`AdminService::new`] directly).
241    pub(super) rebuild_client: Option<RebuildClient>,
242    /// Shared handle to the writer actor.  `None` when the admin service is
243    /// constructed outside a full engine (unit tests).  Required by the
244    /// vector projection drain surface.
245    pub(super) writer: Option<Arc<crate::WriterActor>>,
246}
247
248/// Results of a semantic consistency check on the graph data.
249#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
250pub struct SemanticReport {
251    /// Chunks whose `node_logical_id` has no active node.
252    pub orphaned_chunks: usize,
253    /// Active nodes with a NULL `source_ref` (loss of provenance).
254    pub null_source_ref_nodes: usize,
255    /// Steps referencing a `run_id` that does not exist in the runs table.
256    pub broken_step_fk: usize,
257    /// Actions referencing a `step_id` that does not exist in the steps table.
258    pub broken_action_fk: usize,
259    /// FTS rows whose `chunk_id` does not exist in the chunks table.
260    pub stale_fts_rows: usize,
261    /// FTS rows whose node has been superseded (`superseded_at` IS NOT NULL on all active rows).
262    pub fts_rows_for_superseded_nodes: usize,
263    /// Property FTS rows whose node has been superseded or does not exist.
264    pub stale_property_fts_rows: usize,
265    /// Property FTS rows whose kind has no registered FTS property schema.
266    pub orphaned_property_fts_rows: usize,
267    /// Property FTS rows whose `kind` does not match the active node's actual kind.
268    pub mismatched_kind_property_fts_rows: usize,
269    /// Active logical IDs with more than one per-kind FTS property row.
270    pub duplicate_property_fts_rows: usize,
271    /// Property FTS rows whose `text_content` no longer matches the canonical extraction.
272    pub drifted_property_fts_rows: usize,
273    /// Active edges where at least one endpoint has no active node.
274    pub dangling_edges: usize,
275    /// `logical_ids` where every version has been superseded (no active row).
276    pub orphaned_supersession_chains: usize,
277    /// Vec rows whose backing chunk no longer exists in the chunks table.
278    pub stale_vec_rows: usize,
279    /// Compatibility counter for vec rows whose chunk points at missing node history.
280    pub vec_rows_for_superseded_nodes: usize,
281    /// Latest-state keys whose latest mutation is a `put` but no current row exists.
282    pub missing_operational_current_rows: usize,
283    /// Current rows that do not match the latest mutation state.
284    pub stale_operational_current_rows: usize,
285    /// Mutations written after the owning collection was disabled.
286    pub disabled_collection_mutations: usize,
287    /// Access metadata rows whose `logical_id` no longer has any node history.
288    pub orphaned_last_access_metadata_rows: usize,
289    pub warnings: Vec<String>,
290}
291
292/// Configuration for regenerating vector embeddings.
293///
294/// 0.4.0 architectural invariant: vector identity is the embedder's
295/// responsibility, not the regeneration config's. This struct carries only
296/// WHERE the vectors live and HOW to chunk/preprocess them — never WHAT
297/// model produced them. The embedder supplied at regen-call time is the
298/// single source of truth for `model_identity`, `model_version`,
299/// `dimension`, and `normalization_policy`; the resulting vector profile
300/// is stamped directly from [`QueryEmbedder::identity`].
301///
302/// 0.5.0 breaking change: `table_name` is removed. The vec table name is now
303/// derived from `kind` via [`fathomdb_schema::vec_kind_table_name`].
304#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
305#[serde(rename_all = "snake_case", deny_unknown_fields)]
306pub struct VectorRegenerationConfig {
307    pub kind: String,
308    pub profile: String,
309    pub chunking_policy: String,
310    pub preprocessing_policy: String,
311}
312
313/// Report from a vector embedding regeneration run.
314#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
315pub struct VectorRegenerationReport {
316    pub profile: String,
317    pub table_name: String,
318    pub dimension: usize,
319    pub total_chunks: usize,
320    pub regenerated_rows: usize,
321    pub contract_persisted: bool,
322    pub notes: Vec<String>,
323}
324
325/// Stored FTS tokenizer profile for a node kind.
326///
327/// Created and updated by [`AdminService::set_fts_profile`].
328#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
329pub struct FtsProfile {
330    /// Node kind this profile applies to (e.g. `"Article"`).
331    pub kind: String,
332    /// FTS5 tokenizer string (e.g. `"porter unicode61 remove_diacritics 2"`).
333    pub tokenizer: String,
334    /// Unix timestamp when the profile was last activated, or `None` if never.
335    pub active_at: Option<i64>,
336    /// Unix timestamp when the profile row was first created.
337    pub created_at: i64,
338}
339
340/// Stored vector embedding profile (global, kind-agnostic).
341///
342/// Created and updated by [`AdminService::set_vec_profile`].
343#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
344pub struct VecProfile {
345    /// Identifier for the embedding model (e.g. `"openai/text-embedding-3-small"`).
346    pub model_identity: String,
347    /// Optional version string for the model.
348    pub model_version: Option<String>,
349    /// Number of dimensions produced by the model.
350    pub dimensions: u32,
351    /// Unix timestamp when the profile was last activated, or `None` if never.
352    pub active_at: Option<i64>,
353    /// Unix timestamp when the profile row was first created.
354    pub created_at: i64,
355}
356
357/// Estimated cost of rebuilding a projection (FTS table or vector embeddings).
358///
359/// Returned by [`AdminService::preview_projection_impact`].
360#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
361pub struct ProjectionImpact {
362    /// Number of rows that would be processed during a full rebuild.
363    pub rows_to_rebuild: u64,
364    /// Rough estimated rebuild time in seconds.
365    pub estimated_seconds: u64,
366    /// Estimated temporary disk space required during rebuild, in bytes.
367    pub temp_db_size_bytes: u64,
368    /// The tokenizer currently stored in `projection_profiles`, if any.
369    pub current_tokenizer: Option<String>,
370    /// Reserved for future use; always `None` currently.
371    pub target_tokenizer: Option<String>,
372}
373
374/// Well-known tokenizer preset names mapped to their FTS5 tokenizer strings.
375pub const TOKENIZER_PRESETS: &[(&str, &str)] = &[
376    (
377        "recall-optimized-english",
378        "porter unicode61 remove_diacritics 2",
379    ),
380    ("precision-optimized", "unicode61 remove_diacritics 2"),
381    ("global-cjk", "icu"),
382    ("substring-trigram", "trigram"),
383    ("source-code", "unicode61 tokenchars '._-$@'"),
384];
385
386/// Resolve a tokenizer preset name to its FTS5 tokenizer string.
387///
388/// If `input` matches a known preset name the preset value is returned.
389/// Otherwise `input` is returned unchanged (treated as a raw tokenizer string).
390pub fn resolve_tokenizer_preset(input: &str) -> &str {
391    for (name, value) in TOKENIZER_PRESETS {
392        if *name == input {
393            return value;
394        }
395    }
396    input
397}
398
399pub(super) const CURRENT_VECTOR_CONTRACT_FORMAT_VERSION: i64 = 1;
400pub(super) const MAX_PROFILE_LEN: usize = 128;
401pub(super) const MAX_POLICY_LEN: usize = 128;
402pub(super) const MAX_CONTRACT_JSON_BYTES: usize = 32 * 1024;
403pub(super) const MAX_AUDIT_METADATA_BYTES: usize = 2048;
404const DEFAULT_OPERATIONAL_READ_LIMIT: usize = 100;
405const MAX_OPERATIONAL_READ_LIMIT: usize = 1000;
406
407/// Thread-safe handle to the shared [`AdminService`].
408#[derive(Clone, Debug)]
409pub struct AdminHandle {
410    inner: Arc<AdminService>,
411}
412
413impl AdminHandle {
414    /// Wrap an [`AdminService`] in a shared handle.
415    #[must_use]
416    pub fn new(service: AdminService) -> Self {
417        Self {
418            inner: Arc::new(service),
419        }
420    }
421
422    /// Clone the inner `Arc` to the [`AdminService`].
423    #[must_use]
424    pub fn service(&self) -> Arc<AdminService> {
425        Arc::clone(&self.inner)
426    }
427}
428
429impl AdminService {
430    /// Create a new admin service for the database at the given path.
431    #[must_use]
432    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
433        let database_path = path.as_ref().to_path_buf();
434        let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
435        Self {
436            database_path,
437            schema_manager,
438            projections,
439            rebuild_client: None,
440            writer: None,
441        }
442    }
443
444    /// Create a new admin service wired to the background rebuild actor.
445    #[must_use]
446    pub fn new_with_rebuild(
447        path: impl AsRef<Path>,
448        schema_manager: Arc<SchemaManager>,
449        rebuild_client: RebuildClient,
450    ) -> Self {
451        let database_path = path.as_ref().to_path_buf();
452        let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
453        Self {
454            database_path,
455            schema_manager,
456            projections,
457            rebuild_client: Some(rebuild_client),
458            writer: None,
459        }
460    }
461
462    /// Create a new admin service wired to the rebuild actor AND a shared
463    /// writer-actor handle.  Used by [`crate::runtime::EngineRuntime::open`]
464    /// so admin surfaces that require writer-thread serialization (e.g.
465    /// vector projection drain) can submit batches safely.
466    #[must_use]
467    pub fn new_with_engine(
468        path: impl AsRef<Path>,
469        schema_manager: Arc<SchemaManager>,
470        rebuild_client: RebuildClient,
471        writer: Arc<crate::WriterActor>,
472    ) -> Self {
473        let database_path = path.as_ref().to_path_buf();
474        let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
475        Self {
476            database_path,
477            schema_manager,
478            projections,
479            rebuild_client: Some(rebuild_client),
480            writer: Some(writer),
481        }
482    }
483
484    pub(super) fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
485        #[cfg(feature = "sqlite-vec")]
486        let conn = sqlite::open_connection_with_vec(&self.database_path)?;
487        #[cfg(not(feature = "sqlite-vec"))]
488        let conn = sqlite::open_connection(&self.database_path)?;
489        self.schema_manager.bootstrap(&conn)?;
490        Ok(conn)
491    }
492
493    /// # Errors
494    /// Returns [`EngineError`] if the database connection fails or any SQL query fails.
495    pub fn check_integrity(&self) -> Result<IntegrityReport, EngineError> {
496        let conn = self.connect()?;
497
498        let physical_result: String =
499            conn.query_row("PRAGMA integrity_check", [], |row| row.get(0))?;
500        let foreign_key_count: i64 =
501            conn.query_row("SELECT count(*) FROM pragma_foreign_key_check", [], |row| {
502                row.get(0)
503            })?;
504        let missing_fts_rows: i64 = conn.query_row(
505            r"
506            SELECT count(*)
507            FROM chunks c
508            JOIN nodes n
509              ON n.logical_id = c.node_logical_id
510             AND n.superseded_at IS NULL
511            WHERE NOT EXISTS (
512                SELECT 1
513                FROM fts_nodes f
514                WHERE f.chunk_id = c.id
515            )
516            ",
517            [],
518            |row| row.get(0),
519        )?;
520        let duplicate_active: i64 = conn.query_row(
521            r"
522            SELECT count(*)
523            FROM (
524                SELECT logical_id
525                FROM nodes
526                WHERE superseded_at IS NULL
527                GROUP BY logical_id
528                HAVING count(*) > 1
529            )
530            ",
531            [],
532            |row| row.get(0),
533        )?;
534        let operational_missing_collections: i64 = conn.query_row(
535            r"
536            SELECT (
537                SELECT count(*)
538                FROM operational_mutations m
539                LEFT JOIN operational_collections c ON c.name = m.collection_name
540                WHERE c.name IS NULL
541            ) + (
542                SELECT count(*)
543                FROM operational_current oc
544                LEFT JOIN operational_collections c ON c.name = oc.collection_name
545                WHERE c.name IS NULL
546            )
547            ",
548            [],
549            |row| row.get(0),
550        )?;
551        let operational_missing_last_mutations: i64 = conn.query_row(
552            r"
553            SELECT count(*)
554            FROM operational_current oc
555            LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
556            WHERE m.id IS NULL
557            ",
558            [],
559            |row| row.get(0),
560        )?;
561
562        // Count missing property FTS rows using the same extraction logic as
563        // write/rebuild. A pure-SQL check would overcount: nodes whose declared
564        // paths legitimately normalize to no values correctly have no row.
565        let missing_property_fts_rows = count_missing_property_fts_rows(&conn)?;
566
567        let mut warnings = Vec::new();
568        if missing_fts_rows > 0 {
569            warnings.push("missing FTS projections detected".to_owned());
570        }
571        if missing_property_fts_rows > 0 {
572            warnings.push("missing property FTS projections detected".to_owned());
573        }
574        if duplicate_active > 0 {
575            warnings.push("duplicate active logical_ids detected".to_owned());
576        }
577        if operational_missing_collections > 0 {
578            warnings.push("operational rows reference missing collections".to_owned());
579        }
580        if operational_missing_last_mutations > 0 {
581            warnings.push("operational current rows reference missing last mutations".to_owned());
582        }
583        warnings.extend(projection_table_collision_warnings(&conn)?);
584
585        // FIX(review): was `as usize` — unsound on 32-bit targets, wraps negatives silently.
586        // Options: (A) try_from().unwrap_or(0) — masks corruption, (B) try_from().expect() —
587        // panics on corruption, (C) propagate error. Chose (B) here: a negative count(*)
588        // signals data corruption, and the integrity report would be meaningless anyway.
589        Ok(IntegrityReport {
590            physical_ok: physical_result == "ok",
591            foreign_keys_ok: foreign_key_count == 0,
592            missing_fts_rows: i64_to_usize(missing_fts_rows),
593            missing_property_fts_rows: i64_to_usize(missing_property_fts_rows),
594            duplicate_active_logical_ids: i64_to_usize(duplicate_active),
595            operational_missing_collections: i64_to_usize(operational_missing_collections),
596            operational_missing_last_mutations: i64_to_usize(operational_missing_last_mutations),
597            warnings,
598        })
599    }
600
601    /// # Errors
602    /// Returns [`EngineError`] if the database connection fails or any SQL query fails.
603    #[allow(clippy::too_many_lines)]
604    pub fn check_semantics(&self) -> Result<SemanticReport, EngineError> {
605        let conn = self.connect()?;
606
607        let orphaned_chunks: i64 = conn.query_row(
608            r"
609            SELECT count(*)
610            FROM chunks c
611            WHERE NOT EXISTS (
612                SELECT 1 FROM nodes n
613                WHERE n.logical_id = c.node_logical_id
614            )
615            ",
616            [],
617            |row| row.get(0),
618        )?;
619
620        let null_source_ref_nodes: i64 = conn.query_row(
621            "SELECT count(*) FROM nodes WHERE source_ref IS NULL AND superseded_at IS NULL",
622            [],
623            |row| row.get(0),
624        )?;
625
626        let broken_step_fk: i64 = conn.query_row(
627            r"
628            SELECT count(*) FROM steps s
629            WHERE NOT EXISTS (SELECT 1 FROM runs r WHERE r.id = s.run_id)
630            ",
631            [],
632            |row| row.get(0),
633        )?;
634
635        let broken_action_fk: i64 = conn.query_row(
636            r"
637            SELECT count(*) FROM actions a
638            WHERE NOT EXISTS (SELECT 1 FROM steps s WHERE s.id = a.step_id)
639            ",
640            [],
641            |row| row.get(0),
642        )?;
643
644        let stale_fts_rows: i64 = conn.query_row(
645            r"
646            SELECT count(*) FROM fts_nodes f
647            WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = f.chunk_id)
648            ",
649            [],
650            |row| row.get(0),
651        )?;
652
653        let fts_rows_for_superseded_nodes: i64 = conn.query_row(
654            r"
655            SELECT count(*) FROM fts_nodes f
656            WHERE NOT EXISTS (
657                SELECT 1 FROM nodes n
658                WHERE n.logical_id = f.node_logical_id AND n.superseded_at IS NULL
659            )
660            ",
661            [],
662            |row| row.get(0),
663        )?;
664
665        let (
666            stale_property_fts_rows,
667            orphaned_property_fts_rows,
668            mismatched_kind_property_fts_rows,
669            duplicate_property_fts_rows,
670        ) = count_per_kind_property_fts_issues(&conn)?;
671
672        let drifted_property_fts_rows = count_drifted_property_fts_rows(&conn)?;
673
674        let dangling_edges: i64 = conn.query_row(
675            r"
676            SELECT count(*) FROM edges e
677            WHERE e.superseded_at IS NULL AND (
678                NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.source_logical_id AND n.superseded_at IS NULL)
679                OR
680                NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.target_logical_id AND n.superseded_at IS NULL)
681            )
682            ",
683            [],
684            |row| row.get(0),
685        )?;
686
687        let orphaned_supersession_chains: i64 = conn.query_row(
688            r"
689            SELECT count(*) FROM (
690                SELECT logical_id FROM nodes
691                GROUP BY logical_id
692                HAVING count(*) > 0 AND sum(CASE WHEN superseded_at IS NULL THEN 1 ELSE 0 END) = 0
693            )
694            ",
695            [],
696            |row| row.get(0),
697        )?;
698
699        // Vec stale row detection — iterates per-kind vec tables from projection_profiles.
700        #[cfg(feature = "sqlite-vec")]
701        let (stale_vec_rows, vec_rows_for_superseded_nodes): (i64, i64) = {
702            let kinds: Vec<String> =
703                match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
704                    Ok(mut stmt) => stmt
705                        .query_map([], |row| row.get(0))
706                        .map_err(EngineError::Sqlite)?
707                        .collect::<Result<Vec<_>, _>>()
708                        .map_err(EngineError::Sqlite)?,
709                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
710                        if msg.contains("no such table: projection_profiles") =>
711                    {
712                        vec![]
713                    }
714                    Err(e) => return Err(EngineError::Sqlite(e)),
715                };
716            let mut stale = 0i64;
717            let mut superseded = 0i64;
718            for kind in &kinds {
719                let table = fathomdb_schema::vec_kind_table_name(kind);
720                let stale_sql = format!(
721                    "SELECT count(*) FROM {table} v \
722                     WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = v.chunk_id)"
723                );
724                let superseded_sql = format!(
725                    "SELECT count(*) FROM {table} v \
726                     JOIN chunks c ON c.id = v.chunk_id \
727                     WHERE NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = c.node_logical_id)"
728                );
729                stale += match conn.query_row(&stale_sql, [], |row| row.get(0)) {
730                    Ok(n) => n,
731                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
732                        if msg.contains("no such table:")
733                            || msg.contains("no such module: vec0") =>
734                    {
735                        0
736                    }
737                    Err(e) => return Err(EngineError::Sqlite(e)),
738                };
739                superseded += match conn.query_row(&superseded_sql, [], |row| row.get(0)) {
740                    Ok(n) => n,
741                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
742                        if msg.contains("no such table:")
743                            || msg.contains("no such module: vec0") =>
744                    {
745                        0
746                    }
747                    Err(e) => return Err(EngineError::Sqlite(e)),
748                };
749            }
750            (stale, superseded)
751        };
752        #[cfg(not(feature = "sqlite-vec"))]
753        let stale_vec_rows: i64 = 0;
754        #[cfg(not(feature = "sqlite-vec"))]
755        let vec_rows_for_superseded_nodes: i64 = 0;
756        let missing_operational_current_rows: i64 = conn.query_row(
757            r"
758            SELECT count(*)
759            FROM operational_mutations m
760            JOIN operational_collections c
761              ON c.name = m.collection_name
762             AND c.kind = 'latest_state'
763            WHERE m.op_kind = 'put'
764              AND NOT EXISTS (
765                    SELECT 1
766                    FROM operational_mutations newer
767                    WHERE newer.collection_name = m.collection_name
768                      AND newer.record_key = m.record_key
769                      AND newer.mutation_order > m.mutation_order
770                )
771              AND NOT EXISTS (
772                    SELECT 1
773                    FROM operational_current oc
774                    WHERE oc.collection_name = m.collection_name
775                      AND oc.record_key = m.record_key
776                )
777            ",
778            [],
779            |row| row.get(0),
780        )?;
781        let stale_operational_current_rows: i64 = conn.query_row(
782            r"
783            SELECT count(*)
784            FROM operational_current oc
785            JOIN operational_collections c
786              ON c.name = oc.collection_name
787             AND c.kind = 'latest_state'
788            LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
789            WHERE m.id IS NULL
790               OR m.collection_name != oc.collection_name
791               OR m.record_key != oc.record_key
792               OR m.op_kind != 'put'
793               OR m.payload_json != oc.payload_json
794               OR EXISTS (
795                    SELECT 1
796                    FROM operational_mutations newer
797                    WHERE newer.collection_name = oc.collection_name
798                      AND newer.record_key = oc.record_key
799                      AND newer.mutation_order > m.mutation_order
800                )
801            ",
802            [],
803            |row| row.get(0),
804        )?;
805        let disabled_collection_mutations: i64 = conn.query_row(
806            r"
807            SELECT count(*)
808            FROM operational_mutations m
809            JOIN operational_collections c ON c.name = m.collection_name
810            WHERE c.disabled_at IS NOT NULL AND m.created_at > c.disabled_at
811            ",
812            [],
813            |row| row.get(0),
814        )?;
815        let orphaned_last_access_metadata_rows: i64 = conn.query_row(
816            r"
817            SELECT count(*)
818            FROM node_access_metadata am
819            WHERE NOT EXISTS (
820                SELECT 1 FROM nodes n WHERE n.logical_id = am.logical_id
821            )
822            ",
823            [],
824            |row| row.get(0),
825        )?;
826
827        let mut warnings = Vec::new();
828        if orphaned_chunks > 0 {
829            warnings.push(format!(
830                "{orphaned_chunks} orphaned chunk(s) with no surviving node history"
831            ));
832        }
833        if null_source_ref_nodes > 0 {
834            warnings.push(format!(
835                "{null_source_ref_nodes} active node(s) with null source_ref"
836            ));
837        }
838        if broken_step_fk > 0 {
839            warnings.push(format!(
840                "{broken_step_fk} step(s) referencing non-existent run"
841            ));
842        }
843        if broken_action_fk > 0 {
844            warnings.push(format!(
845                "{broken_action_fk} action(s) referencing non-existent step"
846            ));
847        }
848        if stale_fts_rows > 0 {
849            warnings.push(format!(
850                "{stale_fts_rows} stale FTS row(s) referencing missing chunk"
851            ));
852        }
853        if fts_rows_for_superseded_nodes > 0 {
854            warnings.push(format!(
855                "{fts_rows_for_superseded_nodes} FTS row(s) for superseded node(s)"
856            ));
857        }
858        if stale_property_fts_rows > 0 {
859            warnings.push(format!(
860                "{stale_property_fts_rows} stale property FTS row(s) for superseded/missing node(s)"
861            ));
862        }
863        if orphaned_property_fts_rows > 0 {
864            warnings.push(format!(
865                "{orphaned_property_fts_rows} orphaned property FTS row(s) for unregistered kind(s)"
866            ));
867        }
868        if mismatched_kind_property_fts_rows > 0 {
869            warnings.push(format!(
870                "{mismatched_kind_property_fts_rows} property FTS row(s) whose kind does not match the active node"
871            ));
872        }
873        if duplicate_property_fts_rows > 0 {
874            warnings.push(format!(
875                "{duplicate_property_fts_rows} active logical ID(s) with duplicate property FTS rows"
876            ));
877        }
878        if drifted_property_fts_rows > 0 {
879            warnings.push(format!(
880                "{drifted_property_fts_rows} property FTS row(s) with stale text_content"
881            ));
882        }
883        if dangling_edges > 0 {
884            warnings.push(format!(
885                "{dangling_edges} active edge(s) with missing endpoint node"
886            ));
887        }
888        if orphaned_supersession_chains > 0 {
889            warnings.push(format!(
890                "{orphaned_supersession_chains} logical_id(s) with all versions superseded"
891            ));
892        }
893        if stale_vec_rows > 0 {
894            warnings.push(format!(
895                "{stale_vec_rows} stale vec row(s) referencing missing chunk"
896            ));
897        }
898        if vec_rows_for_superseded_nodes > 0 {
899            warnings.push(format!(
900                "{vec_rows_for_superseded_nodes} vec row(s) whose node history is missing"
901            ));
902        }
903        if missing_operational_current_rows > 0 {
904            warnings.push(format!(
905                "{missing_operational_current_rows} latest-state key(s) missing operational_current rows"
906            ));
907        }
908        if stale_operational_current_rows > 0 {
909            warnings.push(format!(
910                "{stale_operational_current_rows} stale operational_current row(s)"
911            ));
912        }
913        if disabled_collection_mutations > 0 {
914            warnings.push(format!(
915                "{disabled_collection_mutations} mutation(s) were written after collection disable"
916            ));
917        }
918        if orphaned_last_access_metadata_rows > 0 {
919            warnings.push(format!(
920                "{orphaned_last_access_metadata_rows} last_access metadata row(s) reference missing node history"
921            ));
922        }
923
924        Ok(SemanticReport {
925            orphaned_chunks: i64_to_usize(orphaned_chunks),
926            null_source_ref_nodes: i64_to_usize(null_source_ref_nodes),
927            broken_step_fk: i64_to_usize(broken_step_fk),
928            broken_action_fk: i64_to_usize(broken_action_fk),
929            stale_fts_rows: i64_to_usize(stale_fts_rows),
930            fts_rows_for_superseded_nodes: i64_to_usize(fts_rows_for_superseded_nodes),
931            stale_property_fts_rows: i64_to_usize(stale_property_fts_rows),
932            orphaned_property_fts_rows: i64_to_usize(orphaned_property_fts_rows),
933            mismatched_kind_property_fts_rows: i64_to_usize(mismatched_kind_property_fts_rows),
934            duplicate_property_fts_rows: i64_to_usize(duplicate_property_fts_rows),
935            drifted_property_fts_rows: i64_to_usize(drifted_property_fts_rows),
936            dangling_edges: i64_to_usize(dangling_edges),
937            orphaned_supersession_chains: i64_to_usize(orphaned_supersession_chains),
938            stale_vec_rows: i64_to_usize(stale_vec_rows),
939            vec_rows_for_superseded_nodes: i64_to_usize(vec_rows_for_superseded_nodes),
940            missing_operational_current_rows: i64_to_usize(missing_operational_current_rows),
941            stale_operational_current_rows: i64_to_usize(stale_operational_current_rows),
942            disabled_collection_mutations: i64_to_usize(disabled_collection_mutations),
943            orphaned_last_access_metadata_rows: i64_to_usize(orphaned_last_access_metadata_rows),
944            warnings,
945        })
946    }
947}
948
949fn projection_table_collision_warnings(
950    conn: &rusqlite::Connection,
951) -> Result<Vec<String>, EngineError> {
952    let fts_kinds = projection_kinds(conn, "SELECT kind FROM fts_property_schemas")?;
953    let mut warnings = legacy_projection_collision_warnings(
954        "FTS property",
955        &fts_kinds,
956        fathomdb_schema::legacy_fts_kind_table_name,
957        fathomdb_schema::fts_kind_table_name,
958    );
959
960    let vec_kinds = projection_kinds(
961        conn,
962        "SELECT kind FROM projection_profiles WHERE facet = 'vec'",
963    )?;
964    warnings.extend(legacy_projection_collision_warnings(
965        "vector",
966        &vec_kinds,
967        fathomdb_schema::legacy_vec_kind_table_name,
968        fathomdb_schema::vec_kind_table_name,
969    ));
970    Ok(warnings)
971}
972
973fn projection_kinds(conn: &rusqlite::Connection, sql: &str) -> Result<Vec<String>, EngineError> {
974    let mut stmt = conn.prepare(sql)?;
975    stmt.query_map([], |r| r.get::<_, String>(0))?
976        .collect::<Result<Vec<_>, _>>()
977        .map_err(EngineError::Sqlite)
978}
979
980fn legacy_projection_collision_warnings(
981    label: &str,
982    kinds: &[String],
983    legacy_name: fn(&str) -> String,
984    current_name: fn(&str) -> String,
985) -> Vec<String> {
986    let mut by_legacy: std::collections::BTreeMap<String, Vec<&str>> =
987        std::collections::BTreeMap::new();
988    for kind in kinds {
989        by_legacy
990            .entry(legacy_name(kind))
991            .or_default()
992            .push(kind.as_str());
993    }
994
995    let mut warnings = Vec::new();
996    for (legacy_table, mut colliding_kinds) in by_legacy {
997        if colliding_kinds.len() <= 1 {
998            continue;
999        }
1000        colliding_kinds.sort_unstable();
1001        let current_tables: Vec<String> = colliding_kinds
1002            .iter()
1003            .map(|kind| current_name(kind))
1004            .collect();
1005        warnings.push(format!(
1006            "legacy {label} projection table name collision for {legacy_table}: kinds [{}] now map to [{}]",
1007            colliding_kinds.join(", "),
1008            current_tables.join(", ")
1009        ));
1010    }
1011    warnings
1012}
1013
1014/// Count per-kind FTS integrity issues across all registered per-kind tables.
1015/// Returns (stale, orphaned, `mismatched_kind`, duplicate) counts.
1016///
1017/// - Stale: rows in a per-kind table whose node is superseded or missing.
1018/// - Orphaned: rows in a per-kind table for a kind with no registered schema.
1019/// - Mismatched kind: impossible with per-kind tables (always 0).
1020/// - Duplicate: same `node_logical_id` appears more than once in any per-kind table.
1021fn count_per_kind_property_fts_issues(
1022    conn: &rusqlite::Connection,
1023) -> Result<(i64, i64, i64, i64), EngineError> {
1024    // Collect all per-kind virtual tables from sqlite_master.
1025    // Filter by sql LIKE 'CREATE VIRTUAL TABLE%' to exclude FTS5 shadow tables
1026    // (e.g. fts_props_goal_data, fts_props_goal_idx) which share the same prefix.
1027    let per_kind_tables: Vec<String> = {
1028        let mut stmt = conn.prepare(
1029            "SELECT name FROM sqlite_master \
1030             WHERE type='table' AND name LIKE 'fts_props_%' \
1031             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1032        )?;
1033        stmt.query_map([], |r| r.get::<_, String>(0))?
1034            .collect::<Result<Vec<_>, _>>()?
1035    };
1036
1037    let registered_kinds: std::collections::HashSet<String> = {
1038        let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
1039        stmt.query_map([], |r| r.get::<_, String>(0))?
1040            .collect::<Result<std::collections::HashSet<_>, _>>()?
1041    };
1042
1043    let mut stale = 0i64;
1044    let mut orphaned = 0i64;
1045    let mut duplicate = 0i64;
1046
1047    for table in &per_kind_tables {
1048        // Stale: rows whose node_logical_id has no active node.
1049        let kind_stale: i64 = conn.query_row(
1050            &format!(
1051                "SELECT count(*) FROM {table} fp \
1052                 WHERE NOT EXISTS (\
1053                     SELECT 1 FROM nodes n \
1054                     WHERE n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL\
1055                 )"
1056            ),
1057            [],
1058            |r| r.get(0),
1059        )?;
1060        stale += kind_stale;
1061
1062        // Duplicate: same node_logical_id more than once.
1063        let kind_dup: i64 = conn.query_row(
1064            &format!(
1065                "SELECT count(*) FROM (\
1066                     SELECT node_logical_id FROM {table} \
1067                     GROUP BY node_logical_id HAVING count(*) > 1\
1068                 )"
1069            ),
1070            [],
1071            |r| r.get(0),
1072        )?;
1073        duplicate += kind_dup;
1074
1075        // Orphaned: this per-kind table has no corresponding schema.
1076        // Determine which kind this table corresponds to by checking all registered kinds.
1077        let table_has_schema = registered_kinds
1078            .iter()
1079            .any(|k| fathomdb_schema::fts_kind_table_name(k) == *table);
1080        if !table_has_schema {
1081            let table_rows: i64 =
1082                conn.query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))?;
1083            orphaned += table_rows;
1084        }
1085    }
1086
1087    // Mismatched kind is always 0 with per-kind tables.
1088    Ok((stale, orphaned, 0, duplicate))
1089}
1090
1091/// Count active nodes that should have a property FTS row (extraction yields a value)
1092/// but don't. Uses the same extraction logic as write/rebuild to avoid false positives
1093/// for nodes whose declared paths legitimately normalize to no values.
1094fn count_missing_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1095    let schemas = crate::writer::load_fts_property_schemas(conn)?;
1096    if schemas.is_empty() {
1097        return Ok(0);
1098    }
1099
1100    let mut missing = 0i64;
1101    for (kind, schema) in &schemas {
1102        let table = fathomdb_schema::fts_kind_table_name(kind);
1103        // If the per-kind table doesn't exist yet, all nodes with extractable values are missing.
1104        let table_exists: bool = conn
1105            .query_row(
1106                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1107                [table.as_str()],
1108                |r| r.get::<_, i64>(0),
1109            )
1110            .unwrap_or(0)
1111            > 0;
1112
1113        if table_exists {
1114            let mut stmt = conn.prepare(&format!(
1115                "SELECT n.logical_id, n.properties FROM nodes n \
1116                 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
1117                   AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
1118            ))?;
1119            let rows = stmt.query_map([kind.as_str()], |row| {
1120                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1121            })?;
1122            for row in rows {
1123                let (_logical_id, properties_str) = row?;
1124                let props: serde_json::Value =
1125                    serde_json::from_str(&properties_str).unwrap_or_default();
1126                if crate::writer::extract_property_fts(&props, schema)
1127                    .0
1128                    .is_some()
1129                {
1130                    missing += 1;
1131                }
1132            }
1133        } else {
1134            // Per-kind table doesn't exist yet — count all nodes with extractable values.
1135            let mut stmt = conn.prepare(
1136                "SELECT n.logical_id, n.properties FROM nodes n \
1137                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
1138            )?;
1139            let rows = stmt.query_map([kind.as_str()], |row| {
1140                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1141            })?;
1142            for row in rows {
1143                let (_logical_id, properties_str) = row?;
1144                let props: serde_json::Value =
1145                    serde_json::from_str(&properties_str).unwrap_or_default();
1146                if crate::writer::extract_property_fts(&props, schema)
1147                    .0
1148                    .is_some()
1149                {
1150                    missing += 1;
1151                }
1152            }
1153        }
1154    }
1155    Ok(missing)
1156}
1157
1158/// Count property FTS rows whose persisted text has drifted from the current
1159/// canonical value computed by the FTS extractors. Handles both per-kind
1160/// table shapes:
1161///
1162/// - Non-weighted (legacy / default): single `text_content` column per row.
1163///   Compared against `extract_property_fts(...)`.
1164/// - Weighted: one column per registered path (named by `fts_column_name`).
1165///   Compared against `extract_property_fts_columns(...)`; any per-column
1166///   mismatch counts the row as drifted exactly once.
1167///
1168/// This catches:
1169/// - rows whose text no longer matches the current node properties and schema
1170/// - rows that should have been removed (extraction now yields no value)
1171fn count_drifted_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1172    let schemas = crate::writer::load_fts_property_schemas(conn)?;
1173    if schemas.is_empty() {
1174        return Ok(0);
1175    }
1176
1177    let mut drifted = 0i64;
1178    for (kind, schema) in &schemas {
1179        let table = fathomdb_schema::fts_kind_table_name(kind);
1180        // If the per-kind table doesn't exist, no rows to check.
1181        let table_exists: bool = conn
1182            .query_row(
1183                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1184                [table.as_str()],
1185                |r| r.get::<_, i64>(0),
1186            )
1187            .unwrap_or(0)
1188            > 0;
1189        if !table_exists {
1190            continue;
1191        }
1192
1193        // Dispatch on the persisted schema, not on the live table columns.
1194        // Registration (`register_fts_property_schema_with_entries` in
1195        // `admin/fts.rs`) chooses the per-kind table layout by checking
1196        // whether any entry carries a weight; mirroring that invariant here
1197        // keeps the two code paths in lockstep. A live-column probe would
1198        // misclassify a weighted schema whose registered paths happened to
1199        // include literal `$.text_content` (collapsing to a `text_content`
1200        // column), silently running the non-weighted comparator against
1201        // per-column storage.
1202        drifted += if schema.is_weighted() {
1203            count_drifted_weighted(conn, kind, &table, schema)?
1204        } else {
1205            count_drifted_non_weighted(conn, kind, &table, schema)?
1206        };
1207    }
1208    Ok(drifted)
1209}
1210
1211/// Drift count for the non-weighted (single `text_content` column) per-kind
1212/// FTS table. Preserves the historic query shape.
1213fn count_drifted_non_weighted(
1214    conn: &rusqlite::Connection,
1215    kind: &str,
1216    table: &str,
1217    schema: &crate::writer::PropertyFtsSchema,
1218) -> Result<i64, EngineError> {
1219    let mut drifted = 0i64;
1220    let mut stmt = conn.prepare(&format!(
1221        "SELECT fp.node_logical_id, fp.text_content, n.properties \
1222         FROM {table} fp \
1223         JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1224         WHERE n.kind = ?1"
1225    ))?;
1226    let rows = stmt.query_map([kind], |row| {
1227        Ok((
1228            row.get::<_, String>(0)?,
1229            row.get::<_, String>(1)?,
1230            row.get::<_, String>(2)?,
1231        ))
1232    })?;
1233    for row in rows {
1234        let (_logical_id, stored_text, properties_str) = row?;
1235        let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1236        let (expected, _positions, _stats) = crate::writer::extract_property_fts(&props, schema);
1237        match expected {
1238            Some(text) if text == stored_text => {}
1239            _ => drifted += 1,
1240        }
1241    }
1242    Ok(drifted)
1243}
1244
1245/// Drift count for a weighted (per-column) per-kind FTS table. One column per
1246/// registered path, named via `fts_column_name`. A row counts as drifted if
1247/// any per-column value differs from the canonical extraction.
1248fn count_drifted_weighted(
1249    conn: &rusqlite::Connection,
1250    kind: &str,
1251    table: &str,
1252    schema: &crate::writer::PropertyFtsSchema,
1253) -> Result<i64, EngineError> {
1254    // Column names come from `fts_column_name` and are restricted to
1255    // `[a-zA-Z0-9_]`, so direct interpolation + quoted identifiers are safe.
1256    let columns: Vec<String> = schema
1257        .paths
1258        .iter()
1259        .map(|entry| {
1260            let is_recursive = matches!(entry.mode, crate::writer::PropertyPathMode::Recursive);
1261            fathomdb_schema::fts_column_name(&entry.path, is_recursive)
1262        })
1263        .collect();
1264    if columns.is_empty() {
1265        // Weighted table with no registered columns is impossible in
1266        // practice (register always writes specs), but guard defensively:
1267        // nothing to compare against, so nothing drifts.
1268        return Ok(0);
1269    }
1270
1271    let select_cols: String = columns
1272        .iter()
1273        .map(|c| format!("fp.\"{c}\""))
1274        .collect::<Vec<_>>()
1275        .join(", ");
1276    let sql = format!(
1277        "SELECT fp.node_logical_id, {select_cols}, n.properties \
1278         FROM {table} fp \
1279         JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1280         WHERE n.kind = ?1"
1281    );
1282    let mut stmt = conn.prepare(&sql)?;
1283    // Column layout in the result set:
1284    //   [0] node_logical_id
1285    //   [1..1+columns.len()] per-spec stored text, in `columns` order
1286    //   [last] properties JSON
1287    let props_col_idx = columns.len() + 1;
1288    let rows = stmt.query_map([kind], |row| {
1289        let mut stored: Vec<String> = Vec::with_capacity(columns.len());
1290        for i in 0..columns.len() {
1291            stored.push(row.get::<_, String>(i + 1)?);
1292        }
1293        let properties: String = row.get(props_col_idx)?;
1294        Ok((stored, properties))
1295    })?;
1296
1297    let mut drifted = 0i64;
1298    for row in rows {
1299        let (stored, properties_str) = row?;
1300        let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1301        let expected = crate::writer::extract_property_fts_columns(&props, schema);
1302        // `extract_property_fts_columns` returns entries in schema-path order,
1303        // which matches `columns`. Compare per-column; any mismatch counts
1304        // the row as drifted exactly once.
1305        let row_drifted = if expected.len() == stored.len() {
1306            expected
1307                .iter()
1308                .zip(stored.iter())
1309                .any(|((_name, exp_text), stored_text)| exp_text != stored_text)
1310        } else {
1311            true
1312        };
1313        if row_drifted {
1314            drifted += 1;
1315        }
1316    }
1317    Ok(drifted)
1318}
1319
1320/// Convert a non-negative i64 count to usize, panicking on negative values
1321/// which would indicate data corruption.
1322#[allow(clippy::expect_used)]
1323pub(super) fn i64_to_usize(val: i64) -> usize {
1324    usize::try_from(val).expect("count(*) must be non-negative")
1325}
1326
1327pub(super) fn persist_simple_provenance_event(
1328    conn: &rusqlite::Connection,
1329    event_type: &str,
1330    subject: &str,
1331    metadata: Option<serde_json::Value>,
1332) -> Result<(), EngineError> {
1333    let metadata_json = metadata.map(|value| value.to_string()).unwrap_or_default();
1334    conn.execute(
1335        "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1336        rusqlite::params![new_id(), event_type, subject, metadata_json],
1337    )?;
1338    Ok(())
1339}
1340
1341pub(super) fn rebuild_operational_current_rows(
1342    tx: &rusqlite::Transaction<'_>,
1343    collections: &[String],
1344) -> Result<usize, EngineError> {
1345    let mut rebuilt_rows = 0usize;
1346    clear_operational_current_rows(tx, collections)?;
1347    let mut ins_current = tx.prepare_cached(
1348        "INSERT INTO operational_current \
1349         (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1350         VALUES (?1, ?2, ?3, ?4, ?5)",
1351    )?;
1352
1353    for collection in collections {
1354        let mut stmt = tx.prepare(
1355            "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
1356             FROM operational_mutations \
1357             WHERE collection_name = ?1 \
1358             ORDER BY record_key, mutation_order",
1359        )?;
1360        let mut latest_by_key: std::collections::HashMap<String, Option<(String, i64, String)>> =
1361            std::collections::HashMap::new();
1362        let rows = stmt.query_map([collection], operational::map_operational_mutation_row)?;
1363        for row in rows {
1364            let mutation = row?;
1365            match mutation.op_kind.as_str() {
1366                "put" => {
1367                    latest_by_key.insert(
1368                        mutation.record_key,
1369                        Some((mutation.payload_json, mutation.created_at, mutation.id)),
1370                    );
1371                }
1372                "delete" => {
1373                    latest_by_key.insert(mutation.record_key, None);
1374                }
1375                _ => {}
1376            }
1377        }
1378
1379        for (record_key, state) in latest_by_key {
1380            if let Some((payload_json, updated_at, last_mutation_id)) = state {
1381                ins_current.execute(rusqlite::params![
1382                    collection,
1383                    record_key,
1384                    payload_json,
1385                    updated_at,
1386                    last_mutation_id,
1387                ])?;
1388                rebuilt_rows += 1;
1389            }
1390        }
1391    }
1392
1393    drop(ins_current);
1394    Ok(rebuilt_rows)
1395}
1396
1397pub(super) fn clear_operational_current_rows(
1398    tx: &rusqlite::Transaction<'_>,
1399    collections: &[String],
1400) -> Result<(), EngineError> {
1401    let mut delete_current =
1402        tx.prepare_cached("DELETE FROM operational_current WHERE collection_name = ?1")?;
1403    let mut delete_secondary_current = tx.prepare_cached(
1404        "DELETE FROM operational_secondary_index_entries \
1405         WHERE collection_name = ?1 AND subject_kind = 'current'",
1406    )?;
1407    for collection in collections {
1408        delete_secondary_current.execute([collection])?;
1409        delete_current.execute([collection])?;
1410    }
1411    drop(delete_secondary_current);
1412    drop(delete_current);
1413    Ok(())
1414}
1415
1416#[cfg(test)]
1417#[allow(clippy::expect_used, deprecated)]
1418mod tests {
1419    use std::fs;
1420    use std::sync::Arc;
1421
1422    use fathomdb_schema::SchemaManager;
1423    use tempfile::NamedTempFile;
1424
1425    use super::{
1426        AdminService, FtsPropertyPathMode, FtsPropertyPathSpec, SafeExportOptions,
1427        VectorRegenerationConfig,
1428    };
1429    use crate::embedder::{BatchEmbedder, EmbedderError, QueryEmbedder, QueryEmbedderIdentity};
1430    use crate::projection::ProjectionTarget;
1431    use crate::sqlite;
1432    use crate::{EngineError, OperationalCollectionKind, OperationalRegisterRequest};
1433
1434    #[cfg(feature = "sqlite-vec")]
1435    use crate::{ExecutionCoordinator, TelemetryCounters};
1436
1437    #[cfg(feature = "sqlite-vec")]
1438    use fathomdb_query::QueryBuilder;
1439
1440    #[cfg(feature = "sqlite-vec")]
1441    use super::load_vector_regeneration_config;
1442
1443    /// In-process embedder used by the regeneration test suite. The
1444    /// vector is parameterized so individual tests can distinguish which
1445    /// embedder produced which profile row.
1446    #[derive(Debug)]
1447    #[allow(dead_code)]
1448    struct TestEmbedder {
1449        identity: QueryEmbedderIdentity,
1450        vector: Vec<f32>,
1451    }
1452
1453    #[allow(dead_code)]
1454    impl TestEmbedder {
1455        fn new(model: &str, dimension: usize) -> Self {
1456            Self {
1457                identity: QueryEmbedderIdentity {
1458                    model_identity: model.to_owned(),
1459                    model_version: "1.0.0".to_owned(),
1460                    dimension,
1461                    normalization_policy: "l2".to_owned(),
1462                },
1463                vector: vec![1.0; dimension],
1464            }
1465        }
1466    }
1467
1468    impl QueryEmbedder for TestEmbedder {
1469        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1470            Ok(self.vector.clone())
1471        }
1472        fn identity(&self) -> QueryEmbedderIdentity {
1473            self.identity.clone()
1474        }
1475        fn max_tokens(&self) -> usize {
1476            512
1477        }
1478    }
1479
1480    impl BatchEmbedder for TestEmbedder {
1481        fn batch_embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, EmbedderError> {
1482            Ok(texts.iter().map(|_| self.vector.clone()).collect())
1483        }
1484        fn identity(&self) -> QueryEmbedderIdentity {
1485            self.identity.clone()
1486        }
1487        fn max_tokens(&self) -> usize {
1488            512
1489        }
1490    }
1491
1492    /// Embedder that always fails — used to exercise the post-request
1493    /// failure audit path without the complexity of subprocess machinery.
1494    #[derive(Debug)]
1495    #[allow(dead_code)]
1496    struct FailingEmbedder {
1497        identity: QueryEmbedderIdentity,
1498    }
1499
1500    impl QueryEmbedder for FailingEmbedder {
1501        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1502            Err(EmbedderError::Failed("test failure".to_owned()))
1503        }
1504        fn identity(&self) -> QueryEmbedderIdentity {
1505            self.identity.clone()
1506        }
1507        fn max_tokens(&self) -> usize {
1508            512
1509        }
1510    }
1511
1512    #[allow(dead_code)]
1513    #[cfg(unix)]
1514    fn set_file_mode(path: &std::path::Path, mode: u32) {
1515        use std::os::unix::fs::PermissionsExt;
1516
1517        let mut permissions = fs::metadata(path).expect("script metadata").permissions();
1518        permissions.set_mode(mode);
1519        fs::set_permissions(path, permissions).expect("chmod");
1520    }
1521
1522    #[allow(dead_code)]
1523    #[cfg(not(unix))]
1524    fn set_file_mode(_path: &std::path::Path, _mode: u32) {}
1525    fn setup() -> (NamedTempFile, AdminService) {
1526        let db = NamedTempFile::new().expect("temp file");
1527        let schema = Arc::new(SchemaManager::new());
1528        {
1529            let conn = sqlite::open_connection(db.path()).expect("connection");
1530            schema.bootstrap(&conn).expect("bootstrap");
1531        }
1532        let service = AdminService::new(db.path(), Arc::clone(&schema));
1533        (db, service)
1534    }
1535
1536    #[test]
1537    fn check_integrity_includes_active_uniqueness_count() {
1538        let (_db, service) = setup();
1539        let report = service.check_integrity().expect("integrity check");
1540        assert_eq!(report.duplicate_active_logical_ids, 0);
1541        assert_eq!(report.operational_missing_collections, 0);
1542        assert_eq!(report.operational_missing_last_mutations, 0);
1543    }
1544
1545    #[test]
1546    fn check_integrity_warns_for_legacy_projection_name_collisions() {
1547        let (db, service) = setup();
1548        {
1549            let conn = sqlite::open_connection(db.path()).expect("conn");
1550            conn.execute(
1551                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
1552                 VALUES ('Foo-Bar', '[\"$.name\"]', ' ')",
1553                [],
1554            )
1555            .expect("insert first schema");
1556            conn.execute(
1557                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
1558                 VALUES ('Foo_Bar', '[\"$.name\"]', ' ')",
1559                [],
1560            )
1561            .expect("insert second schema");
1562        }
1563
1564        let report = service.check_integrity().expect("integrity check");
1565
1566        assert!(report.warnings.iter().any(|warning| {
1567            warning.contains("legacy FTS property projection table name collision")
1568        }));
1569    }
1570
1571    #[test]
1572    fn trace_source_returns_node_logical_ids() {
1573        let (db, service) = setup();
1574        {
1575            let conn = sqlite::open_connection(db.path()).expect("conn");
1576            conn.execute(
1577                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1578                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 'source-1')",
1579                [],
1580            )
1581            .expect("insert node");
1582        }
1583        let report = service.trace_source("source-1").expect("trace");
1584        assert_eq!(report.node_rows, 1);
1585        assert_eq!(report.node_logical_ids, vec!["lg1"]);
1586    }
1587
1588    #[test]
1589    fn trace_source_includes_operational_mutations() {
1590        let (db, service) = setup();
1591        {
1592            let conn = sqlite::open_connection(db.path()).expect("conn");
1593            conn.execute(
1594                "INSERT INTO operational_collections \
1595                 (name, kind, schema_json, retention_json, format_version, created_at) \
1596                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1597                [],
1598            )
1599            .expect("insert collection");
1600            conn.execute(
1601                "INSERT INTO operational_mutations \
1602                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1603                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"ok\"}', 'source-1', 100, 1)",
1604                [],
1605            )
1606            .expect("insert mutation");
1607        }
1608
1609        let report = service.trace_source("source-1").expect("trace");
1610        assert_eq!(report.operational_mutation_rows, 1);
1611        assert_eq!(report.operational_mutation_ids, vec!["m1"]);
1612    }
1613
1614    #[test]
1615    fn excise_source_restores_prior_active_node() {
1616        let (db, service) = setup();
1617        {
1618            let conn = sqlite::open_connection(db.path()).expect("conn");
1619            conn.execute(
1620                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1621                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
1622                [],
1623            )
1624            .expect("insert v1 superseded");
1625            conn.execute(
1626                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1627                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
1628                [],
1629            )
1630            .expect("insert v2 active");
1631        }
1632        service.excise_source("source-2").expect("excise");
1633        {
1634            let conn = sqlite::open_connection(db.path()).expect("conn");
1635            let active_row_id: String = conn
1636                .query_row(
1637                    "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
1638                    [],
1639                    |row| row.get(0),
1640                )
1641                .expect("active row exists after excise");
1642            assert_eq!(active_row_id, "r1");
1643        }
1644    }
1645
1646    #[test]
1647    fn excise_source_deletes_operational_mutations_and_repairs_latest_state_current() {
1648        let (db, service) = setup();
1649        {
1650            let conn = sqlite::open_connection(db.path()).expect("conn");
1651            conn.execute(
1652                "INSERT INTO operational_collections \
1653                 (name, kind, schema_json, retention_json, format_version, created_at) \
1654                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1655                [],
1656            )
1657            .expect("insert collection");
1658            conn.execute(
1659                "INSERT INTO operational_mutations \
1660                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1661                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'source-1', 100, 1)",
1662                [],
1663            )
1664            .expect("insert prior mutation");
1665            conn.execute(
1666                "INSERT INTO operational_mutations \
1667                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1668                 VALUES ('m2', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'source-2', 200, 2)",
1669                [],
1670            )
1671            .expect("insert excised mutation");
1672            conn.execute(
1673                "INSERT INTO operational_current \
1674                 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1675                 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 200, 'm2')",
1676                [],
1677            )
1678            .expect("insert current row");
1679        }
1680
1681        let traced = service
1682            .trace_source("source-2")
1683            .expect("trace before excise");
1684        assert_eq!(traced.operational_mutation_rows, 1);
1685        assert_eq!(traced.operational_mutation_ids, vec!["m2"]);
1686
1687        let excised = service.excise_source("source-2").expect("excise");
1688        assert_eq!(excised.operational_mutation_rows, 0);
1689        assert!(excised.operational_mutation_ids.is_empty());
1690
1691        {
1692            let conn = sqlite::open_connection(db.path()).expect("conn");
1693            let remaining: i64 = conn
1694                .query_row(
1695                    "SELECT count(*) FROM operational_mutations WHERE source_ref = 'source-2'",
1696                    [],
1697                    |row| row.get(0),
1698                )
1699                .expect("remaining count");
1700            assert_eq!(remaining, 0);
1701
1702            let current: (String, String) = conn
1703                .query_row(
1704                    "SELECT payload_json, last_mutation_id FROM operational_current \
1705                     WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1706                    [],
1707                    |row| Ok((row.get(0)?, row.get(1)?)),
1708                )
1709                .expect("rebuilt current row");
1710            assert_eq!(current.0, "{\"status\":\"old\"}");
1711            assert_eq!(current.1, "m1");
1712        }
1713    }
1714
1715    #[test]
1716    fn restore_logical_id_reestablishes_last_pre_retire_content_and_attached_edges() {
1717        let (db, service) = setup();
1718        {
1719            let conn = sqlite::open_connection(db.path()).expect("conn");
1720            conn.execute(
1721                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1722                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1723                [],
1724            )
1725            .expect("insert node");
1726            conn.execute(
1727                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1728                 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1729                [],
1730            )
1731            .expect("insert target node");
1732            conn.execute(
1733                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1734                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1735                [],
1736            )
1737            .expect("insert chunk");
1738            conn.execute(
1739                "INSERT INTO edges \
1740                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1741                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1742                [],
1743            )
1744            .expect("insert edge");
1745            conn.execute(
1746                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1747                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1748                [],
1749            )
1750            .expect("insert node retire event");
1751            conn.execute(
1752                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1753                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
1754                [],
1755            )
1756            .expect("insert edge retire event");
1757            conn.execute(
1758                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1759                [],
1760            )
1761            .expect("retire node");
1762            conn.execute(
1763                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
1764                [],
1765            )
1766            .expect("retire edge");
1767            conn.execute("DELETE FROM fts_nodes", [])
1768                .expect("clear fts");
1769        }
1770
1771        let report = service.restore_logical_id("doc-1").expect("restore");
1772        assert_eq!(report.logical_id, "doc-1");
1773        assert!(!report.was_noop);
1774        assert_eq!(report.restored_node_rows, 1);
1775        assert_eq!(report.restored_edge_rows, 1);
1776        assert_eq!(report.restored_chunk_rows, 1);
1777        assert_eq!(report.restored_fts_rows, 1);
1778
1779        let conn = sqlite::open_connection(db.path()).expect("conn");
1780        let active_node_count: i64 = conn
1781            .query_row(
1782                "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1783                [],
1784                |row| row.get(0),
1785            )
1786            .expect("active node count");
1787        assert_eq!(active_node_count, 1);
1788        let active_edge_count: i64 = conn
1789            .query_row(
1790                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1791                [],
1792                |row| row.get(0),
1793            )
1794            .expect("active edge count");
1795        assert_eq!(active_edge_count, 1);
1796        let fts_count: i64 = conn
1797            .query_row(
1798                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
1799                [],
1800                |row| row.get(0),
1801            )
1802            .expect("fts count");
1803        assert_eq!(fts_count, 1);
1804    }
1805
1806    #[test]
1807    fn restore_logical_id_restores_edges_retired_after_the_node_retire_event() {
1808        let (db, service) = setup();
1809        {
1810            let conn = sqlite::open_connection(db.path()).expect("conn");
1811            conn.execute(
1812                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1813                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1814                [],
1815            )
1816            .expect("insert node");
1817            conn.execute(
1818                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1819                 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1820                [],
1821            )
1822            .expect("insert target node");
1823            conn.execute(
1824                "INSERT INTO edges \
1825                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1826                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1827                [],
1828            )
1829            .expect("insert edge");
1830            conn.execute(
1831                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1832                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1833                [],
1834            )
1835            .expect("insert node retire event");
1836            conn.execute(
1837                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1838                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 201, '')",
1839                [],
1840            )
1841            .expect("insert edge retire event");
1842            conn.execute(
1843                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1844                [],
1845            )
1846            .expect("retire node");
1847            conn.execute(
1848                "UPDATE edges SET superseded_at = 201 WHERE logical_id = 'edge-1'",
1849                [],
1850            )
1851            .expect("retire edge");
1852        }
1853
1854        let report = service.restore_logical_id("doc-1").expect("restore");
1855        assert_eq!(report.restored_edge_rows, 1);
1856
1857        let conn = sqlite::open_connection(db.path()).expect("conn");
1858        let active_edge_count: i64 = conn
1859            .query_row(
1860                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1861                [],
1862                |row| row.get(0),
1863            )
1864            .expect("active edge count");
1865        assert_eq!(active_edge_count, 1);
1866    }
1867
1868    #[test]
1869    fn restore_logical_id_prefers_latest_retired_revision_when_timestamps_tie() {
1870        let (db, service) = setup();
1871        {
1872            let conn = sqlite::open_connection(db.path()).expect("conn");
1873            conn.execute(
1874                "INSERT INTO nodes \
1875                 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1876                 VALUES ('node-row-older', 'doc-1', 'Document', '{\"title\":\"older\"}', 100, 200, 'forget-1')",
1877                [],
1878            )
1879            .expect("insert older retired node");
1880            conn.execute(
1881                "INSERT INTO nodes \
1882                 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1883                 VALUES ('node-row-newer', 'doc-1', 'Document', '{\"title\":\"newer\"}', 100, 200, 'forget-1')",
1884                [],
1885            )
1886            .expect("insert newer retired node");
1887            conn.execute(
1888                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1889                 VALUES ('evt-retire-older', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1890                [],
1891            )
1892            .expect("insert older retire event");
1893            conn.execute(
1894                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1895                 VALUES ('evt-retire-newer', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1896                [],
1897            )
1898            .expect("insert newer retire event");
1899        }
1900
1901        let report = service.restore_logical_id("doc-1").expect("restore");
1902
1903        assert!(!report.was_noop);
1904        let conn = sqlite::open_connection(db.path()).expect("conn");
1905        let active_row: (String, String) = conn
1906            .query_row(
1907                "SELECT row_id, properties FROM nodes \
1908                 WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1909                [],
1910                |row| Ok((row.get(0)?, row.get(1)?)),
1911            )
1912            .expect("restored active row");
1913        assert_eq!(active_row.0, "node-row-newer");
1914        assert_eq!(active_row.1, "{\"title\":\"newer\"}");
1915    }
1916
1917    #[test]
1918    fn purge_logical_id_removes_retired_content_and_records_tombstone() {
1919        let (db, service) = setup();
1920        {
1921            let conn = sqlite::open_connection(db.path()).expect("conn");
1922            conn.execute(
1923                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1924                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1925                [],
1926            )
1927            .expect("insert retired node");
1928            conn.execute(
1929                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1930                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1931                [],
1932            )
1933            .expect("insert chunk");
1934            conn.execute(
1935                "INSERT INTO edges \
1936                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, superseded_at, source_ref) \
1937                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 200, 'seed')",
1938                [],
1939            )
1940            .expect("insert retired edge");
1941            conn.execute(
1942                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1943                 VALUES ('chunk-1', 'doc-1', 'Document', 'budget narrative')",
1944                [],
1945            )
1946            .expect("insert fts");
1947        }
1948
1949        let report = service.purge_logical_id("doc-1").expect("purge");
1950        assert_eq!(report.logical_id, "doc-1");
1951        assert!(!report.was_noop);
1952        assert_eq!(report.deleted_node_rows, 1);
1953        assert_eq!(report.deleted_edge_rows, 1);
1954        assert_eq!(report.deleted_chunk_rows, 1);
1955        assert_eq!(report.deleted_fts_rows, 1);
1956
1957        let conn = sqlite::open_connection(db.path()).expect("conn");
1958        let remaining_nodes: i64 = conn
1959            .query_row(
1960                "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1'",
1961                [],
1962                |row| row.get(0),
1963            )
1964            .expect("remaining nodes");
1965        assert_eq!(remaining_nodes, 0);
1966        let remaining_edges: i64 = conn
1967            .query_row(
1968                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1'",
1969                [],
1970                |row| row.get(0),
1971            )
1972            .expect("remaining edges");
1973        assert_eq!(remaining_edges, 0);
1974        let remaining_chunks: i64 = conn
1975            .query_row(
1976                "SELECT count(*) FROM chunks WHERE id = 'chunk-1'",
1977                [],
1978                |row| row.get(0),
1979            )
1980            .expect("remaining chunks");
1981        assert_eq!(remaining_chunks, 0);
1982        let purge_events: i64 = conn
1983            .query_row(
1984                "SELECT count(*) FROM provenance_events WHERE event_type = 'purge_logical_id' AND subject = 'doc-1'",
1985                [],
1986                |row| row.get(0),
1987            )
1988            .expect("purge events");
1989        assert_eq!(purge_events, 1);
1990    }
1991
1992    #[test]
1993    fn check_semantics_accepts_preserved_retired_chunks() {
1994        let (db, service) = setup();
1995        {
1996            let conn = sqlite::open_connection(db.path()).expect("conn");
1997            conn.execute(
1998                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1999                 VALUES ('node-row-1', 'doc-1', 'Document', '{}', 100, 200, 'seed')",
2000                [],
2001            )
2002            .expect("insert retired node");
2003            conn.execute(
2004                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2005                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2006                [],
2007            )
2008            .expect("insert chunk");
2009        }
2010
2011        let report = service.check_semantics().expect("semantics");
2012        assert_eq!(report.orphaned_chunks, 0);
2013    }
2014
2015    #[test]
2016    fn check_semantics_detects_missing_retired_node_history_for_preserved_chunks() {
2017        let (db, service) = setup();
2018        {
2019            let conn = sqlite::open_connection(db.path()).expect("conn");
2020            conn.execute(
2021                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2022                 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
2023                [],
2024            )
2025            .expect("insert orphaned chunk");
2026        }
2027
2028        let report = service.check_semantics().expect("semantics");
2029        assert_eq!(report.orphaned_chunks, 1);
2030    }
2031
2032    #[cfg(feature = "sqlite-vec")]
2033    #[test]
2034    fn check_semantics_detects_missing_retired_node_history_for_preserved_vec_rows() {
2035        let (db, service) = setup();
2036        {
2037            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2038            service
2039                .schema_manager
2040                .ensure_vec_kind_profile(&conn, "Doc", 4)
2041                .expect("ensure vec kind profile");
2042            let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
2043            conn.execute(
2044                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2045                 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
2046                [],
2047            )
2048            .expect("insert orphaned chunk");
2049            conn.execute(
2050                &format!(
2051                    "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
2052                ),
2053                [],
2054            )
2055            .expect("insert vec row");
2056        }
2057
2058        let report = service.check_semantics().expect("semantics");
2059        assert_eq!(report.orphaned_chunks, 1);
2060        assert_eq!(report.vec_rows_for_superseded_nodes, 1);
2061    }
2062
2063    #[cfg(feature = "sqlite-vec")]
2064    #[test]
2065    fn restore_logical_id_reestablishes_vector_search_without_reingest() {
2066        let (db, service) = setup();
2067        {
2068            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2069            service
2070                .schema_manager
2071                .ensure_vec_kind_profile(&conn, "Document", 4)
2072                .expect("ensure vec kind profile");
2073            conn.execute(
2074                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
2075                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
2076                [],
2077            )
2078            .expect("insert retired node");
2079            let vec_table = fathomdb_schema::vec_kind_table_name("Document");
2080            conn.execute(
2081                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2082                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2083                [],
2084            )
2085            .expect("insert chunk");
2086            conn.execute(
2087                &format!(
2088                    "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
2089                ),
2090                [],
2091            )
2092            .expect("insert vec row");
2093            conn.execute(
2094                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
2095                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
2096                [],
2097            )
2098            .expect("insert retire event");
2099        }
2100
2101        let report = service.restore_logical_id("doc-1").expect("restore");
2102        assert_eq!(report.restored_vec_rows, 1);
2103
2104        let coordinator = ExecutionCoordinator::open(
2105            db.path(),
2106            Arc::new(SchemaManager::new()),
2107            Some(4),
2108            1,
2109            Arc::new(TelemetryCounters::default()),
2110            None,
2111        )
2112        .expect("coordinator");
2113        let compiled = QueryBuilder::nodes("Document")
2114            .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
2115            .compile()
2116            .expect("compile");
2117        let rows = coordinator
2118            .execute_compiled_read(&compiled)
2119            .expect("vector read");
2120        assert!(
2121            rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
2122            "restore should make the preserved vec row visible again without re-ingest"
2123        );
2124    }
2125
2126    #[cfg(feature = "sqlite-vec")]
2127    #[test]
2128    fn purge_logical_id_deletes_vec_rows_for_retired_content() {
2129        let (db, service) = setup();
2130        {
2131            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2132            service
2133                .schema_manager
2134                .ensure_vec_kind_profile(&conn, "Document", 4)
2135                .expect("ensure vec kind profile");
2136            conn.execute(
2137                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
2138                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
2139                [],
2140            )
2141            .expect("insert retired node");
2142            let vec_table = fathomdb_schema::vec_kind_table_name("Document");
2143            conn.execute(
2144                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2145                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2146                [],
2147            )
2148            .expect("insert chunk");
2149            conn.execute(
2150                &format!(
2151                    "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
2152                ),
2153                [],
2154            )
2155            .expect("insert vec row");
2156        }
2157
2158        let report = service.purge_logical_id("doc-1").expect("purge");
2159        assert_eq!(report.deleted_vec_rows, 1);
2160
2161        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2162        let vec_table = fathomdb_schema::vec_kind_table_name("Document");
2163        let vec_count: i64 = conn
2164            .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
2165                row.get(0)
2166            })
2167            .expect("vec count");
2168        assert_eq!(vec_count, 0);
2169    }
2170
2171    #[cfg(feature = "sqlite-vec")]
2172    #[test]
2173    fn restore_logical_id_restores_visibility_of_regenerated_vectors() {
2174        let (db, service) = setup();
2175
2176        {
2177            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2178            conn.execute(
2179                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
2180                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
2181                [],
2182            )
2183            .expect("insert node");
2184            conn.execute(
2185                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2186                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2187                [],
2188            )
2189            .expect("insert chunk");
2190        }
2191
2192        let embedder = TestEmbedder::new("test-model", 4);
2193        service
2194            .regenerate_vector_embeddings(
2195                &embedder,
2196                &VectorRegenerationConfig {
2197                    kind: "Document".to_owned(),
2198                    profile: "default".to_owned(),
2199                    chunking_policy: "per_chunk".to_owned(),
2200                    preprocessing_policy: "trim".to_owned(),
2201                },
2202            )
2203            .expect("regenerate");
2204
2205        {
2206            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2207            conn.execute(
2208                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
2209                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
2210                [],
2211            )
2212            .expect("insert retire event");
2213            conn.execute(
2214                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
2215                [],
2216            )
2217            .expect("retire node");
2218        }
2219
2220        let report = service.restore_logical_id("doc-1").expect("restore");
2221        assert_eq!(report.restored_vec_rows, 1);
2222
2223        let coordinator = ExecutionCoordinator::open(
2224            db.path(),
2225            Arc::new(SchemaManager::new()),
2226            Some(4),
2227            1,
2228            Arc::new(TelemetryCounters::default()),
2229            None,
2230        )
2231        .expect("coordinator");
2232        let compiled = QueryBuilder::nodes("Document")
2233            .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
2234            .compile()
2235            .expect("compile");
2236        let rows = coordinator
2237            .execute_compiled_read(&compiled)
2238            .expect("vector read");
2239        assert!(
2240            rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
2241            "restored logical_id should become visible through regenerated vectors"
2242        );
2243    }
2244
2245    #[test]
2246    fn check_semantics_clean_db_returns_zeros() {
2247        let (_db, service) = setup();
2248        let report = service.check_semantics().expect("semantics check");
2249        assert_eq!(report.orphaned_chunks, 0);
2250        assert_eq!(report.null_source_ref_nodes, 0);
2251        assert_eq!(report.broken_step_fk, 0);
2252        assert_eq!(report.broken_action_fk, 0);
2253        assert_eq!(report.stale_fts_rows, 0);
2254        assert_eq!(report.fts_rows_for_superseded_nodes, 0);
2255        assert_eq!(report.dangling_edges, 0);
2256        assert_eq!(report.orphaned_supersession_chains, 0);
2257        assert_eq!(report.stale_vec_rows, 0);
2258        assert_eq!(report.vec_rows_for_superseded_nodes, 0);
2259        assert_eq!(report.missing_operational_current_rows, 0);
2260        assert_eq!(report.stale_operational_current_rows, 0);
2261        assert_eq!(report.disabled_collection_mutations, 0);
2262        assert_eq!(report.mismatched_kind_property_fts_rows, 0);
2263        assert_eq!(report.duplicate_property_fts_rows, 0);
2264        assert_eq!(report.drifted_property_fts_rows, 0);
2265        assert!(report.warnings.is_empty());
2266    }
2267
2268    #[test]
2269    fn register_operational_collection_persists_and_emits_provenance() {
2270        let (db, service) = setup();
2271        let record = service
2272            .register_operational_collection(&OperationalRegisterRequest {
2273                name: "connector_health".to_owned(),
2274                kind: OperationalCollectionKind::LatestState,
2275                schema_json: "{}".to_owned(),
2276                retention_json: "{}".to_owned(),
2277                filter_fields_json: "[]".to_owned(),
2278                validation_json: String::new(),
2279                secondary_indexes_json: "[]".to_owned(),
2280                format_version: 1,
2281            })
2282            .expect("register collection");
2283
2284        assert_eq!(record.name, "connector_health");
2285        assert_eq!(record.kind, OperationalCollectionKind::LatestState);
2286        assert_eq!(record.schema_json, "{}");
2287        assert_eq!(record.retention_json, "{}");
2288        assert_eq!(record.filter_fields_json, "[]");
2289        assert!(record.created_at > 0);
2290        assert_eq!(record.disabled_at, None);
2291
2292        let described = service
2293            .describe_operational_collection("connector_health")
2294            .expect("describe collection")
2295            .expect("collection exists");
2296        assert_eq!(described, record);
2297
2298        let conn = sqlite::open_connection(db.path()).expect("conn");
2299        let provenance_count: i64 = conn
2300            .query_row(
2301                "SELECT count(*) FROM provenance_events \
2302                 WHERE event_type = 'operational_collection_registered' AND subject = 'connector_health'",
2303                [],
2304                |row| row.get(0),
2305            )
2306            .expect("provenance count");
2307        assert_eq!(provenance_count, 1);
2308    }
2309
2310    #[test]
2311    fn register_and_update_operational_collection_validation_round_trip() {
2312        let (db, service) = setup();
2313        let record = service
2314            .register_operational_collection(&OperationalRegisterRequest {
2315                name: "connector_health".to_owned(),
2316                kind: OperationalCollectionKind::LatestState,
2317                schema_json: "{}".to_owned(),
2318                retention_json: "{}".to_owned(),
2319                filter_fields_json: "[]".to_owned(),
2320                validation_json: String::new(),
2321                secondary_indexes_json: "[]".to_owned(),
2322                format_version: 1,
2323            })
2324            .expect("register collection");
2325        assert_eq!(record.validation_json, "");
2326
2327        let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
2328        let updated = service
2329            .update_operational_collection_validation("connector_health", validation_json)
2330            .expect("update validation");
2331        assert_eq!(updated.validation_json, validation_json);
2332
2333        let described = service
2334            .describe_operational_collection("connector_health")
2335            .expect("describe collection")
2336            .expect("collection exists");
2337        assert_eq!(described.validation_json, validation_json);
2338
2339        let conn = sqlite::open_connection(db.path()).expect("conn");
2340        let provenance_count: i64 = conn
2341            .query_row(
2342                "SELECT count(*) FROM provenance_events \
2343                 WHERE event_type = 'operational_collection_validation_updated' \
2344                   AND subject = 'connector_health'",
2345                [],
2346                |row| row.get(0),
2347            )
2348            .expect("provenance count");
2349        assert_eq!(provenance_count, 1);
2350    }
2351
2352    #[test]
2353    fn register_update_and_rebuild_operational_secondary_indexes_round_trip() {
2354        let (db, service) = setup();
2355        let record = service
2356            .register_operational_collection(&OperationalRegisterRequest {
2357                name: "audit_log".to_owned(),
2358                kind: OperationalCollectionKind::AppendOnlyLog,
2359                schema_json: "{}".to_owned(),
2360                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2361                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
2362                validation_json: String::new(),
2363                secondary_indexes_json: "[]".to_owned(),
2364                format_version: 1,
2365            })
2366            .expect("register collection");
2367        assert_eq!(record.secondary_indexes_json, "[]");
2368
2369        {
2370            let writer = crate::WriterActor::start(
2371                db.path(),
2372                Arc::new(SchemaManager::new()),
2373                crate::ProvenanceMode::Warn,
2374                Arc::new(crate::TelemetryCounters::default()),
2375            )
2376            .expect("writer");
2377            writer
2378                .submit(crate::WriteRequest {
2379                    label: "secondary-index-seed".to_owned(),
2380                    nodes: vec![],
2381                    node_retires: vec![],
2382                    edges: vec![],
2383                    edge_retires: vec![],
2384                    chunks: vec![],
2385                    runs: vec![],
2386                    steps: vec![],
2387                    actions: vec![],
2388                    optional_backfills: vec![],
2389                    vec_inserts: vec![],
2390                    operational_writes: vec![
2391                        crate::OperationalWrite::Append {
2392                            collection: "audit_log".to_owned(),
2393                            record_key: "evt-1".to_owned(),
2394                            payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
2395                            source_ref: Some("src-1".to_owned()),
2396                        },
2397                        crate::OperationalWrite::Append {
2398                            collection: "audit_log".to_owned(),
2399                            record_key: "evt-2".to_owned(),
2400                            payload_json: r#"{"actor":"bob","ts":200}"#.to_owned(),
2401                            source_ref: Some("src-2".to_owned()),
2402                        },
2403                    ],
2404                })
2405                .expect("seed writes");
2406        }
2407
2408        let secondary_indexes_json = r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#;
2409        let updated = service
2410            .update_operational_collection_secondary_indexes("audit_log", secondary_indexes_json)
2411            .expect("update secondary indexes");
2412        assert_eq!(updated.secondary_indexes_json, secondary_indexes_json);
2413
2414        let conn = sqlite::open_connection(db.path()).expect("conn");
2415        let entry_count: i64 = conn
2416            .query_row(
2417                "SELECT count(*) FROM operational_secondary_index_entries \
2418                 WHERE collection_name = 'audit_log' AND index_name = 'actor_ts'",
2419                [],
2420                |row| row.get(0),
2421            )
2422            .expect("secondary index count");
2423        assert_eq!(entry_count, 2);
2424        conn.execute(
2425            "DELETE FROM operational_secondary_index_entries WHERE collection_name = 'audit_log'",
2426            [],
2427        )
2428        .expect("clear index entries");
2429        drop(conn);
2430
2431        let rebuild = service
2432            .rebuild_operational_secondary_indexes("audit_log")
2433            .expect("rebuild secondary indexes");
2434        assert_eq!(rebuild.collection_name, "audit_log");
2435        assert_eq!(rebuild.mutation_entries_rebuilt, 2);
2436        assert_eq!(rebuild.current_entries_rebuilt, 0);
2437    }
2438
2439    #[test]
2440    fn register_operational_collection_rejects_invalid_validation_contract() {
2441        let (_db, service) = setup();
2442
2443        let error = service
2444            .register_operational_collection(&OperationalRegisterRequest {
2445                name: "connector_health".to_owned(),
2446                kind: OperationalCollectionKind::LatestState,
2447                schema_json: "{}".to_owned(),
2448                retention_json: "{}".to_owned(),
2449                filter_fields_json: "[]".to_owned(),
2450                validation_json: r#"{"format_version":1,"mode":"enforce","fields":[{"name":"status","type":"string","minimum":0}]}"#
2451                    .to_owned(),
2452                secondary_indexes_json: "[]".to_owned(),
2453                format_version: 1,
2454            })
2455            .expect_err("invalid validation contract should reject");
2456
2457        assert!(matches!(error, EngineError::InvalidWrite(_)));
2458        assert!(error.to_string().contains("minimum/maximum"));
2459    }
2460
2461    #[test]
2462    fn validate_operational_collection_history_reports_invalid_rows_without_mutation() {
2463        let (db, service) = setup();
2464        service
2465            .register_operational_collection(&OperationalRegisterRequest {
2466                name: "audit_log".to_owned(),
2467                kind: OperationalCollectionKind::AppendOnlyLog,
2468                schema_json: "{}".to_owned(),
2469                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2470                filter_fields_json: "[]".to_owned(),
2471                validation_json: r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#
2472                    .to_owned(),
2473                secondary_indexes_json: "[]".to_owned(),
2474                format_version: 1,
2475            })
2476            .expect("register collection");
2477        {
2478            let writer = crate::WriterActor::start(
2479                db.path(),
2480                Arc::new(SchemaManager::new()),
2481                crate::ProvenanceMode::Warn,
2482                Arc::new(crate::TelemetryCounters::default()),
2483            )
2484            .expect("writer");
2485            writer
2486                .submit(crate::WriteRequest {
2487                    label: "history-validation".to_owned(),
2488                    nodes: vec![],
2489                    node_retires: vec![],
2490                    edges: vec![],
2491                    edge_retires: vec![],
2492                    chunks: vec![],
2493                    runs: vec![],
2494                    steps: vec![],
2495                    actions: vec![],
2496                    optional_backfills: vec![],
2497                    vec_inserts: vec![],
2498                    operational_writes: vec![
2499                        crate::OperationalWrite::Append {
2500                            collection: "audit_log".to_owned(),
2501                            record_key: "evt-1".to_owned(),
2502                            payload_json: r#"{"status":"ok"}"#.to_owned(),
2503                            source_ref: Some("src-1".to_owned()),
2504                        },
2505                        crate::OperationalWrite::Append {
2506                            collection: "audit_log".to_owned(),
2507                            record_key: "evt-2".to_owned(),
2508                            payload_json: r#"{"status":"bogus"}"#.to_owned(),
2509                            source_ref: Some("src-2".to_owned()),
2510                        },
2511                    ],
2512                })
2513                .expect("write");
2514        }
2515
2516        let report = service
2517            .validate_operational_collection_history("audit_log")
2518            .expect("validate history");
2519        assert_eq!(report.collection_name, "audit_log");
2520        assert_eq!(report.checked_rows, 2);
2521        assert_eq!(report.invalid_row_count, 1);
2522        assert_eq!(report.issues.len(), 1);
2523        assert_eq!(report.issues[0].record_key, "evt-2");
2524        assert!(report.issues[0].message.contains("must be one of"));
2525
2526        let trace = service
2527            .trace_operational_collection("audit_log", None)
2528            .expect("trace");
2529        assert_eq!(trace.mutation_count, 2);
2530
2531        let conn = sqlite::open_connection(db.path()).expect("conn");
2532        let provenance_count: i64 = conn
2533            .query_row(
2534                "SELECT count(*) FROM provenance_events \
2535                 WHERE event_type = 'operational_collection_history_validated' \
2536                   AND subject = 'audit_log'",
2537                [],
2538                |row| row.get(0),
2539            )
2540            .expect("provenance count");
2541        assert_eq!(provenance_count, 0);
2542    }
2543
2544    #[test]
2545    fn trace_operational_collection_returns_mutations_and_current_rows() {
2546        let (db, service) = setup();
2547        service
2548            .register_operational_collection(&OperationalRegisterRequest {
2549                name: "connector_health".to_owned(),
2550                kind: OperationalCollectionKind::LatestState,
2551                schema_json: "{}".to_owned(),
2552                retention_json: "{}".to_owned(),
2553                filter_fields_json: "[]".to_owned(),
2554                validation_json: String::new(),
2555                secondary_indexes_json: "[]".to_owned(),
2556                format_version: 1,
2557            })
2558            .expect("register collection");
2559        {
2560            let writer = crate::WriterActor::start(
2561                db.path(),
2562                Arc::new(SchemaManager::new()),
2563                crate::ProvenanceMode::Warn,
2564                Arc::new(crate::TelemetryCounters::default()),
2565            )
2566            .expect("writer");
2567            writer
2568                .submit(crate::WriteRequest {
2569                    label: "operational".to_owned(),
2570                    nodes: vec![],
2571                    node_retires: vec![],
2572                    edges: vec![],
2573                    edge_retires: vec![],
2574                    chunks: vec![],
2575                    runs: vec![],
2576                    steps: vec![],
2577                    actions: vec![],
2578                    optional_backfills: vec![],
2579                    vec_inserts: vec![],
2580                    operational_writes: vec![crate::OperationalWrite::Put {
2581                        collection: "connector_health".to_owned(),
2582                        record_key: "gmail".to_owned(),
2583                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2584                        source_ref: Some("src-1".to_owned()),
2585                    }],
2586                })
2587                .expect("write");
2588        }
2589
2590        let report = service
2591            .trace_operational_collection("connector_health", Some("gmail"))
2592            .expect("trace");
2593        assert_eq!(report.collection_name, "connector_health");
2594        assert_eq!(report.record_key.as_deref(), Some("gmail"));
2595        assert_eq!(report.mutation_count, 1);
2596        assert_eq!(report.current_count, 1);
2597        assert_eq!(report.mutations[0].op_kind, "put");
2598        assert_eq!(report.current_rows[0].payload_json, r#"{"status":"ok"}"#);
2599    }
2600
2601    #[test]
2602    fn trace_operational_collection_rejects_unknown_collection() {
2603        let (_db, service) = setup();
2604
2605        let error = service
2606            .trace_operational_collection("missing_collection", None)
2607            .expect_err("unknown collection should fail");
2608
2609        assert!(matches!(error, EngineError::InvalidWrite(_)));
2610        assert!(error.to_string().contains("is not registered"));
2611    }
2612
2613    #[test]
2614    fn rebuild_operational_current_repairs_missing_latest_state_rows() {
2615        let (db, service) = setup();
2616        service
2617            .register_operational_collection(&OperationalRegisterRequest {
2618                name: "connector_health".to_owned(),
2619                kind: OperationalCollectionKind::LatestState,
2620                schema_json: "{}".to_owned(),
2621                retention_json: "{}".to_owned(),
2622                filter_fields_json: "[]".to_owned(),
2623                validation_json: String::new(),
2624                secondary_indexes_json: "[]".to_owned(),
2625                format_version: 1,
2626            })
2627            .expect("register collection");
2628        {
2629            let writer = crate::WriterActor::start(
2630                db.path(),
2631                Arc::new(SchemaManager::new()),
2632                crate::ProvenanceMode::Warn,
2633                Arc::new(crate::TelemetryCounters::default()),
2634            )
2635            .expect("writer");
2636            writer
2637                .submit(crate::WriteRequest {
2638                    label: "operational".to_owned(),
2639                    nodes: vec![],
2640                    node_retires: vec![],
2641                    edges: vec![],
2642                    edge_retires: vec![],
2643                    chunks: vec![],
2644                    runs: vec![],
2645                    steps: vec![],
2646                    actions: vec![],
2647                    optional_backfills: vec![],
2648                    vec_inserts: vec![],
2649                    operational_writes: vec![crate::OperationalWrite::Put {
2650                        collection: "connector_health".to_owned(),
2651                        record_key: "gmail".to_owned(),
2652                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2653                        source_ref: Some("src-1".to_owned()),
2654                    }],
2655                })
2656                .expect("write");
2657        }
2658        {
2659            let conn = sqlite::open_connection(db.path()).expect("conn");
2660            conn.execute(
2661                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2662                [],
2663            )
2664            .expect("delete current row");
2665        }
2666
2667        let before = service.check_semantics().expect("semantics before rebuild");
2668        assert_eq!(before.missing_operational_current_rows, 1);
2669
2670        let repair = service
2671            .rebuild_operational_current(Some("connector_health"))
2672            .expect("rebuild current");
2673        assert_eq!(repair.collections_rebuilt, 1);
2674        assert_eq!(repair.current_rows_rebuilt, 1);
2675
2676        let after = service.check_semantics().expect("semantics after rebuild");
2677        assert_eq!(after.missing_operational_current_rows, 0);
2678
2679        let conn = sqlite::open_connection(db.path()).expect("conn");
2680        let payload: String = conn
2681            .query_row(
2682                "SELECT payload_json FROM operational_current \
2683                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2684                [],
2685                |row| row.get(0),
2686            )
2687            .expect("restored payload");
2688        assert_eq!(payload, r#"{"status":"ok"}"#);
2689    }
2690
2691    #[test]
2692    fn rebuild_operational_current_restores_latest_state_secondary_index_entries() {
2693        let (db, service) = setup();
2694        service
2695            .register_operational_collection(&OperationalRegisterRequest {
2696                name: "connector_health".to_owned(),
2697                kind: OperationalCollectionKind::LatestState,
2698                schema_json: "{}".to_owned(),
2699                retention_json: "{}".to_owned(),
2700                filter_fields_json: "[]".to_owned(),
2701                validation_json: String::new(),
2702                secondary_indexes_json: r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"}]"#.to_owned(),
2703                format_version: 1,
2704            })
2705            .expect("register collection");
2706        {
2707            let writer = crate::WriterActor::start(
2708                db.path(),
2709                Arc::new(SchemaManager::new()),
2710                crate::ProvenanceMode::Warn,
2711                Arc::new(crate::TelemetryCounters::default()),
2712            )
2713            .expect("writer");
2714            writer
2715                .submit(crate::WriteRequest {
2716                    label: "operational".to_owned(),
2717                    nodes: vec![],
2718                    node_retires: vec![],
2719                    edges: vec![],
2720                    edge_retires: vec![],
2721                    chunks: vec![],
2722                    runs: vec![],
2723                    steps: vec![],
2724                    actions: vec![],
2725                    optional_backfills: vec![],
2726                    vec_inserts: vec![],
2727                    operational_writes: vec![crate::OperationalWrite::Put {
2728                        collection: "connector_health".to_owned(),
2729                        record_key: "gmail".to_owned(),
2730                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2731                        source_ref: Some("src-1".to_owned()),
2732                    }],
2733                })
2734                .expect("write");
2735        }
2736        {
2737            let conn = sqlite::open_connection(db.path()).expect("conn");
2738            let entry_count: i64 = conn
2739                .query_row(
2740                    "SELECT count(*) FROM operational_secondary_index_entries \
2741                     WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2742                    [],
2743                    |row| row.get(0),
2744                )
2745                .expect("secondary index count before repair");
2746            assert_eq!(entry_count, 1);
2747            conn.execute(
2748                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2749                [],
2750            )
2751            .expect("delete current row");
2752        }
2753
2754        service
2755            .rebuild_operational_current(Some("connector_health"))
2756            .expect("rebuild current");
2757
2758        let conn = sqlite::open_connection(db.path()).expect("conn");
2759        let entry_count: i64 = conn
2760            .query_row(
2761                "SELECT count(*) FROM operational_secondary_index_entries \
2762                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2763                [],
2764                |row| row.get(0),
2765            )
2766            .expect("secondary index count after repair");
2767        assert_eq!(entry_count, 1);
2768    }
2769
2770    #[test]
2771    fn operational_current_semantics_and_rebuild_follow_mutation_order() {
2772        let (db, service) = setup();
2773        {
2774            let conn = sqlite::open_connection(db.path()).expect("conn");
2775            conn.execute(
2776                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2777                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
2778                [],
2779            )
2780            .expect("seed collection");
2781            conn.execute(
2782                "INSERT INTO operational_mutations \
2783                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2784                 VALUES ('m3', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'src-1', 100, 1)",
2785                [],
2786            )
2787            .expect("seed first put");
2788            conn.execute(
2789                "INSERT INTO operational_mutations \
2790                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2791                 VALUES ('m2', 'connector_health', 'gmail', 'delete', '', 'src-2', 100, 2)",
2792                [],
2793            )
2794            .expect("seed delete");
2795            conn.execute(
2796                "INSERT INTO operational_mutations \
2797                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2798                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'src-3', 100, 3)",
2799                [],
2800            )
2801            .expect("seed final put");
2802            conn.execute(
2803                "INSERT INTO operational_current \
2804                 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2805                 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 100, 'm1')",
2806                [],
2807            )
2808            .expect("seed current");
2809        }
2810
2811        let before = service.check_semantics().expect("semantics before rebuild");
2812        assert_eq!(before.missing_operational_current_rows, 0);
2813        assert_eq!(before.stale_operational_current_rows, 0);
2814
2815        {
2816            let conn = sqlite::open_connection(db.path()).expect("conn");
2817            conn.execute(
2818                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2819                [],
2820            )
2821            .expect("delete current row");
2822        }
2823
2824        let missing = service.check_semantics().expect("semantics after delete");
2825        assert_eq!(missing.missing_operational_current_rows, 1);
2826        assert_eq!(missing.stale_operational_current_rows, 0);
2827
2828        service
2829            .rebuild_operational_current(Some("connector_health"))
2830            .expect("rebuild current");
2831
2832        let after = service.check_semantics().expect("semantics after rebuild");
2833        assert_eq!(after.missing_operational_current_rows, 0);
2834        assert_eq!(after.stale_operational_current_rows, 0);
2835
2836        let conn = sqlite::open_connection(db.path()).expect("conn");
2837        let payload: String = conn
2838            .query_row(
2839                "SELECT payload_json FROM operational_current \
2840                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2841                [],
2842                |row| row.get(0),
2843            )
2844            .expect("restored payload");
2845        assert_eq!(payload, r#"{"status":"new"}"#);
2846    }
2847
2848    #[test]
2849    fn disable_operational_collection_sets_disabled_at_and_emits_provenance() {
2850        let (db, service) = setup();
2851        service
2852            .register_operational_collection(&OperationalRegisterRequest {
2853                name: "audit_log".to_owned(),
2854                kind: OperationalCollectionKind::AppendOnlyLog,
2855                schema_json: "{}".to_owned(),
2856                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2857                filter_fields_json: "[]".to_owned(),
2858                validation_json: String::new(),
2859                secondary_indexes_json: "[]".to_owned(),
2860                format_version: 1,
2861            })
2862            .expect("register collection");
2863
2864        let record = service
2865            .disable_operational_collection("audit_log")
2866            .expect("disable collection");
2867        assert_eq!(record.name, "audit_log");
2868        assert!(record.disabled_at.is_some());
2869
2870        let disabled_at = record.disabled_at.expect("disabled_at");
2871        let described = service
2872            .describe_operational_collection("audit_log")
2873            .expect("describe collection")
2874            .expect("collection exists");
2875        assert_eq!(described.disabled_at, Some(disabled_at));
2876
2877        let writer = crate::WriterActor::start(
2878            db.path(),
2879            Arc::new(SchemaManager::new()),
2880            crate::ProvenanceMode::Warn,
2881            Arc::new(crate::TelemetryCounters::default()),
2882        )
2883        .expect("writer");
2884        let error = writer
2885            .submit(crate::WriteRequest {
2886                label: "disabled-operational".to_owned(),
2887                nodes: vec![],
2888                node_retires: vec![],
2889                edges: vec![],
2890                edge_retires: vec![],
2891                chunks: vec![],
2892                runs: vec![],
2893                steps: vec![],
2894                actions: vec![],
2895                optional_backfills: vec![],
2896                vec_inserts: vec![],
2897                operational_writes: vec![crate::OperationalWrite::Append {
2898                    collection: "audit_log".to_owned(),
2899                    record_key: "evt-1".to_owned(),
2900                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2901                    source_ref: Some("src-1".to_owned()),
2902                }],
2903            })
2904            .expect_err("disabled collection should reject writes");
2905        assert!(matches!(error, EngineError::InvalidWrite(_)));
2906        assert!(error.to_string().contains("is disabled"));
2907
2908        let conn = sqlite::open_connection(db.path()).expect("conn");
2909        let provenance_count: i64 = conn
2910            .query_row(
2911                "SELECT count(*) FROM provenance_events \
2912                 WHERE event_type = 'operational_collection_disabled' AND subject = 'audit_log'",
2913                [],
2914                |row| row.get(0),
2915            )
2916            .expect("provenance count");
2917        assert_eq!(provenance_count, 1);
2918    }
2919
2920    #[test]
2921    fn purge_operational_collection_deletes_append_only_rows_before_cutoff() {
2922        let (db, service) = setup();
2923        {
2924            let conn = sqlite::open_connection(db.path()).expect("conn");
2925            conn.execute(
2926                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2927                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_all\"}', 1, 100)",
2928                [],
2929            )
2930            .expect("seed collection");
2931            conn.execute(
2932                "INSERT INTO operational_mutations \
2933                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2934                 VALUES ('evt-1', 'audit_log', 'evt-1', 'append', '{\"seq\":1}', 'src-1', 100, 1)",
2935                [],
2936            )
2937            .expect("seed event 1");
2938            conn.execute(
2939                "INSERT INTO operational_mutations \
2940                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2941                 VALUES ('evt-2', 'audit_log', 'evt-2', 'append', '{\"seq\":2}', 'src-2', 200, 2)",
2942                [],
2943            )
2944            .expect("seed event 2");
2945            conn.execute(
2946                "INSERT INTO operational_mutations \
2947                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2948                 VALUES ('evt-3', 'audit_log', 'evt-3', 'append', '{\"seq\":3}', 'src-3', 300, 3)",
2949                [],
2950            )
2951            .expect("seed event 3");
2952        }
2953
2954        let report = service
2955            .purge_operational_collection("audit_log", 250)
2956            .expect("purge collection");
2957        assert_eq!(report.collection_name, "audit_log");
2958        assert_eq!(report.deleted_mutations, 2);
2959        assert_eq!(report.before_timestamp, 250);
2960
2961        let conn = sqlite::open_connection(db.path()).expect("conn");
2962        let remaining: Vec<String> = {
2963            let mut stmt = conn
2964                .prepare(
2965                    "SELECT id FROM operational_mutations \
2966                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2967                )
2968                .expect("stmt");
2969            stmt.query_map([], |row| row.get(0))
2970                .expect("rows")
2971                .collect::<Result<_, _>>()
2972                .expect("collect")
2973        };
2974        assert_eq!(remaining, vec!["evt-3".to_owned()]);
2975        let provenance_count: i64 = conn
2976            .query_row(
2977                "SELECT count(*) FROM provenance_events \
2978                 WHERE event_type = 'operational_collection_purged' AND subject = 'audit_log'",
2979                [],
2980                |row| row.get(0),
2981            )
2982            .expect("provenance count");
2983        assert_eq!(provenance_count, 1);
2984    }
2985
2986    #[test]
2987    fn compact_operational_collection_dry_run_reports_without_mutation() {
2988        let (db, service) = setup();
2989        {
2990            let conn = sqlite::open_connection(db.path()).expect("conn");
2991            conn.execute(
2992                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2993                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2994                [],
2995            )
2996            .expect("seed collection");
2997            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2998                conn.execute(
2999                    "INSERT INTO operational_mutations \
3000                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3001                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3002                    rusqlite::params![
3003                        format!("evt-{index}"),
3004                        format!("{{\"seq\":{index}}}"),
3005                        created_at,
3006                        index,
3007                    ],
3008                )
3009                .expect("seed event");
3010            }
3011        }
3012
3013        let report = service
3014            .compact_operational_collection("audit_log", true)
3015            .expect("compact collection");
3016        assert_eq!(report.collection_name, "audit_log");
3017        assert_eq!(report.deleted_mutations, 1);
3018        assert!(report.dry_run);
3019        assert_eq!(report.before_timestamp, None);
3020
3021        let conn = sqlite::open_connection(db.path()).expect("conn");
3022        let remaining_count: i64 = conn
3023            .query_row(
3024                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
3025                [],
3026                |row| row.get(0),
3027            )
3028            .expect("remaining count");
3029        assert_eq!(remaining_count, 3);
3030        let provenance_count: i64 = conn
3031            .query_row(
3032                "SELECT count(*) FROM provenance_events \
3033                 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
3034                [],
3035                |row| row.get(0),
3036            )
3037            .expect("provenance count");
3038        assert_eq!(provenance_count, 0);
3039    }
3040
3041    #[test]
3042    fn compact_operational_collection_keep_last_deletes_oldest_rows() {
3043        let (db, service) = setup();
3044        {
3045            let conn = sqlite::open_connection(db.path()).expect("conn");
3046            conn.execute(
3047                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3048                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3049                [],
3050            )
3051            .expect("seed collection");
3052            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
3053                conn.execute(
3054                    "INSERT INTO operational_mutations \
3055                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3056                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3057                    rusqlite::params![
3058                        format!("evt-{index}"),
3059                        format!("{{\"seq\":{index}}}"),
3060                        created_at,
3061                        index,
3062                    ],
3063                )
3064                .expect("seed event");
3065            }
3066        }
3067
3068        let report = service
3069            .compact_operational_collection("audit_log", false)
3070            .expect("compact collection");
3071        assert_eq!(report.deleted_mutations, 1);
3072        assert!(!report.dry_run);
3073
3074        let conn = sqlite::open_connection(db.path()).expect("conn");
3075        let remaining: Vec<String> = {
3076            let mut stmt = conn
3077                .prepare(
3078                    "SELECT id FROM operational_mutations \
3079                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
3080                )
3081                .expect("stmt");
3082            stmt.query_map([], |row| row.get(0))
3083                .expect("rows")
3084                .collect::<Result<_, _>>()
3085                .expect("collect")
3086        };
3087        assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
3088        let provenance_count: i64 = conn
3089            .query_row(
3090                "SELECT count(*) FROM provenance_events \
3091                 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
3092                [],
3093                |row| row.get(0),
3094            )
3095            .expect("provenance count");
3096        assert_eq!(provenance_count, 1);
3097    }
3098
3099    #[test]
3100    fn plan_and_run_operational_retention_keep_last() {
3101        let (db, service) = setup();
3102        {
3103            let conn = sqlite::open_connection(db.path()).expect("conn");
3104            conn.execute(
3105                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3106                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3107                [],
3108            )
3109            .expect("seed collection");
3110            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
3111                conn.execute(
3112                    "INSERT INTO operational_mutations \
3113                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3114                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3115                    rusqlite::params![
3116                        format!("evt-{index}"),
3117                        format!("{{\"seq\":{index}}}"),
3118                        created_at,
3119                        index,
3120                    ],
3121                )
3122                .expect("seed event");
3123            }
3124        }
3125
3126        let plan = service
3127            .plan_operational_retention(1_000, None, Some(10))
3128            .expect("plan retention");
3129        assert_eq!(plan.collections_examined, 1);
3130        assert_eq!(plan.items[0].collection_name, "audit_log");
3131        assert_eq!(
3132            plan.items[0].action_kind,
3133            crate::operational::OperationalRetentionActionKind::KeepLast
3134        );
3135        assert_eq!(plan.items[0].candidate_deletions, 1);
3136        assert_eq!(plan.items[0].max_rows, Some(2));
3137        assert_eq!(plan.items[0].last_run_at, None);
3138
3139        let dry_run = service
3140            .run_operational_retention(1_000, None, Some(10), true)
3141            .expect("dry-run retention");
3142        assert!(dry_run.dry_run);
3143        assert_eq!(dry_run.collections_acted_on, 1);
3144        assert_eq!(dry_run.items[0].deleted_mutations, 1);
3145        assert_eq!(dry_run.items[0].rows_remaining, 2);
3146
3147        let conn = sqlite::open_connection(db.path()).expect("conn");
3148        let remaining_count: i64 = conn
3149            .query_row(
3150                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
3151                [],
3152                |row| row.get(0),
3153            )
3154            .expect("remaining count after dry run");
3155        assert_eq!(remaining_count, 3);
3156        let retention_run_count: i64 = conn
3157            .query_row(
3158                "SELECT count(*) FROM operational_retention_runs WHERE collection_name = 'audit_log'",
3159                [],
3160                |row| row.get(0),
3161            )
3162            .expect("retention run count");
3163        assert_eq!(retention_run_count, 0);
3164        drop(conn);
3165
3166        let executed = service
3167            .run_operational_retention(1_000, None, Some(10), false)
3168            .expect("execute retention");
3169        assert_eq!(executed.collections_acted_on, 1);
3170        assert_eq!(executed.items[0].deleted_mutations, 1);
3171        assert_eq!(executed.items[0].rows_remaining, 2);
3172
3173        let conn = sqlite::open_connection(db.path()).expect("conn");
3174        let remaining: Vec<String> = {
3175            let mut stmt = conn
3176                .prepare(
3177                    "SELECT id FROM operational_mutations \
3178                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
3179                )
3180                .expect("stmt");
3181            stmt.query_map([], |row| row.get(0))
3182                .expect("rows")
3183                .collect::<Result<_, _>>()
3184                .expect("collect")
3185        };
3186        assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
3187        let last_run_at: i64 = conn
3188            .query_row(
3189                "SELECT executed_at FROM operational_retention_runs \
3190                 WHERE collection_name = 'audit_log' ORDER BY executed_at DESC LIMIT 1",
3191                [],
3192                |row| row.get(0),
3193            )
3194            .expect("last run at");
3195        assert_eq!(last_run_at, 1_000);
3196    }
3197
3198    #[test]
3199    fn dry_run_operational_retention_does_not_mark_noop_collection_as_acted_on() {
3200        let (db, service) = setup();
3201        let conn = sqlite::open_connection(db.path()).expect("conn");
3202        conn.execute(
3203            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3204             VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3205            [],
3206        )
3207        .expect("seed collection");
3208        for (index, created_at) in [(1_i64, 100_i64), (2, 200)] {
3209            conn.execute(
3210                "INSERT INTO operational_mutations \
3211                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3212                 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3213                rusqlite::params![
3214                    format!("evt-{index}"),
3215                    format!("{{\"seq\":{index}}}"),
3216                    created_at,
3217                    index,
3218                ],
3219            )
3220            .expect("seed event");
3221        }
3222        drop(conn);
3223
3224        let dry_run = service
3225            .run_operational_retention(1_000, None, Some(10), true)
3226            .expect("dry-run retention");
3227        assert!(dry_run.dry_run);
3228        assert_eq!(dry_run.collections_acted_on, 0);
3229        assert_eq!(dry_run.items[0].deleted_mutations, 0);
3230        assert_eq!(dry_run.items[0].rows_remaining, 2);
3231    }
3232
3233    #[test]
3234    fn compact_operational_collection_rejects_latest_state() {
3235        let (_db, service) = setup();
3236        service
3237            .register_operational_collection(&OperationalRegisterRequest {
3238                name: "connector_health".to_owned(),
3239                kind: OperationalCollectionKind::LatestState,
3240                schema_json: "{}".to_owned(),
3241                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3242                filter_fields_json: "[]".to_owned(),
3243                validation_json: String::new(),
3244                secondary_indexes_json: "[]".to_owned(),
3245                format_version: 1,
3246            })
3247            .expect("register collection");
3248
3249        let error = service
3250            .compact_operational_collection("connector_health", false)
3251            .expect_err("latest_state compaction should be rejected");
3252        assert!(matches!(error, EngineError::InvalidWrite(_)));
3253        assert!(error.to_string().contains("append_only_log"));
3254    }
3255
3256    #[test]
3257    fn register_operational_collection_persists_filter_fields_json() {
3258        let (_db, service) = setup();
3259
3260        let record = service
3261            .register_operational_collection(&OperationalRegisterRequest {
3262                name: "audit_log".to_owned(),
3263                kind: OperationalCollectionKind::AppendOnlyLog,
3264                schema_json: "{}".to_owned(),
3265                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3266                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3267                validation_json: String::new(),
3268                secondary_indexes_json: "[]".to_owned(),
3269                format_version: 1,
3270            })
3271            .expect("register collection");
3272
3273        assert_eq!(
3274            record.filter_fields_json,
3275            r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#
3276        );
3277    }
3278
3279    #[test]
3280    fn read_operational_collection_filters_append_only_rows_by_declared_fields() {
3281        let (db, service) = setup();
3282        service
3283            .register_operational_collection(&OperationalRegisterRequest {
3284                name: "audit_log".to_owned(),
3285                kind: OperationalCollectionKind::AppendOnlyLog,
3286                schema_json: "{}".to_owned(),
3287                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3288                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"seq","type":"integer","modes":["exact","range"]},{"name":"ts","type":"timestamp","modes":["exact","range"]}]"#.to_owned(),
3289                validation_json: String::new(),
3290                secondary_indexes_json: "[]".to_owned(),
3291                format_version: 1,
3292            })
3293            .expect("register collection");
3294        {
3295            let writer = crate::WriterActor::start(
3296                db.path(),
3297                Arc::new(SchemaManager::new()),
3298                crate::ProvenanceMode::Warn,
3299                Arc::new(crate::TelemetryCounters::default()),
3300            )
3301            .expect("writer");
3302            writer
3303                .submit(crate::WriteRequest {
3304                    label: "operational".to_owned(),
3305                    nodes: vec![],
3306                    node_retires: vec![],
3307                    edges: vec![],
3308                    edge_retires: vec![],
3309                    chunks: vec![],
3310                    runs: vec![],
3311                    steps: vec![],
3312                    actions: vec![],
3313                    optional_backfills: vec![],
3314                    vec_inserts: vec![],
3315                    operational_writes: vec![
3316                        crate::OperationalWrite::Append {
3317                            collection: "audit_log".to_owned(),
3318                            record_key: "evt-1".to_owned(),
3319                            payload_json: r#"{"actor":"alice","seq":1,"ts":100}"#.to_owned(),
3320                            source_ref: Some("src-1".to_owned()),
3321                        },
3322                        crate::OperationalWrite::Append {
3323                            collection: "audit_log".to_owned(),
3324                            record_key: "evt-2".to_owned(),
3325                            payload_json: r#"{"actor":"alice-admin","seq":2,"ts":200}"#.to_owned(),
3326                            source_ref: Some("src-2".to_owned()),
3327                        },
3328                        crate::OperationalWrite::Append {
3329                            collection: "audit_log".to_owned(),
3330                            record_key: "evt-3".to_owned(),
3331                            payload_json: r#"{"actor":"bob","seq":3,"ts":300}"#.to_owned(),
3332                            source_ref: Some("src-3".to_owned()),
3333                        },
3334                    ],
3335                })
3336                .expect("write");
3337        }
3338
3339        let report = service
3340            .read_operational_collection(&crate::operational::OperationalReadRequest {
3341                collection_name: "audit_log".to_owned(),
3342                filters: vec![
3343                    crate::operational::OperationalFilterClause::Prefix {
3344                        field: "actor".to_owned(),
3345                        value: "alice".to_owned(),
3346                    },
3347                    crate::operational::OperationalFilterClause::Range {
3348                        field: "ts".to_owned(),
3349                        lower: Some(150),
3350                        upper: Some(250),
3351                    },
3352                ],
3353                limit: Some(10),
3354            })
3355            .expect("filtered read");
3356
3357        assert_eq!(report.collection_name, "audit_log");
3358        assert_eq!(report.row_count, 1);
3359        assert!(!report.was_limited);
3360        assert_eq!(report.rows.len(), 1);
3361        assert_eq!(report.rows[0].record_key, "evt-2");
3362        assert_eq!(
3363            report.rows[0].payload_json,
3364            r#"{"actor":"alice-admin","seq":2,"ts":200}"#
3365        );
3366    }
3367
3368    #[test]
3369    fn read_operational_collection_uses_secondary_index_when_filter_values_are_missing() {
3370        let (db, service) = setup();
3371        service
3372            .register_operational_collection(&OperationalRegisterRequest {
3373                name: "audit_log".to_owned(),
3374                kind: OperationalCollectionKind::AppendOnlyLog,
3375                schema_json: "{}".to_owned(),
3376                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3377                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3378                validation_json: String::new(),
3379                secondary_indexes_json: r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#.to_owned(),
3380                format_version: 1,
3381            })
3382            .expect("register collection");
3383        {
3384            let writer = crate::WriterActor::start(
3385                db.path(),
3386                Arc::new(SchemaManager::new()),
3387                crate::ProvenanceMode::Warn,
3388                Arc::new(crate::TelemetryCounters::default()),
3389            )
3390            .expect("writer");
3391            writer
3392                .submit(crate::WriteRequest {
3393                    label: "operational".to_owned(),
3394                    nodes: vec![],
3395                    node_retires: vec![],
3396                    edges: vec![],
3397                    edge_retires: vec![],
3398                    chunks: vec![],
3399                    runs: vec![],
3400                    steps: vec![],
3401                    actions: vec![],
3402                    optional_backfills: vec![],
3403                    vec_inserts: vec![],
3404                    operational_writes: vec![
3405                        crate::OperationalWrite::Append {
3406                            collection: "audit_log".to_owned(),
3407                            record_key: "evt-1".to_owned(),
3408                            payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
3409                            source_ref: Some("src-1".to_owned()),
3410                        },
3411                        crate::OperationalWrite::Append {
3412                            collection: "audit_log".to_owned(),
3413                            record_key: "evt-2".to_owned(),
3414                            payload_json: r#"{"actor":"alice-admin","ts":200}"#.to_owned(),
3415                            source_ref: Some("src-2".to_owned()),
3416                        },
3417                    ],
3418                })
3419                .expect("write");
3420        }
3421        let conn = sqlite::open_connection(db.path()).expect("conn");
3422        conn.execute(
3423            "DELETE FROM operational_filter_values WHERE collection_name = 'audit_log'",
3424            [],
3425        )
3426        .expect("clear filter values");
3427        drop(conn);
3428
3429        let report = service
3430            .read_operational_collection(&crate::operational::OperationalReadRequest {
3431                collection_name: "audit_log".to_owned(),
3432                filters: vec![
3433                    crate::operational::OperationalFilterClause::Prefix {
3434                        field: "actor".to_owned(),
3435                        value: "alice".to_owned(),
3436                    },
3437                    crate::operational::OperationalFilterClause::Range {
3438                        field: "ts".to_owned(),
3439                        lower: Some(150),
3440                        upper: Some(250),
3441                    },
3442                ],
3443                limit: Some(10),
3444            })
3445            .expect("secondary-index read");
3446
3447        assert_eq!(report.row_count, 1);
3448        assert_eq!(report.rows[0].record_key, "evt-2");
3449    }
3450
3451    #[test]
3452    fn read_operational_collection_rejects_undeclared_fields_and_latest_state_collections() {
3453        let (_db, service) = setup();
3454        service
3455            .register_operational_collection(&OperationalRegisterRequest {
3456                name: "connector_health".to_owned(),
3457                kind: OperationalCollectionKind::LatestState,
3458                schema_json: "{}".to_owned(),
3459                retention_json: "{}".to_owned(),
3460                filter_fields_json: r#"[{"name":"status","type":"string","modes":["exact"]}]"#
3461                    .to_owned(),
3462                validation_json: String::new(),
3463                secondary_indexes_json: "[]".to_owned(),
3464                format_version: 1,
3465            })
3466            .expect("register collection");
3467
3468        let latest_state_error = service
3469            .read_operational_collection(&crate::operational::OperationalReadRequest {
3470                collection_name: "connector_health".to_owned(),
3471                filters: vec![crate::operational::OperationalFilterClause::Exact {
3472                    field: "status".to_owned(),
3473                    value: crate::operational::OperationalFilterValue::String("ok".to_owned()),
3474                }],
3475                limit: Some(10),
3476            })
3477            .expect_err("latest_state filtered reads should be rejected");
3478        assert!(latest_state_error.to_string().contains("append_only_log"));
3479
3480        service
3481            .register_operational_collection(&OperationalRegisterRequest {
3482                name: "audit_log".to_owned(),
3483                kind: OperationalCollectionKind::AppendOnlyLog,
3484                schema_json: "{}".to_owned(),
3485                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3486                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact"]}]"#
3487                    .to_owned(),
3488                validation_json: String::new(),
3489                secondary_indexes_json: "[]".to_owned(),
3490                format_version: 1,
3491            })
3492            .expect("register append-only collection");
3493
3494        let undeclared_error = service
3495            .read_operational_collection(&crate::operational::OperationalReadRequest {
3496                collection_name: "audit_log".to_owned(),
3497                filters: vec![crate::operational::OperationalFilterClause::Exact {
3498                    field: "missing".to_owned(),
3499                    value: crate::operational::OperationalFilterValue::String("x".to_owned()),
3500                }],
3501                limit: Some(10),
3502            })
3503            .expect_err("undeclared field should be rejected");
3504        assert!(undeclared_error.to_string().contains("undeclared"));
3505    }
3506
3507    #[test]
3508    fn read_operational_collection_applies_limit_and_reports_truncation() {
3509        let (db, service) = setup();
3510        service
3511            .register_operational_collection(&OperationalRegisterRequest {
3512                name: "audit_log".to_owned(),
3513                kind: OperationalCollectionKind::AppendOnlyLog,
3514                schema_json: "{}".to_owned(),
3515                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3516                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["prefix"]}]"#
3517                    .to_owned(),
3518                validation_json: String::new(),
3519                secondary_indexes_json: "[]".to_owned(),
3520                format_version: 1,
3521            })
3522            .expect("register collection");
3523        {
3524            let writer = crate::WriterActor::start(
3525                db.path(),
3526                Arc::new(SchemaManager::new()),
3527                crate::ProvenanceMode::Warn,
3528                Arc::new(crate::TelemetryCounters::default()),
3529            )
3530            .expect("writer");
3531            writer
3532                .submit(crate::WriteRequest {
3533                    label: "operational".to_owned(),
3534                    nodes: vec![],
3535                    node_retires: vec![],
3536                    edges: vec![],
3537                    edge_retires: vec![],
3538                    chunks: vec![],
3539                    runs: vec![],
3540                    steps: vec![],
3541                    actions: vec![],
3542                    optional_backfills: vec![],
3543                    vec_inserts: vec![],
3544                    operational_writes: vec![
3545                        crate::OperationalWrite::Append {
3546                            collection: "audit_log".to_owned(),
3547                            record_key: "evt-1".to_owned(),
3548                            payload_json: r#"{"actor":"alice-1"}"#.to_owned(),
3549                            source_ref: Some("src-1".to_owned()),
3550                        },
3551                        crate::OperationalWrite::Append {
3552                            collection: "audit_log".to_owned(),
3553                            record_key: "evt-2".to_owned(),
3554                            payload_json: r#"{"actor":"alice-2"}"#.to_owned(),
3555                            source_ref: Some("src-2".to_owned()),
3556                        },
3557                    ],
3558                })
3559                .expect("write");
3560        }
3561
3562        let report = service
3563            .read_operational_collection(&crate::operational::OperationalReadRequest {
3564                collection_name: "audit_log".to_owned(),
3565                filters: vec![crate::operational::OperationalFilterClause::Prefix {
3566                    field: "actor".to_owned(),
3567                    value: "alice".to_owned(),
3568                }],
3569                limit: Some(1),
3570            })
3571            .expect("limited read");
3572
3573        assert_eq!(report.row_count, 1);
3574        assert_eq!(report.applied_limit, 1);
3575        assert!(report.was_limited);
3576        assert_eq!(report.rows[0].record_key, "evt-2");
3577    }
3578
3579    #[test]
3580    fn preexisting_operational_collection_can_gain_filter_contract_after_upgrade() {
3581        let db = NamedTempFile::new().expect("temp db");
3582        let conn = sqlite::open_connection(db.path()).expect("conn");
3583        conn.execute_batch(
3584            r#"
3585            CREATE TABLE operational_collections (
3586                name TEXT PRIMARY KEY,
3587                kind TEXT NOT NULL,
3588                schema_json TEXT NOT NULL,
3589                retention_json TEXT NOT NULL,
3590                format_version INTEGER NOT NULL DEFAULT 1,
3591                created_at INTEGER NOT NULL DEFAULT 100,
3592                disabled_at INTEGER
3593            );
3594            CREATE TABLE operational_mutations (
3595                id TEXT PRIMARY KEY,
3596                collection_name TEXT NOT NULL,
3597                record_key TEXT NOT NULL,
3598                op_kind TEXT NOT NULL,
3599                payload_json TEXT NOT NULL,
3600                source_ref TEXT,
3601                created_at INTEGER NOT NULL DEFAULT 100,
3602                mutation_order INTEGER NOT NULL DEFAULT 1
3603            );
3604            INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at)
3605            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}', 1, 100);
3606            INSERT INTO operational_mutations
3607                (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order)
3608            VALUES
3609                ('evt-1', 'audit_log', 'evt-1', 'append', '{"actor":"alice","ts":0}', 'src-1', 100, 1);
3610            "#,
3611        )
3612        .expect("seed pre-v10 schema");
3613        drop(conn);
3614
3615        let service = AdminService::new(db.path(), Arc::new(SchemaManager::new()));
3616        let pre_update = service
3617            .read_operational_collection(&crate::operational::OperationalReadRequest {
3618                collection_name: "audit_log".to_owned(),
3619                filters: vec![crate::operational::OperationalFilterClause::Exact {
3620                    field: "actor".to_owned(),
3621                    value: crate::operational::OperationalFilterValue::String("alice".to_owned()),
3622                }],
3623                limit: Some(10),
3624            })
3625            .expect_err("read should reject undeclared fields before migration update");
3626        assert!(pre_update.to_string().contains("undeclared"));
3627
3628        let updated = service
3629            .update_operational_collection_filters(
3630                "audit_log",
3631                r#"[{"name":"actor","type":"string","modes":["exact"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#,
3632            )
3633            .expect("update filter contract");
3634        assert!(updated.filter_fields_json.contains("\"actor\""));
3635
3636        let report = service
3637            .read_operational_collection(&crate::operational::OperationalReadRequest {
3638                collection_name: "audit_log".to_owned(),
3639                filters: vec![crate::operational::OperationalFilterClause::Range {
3640                    field: "ts".to_owned(),
3641                    lower: Some(0),
3642                    upper: Some(0),
3643                }],
3644                limit: Some(10),
3645            })
3646            .expect("read after explicit filter update");
3647        assert_eq!(report.row_count, 1);
3648        assert_eq!(report.rows[0].record_key, "evt-1");
3649    }
3650
3651    #[cfg(feature = "sqlite-vec")]
3652    #[test]
3653    fn check_semantics_detects_stale_vec_rows() {
3654        use crate::sqlite::open_connection_with_vec;
3655
3656        let db = NamedTempFile::new().expect("temp file");
3657        let schema = Arc::new(SchemaManager::new());
3658        {
3659            let conn = open_connection_with_vec(db.path()).expect("vec conn");
3660            schema.bootstrap(&conn).expect("bootstrap");
3661            schema
3662                .ensure_vec_kind_profile(&conn, "Doc", 3)
3663                .expect("vec kind profile");
3664            // Insert a vec row whose chunk does not exist.
3665            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
3666                .iter()
3667                .flat_map(|f| f.to_le_bytes())
3668                .collect();
3669            let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
3670            conn.execute(
3671                &format!(
3672                    "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('ghost-chunk', ?1)"
3673                ),
3674                rusqlite::params![bytes],
3675            )
3676            .expect("insert stale vec row");
3677        }
3678        let service = AdminService::new(db.path(), Arc::clone(&schema));
3679        let report = service.check_semantics().expect("semantics check");
3680        assert_eq!(report.stale_vec_rows, 1);
3681        assert!(
3682            report.warnings.iter().any(|w| w.contains("stale vec")),
3683            "warning must mention stale vec"
3684        );
3685    }
3686
3687    #[cfg(feature = "sqlite-vec")]
3688    #[test]
3689    fn restore_vector_profiles_recreates_vec_table_from_metadata() {
3690        let db = NamedTempFile::new().expect("temp file");
3691        let schema = Arc::new(SchemaManager::new());
3692        {
3693            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3694            schema.bootstrap(&conn).expect("bootstrap");
3695            conn.execute(
3696                "INSERT INTO vector_profiles (profile, table_name, dimension, enabled) \
3697                 VALUES ('default', 'vec_nodes_active', 3, 1)",
3698                [],
3699            )
3700            .expect("insert vector profile");
3701        }
3702
3703        let service = AdminService::new(db.path(), Arc::clone(&schema));
3704        let report = service
3705            .restore_vector_profiles()
3706            .expect("restore vector profiles");
3707        assert_eq!(
3708            report.targets,
3709            vec![crate::projection::ProjectionTarget::Vec]
3710        );
3711        assert_eq!(report.rebuilt_rows, 1);
3712
3713        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3714        let count: i64 = conn
3715            .query_row(
3716                "SELECT count(*) FROM sqlite_schema WHERE name = 'vec_nodes_active'",
3717                [],
3718                |row| row.get(0),
3719            )
3720            .expect("vec schema count");
3721        assert_eq!(count, 1, "vec table should exist after restore");
3722    }
3723
3724    #[cfg(feature = "sqlite-vec")]
3725    #[test]
3726    fn load_vector_regeneration_config_supports_json_and_toml() {
3727        let dir = tempfile::tempdir().expect("temp dir");
3728        let json_path = dir.path().join("regen.json");
3729        let toml_path = dir.path().join("regen.toml");
3730
3731        let config = VectorRegenerationConfig {
3732            kind: "Document".to_owned(),
3733            profile: "default".to_owned(),
3734            chunking_policy: "per_chunk".to_owned(),
3735            preprocessing_policy: "trim".to_owned(),
3736        };
3737
3738        fs::write(&json_path, serde_json::to_string(&config).expect("json")).expect("write json");
3739        fs::write(&toml_path, toml::to_string(&config).expect("toml")).expect("write toml");
3740
3741        let parsed_json = load_vector_regeneration_config(&json_path).expect("json parse");
3742        let parsed_toml = load_vector_regeneration_config(&toml_path).expect("toml parse");
3743
3744        assert_eq!(parsed_json, config);
3745        assert_eq!(parsed_toml, config);
3746    }
3747
3748    /// The 0.4.0 rewrite removed the identity fields from the config.
3749    /// Any client that still serializes the pre-0.4 fields must be
3750    /// rejected AT THE SERDE BOUNDARY with a clear error — never
3751    /// silently accepted.
3752    #[test]
3753    fn regenerate_vector_embeddings_config_rejects_old_identity_fields() {
3754        // Pre-0.5.0 configs that include old fields (table_name, model_identity, etc.)
3755        // must be rejected at the serde boundary due to deny_unknown_fields.
3756        let legacy_json = r#"{
3757            "kind": "Document",
3758            "profile": "default",
3759            "table_name": "vec_nodes_active",
3760            "model_identity": "old-model",
3761            "model_version": "1.0",
3762            "dimension": 4,
3763            "normalization_policy": "l2",
3764            "chunking_policy": "per_chunk",
3765            "preprocessing_policy": "trim",
3766            "generator_command": ["/bin/echo"]
3767        }"#;
3768        let result: Result<VectorRegenerationConfig, _> = serde_json::from_str(legacy_json);
3769        assert!(
3770            result.is_err(),
3771            "legacy identity fields must be rejected at deserialization"
3772        );
3773    }
3774
3775    #[cfg(all(not(feature = "sqlite-vec"), unix))]
3776    #[test]
3777    fn regenerate_vector_embeddings_unsupported_vec_capability_writes_request_and_failed_audit() {
3778        let db = NamedTempFile::new().expect("temp file");
3779        let schema = Arc::new(SchemaManager::new());
3780
3781        {
3782            let conn = sqlite::open_connection(db.path()).expect("connection");
3783            schema.bootstrap(&conn).expect("bootstrap");
3784            conn.execute(
3785                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3786                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3787                [],
3788            )
3789            .expect("insert node");
3790            conn.execute(
3791                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3792                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3793                [],
3794            )
3795            .expect("insert chunk");
3796        }
3797
3798        let service = AdminService::new(db.path(), Arc::clone(&schema));
3799        let embedder = TestEmbedder::new("test-model", 4);
3800        let error = service
3801            .regenerate_vector_embeddings(
3802                &embedder,
3803                &VectorRegenerationConfig {
3804                    kind: "Document".to_owned(),
3805                    profile: "default".to_owned(),
3806                    chunking_policy: "per_chunk".to_owned(),
3807                    preprocessing_policy: "trim".to_owned(),
3808                },
3809            )
3810            .expect_err("sqlite-vec capability should be required");
3811
3812        assert!(error.to_string().contains("unsupported vec capability"));
3813
3814        let conn = sqlite::open_connection(db.path()).expect("connection");
3815        let request_count: i64 = conn
3816            .query_row(
3817                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3818                [],
3819                |row| row.get(0),
3820            )
3821            .expect("request count");
3822        assert_eq!(request_count, 1);
3823        let failed_count: i64 = conn
3824            .query_row(
3825                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3826                [],
3827                |row| row.get(0),
3828            )
3829            .expect("failed count");
3830        assert_eq!(failed_count, 1);
3831        let metadata_json: String = conn
3832            .query_row(
3833                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3834                [],
3835                |row| row.get(0),
3836            )
3837            .expect("failed metadata");
3838        assert!(metadata_json.contains("\"failure_class\":\"unsupported vec capability\""));
3839    }
3840
3841    #[cfg(feature = "sqlite-vec")]
3842    #[test]
3843    #[allow(clippy::too_many_lines)]
3844    fn regenerate_vector_embeddings_rebuilds_embeddings_via_embedder() {
3845        let db = NamedTempFile::new().expect("temp file");
3846        let schema = Arc::new(SchemaManager::new());
3847
3848        {
3849            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3850            schema.bootstrap(&conn).expect("bootstrap");
3851            conn.execute(
3852                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3853                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3854                [],
3855            )
3856            .expect("insert node");
3857            conn.execute(
3858                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3859                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3860                [],
3861            )
3862            .expect("insert chunk 1");
3863            conn.execute(
3864                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3865                 VALUES ('chunk-2', 'doc-1', 'travel plan', 101)",
3866                [],
3867            )
3868            .expect("insert chunk 2");
3869        }
3870
3871        let service = AdminService::new(db.path(), Arc::clone(&schema));
3872        let embedder = TestEmbedder::new("test-model", 4);
3873        let report = service
3874            .regenerate_vector_embeddings(
3875                &embedder,
3876                &VectorRegenerationConfig {
3877                    kind: "Document".to_owned(),
3878                    profile: "default".to_owned(),
3879                    chunking_policy: "per_chunk".to_owned(),
3880                    preprocessing_policy: "trim".to_owned(),
3881                },
3882            )
3883            .expect("regenerate vectors");
3884
3885        let expected_vec_table = fathomdb_schema::vec_kind_table_name("Document");
3886        assert_eq!(report.profile, "default");
3887        assert_eq!(report.table_name, expected_vec_table);
3888        assert_eq!(report.dimension, 4);
3889        assert_eq!(report.total_chunks, 2);
3890        assert_eq!(report.regenerated_rows, 2);
3891        assert!(report.contract_persisted);
3892
3893        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3894        let vec_count: i64 = conn
3895            .query_row(
3896                &format!("SELECT count(*) FROM {expected_vec_table}"),
3897                [],
3898                |row| row.get(0),
3899            )
3900            .expect("vec count");
3901        assert_eq!(vec_count, 2);
3902
3903        // The persisted vector contract must reflect the embedder
3904        // identity — not any string the caller passed in, because the
3905        // caller never passes one.
3906        let (model_identity, model_version, dimension, normalization_policy): (
3907            String,
3908            String,
3909            i64,
3910            String,
3911        ) = conn
3912            .query_row(
3913                "SELECT model_identity, model_version, dimension, normalization_policy \
3914                 FROM vector_embedding_contracts WHERE profile = 'default'",
3915                [],
3916                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3917            )
3918            .expect("contract row");
3919        assert_eq!(model_identity, "test-model");
3920        assert_eq!(model_version, "1.0.0");
3921        assert_eq!(dimension, 4);
3922        assert_eq!(normalization_policy, "l2");
3923
3924        let contract_format_version: i64 = conn
3925            .query_row(
3926                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
3927                [],
3928                |row| row.get(0),
3929            )
3930            .expect("contract_format_version");
3931        assert_eq!(contract_format_version, 1);
3932        let request_count: i64 = conn
3933            .query_row(
3934                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3935                [],
3936                |row| row.get(0),
3937            )
3938            .expect("request audit count");
3939        assert_eq!(request_count, 1);
3940        let apply_count: i64 = conn
3941            .query_row(
3942                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3943                [],
3944                |row| row.get(0),
3945            )
3946            .expect("apply audit count");
3947        assert_eq!(apply_count, 1);
3948        let apply_metadata: String = conn
3949            .query_row(
3950                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3951                [],
3952                |row| row.get(0),
3953            )
3954            .expect("apply metadata");
3955        assert!(apply_metadata.contains("\"profile\":\"default\""));
3956        assert!(apply_metadata.contains("\"snapshot_hash\":"));
3957        assert!(apply_metadata.contains("\"model_identity\":\"test-model\""));
3958    }
3959
3960    #[cfg(feature = "sqlite-vec")]
3961    #[test]
3962    #[allow(clippy::too_many_lines)]
3963    fn regenerate_vector_embeddings_embedder_failure_leaves_contract_and_vec_rows_unchanged() {
3964        let db = NamedTempFile::new().expect("temp file");
3965        let schema = Arc::new(SchemaManager::new());
3966
3967        {
3968            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3969            schema.bootstrap(&conn).expect("bootstrap");
3970            conn.execute(
3971                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3972                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3973                [],
3974            )
3975            .expect("insert node");
3976            conn.execute(
3977                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3978                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3979                [],
3980            )
3981            .expect("insert chunk");
3982            schema
3983                .ensure_vec_kind_profile(&conn, "Document", 4)
3984                .expect("ensure vec kind profile");
3985            conn.execute(
3986                r"
3987                INSERT INTO vector_embedding_contracts (
3988                    profile,
3989                    table_name,
3990                    model_identity,
3991                    model_version,
3992                    dimension,
3993                    normalization_policy,
3994                    chunking_policy,
3995                    preprocessing_policy,
3996                    generator_command_json,
3997                    applied_at,
3998                    snapshot_hash
3999                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
4000                ",
4001                rusqlite::params![
4002                    "default",
4003                    fathomdb_schema::vec_kind_table_name("Document"),
4004                    "old-model",
4005                    "0.9.0",
4006                    4,
4007                    "l2",
4008                    "per_chunk",
4009                    "trim",
4010                    "[]",
4011                    111,
4012                    "old-snapshot"
4013                ],
4014            )
4015            .expect("seed contract");
4016            let vec_table = fathomdb_schema::vec_kind_table_name("Document");
4017            conn.execute(
4018                &format!(
4019                    "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))"
4020                ),
4021                [],
4022            )
4023            .expect("seed vec row");
4024        }
4025
4026        let service = AdminService::new(db.path(), Arc::clone(&schema));
4027        let failing = FailingEmbedder {
4028            identity: QueryEmbedderIdentity {
4029                model_identity: "new-model".to_owned(),
4030                model_version: "1.0.0".to_owned(),
4031                dimension: 4,
4032                normalization_policy: "l2".to_owned(),
4033            },
4034        };
4035        let error = service
4036            .regenerate_vector_embeddings(
4037                &failing,
4038                &VectorRegenerationConfig {
4039                    kind: "Document".to_owned(),
4040                    profile: "default".to_owned(),
4041                    chunking_policy: "per_chunk".to_owned(),
4042                    preprocessing_policy: "trim".to_owned(),
4043                },
4044            )
4045            .expect_err("embedder should fail");
4046
4047        assert!(error.to_string().contains("embedder failure"));
4048
4049        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4050        let model_identity: String = conn
4051            .query_row(
4052                "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
4053                [],
4054                |row| row.get(0),
4055            )
4056            .expect("model identity");
4057        assert_eq!(model_identity, "old-model");
4058        let snapshot_hash: String = conn
4059            .query_row(
4060                "SELECT snapshot_hash FROM vector_embedding_contracts WHERE profile = 'default'",
4061                [],
4062                |row| row.get(0),
4063            )
4064            .expect("snapshot hash");
4065        assert_eq!(snapshot_hash, "old-snapshot");
4066        let vec_table = fathomdb_schema::vec_kind_table_name("Document");
4067        let vec_count: i64 = conn
4068            .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
4069                row.get(0)
4070            })
4071            .expect("vec count");
4072        assert_eq!(vec_count, 1);
4073        let failure_count: i64 = conn
4074            .query_row(
4075                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
4076                [],
4077                |row| row.get(0),
4078            )
4079            .expect("failure count");
4080        assert_eq!(failure_count, 1);
4081        let failure_metadata: String = conn
4082            .query_row(
4083                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
4084                [],
4085                |row| row.get(0),
4086            )
4087            .expect("failure metadata");
4088        assert!(failure_metadata.contains("\"failure_class\":\"embedder failure\""));
4089    }
4090
4091    // Subprocess generator tests (snapshot-drift-via-concurrent-writer,
4092    // timeout, stdout/stderr overflow, oversized input, excessive chunk
4093    // count, malformed JSON, world-writable executable, disallowed
4094    // executable root, environment preservation) were removed in 0.4.0
4095    // along with the subprocess generator pattern itself. The failure
4096    // modes they exercised belong to the deleted
4097    // `run_vector_generator_bounded` pipeline and have no equivalent in
4098    // the direct-embedder path. See
4099    // `.claude/memory/project_vector_identity_invariant.md`.
4100
4101    #[cfg(feature = "sqlite-vec")]
4102    #[test]
4103    fn regenerate_vector_embeddings_rejects_whitespace_only_profile_before_mutation() {
4104        let db = NamedTempFile::new().expect("temp file");
4105        let schema = Arc::new(SchemaManager::new());
4106        {
4107            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4108            schema.bootstrap(&conn).expect("bootstrap");
4109            conn.execute(
4110                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4111                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
4112                [],
4113            )
4114            .expect("insert node");
4115            conn.execute(
4116                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4117                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
4118                [],
4119            )
4120            .expect("insert chunk");
4121        }
4122
4123        let service = AdminService::new(db.path(), Arc::clone(&schema));
4124        let embedder = TestEmbedder::new("test-model", 4);
4125        let error = service
4126            .regenerate_vector_embeddings(
4127                &embedder,
4128                &VectorRegenerationConfig {
4129                    kind: "Document".to_owned(),
4130                    profile: "   ".to_owned(),
4131                    chunking_policy: "per_chunk".to_owned(),
4132                    preprocessing_policy: "trim".to_owned(),
4133                },
4134            )
4135            .expect_err("whitespace profile should be rejected");
4136
4137        assert!(error.to_string().contains("invalid contract"));
4138        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4139        let contract_count: i64 = conn
4140            .query_row(
4141                "SELECT count(*) FROM vector_embedding_contracts",
4142                [],
4143                |row| row.get(0),
4144            )
4145            .expect("contract count");
4146        assert_eq!(contract_count, 0);
4147        let provenance_count: i64 = conn
4148            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
4149                row.get(0)
4150            })
4151            .expect("provenance count");
4152        assert_eq!(provenance_count, 0);
4153    }
4154
4155    #[cfg(feature = "sqlite-vec")]
4156    #[test]
4157    fn regenerate_vector_embeddings_rejects_future_contract_format_version() {
4158        let db = NamedTempFile::new().expect("temp file");
4159        let schema = Arc::new(SchemaManager::new());
4160        {
4161            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4162            schema.bootstrap(&conn).expect("bootstrap");
4163            conn.execute(
4164                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4165                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
4166                [],
4167            )
4168            .expect("insert node");
4169            conn.execute(
4170                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4171                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
4172                [],
4173            )
4174            .expect("insert chunk");
4175            conn.execute(
4176                r"
4177                INSERT INTO vector_embedding_contracts (
4178                    profile,
4179                    table_name,
4180                    model_identity,
4181                    model_version,
4182                    dimension,
4183                    normalization_policy,
4184                    chunking_policy,
4185                    preprocessing_policy,
4186                    generator_command_json,
4187                    applied_at,
4188                    snapshot_hash,
4189                    contract_format_version,
4190                    updated_at
4191                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
4192                ",
4193                rusqlite::params![
4194                    "default",
4195                    "vec_nodes_active",
4196                    "old-model",
4197                    "0.9.0",
4198                    4,
4199                    "l2",
4200                    "per_chunk",
4201                    "trim",
4202                    "[]",
4203                    111,
4204                    "old-snapshot",
4205                    99,
4206                    111,
4207                ],
4208            )
4209            .expect("seed future contract");
4210        }
4211
4212        let service = AdminService::new(db.path(), Arc::clone(&schema));
4213        let embedder = TestEmbedder::new("test-model", 4);
4214        let error = service
4215            .regenerate_vector_embeddings(
4216                &embedder,
4217                &VectorRegenerationConfig {
4218                    kind: "Document".to_owned(),
4219                    profile: "default".to_owned(),
4220                    chunking_policy: "per_chunk".to_owned(),
4221                    preprocessing_policy: "trim".to_owned(),
4222                },
4223            )
4224            .expect_err("future contract version should be rejected");
4225
4226        assert!(error.to_string().contains("unsupported"));
4227        assert!(error.to_string().contains("format version"));
4228    }
4229
4230    #[test]
4231    fn check_semantics_detects_orphaned_chunk() {
4232        let (db, service) = setup();
4233        {
4234            // Open without FK enforcement to insert chunk with no active node.
4235            let conn = sqlite::open_connection(db.path()).expect("conn");
4236            conn.execute(
4237                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4238                 VALUES ('c1', 'ghost-node', 'text', 100)",
4239                [],
4240            )
4241            .expect("insert orphaned chunk");
4242        }
4243        let report = service.check_semantics().expect("semantics check");
4244        assert_eq!(report.orphaned_chunks, 1);
4245    }
4246
4247    #[test]
4248    fn check_semantics_detects_null_source_ref() {
4249        let (db, service) = setup();
4250        {
4251            let conn = sqlite::open_connection(db.path()).expect("conn");
4252            conn.execute(
4253                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4254                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100)",
4255                [],
4256            )
4257            .expect("insert node with null source_ref");
4258        }
4259        let report = service.check_semantics().expect("semantics check");
4260        assert_eq!(report.null_source_ref_nodes, 1);
4261    }
4262
4263    #[test]
4264    fn check_semantics_detects_broken_step_fk() {
4265        let (db, service) = setup();
4266        {
4267            // Explicitly disable FK enforcement for this connection so we can insert
4268            // an orphaned step (ghost run_id) to simulate a partial-write failure.
4269            let conn = sqlite::open_connection(db.path()).expect("conn");
4270            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4271                .expect("disable FK");
4272            conn.execute(
4273                "INSERT INTO steps (id, run_id, kind, status, properties, created_at) \
4274                 VALUES ('s1', 'ghost-run', 'llm', 'completed', '{}', 100)",
4275                [],
4276            )
4277            .expect("insert step with ghost run_id");
4278        }
4279        let report = service.check_semantics().expect("semantics check");
4280        assert_eq!(report.broken_step_fk, 1);
4281    }
4282
4283    #[test]
4284    fn check_semantics_detects_broken_action_fk() {
4285        let (db, service) = setup();
4286        {
4287            let conn = sqlite::open_connection(db.path()).expect("conn");
4288            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4289                .expect("disable FK");
4290            conn.execute(
4291                "INSERT INTO actions (id, step_id, kind, status, properties, created_at) \
4292                 VALUES ('a1', 'ghost-step', 'emit', 'completed', '{}', 100)",
4293                [],
4294            )
4295            .expect("insert action with ghost step_id");
4296        }
4297        let report = service.check_semantics().expect("semantics check");
4298        assert_eq!(report.broken_action_fk, 1);
4299    }
4300
4301    #[test]
4302    fn check_semantics_detects_stale_fts_rows() {
4303        let (db, service) = setup();
4304        {
4305            let conn = sqlite::open_connection(db.path()).expect("conn");
4306            // FTS virtual tables have no FK constraints; insert a row referencing
4307            // a chunk_id that does not exist in the chunks table.
4308            conn.execute(
4309                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4310                 VALUES ('ghost-chunk', 'any-node', 'Meeting', 'stale content')",
4311                [],
4312            )
4313            .expect("insert stale FTS row");
4314        }
4315        let report = service.check_semantics().expect("semantics check");
4316        assert_eq!(report.stale_fts_rows, 1);
4317    }
4318
4319    #[test]
4320    fn check_semantics_detects_fts_rows_for_superseded_nodes() {
4321        let (db, service) = setup();
4322        {
4323            let conn = sqlite::open_connection(db.path()).expect("conn");
4324            // Insert a node that has been fully superseded (superseded_at IS NOT NULL).
4325            conn.execute(
4326                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4327                 VALUES ('r1', 'lg-sup', 'Meeting', '{}', 100, 200, 'src-1')",
4328                [],
4329            )
4330            .expect("insert superseded node");
4331            // Insert an FTS row for the superseded node's logical_id.
4332            conn.execute(
4333                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4334                 VALUES ('ck-x', 'lg-sup', 'Meeting', 'superseded content')",
4335                [],
4336            )
4337            .expect("insert FTS row for superseded node");
4338        }
4339        let report = service.check_semantics().expect("semantics check");
4340        assert_eq!(report.fts_rows_for_superseded_nodes, 1);
4341    }
4342
4343    #[test]
4344    fn check_semantics_detects_dangling_edges() {
4345        let (db, service) = setup();
4346        {
4347            let conn = sqlite::open_connection(db.path()).expect("conn");
4348            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4349                .expect("disable FK");
4350            // One active node as source; target does not exist — edge is dangling.
4351            conn.execute(
4352                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4353                 VALUES ('r1', 'lg-src', 'Meeting', '{}', 100, 'src-1')",
4354                [],
4355            )
4356            .expect("insert source node");
4357            conn.execute(
4358                "INSERT INTO edges \
4359                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4360                 VALUES ('e1', 'edge-1', 'lg-src', 'ghost-target', 'LINKS', '{}', 100, 'src-1')",
4361                [],
4362            )
4363            .expect("insert dangling edge");
4364        }
4365        let report = service.check_semantics().expect("semantics check");
4366        assert_eq!(report.dangling_edges, 1);
4367    }
4368
4369    #[test]
4370    fn check_semantics_detects_orphaned_supersession_chains() {
4371        let (db, service) = setup();
4372        {
4373            let conn = sqlite::open_connection(db.path()).expect("conn");
4374            // Every version of this logical_id is superseded — no active row remains.
4375            conn.execute(
4376                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4377                 VALUES ('r1', 'lg-orphaned', 'Meeting', '{}', 100, 200, 'src-1')",
4378                [],
4379            )
4380            .expect("insert fully superseded node");
4381        }
4382        let report = service.check_semantics().expect("semantics check");
4383        assert_eq!(report.orphaned_supersession_chains, 1);
4384    }
4385
4386    #[test]
4387    fn check_semantics_detects_mismatched_kind_property_fts_rows() {
4388        // With per-kind tables, mismatched_kind is always 0 — rows in fts_props_<kind>
4389        // must belong to that kind by construction. However, orphaned rows (per-kind table
4390        // with no registered schema) serve as the equivalent signal and are tested via
4391        // check_semantics_detects_fts_rows_for_superseded_nodes. This test verifies
4392        // mismatched_kind is 0 even when per-kind table rows exist for a node.
4393        let (db, service) = setup();
4394        {
4395            let conn = sqlite::open_connection(db.path()).expect("conn");
4396            conn.execute(
4397                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4398                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4399                [],
4400            )
4401            .expect("register schema");
4402            conn.execute(
4403                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4404                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4405                [],
4406            )
4407            .expect("insert node");
4408            // Create the per-kind table and insert a correctly-kind row.
4409            let table = fathomdb_schema::fts_kind_table_name("Goal");
4410            conn.execute_batch(&format!(
4411                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4412                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4413            ))
4414            .expect("create per-kind table");
4415            conn.execute(
4416                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4417                [],
4418            )
4419            .expect("insert per-kind FTS row");
4420        }
4421        let report = service.check_semantics().expect("semantics check");
4422        // Per-kind tables make mismatched_kind impossible — always 0.
4423        assert_eq!(report.mismatched_kind_property_fts_rows, 0);
4424    }
4425
4426    #[test]
4427    fn check_semantics_detects_duplicate_property_fts_rows() {
4428        let (db, service) = setup();
4429        {
4430            let conn = sqlite::open_connection(db.path()).expect("conn");
4431            conn.execute(
4432                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4433                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4434                [],
4435            )
4436            .expect("insert node");
4437            // Create the per-kind table and insert two rows for the same logical ID.
4438            let table = fathomdb_schema::fts_kind_table_name("Goal");
4439            conn.execute_batch(&format!(
4440                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4441                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4442            ))
4443            .expect("create per-kind table");
4444            conn.execute(
4445                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4446                [],
4447            )
4448            .expect("insert first property FTS row");
4449            conn.execute(
4450                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2 duplicate')"),
4451                [],
4452            )
4453            .expect("insert duplicate property FTS row");
4454        }
4455        let report = service.check_semantics().expect("semantics check");
4456        assert_eq!(report.duplicate_property_fts_rows, 1);
4457    }
4458
4459    #[test]
4460    fn check_semantics_detects_drifted_property_fts_text() {
4461        let (db, service) = setup();
4462        {
4463            let conn = sqlite::open_connection(db.path()).expect("conn");
4464            conn.execute(
4465                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4466                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4467                [],
4468            )
4469            .expect("register schema");
4470            conn.execute(
4471                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4472                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Current name\"}', 100, 'src-1')",
4473                [],
4474            )
4475            .expect("insert node");
4476            // Create per-kind table and insert a row with outdated text content.
4477            let table = fathomdb_schema::fts_kind_table_name("Goal");
4478            conn.execute_batch(&format!(
4479                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4480                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4481            ))
4482            .expect("create per-kind table");
4483            conn.execute(
4484                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Old stale name')"),
4485                [],
4486            )
4487            .expect("insert stale property FTS row");
4488        }
4489        let report = service.check_semantics().expect("semantics check");
4490        assert_eq!(report.drifted_property_fts_rows, 1);
4491    }
4492
4493    #[test]
4494    fn check_semantics_detects_property_fts_row_that_should_not_exist() {
4495        let (db, service) = setup();
4496        {
4497            let conn = sqlite::open_connection(db.path()).expect("conn");
4498            conn.execute(
4499                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4500                 VALUES ('Goal', '[\"$.searchable\"]', ' ')",
4501                [],
4502            )
4503            .expect("register schema");
4504            // Node does NOT have $.searchable — extraction yields no value.
4505            conn.execute(
4506                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4507                 VALUES ('r1', 'goal-1', 'Goal', '{\"other\":\"field\"}', 100, 'src-1')",
4508                [],
4509            )
4510            .expect("insert node");
4511            // Create per-kind table and insert a phantom row that should not exist.
4512            let table = fathomdb_schema::fts_kind_table_name("Goal");
4513            conn.execute_batch(&format!(
4514                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4515                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4516            ))
4517            .expect("create per-kind table");
4518            conn.execute(
4519                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'phantom text')"),
4520                [],
4521            )
4522            .expect("insert phantom property FTS row");
4523        }
4524        let report = service.check_semantics().expect("semantics check");
4525        assert_eq!(
4526            report.drifted_property_fts_rows, 1,
4527            "row that should not exist must be counted as drifted"
4528        );
4529    }
4530
4531    /// Regression (0.5.2 Pack A): weighted (per-column) FTS property schemas
4532    /// used to crash `check_semantics` with
4533    /// `SqliteError: no such column: fp.text_content` because the drift
4534    /// counter hardcoded the non-weighted column shape. A clean DB with a
4535    /// weighted schema must now report 0 drift without panicking.
4536    #[test]
4537    fn check_semantics_clean_on_weighted_fts_schema_does_not_panic() {
4538        let (db, service) = setup();
4539        // Register a weighted schema (two specs: one scalar, one recursive).
4540        // Eager mode writes the schema row and creates the per-kind weighted
4541        // FTS table with per-path columns.
4542        // Weighted schemas (at least one entry with a weight) trigger the
4543        // per-column FTS table layout in `create_or_replace_fts_kind_table`.
4544        let entries = vec![
4545            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4546            FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4547        ];
4548        service
4549            .register_fts_property_schema_with_entries(
4550                "Article",
4551                &entries,
4552                Some(" "),
4553                &[],
4554                crate::rebuild_actor::RebuildMode::Eager,
4555            )
4556            .expect("register weighted schema");
4557
4558        // Insert a node and a matching FTS row whose per-column values match
4559        // the canonical extraction (expected: 0 drift).
4560        {
4561            let conn = sqlite::open_connection(db.path()).expect("conn");
4562            let properties = r#"{"title":"Hello","body":{"text":"world"}}"#;
4563            conn.execute(
4564                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4565                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4566                [properties],
4567            )
4568            .expect("insert node");
4569
4570            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4571            let (_kind, schema) = schemas
4572                .iter()
4573                .find(|(k, _)| k == "Article")
4574                .expect("weighted schema present");
4575            let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4576            let cols = crate::writer::extract_property_fts_columns(&props, schema);
4577
4578            let table = fathomdb_schema::fts_kind_table_name("Article");
4579            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4580            let placeholders: Vec<String> =
4581                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4582            let sql = format!(
4583                "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4584                cols = col_names.join(", "),
4585                placeholders = placeholders.join(", "),
4586            );
4587            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4588            let params: Vec<&dyn rusqlite::ToSql> =
4589                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4590                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4591                    .collect();
4592            conn.execute(&sql, params.as_slice())
4593                .expect("insert weighted FTS row");
4594        }
4595
4596        let report = service
4597            .check_semantics()
4598            .expect("semantics check must not crash on weighted schema");
4599        assert_eq!(report.drifted_property_fts_rows, 0);
4600    }
4601
4602    /// Regression (0.5.2 Pack A): drift detection in weighted FTS tables.
4603    /// After tampering a per-column value, `check_semantics` must count the
4604    /// row as drifted (once per row, regardless of how many columns mismatch).
4605    #[test]
4606    fn check_semantics_detects_drifted_property_fts_text_weighted() {
4607        let (db, service) = setup();
4608        // Weighted schemas (at least one entry with a weight) trigger the
4609        // per-column FTS table layout in `create_or_replace_fts_kind_table`.
4610        let entries = vec![
4611            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4612            FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4613        ];
4614        service
4615            .register_fts_property_schema_with_entries(
4616                "Article",
4617                &entries,
4618                Some(" "),
4619                &[],
4620                crate::rebuild_actor::RebuildMode::Eager,
4621            )
4622            .expect("register weighted schema");
4623
4624        let title_col = fathomdb_schema::fts_column_name("$.title", false);
4625
4626        {
4627            let conn = sqlite::open_connection(db.path()).expect("conn");
4628            let properties = r#"{"title":"Current","body":{"text":"body"}}"#;
4629            conn.execute(
4630                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4631                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4632                [properties],
4633            )
4634            .expect("insert node");
4635
4636            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4637            let (_kind, schema) = schemas
4638                .iter()
4639                .find(|(k, _)| k == "Article")
4640                .expect("weighted schema present");
4641            let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4642            let cols = crate::writer::extract_property_fts_columns(&props, schema);
4643
4644            let table = fathomdb_schema::fts_kind_table_name("Article");
4645            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4646            let placeholders: Vec<String> =
4647                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4648            let sql = format!(
4649                "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4650                cols = col_names.join(", "),
4651                placeholders = placeholders.join(", "),
4652            );
4653            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4654            let params: Vec<&dyn rusqlite::ToSql> =
4655                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4656                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4657                    .collect();
4658            conn.execute(&sql, params.as_slice())
4659                .expect("insert weighted FTS row");
4660
4661            // Tamper the title column so it no longer matches canonical extraction.
4662            conn.execute(
4663                &format!("UPDATE {table} SET {title_col} = 'tampered' WHERE node_logical_id = 'article-1'"),
4664                [],
4665            )
4666            .expect("tamper weighted FTS row");
4667        }
4668
4669        let report = service.check_semantics().expect("semantics check");
4670        assert_eq!(report.drifted_property_fts_rows, 1);
4671    }
4672
4673    /// Regression (0.5.2 Pack A): a DB with both a weighted and a non-weighted
4674    /// per-kind FTS table must report 0 drift when both are in sync. This
4675    /// exercises the dispatcher: weighted path for one kind, non-weighted
4676    /// path for the other, in a single `check_semantics` call.
4677    #[test]
4678    fn check_semantics_mixed_weighted_and_non_weighted_schemas() {
4679        let (db, service) = setup();
4680
4681        // Weighted schema for Article (per-column layout — requires weights).
4682        let weighted_entries = vec![
4683            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4684            FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4685        ];
4686        service
4687            .register_fts_property_schema_with_entries(
4688                "Article",
4689                &weighted_entries,
4690                Some(" "),
4691                &[],
4692                crate::rebuild_actor::RebuildMode::Eager,
4693            )
4694            .expect("register weighted schema");
4695
4696        // Non-weighted (single path) schema for Goal. The legacy JSON shape
4697        // (bare array of scalar paths) yields a non-weighted per-kind table.
4698        {
4699            let conn = sqlite::open_connection(db.path()).expect("conn");
4700            conn.execute(
4701                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4702                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4703                [],
4704            )
4705            .expect("register non-weighted schema");
4706            let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
4707            conn.execute_batch(&format!(
4708                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
4709                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4710            ))
4711            .expect("create non-weighted per-kind table");
4712
4713            // Insert Article node + matching weighted FTS row.
4714            let article_props = r#"{"title":"Hello","body":{"text":"world"}}"#;
4715            conn.execute(
4716                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4717                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4718                [article_props],
4719            )
4720            .expect("insert article");
4721
4722            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4723            let (_k, article_schema) = schemas
4724                .iter()
4725                .find(|(k, _)| k == "Article")
4726                .expect("Article schema present");
4727            let props: serde_json::Value =
4728                serde_json::from_str(article_props).expect("parse article props");
4729            let cols = crate::writer::extract_property_fts_columns(&props, article_schema);
4730            let article_table = fathomdb_schema::fts_kind_table_name("Article");
4731            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4732            let placeholders: Vec<String> =
4733                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4734            let sql = format!(
4735                "INSERT INTO {article_table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4736                cols = col_names.join(", "),
4737                placeholders = placeholders.join(", "),
4738            );
4739            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4740            let params: Vec<&dyn rusqlite::ToSql> =
4741                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4742                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4743                    .collect();
4744            conn.execute(&sql, params.as_slice())
4745                .expect("insert weighted FTS row");
4746
4747            // Insert Goal node + matching non-weighted FTS row. Canonical
4748            // extraction for legacy schema on $.name yields the string
4749            // "Goal One".
4750            conn.execute(
4751                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4752                 VALUES ('r2', 'goal-1', 'Goal', '{\"name\":\"Goal One\"}', 100, 'src-2')",
4753                [],
4754            )
4755            .expect("insert goal node");
4756            conn.execute(
4757                &format!("INSERT INTO {goal_table} (node_logical_id, text_content) VALUES ('goal-1', 'Goal One')"),
4758                [],
4759            )
4760            .expect("insert non-weighted FTS row");
4761        }
4762
4763        let report = service
4764            .check_semantics()
4765            .expect("semantics check must handle both shapes");
4766        assert_eq!(
4767            report.drifted_property_fts_rows, 0,
4768            "clean mixed weighted + non-weighted DB must report 0 drift"
4769        );
4770    }
4771
4772    /// Regression (0.5.2 follow-up, review note): a weighted schema whose
4773    /// path set includes literal `$.text_content` collapses via
4774    /// `fts_column_name` to a `text_content` column. A live-column probe
4775    /// would then misclassify this weighted table as non-weighted and run
4776    /// the single-blob comparator against per-column storage — no crash,
4777    /// but silently incorrect drift counts (a clean DB would report drift).
4778    /// Dispatching on the persisted schema shape (any entry with
4779    /// `weight.is_some()` ⇒ weighted) avoids the collision.
4780    ///
4781    /// This test writes a CLEAN weighted row (per-column values matching
4782    /// canonical extraction) and asserts zero drift. Under the old
4783    /// live-column dispatcher the non-weighted comparator would read the
4784    /// single-path column value, compare it against the multi-path blob
4785    /// concatenation produced by `extract_property_fts`, and spuriously
4786    /// report drift.
4787    #[test]
4788    fn check_semantics_weighted_schema_with_text_content_path() {
4789        let (db, service) = setup();
4790        let entries = vec![
4791            FtsPropertyPathSpec::scalar("$.text_content").with_weight(2.0),
4792            FtsPropertyPathSpec::scalar("$.title").with_weight(1.0),
4793        ];
4794        service
4795            .register_fts_property_schema_with_entries(
4796                "Article",
4797                &entries,
4798                Some(" "),
4799                &[],
4800                crate::rebuild_actor::RebuildMode::Eager,
4801            )
4802            .expect("register weighted schema with $.text_content path");
4803
4804        {
4805            let conn = sqlite::open_connection(db.path()).expect("conn");
4806            // Two distinct, non-empty scalar values. The non-weighted
4807            // comparator would expect their joined blob ("canonical body
4808            // Hello") as the single `text_content` column value; the
4809            // weighted (per-column) layout stores them separately, so on a
4810            // live-column probe the dispatcher would misclassify and
4811            // spuriously report drift.
4812            let properties = r#"{"text_content":"canonical body","title":"Hello"}"#;
4813            conn.execute(
4814                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4815                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4816                [properties],
4817            )
4818            .expect("insert node");
4819
4820            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4821            let (_kind, schema) = schemas
4822                .iter()
4823                .find(|(k, _)| k == "Article")
4824                .expect("weighted schema present");
4825            let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4826            let cols = crate::writer::extract_property_fts_columns(&props, schema);
4827
4828            let table = fathomdb_schema::fts_kind_table_name("Article");
4829            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4830            let placeholders: Vec<String> =
4831                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4832            let sql = format!(
4833                "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4834                cols = col_names.join(", "),
4835                placeholders = placeholders.join(", "),
4836            );
4837            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4838            let params: Vec<&dyn rusqlite::ToSql> =
4839                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4840                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4841                    .collect();
4842            conn.execute(&sql, params.as_slice())
4843                .expect("insert weighted FTS row");
4844        }
4845
4846        let report = service.check_semantics().expect("semantics check");
4847        assert_eq!(
4848            report.drifted_property_fts_rows, 0,
4849            "weighted schema whose path collapses to `text_content` must be \
4850             dispatched as weighted (per-column comparator); a clean DB \
4851             must report 0 drift"
4852        );
4853    }
4854
4855    #[test]
4856    fn safe_export_writes_manifest_with_sha256() {
4857        let (_db, service) = setup();
4858        let export_dir = tempfile::TempDir::new().expect("temp dir");
4859        let export_path = export_dir.path().join("backup.db");
4860
4861        let manifest = service
4862            .safe_export(
4863                &export_path,
4864                SafeExportOptions {
4865                    force_checkpoint: false,
4866                },
4867            )
4868            .expect("export");
4869
4870        assert!(export_path.exists(), "exported db should exist");
4871        let manifest_path = export_dir.path().join("backup.db.export-manifest.json");
4872        assert!(
4873            manifest_path.exists(),
4874            "manifest file should exist at {}",
4875            manifest_path.display()
4876        );
4877        assert_eq!(manifest.sha256.len(), 64, "sha256 should be 64 hex chars");
4878        assert!(
4879            manifest.exported_at > 0,
4880            "exported_at should be a unix timestamp"
4881        );
4882        assert_eq!(
4883            manifest.schema_version,
4884            SchemaManager::new().current_version().0,
4885            "schema_version should match the live schema version"
4886        );
4887        assert_eq!(manifest.protocol_version, 1, "protocol_version should be 1");
4888        assert!(manifest.page_count > 0, "page_count should be positive");
4889    }
4890
4891    #[test]
4892    fn safe_export_preserves_operational_validation_contracts() {
4893        let (_db, service) = setup();
4894        let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
4895        service
4896            .register_operational_collection(&OperationalRegisterRequest {
4897                name: "connector_health".to_owned(),
4898                kind: OperationalCollectionKind::LatestState,
4899                schema_json: "{}".to_owned(),
4900                retention_json: "{}".to_owned(),
4901                filter_fields_json: "[]".to_owned(),
4902                validation_json: validation_json.to_owned(),
4903                secondary_indexes_json: "[]".to_owned(),
4904                format_version: 1,
4905            })
4906            .expect("register collection");
4907
4908        let export_dir = tempfile::TempDir::new().expect("temp dir");
4909        let export_path = export_dir.path().join("backup.db");
4910        service
4911            .safe_export(
4912                &export_path,
4913                SafeExportOptions {
4914                    force_checkpoint: false,
4915                },
4916            )
4917            .expect("export");
4918
4919        let exported = sqlite::open_connection(&export_path).expect("exported conn");
4920        let exported_validation_json: String = exported
4921            .query_row(
4922                "SELECT validation_json FROM operational_collections WHERE name = 'connector_health'",
4923                [],
4924                |row| row.get(0),
4925            )
4926            .expect("validation_json");
4927        assert_eq!(exported_validation_json, validation_json);
4928    }
4929
4930    #[test]
4931    fn safe_export_force_checkpoint_false_skips_wal_pragma() {
4932        let (_db, service) = setup();
4933        let export_dir = tempfile::TempDir::new().expect("temp dir");
4934        let export_path = export_dir.path().join("no-wal.db");
4935
4936        // force_checkpoint: false must not error even on a non-WAL database
4937        let manifest = service
4938            .safe_export(
4939                &export_path,
4940                SafeExportOptions {
4941                    force_checkpoint: false,
4942                },
4943            )
4944            .expect("export with no checkpoint");
4945
4946        assert!(
4947            manifest.page_count > 0,
4948            "page_count must be populated regardless of checkpoint mode"
4949        );
4950        assert_eq!(
4951            manifest.schema_version,
4952            SchemaManager::new().current_version().0
4953        );
4954        assert_eq!(manifest.protocol_version, 1);
4955    }
4956
4957    #[test]
4958    fn safe_export_force_checkpoint_false_still_captures_wal_backed_changes() {
4959        let (db, service) = setup();
4960        let conn = sqlite::open_connection(db.path()).expect("conn");
4961        let journal_mode: String = conn
4962            .query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
4963            .expect("enable wal");
4964        assert_eq!(journal_mode.to_lowercase(), "wal");
4965        let auto_checkpoint_pages: i64 = conn
4966            .query_row("PRAGMA wal_autocheckpoint=0", [], |row| row.get(0))
4967            .expect("disable auto checkpoint");
4968        assert_eq!(auto_checkpoint_pages, 0);
4969        conn.execute(
4970            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4971             VALUES ('r-wal', 'lg-wal', 'Meeting', '{}', 100, 'src-wal')",
4972            [],
4973        )
4974        .expect("insert wal-backed node");
4975
4976        let export_dir = tempfile::TempDir::new().expect("temp dir");
4977        let export_path = export_dir.path().join("wal-backed.db");
4978        service
4979            .safe_export(
4980                &export_path,
4981                SafeExportOptions {
4982                    force_checkpoint: false,
4983                },
4984            )
4985            .expect("export wal-backed db");
4986
4987        let exported = sqlite::open_connection(&export_path).expect("open exported db");
4988        let exported_count: i64 = exported
4989            .query_row(
4990                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-wal'",
4991                [],
4992                |row| row.get(0),
4993            )
4994            .expect("count exported nodes");
4995        assert_eq!(
4996            exported_count, 1,
4997            "safe_export must include committed rows that are still resident in the WAL"
4998        );
4999    }
5000
5001    #[test]
5002    fn excise_source_removes_searchable_content_after_excision() {
5003        let (db, service) = setup();
5004        {
5005            let conn = sqlite::open_connection(db.path()).expect("conn");
5006            conn.execute(
5007                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
5008                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
5009                [],
5010            )
5011            .expect("insert v1");
5012            conn.execute(
5013                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5014                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
5015                [],
5016            )
5017            .expect("insert v2");
5018            conn.execute(
5019                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5020                 VALUES ('ck1', 'lg1', 'hello world', 100)",
5021                [],
5022            )
5023            .expect("insert chunk");
5024        }
5025        service.excise_source("source-2").expect("excise");
5026        {
5027            let conn = sqlite::open_connection(db.path()).expect("conn");
5028            let fts_count: i64 = conn
5029                .query_row(
5030                    "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'ck1'",
5031                    [],
5032                    |row| row.get(0),
5033                )
5034                .expect("fts count");
5035            assert_eq!(
5036                fts_count, 0,
5037                "excised content should not remain searchable after excise"
5038            );
5039        }
5040    }
5041
5042    #[cfg(feature = "sqlite-vec")]
5043    #[test]
5044    fn excise_source_cleans_chunks_and_vec_rows_for_excised_version() {
5045        let (db, service) = setup();
5046        {
5047            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
5048            service
5049                .schema_manager
5050                .ensure_vec_kind_profile(&conn, "Meeting", 4)
5051                .expect("ensure vec kind profile");
5052            conn.execute(
5053                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
5054                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
5055                [],
5056            )
5057            .expect("insert v1");
5058            conn.execute(
5059                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5060                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
5061                [],
5062            )
5063            .expect("insert v2");
5064            conn.execute(
5065                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5066                 VALUES ('ck1', 'lg1', 'new content', 200)",
5067                [],
5068            )
5069            .expect("insert chunk");
5070            let vec_table = fathomdb_schema::vec_kind_table_name("Meeting");
5071            conn.execute(
5072                &format!(
5073                    "INSERT INTO {vec_table} (chunk_id, embedding) VALUES ('ck1', zeroblob(16))"
5074                ),
5075                [],
5076            )
5077            .expect("insert vec row");
5078        }
5079
5080        service.excise_source("source-2").expect("excise");
5081
5082        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
5083        let active_row: String = conn
5084            .query_row(
5085                "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
5086                [],
5087                |row| row.get(0),
5088            )
5089            .expect("restored active row");
5090        assert_eq!(active_row, "r1");
5091        let chunk_count: i64 = conn
5092            .query_row(
5093                "SELECT count(*) FROM chunks WHERE node_logical_id = 'lg1'",
5094                [],
5095                |row| row.get(0),
5096            )
5097            .expect("chunk count");
5098        assert_eq!(
5099            chunk_count, 0,
5100            "excised source content must not survive as chunks"
5101        );
5102        let vec_table = fathomdb_schema::vec_kind_table_name("Meeting");
5103        let vec_count: i64 = conn
5104            .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
5105                row.get(0)
5106            })
5107            .expect("vec count");
5108        assert_eq!(vec_count, 0, "excised source vec rows must be removed");
5109        let fts_count: i64 = conn
5110            .query_row(
5111                "SELECT count(*) FROM fts_nodes WHERE node_logical_id = 'lg1'",
5112                [],
5113                |row| row.get(0),
5114            )
5115            .expect("fts count");
5116        assert_eq!(
5117            fts_count, 0,
5118            "excised source content must not remain searchable"
5119        );
5120    }
5121
5122    #[test]
5123    fn export_page_count_matches_exported_file() {
5124        let (_db, service) = setup();
5125        let export_dir = tempfile::TempDir::new().expect("temp dir");
5126        let export_path = export_dir.path().join("page-count.db");
5127
5128        let manifest = service
5129            .safe_export(
5130                &export_path,
5131                SafeExportOptions {
5132                    force_checkpoint: false,
5133                },
5134            )
5135            .expect("export");
5136
5137        let exported = sqlite::open_connection(&export_path).expect("open exported db");
5138        let actual_page_count: u64 = exported
5139            .query_row("PRAGMA page_count", [], |row| row.get(0))
5140            .expect("page_count from exported file");
5141
5142        assert_eq!(
5143            manifest.page_count, actual_page_count,
5144            "manifest page_count must match the exported file's PRAGMA page_count"
5145        );
5146    }
5147
5148    #[test]
5149    fn no_temp_file_after_successful_export() {
5150        let (_db, service) = setup();
5151        let export_dir = tempfile::TempDir::new().expect("temp dir");
5152        let export_path = export_dir.path().join("no-tmp.db");
5153
5154        service
5155            .safe_export(
5156                &export_path,
5157                SafeExportOptions {
5158                    force_checkpoint: false,
5159                },
5160            )
5161            .expect("export");
5162
5163        let tmp_files: Vec<_> = fs::read_dir(export_dir.path())
5164            .expect("read export dir")
5165            .filter_map(Result::ok)
5166            .filter(|e| e.path().extension().is_some_and(|ext| ext == "tmp"))
5167            .collect();
5168
5169        assert!(
5170            tmp_files.is_empty(),
5171            "no .tmp files should remain after a successful export, found: {tmp_files:?}"
5172        );
5173    }
5174
5175    #[test]
5176    fn export_manifest_is_valid_json() {
5177        let (_db, service) = setup();
5178        let export_dir = tempfile::TempDir::new().expect("temp dir");
5179        let export_path = export_dir.path().join("valid-json.db");
5180
5181        service
5182            .safe_export(
5183                &export_path,
5184                SafeExportOptions {
5185                    force_checkpoint: false,
5186                },
5187            )
5188            .expect("export");
5189
5190        let manifest_path = export_dir.path().join("valid-json.db.export-manifest.json");
5191        let manifest_contents = fs::read_to_string(&manifest_path).expect("read manifest");
5192        let parsed: serde_json::Value =
5193            serde_json::from_str(&manifest_contents).expect("manifest must be valid JSON");
5194
5195        assert!(
5196            parsed.get("exported_at").is_some(),
5197            "manifest must contain exported_at"
5198        );
5199        assert!(
5200            parsed.get("sha256").is_some(),
5201            "manifest must contain sha256"
5202        );
5203        assert!(
5204            parsed.get("schema_version").is_some(),
5205            "manifest must contain schema_version"
5206        );
5207        assert!(
5208            parsed.get("protocol_version").is_some(),
5209            "manifest must contain protocol_version"
5210        );
5211        assert!(
5212            parsed.get("page_count").is_some(),
5213            "manifest must contain page_count"
5214        );
5215    }
5216
5217    #[test]
5218    fn provenance_purge_dry_run_reports_counts() {
5219        let (db, service) = setup();
5220        {
5221            let conn = sqlite::open_connection(db.path()).expect("conn");
5222            conn.execute(
5223                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5224                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5225                [],
5226            )
5227            .expect("insert p1");
5228            conn.execute(
5229                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5230                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5231                [],
5232            )
5233            .expect("insert p2");
5234            conn.execute(
5235                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5236                 VALUES ('p3', 'excise', 'lg3', 'src-1', 300)",
5237                [],
5238            )
5239            .expect("insert p3");
5240        }
5241
5242        let options = super::ProvenancePurgeOptions {
5243            dry_run: true,
5244            preserve_event_types: Vec::new(),
5245        };
5246        let report = service
5247            .purge_provenance_events(250, &options)
5248            .expect("dry run purge");
5249
5250        assert_eq!(report.events_deleted, 2);
5251        assert_eq!(report.events_preserved, 1);
5252        assert!(report.oldest_remaining.is_some());
5253
5254        let conn = sqlite::open_connection(db.path()).expect("conn");
5255        let total: i64 = conn
5256            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5257                row.get(0)
5258            })
5259            .expect("count");
5260        assert_eq!(total, 3, "dry_run must not delete any events");
5261    }
5262
5263    #[test]
5264    fn provenance_purge_deletes_old_events() {
5265        let (db, service) = setup();
5266        {
5267            let conn = sqlite::open_connection(db.path()).expect("conn");
5268            conn.execute(
5269                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5270                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5271                [],
5272            )
5273            .expect("insert p1");
5274            conn.execute(
5275                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5276                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5277                [],
5278            )
5279            .expect("insert p2");
5280        }
5281
5282        let options = super::ProvenancePurgeOptions {
5283            dry_run: false,
5284            preserve_event_types: Vec::new(),
5285        };
5286        let report = service
5287            .purge_provenance_events(150, &options)
5288            .expect("purge");
5289
5290        assert_eq!(report.events_deleted, 1);
5291        assert_eq!(report.events_preserved, 1);
5292        assert_eq!(report.oldest_remaining, Some(200));
5293
5294        let conn = sqlite::open_connection(db.path()).expect("conn");
5295        let remaining: i64 = conn
5296            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5297                row.get(0)
5298            })
5299            .expect("count");
5300        assert_eq!(remaining, 1);
5301    }
5302
5303    #[test]
5304    fn provenance_purge_preserves_specified_types() {
5305        let (db, service) = setup();
5306        {
5307            let conn = sqlite::open_connection(db.path()).expect("conn");
5308            conn.execute(
5309                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5310                 VALUES ('p1', 'excise', 'lg1', 'src-1', 100)",
5311                [],
5312            )
5313            .expect("insert p1");
5314            conn.execute(
5315                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5316                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 100)",
5317                [],
5318            )
5319            .expect("insert p2");
5320            conn.execute(
5321                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5322                 VALUES ('p3', 'node_insert', 'lg3', 'src-1', 100)",
5323                [],
5324            )
5325            .expect("insert p3");
5326        }
5327
5328        let options = super::ProvenancePurgeOptions {
5329            dry_run: false,
5330            preserve_event_types: Vec::new(),
5331        };
5332        let report = service
5333            .purge_provenance_events(500, &options)
5334            .expect("purge");
5335
5336        assert_eq!(report.events_deleted, 2);
5337        assert_eq!(report.events_preserved, 1);
5338
5339        let conn = sqlite::open_connection(db.path()).expect("conn");
5340        let remaining_type: String = conn
5341            .query_row("SELECT event_type FROM provenance_events", [], |row| {
5342                row.get(0)
5343            })
5344            .expect("remaining event type");
5345        assert_eq!(remaining_type, "excise");
5346    }
5347
5348    #[test]
5349    fn provenance_purge_noop_with_zero_timestamp() {
5350        let (db, service) = setup();
5351        {
5352            let conn = sqlite::open_connection(db.path()).expect("conn");
5353            conn.execute(
5354                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5355                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5356                [],
5357            )
5358            .expect("insert p1");
5359        }
5360
5361        let options = super::ProvenancePurgeOptions {
5362            dry_run: false,
5363            preserve_event_types: Vec::new(),
5364        };
5365        let report = service.purge_provenance_events(0, &options).expect("purge");
5366
5367        assert_eq!(report.events_deleted, 0);
5368        assert_eq!(report.events_preserved, 1);
5369        assert_eq!(report.oldest_remaining, Some(100));
5370    }
5371
5372    #[test]
5373    fn restore_skips_edge_when_counterpart_purged() {
5374        let (db, service) = setup();
5375        {
5376            let conn = sqlite::open_connection(db.path()).expect("conn");
5377            // Create node A (doc-1) and node B (doc-2)
5378            conn.execute(
5379                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5380                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5381                [],
5382            )
5383            .expect("insert node A");
5384            conn.execute(
5385                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5386                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5387                [],
5388            )
5389            .expect("insert node B");
5390            // Create edge between A and B
5391            conn.execute(
5392                "INSERT INTO edges \
5393                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5394                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5395                [],
5396            )
5397            .expect("insert edge");
5398            // Retire both A and B, and the edge
5399            conn.execute(
5400                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5401                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5402                [],
5403            )
5404            .expect("insert retire event A");
5405            conn.execute(
5406                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5407                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5408                [],
5409            )
5410            .expect("insert edge retire event");
5411            conn.execute(
5412                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5413                [],
5414            )
5415            .expect("retire node A");
5416            conn.execute(
5417                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5418                [],
5419            )
5420            .expect("retire node B");
5421            conn.execute(
5422                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5423                [],
5424            )
5425            .expect("retire edge");
5426            // Simulate purge of B: delete node rows but leave the edge intact
5427            // to reproduce the dangling-edge scenario the validation guards against.
5428            conn.execute("DELETE FROM nodes WHERE logical_id = 'doc-2'", [])
5429                .expect("purge node B rows");
5430        }
5431
5432        // Restore A — the edge should be skipped because B has no active node
5433        let report = service.restore_logical_id("doc-1").expect("restore A");
5434        assert!(!report.was_noop);
5435        assert_eq!(report.restored_node_rows, 1);
5436        assert_eq!(report.restored_edge_rows, 0, "edge should not be restored");
5437        assert_eq!(report.skipped_edges.len(), 1);
5438        assert_eq!(report.skipped_edges[0].edge_logical_id, "edge-1");
5439        assert_eq!(report.skipped_edges[0].missing_endpoint, "doc-2");
5440
5441        // Verify the edge is still retired in the database
5442        let conn = sqlite::open_connection(db.path()).expect("conn");
5443        let active_edge_count: i64 = conn
5444            .query_row(
5445                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5446                [],
5447                |row| row.get(0),
5448            )
5449            .expect("active edge count");
5450        assert_eq!(active_edge_count, 0, "edge must remain retired");
5451    }
5452
5453    #[test]
5454    fn restore_restores_edges_to_active_nodes() {
5455        let (db, service) = setup();
5456        {
5457            let conn = sqlite::open_connection(db.path()).expect("conn");
5458            // Create node A and node B (B stays active)
5459            conn.execute(
5460                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5461                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5462                [],
5463            )
5464            .expect("insert node A");
5465            conn.execute(
5466                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5467                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5468                [],
5469            )
5470            .expect("insert node B");
5471            // Create edge between A and B
5472            conn.execute(
5473                "INSERT INTO edges \
5474                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5475                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5476                [],
5477            )
5478            .expect("insert edge");
5479            // Retire only A
5480            conn.execute(
5481                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5482                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5483                [],
5484            )
5485            .expect("insert retire event A");
5486            conn.execute(
5487                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5488                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5489                [],
5490            )
5491            .expect("insert edge retire event");
5492            conn.execute(
5493                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5494                [],
5495            )
5496            .expect("retire node A");
5497            conn.execute(
5498                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5499                [],
5500            )
5501            .expect("retire edge");
5502        }
5503
5504        // Restore A — B is active, so the edge should be restored normally
5505        let report = service.restore_logical_id("doc-1").expect("restore A");
5506        assert!(!report.was_noop);
5507        assert_eq!(report.restored_node_rows, 1);
5508        assert!(report.restored_edge_rows > 0, "edge should be restored");
5509        assert!(
5510            report.skipped_edges.is_empty(),
5511            "no edges should be skipped"
5512        );
5513
5514        let conn = sqlite::open_connection(db.path()).expect("conn");
5515        let active_edge_count: i64 = conn
5516            .query_row(
5517                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5518                [],
5519                |row| row.get(0),
5520            )
5521            .expect("active edge count");
5522        assert_eq!(active_edge_count, 1, "edge must be active");
5523    }
5524
5525    #[test]
5526    fn restore_restores_edges_when_both_restored() {
5527        let (db, service) = setup();
5528        {
5529            let conn = sqlite::open_connection(db.path()).expect("conn");
5530            // Create node A and node B
5531            conn.execute(
5532                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5533                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5534                [],
5535            )
5536            .expect("insert node A");
5537            conn.execute(
5538                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5539                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5540                [],
5541            )
5542            .expect("insert node B");
5543            // Create edge between A and B
5544            conn.execute(
5545                "INSERT INTO edges \
5546                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5547                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5548                [],
5549            )
5550            .expect("insert edge");
5551            // Retire both A and B
5552            conn.execute(
5553                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5554                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5555                [],
5556            )
5557            .expect("insert retire event A");
5558            conn.execute(
5559                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5560                 VALUES ('evt-retire-b', 'node_retire', 'doc-2', 'forget-1', 200, '')",
5561                [],
5562            )
5563            .expect("insert retire event B");
5564            conn.execute(
5565                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5566                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5567                [],
5568            )
5569            .expect("insert edge retire event");
5570            conn.execute(
5571                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5572                [],
5573            )
5574            .expect("retire node A");
5575            conn.execute(
5576                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5577                [],
5578            )
5579            .expect("retire node B");
5580            conn.execute(
5581                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5582                [],
5583            )
5584            .expect("retire edge");
5585        }
5586
5587        // Restore B first — edge is skipped because A is still retired
5588        let report_b = service.restore_logical_id("doc-2").expect("restore B");
5589        assert!(!report_b.was_noop);
5590
5591        // Restore A — B is now active, so the edge should be restored
5592        let report_a = service.restore_logical_id("doc-1").expect("restore A");
5593        assert!(!report_a.was_noop);
5594        assert_eq!(report_a.restored_node_rows, 1);
5595        assert!(
5596            report_a.restored_edge_rows > 0,
5597            "edge should be restored when both endpoints active"
5598        );
5599        assert!(
5600            report_a.skipped_edges.is_empty(),
5601            "no edges should be skipped"
5602        );
5603
5604        let conn = sqlite::open_connection(db.path()).expect("conn");
5605        let active_edge_count: i64 = conn
5606            .query_row(
5607                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5608                [],
5609                |row| row.get(0),
5610            )
5611            .expect("active edge count");
5612        assert_eq!(
5613            active_edge_count, 1,
5614            "edge must be active after both endpoints restored"
5615        );
5616    }
5617
5618    // ── FTS property schema end-to-end tests ──────────────────────────
5619
5620    #[test]
5621    fn fts_property_schema_crud_round_trip() {
5622        let (_db, service) = setup();
5623
5624        // Register
5625        let record = service
5626            .register_fts_property_schema(
5627                "Meeting",
5628                &["$.title".to_owned(), "$.summary".to_owned()],
5629                None,
5630            )
5631            .expect("register");
5632        assert_eq!(record.kind, "Meeting");
5633        assert_eq!(record.property_paths, vec!["$.title", "$.summary"]);
5634        assert_eq!(record.separator, " ");
5635        assert_eq!(record.format_version, 1);
5636
5637        // Describe
5638        let described = service
5639            .describe_fts_property_schema("Meeting")
5640            .expect("describe")
5641            .expect("should exist");
5642        assert_eq!(described, record);
5643
5644        // Describe missing kind
5645        let missing = service
5646            .describe_fts_property_schema("NoSuchKind")
5647            .expect("describe missing");
5648        assert!(missing.is_none());
5649
5650        // List
5651        let list = service.list_fts_property_schemas().expect("list");
5652        assert_eq!(list.len(), 1);
5653        assert_eq!(list[0].kind, "Meeting");
5654
5655        // Update (idempotent upsert)
5656        let updated = service
5657            .register_fts_property_schema(
5658                "Meeting",
5659                &["$.title".to_owned(), "$.notes".to_owned()],
5660                Some("\n"),
5661            )
5662            .expect("update");
5663        assert_eq!(updated.property_paths, vec!["$.title", "$.notes"]);
5664        assert_eq!(updated.separator, "\n");
5665
5666        // Remove
5667        service
5668            .remove_fts_property_schema("Meeting")
5669            .expect("remove");
5670        let after_remove = service
5671            .describe_fts_property_schema("Meeting")
5672            .expect("describe after remove");
5673        assert!(after_remove.is_none());
5674
5675        // Remove non-existent is an error
5676        let err = service.remove_fts_property_schema("Meeting");
5677        assert!(err.is_err());
5678    }
5679
5680    #[test]
5681    fn describe_fts_property_schema_round_trips_recursive_entries() {
5682        let (_db, service) = setup();
5683
5684        let entries = vec![
5685            FtsPropertyPathSpec::scalar("$.title"),
5686            FtsPropertyPathSpec::recursive("$.payload"),
5687        ];
5688        let exclude = vec!["$.payload.private".to_owned()];
5689        let registered = service
5690            .register_fts_property_schema_with_entries(
5691                "KnowledgeItem",
5692                &entries,
5693                Some(" "),
5694                &exclude,
5695                crate::rebuild_actor::RebuildMode::Eager,
5696            )
5697            .expect("register recursive");
5698
5699        // The register entry point now echoes back the fully-populated
5700        // record via the same load helper used by describe/list.
5701        assert_eq!(registered.entries, entries);
5702        assert_eq!(registered.exclude_paths, exclude);
5703        assert_eq!(registered.property_paths, vec!["$.title", "$.payload"]);
5704
5705        let described = service
5706            .describe_fts_property_schema("KnowledgeItem")
5707            .expect("describe")
5708            .expect("should exist");
5709        assert_eq!(described.kind, "KnowledgeItem");
5710        assert_eq!(described.entries, entries);
5711        assert_eq!(described.exclude_paths, exclude);
5712        assert_eq!(described.property_paths, vec!["$.title", "$.payload"]);
5713        assert_eq!(described.separator, " ");
5714        assert_eq!(described.format_version, 1);
5715    }
5716
5717    #[test]
5718    fn list_fts_property_schemas_round_trips_recursive_entries() {
5719        let (_db, service) = setup();
5720
5721        let entries = vec![
5722            FtsPropertyPathSpec::scalar("$.title"),
5723            FtsPropertyPathSpec::recursive("$.payload"),
5724        ];
5725        let exclude = vec!["$.payload.secret".to_owned()];
5726        service
5727            .register_fts_property_schema_with_entries(
5728                "KnowledgeItem",
5729                &entries,
5730                Some(" "),
5731                &exclude,
5732                crate::rebuild_actor::RebuildMode::Eager,
5733            )
5734            .expect("register recursive");
5735
5736        let listed = service.list_fts_property_schemas().expect("list");
5737        assert_eq!(listed.len(), 1);
5738        let record = &listed[0];
5739        assert_eq!(record.kind, "KnowledgeItem");
5740        assert_eq!(record.entries, entries);
5741        assert_eq!(record.exclude_paths, exclude);
5742        assert_eq!(record.property_paths, vec!["$.title", "$.payload"]);
5743    }
5744
5745    #[test]
5746    fn describe_fts_property_schema_round_trips_scalar_only_entries() {
5747        let (_db, service) = setup();
5748
5749        service
5750            .register_fts_property_schema(
5751                "Meeting",
5752                &["$.title".to_owned(), "$.summary".to_owned()],
5753                None,
5754            )
5755            .expect("register scalar");
5756
5757        let described = service
5758            .describe_fts_property_schema("Meeting")
5759            .expect("describe")
5760            .expect("should exist");
5761        assert_eq!(described.property_paths, vec!["$.title", "$.summary"]);
5762        assert_eq!(described.entries.len(), 2);
5763        for entry in &described.entries {
5764            assert_eq!(
5765                entry.mode,
5766                FtsPropertyPathMode::Scalar,
5767                "scalar-only schema should deserialize every entry as Scalar"
5768            );
5769        }
5770        assert!(described.exclude_paths.is_empty());
5771    }
5772
5773    #[test]
5774    fn restore_reestablishes_property_fts_visibility() {
5775        let (db, service) = setup();
5776        let doc_table = fathomdb_schema::fts_kind_table_name("Document");
5777        {
5778            let conn = sqlite::open_connection(db.path()).expect("conn");
5779            // Register a property schema for Document kind.
5780            conn.execute(
5781                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5782                 VALUES ('Document', '[\"$.title\", \"$.body\"]', ' ')",
5783                [],
5784            )
5785            .expect("register schema");
5786            // Create the per-kind FTS table.
5787            conn.execute_batch(&format!(
5788                "CREATE VIRTUAL TABLE IF NOT EXISTS {doc_table} USING fts5(\
5789                    node_logical_id UNINDEXED, text_content, \
5790                    tokenize = 'porter unicode61 remove_diacritics 2'\
5791                )"
5792            ))
5793            .expect("create per-kind table");
5794            // Insert an active node with extractable properties.
5795            conn.execute(
5796                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5797                 VALUES ('row-1', 'doc-1', 'Document', '{\"title\":\"Budget\",\"body\":\"Q3 forecast\"}', 100, 'seed')",
5798                [],
5799            )
5800            .expect("insert node");
5801            // Insert a chunk so restore has something to work with for FTS.
5802            conn.execute(
5803                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5804                 VALUES ('chunk-1', 'doc-1', 'budget text', 100)",
5805                [],
5806            )
5807            .expect("insert chunk");
5808            // Insert property FTS row into per-kind table (as write path would).
5809            conn.execute(
5810                &format!(
5811                    "INSERT INTO {doc_table} (node_logical_id, text_content) \
5812                     VALUES ('doc-1', 'Budget Q3 forecast')"
5813                ),
5814                [],
5815            )
5816            .expect("insert property fts");
5817            // Simulate retire: supersede node, clear FTS.
5818            conn.execute(
5819                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5820                 VALUES ('evt-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5821                [],
5822            )
5823            .expect("retire event");
5824            conn.execute(
5825                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5826                [],
5827            )
5828            .expect("supersede");
5829            conn.execute("DELETE FROM fts_nodes", [])
5830                .expect("clear chunk fts");
5831            conn.execute(&format!("DELETE FROM {doc_table}"), [])
5832                .expect("clear property fts");
5833        }
5834
5835        let report = service.restore_logical_id("doc-1").expect("restore");
5836        assert_eq!(report.restored_property_fts_rows, 1);
5837
5838        // Verify the property FTS row was recreated in the per-kind table.
5839        let conn = sqlite::open_connection(db.path()).expect("conn");
5840        let prop_fts_count: i64 = conn
5841            .query_row(
5842                &format!("SELECT count(*) FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5843                [],
5844                |row| row.get(0),
5845            )
5846            .expect("prop fts count");
5847        assert_eq!(prop_fts_count, 1, "property FTS must be restored");
5848
5849        let text: String = conn
5850            .query_row(
5851                &format!("SELECT text_content FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5852                [],
5853                |row| row.get(0),
5854            )
5855            .expect("prop fts text");
5856        assert_eq!(text, "Budget Q3 forecast");
5857    }
5858
5859    #[test]
5860    fn safe_export_preserves_fts_property_schemas() {
5861        let (_db, service) = setup();
5862        service
5863            .register_fts_property_schema(
5864                "Goal",
5865                &["$.name".to_owned(), "$.rationale".to_owned()],
5866                None,
5867            )
5868            .expect("register schema");
5869
5870        let export_dir = tempfile::TempDir::new().expect("temp dir");
5871        let export_path = export_dir.path().join("backup.db");
5872        service
5873            .safe_export(
5874                &export_path,
5875                SafeExportOptions {
5876                    force_checkpoint: false,
5877                },
5878            )
5879            .expect("export");
5880
5881        // Open the exported DB and verify the schema survived.
5882        let exported_conn = rusqlite::Connection::open(&export_path).expect("open exported db");
5883        let kind: String = exported_conn
5884            .query_row(
5885                "SELECT kind FROM fts_property_schemas WHERE kind = 'Goal'",
5886                [],
5887                |row| row.get(0),
5888            )
5889            .expect("schema must exist in export");
5890        assert_eq!(kind, "Goal");
5891        let paths_json: String = exported_conn
5892            .query_row(
5893                "SELECT property_paths_json FROM fts_property_schemas WHERE kind = 'Goal'",
5894                [],
5895                |row| row.get(0),
5896            )
5897            .expect("paths must exist");
5898        let paths: Vec<String> = serde_json::from_str(&paths_json).expect("valid json");
5899        assert_eq!(paths, vec!["$.name", "$.rationale"]);
5900    }
5901
5902    #[test]
5903    #[allow(clippy::too_many_lines)]
5904    fn export_recovery_rebuilds_property_fts_from_canonical_state() {
5905        let (db, service) = setup();
5906        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5907        // Register a schema and insert two nodes with extractable properties.
5908        service
5909            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5910            .expect("register");
5911        {
5912            let conn = sqlite::open_connection(db.path()).expect("conn");
5913            conn.execute(
5914                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5915                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5916                [],
5917            )
5918            .expect("insert node 1");
5919            conn.execute(
5920                &format!(
5921                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5922                     VALUES ('goal-1', 'Ship v2')"
5923                ),
5924                [],
5925            )
5926            .expect("insert property FTS row 1");
5927            conn.execute(
5928                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5929                 VALUES ('row-2', 'goal-2', 'Goal', '{\"name\":\"Launch redesign\"}', 100, 'seed')",
5930                [],
5931            )
5932            .expect("insert node 2");
5933            conn.execute(
5934                &format!(
5935                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5936                     VALUES ('goal-2', 'Launch redesign')"
5937                ),
5938                [],
5939            )
5940            .expect("insert property FTS row 2");
5941        }
5942
5943        // Export.
5944        let export_dir = tempfile::TempDir::new().expect("temp dir");
5945        let export_path = export_dir.path().join("backup.db");
5946        service
5947            .safe_export(
5948                &export_path,
5949                SafeExportOptions {
5950                    force_checkpoint: false,
5951                },
5952            )
5953            .expect("export");
5954
5955        // Corrupt the derived rows: replace correct text with wrong text for
5956        // goal-1, and delete the row for goal-2 entirely. This exercises both
5957        // corrupted-but-present rows and missing rows in the same recovery.
5958        {
5959            let conn = rusqlite::Connection::open(&export_path).expect("open export");
5960            // Bootstrap the exported DB to get per-kind tables.
5961            SchemaManager::new()
5962                .bootstrap(&conn)
5963                .expect("bootstrap export");
5964            conn.execute(
5965                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5966                [],
5967            )
5968            .expect("delete old row");
5969            conn.execute(
5970                &format!(
5971                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5972                     VALUES ('goal-1', 'completely wrong stale text')"
5973                ),
5974                [],
5975            )
5976            .expect("insert corrupted row");
5977            conn.execute(
5978                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5979                [],
5980            )
5981            .expect("delete goal-2 row");
5982        }
5983
5984        // Open the exported DB and rebuild projections from canonical state.
5985        let schema = Arc::new(SchemaManager::new());
5986        let exported_service = AdminService::new(&export_path, Arc::clone(&schema));
5987        exported_service
5988            .rebuild_projections(ProjectionTarget::Fts)
5989            .expect("rebuild");
5990
5991        // Verify the per-kind table has the correct rows after recovery.
5992        let conn = rusqlite::Connection::open(&export_path).expect("open export for verify");
5993        let goal1_text: String = conn
5994            .query_row(
5995                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5996                [],
5997                |r| r.get(0),
5998            )
5999            .expect("goal-1 text after rebuild");
6000        assert_eq!(
6001            goal1_text, "Ship v2",
6002            "goal-1 text must be corrected by rebuild"
6003        );
6004
6005        let goal2_count: i64 = conn
6006            .query_row(
6007                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
6008                [],
6009                |r| r.get(0),
6010            )
6011            .expect("goal-2 count");
6012        assert_eq!(goal2_count, 1, "goal-2 row must be restored by rebuild");
6013
6014        let stale_count: i64 = conn
6015            .query_row(
6016                &format!("SELECT count(*) FROM {goal_table} WHERE text_content = 'completely wrong stale text'"),
6017                [],
6018                |r| r.get(0),
6019            )
6020            .expect("stale count");
6021        assert_eq!(stale_count, 0, "corrupted text must be gone after rebuild");
6022
6023        // Verify integrity and semantics are clean after recovery.
6024        let integrity = exported_service.check_integrity().expect("integrity");
6025        assert_eq!(integrity.missing_property_fts_rows, 0);
6026        let semantics = exported_service.check_semantics().expect("semantics");
6027        assert_eq!(semantics.drifted_property_fts_rows, 0);
6028        assert_eq!(semantics.orphaned_property_fts_rows, 0);
6029        assert_eq!(semantics.duplicate_property_fts_rows, 0);
6030    }
6031
6032    #[test]
6033    fn check_integrity_no_false_positives_for_empty_extraction() {
6034        let (db, service) = setup();
6035        {
6036            let conn = sqlite::open_connection(db.path()).expect("conn");
6037            // Register a schema that looks for $.searchable
6038            conn.execute(
6039                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6040                 VALUES ('Ticket', '[\"$.searchable\"]', ' ')",
6041                [],
6042            )
6043            .expect("register schema");
6044            // Insert a node whose properties do NOT contain $.searchable —
6045            // correctly has no property FTS row.
6046            conn.execute(
6047                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6048                 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"status\":\"open\"}', 100, 'seed')",
6049                [],
6050            )
6051            .expect("insert node");
6052        }
6053
6054        let report = service.check_integrity().expect("integrity");
6055        assert_eq!(
6056            report.missing_property_fts_rows, 0,
6057            "node with no extractable values must not be counted as missing"
6058        );
6059    }
6060
6061    #[test]
6062    fn check_integrity_detects_genuinely_missing_property_fts_rows() {
6063        let (db, service) = setup();
6064        {
6065            let conn = sqlite::open_connection(db.path()).expect("conn");
6066            conn.execute(
6067                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6068                 VALUES ('Ticket', '[\"$.title\"]', ' ')",
6069                [],
6070            )
6071            .expect("register schema");
6072            // Insert a node WITH an extractable $.title but no property FTS row.
6073            conn.execute(
6074                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6075                 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"title\":\"fix login bug\"}', 100, 'seed')",
6076                [],
6077            )
6078            .expect("insert node");
6079        }
6080
6081        let report = service.check_integrity().expect("integrity");
6082        assert_eq!(
6083            report.missing_property_fts_rows, 1,
6084            "node with extractable values but no property FTS row must be detected"
6085        );
6086    }
6087
6088    #[test]
6089    fn rebuild_projections_fts_restores_missing_property_fts_rows() {
6090        let (db, service) = setup();
6091        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6092        {
6093            let conn = sqlite::open_connection(db.path()).expect("conn");
6094            conn.execute(
6095                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6096                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6097                [],
6098            )
6099            .expect("register schema");
6100            conn.execute(
6101                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6102                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6103                [],
6104            )
6105            .expect("insert node");
6106            // Deliberately do NOT insert a property FTS row.
6107        }
6108
6109        let report = service
6110            .rebuild_projections(ProjectionTarget::Fts)
6111            .expect("rebuild");
6112        assert!(
6113            report.rebuilt_rows >= 1,
6114            "rebuild must insert at least one property FTS row"
6115        );
6116
6117        let conn = sqlite::open_connection(db.path()).expect("conn");
6118        let text: String = conn
6119            .query_row(
6120                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6121                [],
6122                |row| row.get(0),
6123            )
6124            .expect("property FTS row must exist after rebuild");
6125        assert_eq!(text, "Ship v2");
6126    }
6127
6128    #[test]
6129    fn rebuild_missing_projections_fills_gap_for_deleted_property_fts_row() {
6130        let (db, service) = setup();
6131        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6132        {
6133            let conn = sqlite::open_connection(db.path()).expect("conn");
6134            conn.execute(
6135                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6136                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6137                [],
6138            )
6139            .expect("register schema");
6140            conn.execute(
6141                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6142                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6143                [],
6144            )
6145            .expect("insert node");
6146            // Create per-kind table and insert then delete to simulate corruption.
6147            conn.execute_batch(&format!(
6148                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
6149                    node_logical_id UNINDEXED, text_content, \
6150                    tokenize = 'porter unicode61 remove_diacritics 2'\
6151                )"
6152            ))
6153            .expect("create per-kind table");
6154            conn.execute(
6155                &format!(
6156                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
6157                     VALUES ('goal-1', 'Ship v2')"
6158                ),
6159                [],
6160            )
6161            .expect("insert property fts");
6162            conn.execute(
6163                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6164                [],
6165            )
6166            .expect("delete property fts");
6167        }
6168
6169        let report = service
6170            .rebuild_missing_projections()
6171            .expect("rebuild missing");
6172        assert!(
6173            report.rebuilt_rows >= 1,
6174            "missing rebuild must insert the gap-fill row"
6175        );
6176
6177        let conn = sqlite::open_connection(db.path()).expect("conn");
6178        let count: i64 = conn
6179            .query_row(
6180                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6181                [],
6182                |row| row.get(0),
6183            )
6184            .expect("count");
6185        assert_eq!(
6186            count, 1,
6187            "gap-fill must restore exactly one property FTS row"
6188        );
6189    }
6190
6191    #[test]
6192    fn remove_schema_then_rebuild_cleans_stale_property_fts_rows() {
6193        // This test verifies that a full FTS rebuild clears per-kind tables whose
6194        // schema has been removed (orphaned state). We create the orphaned state
6195        // directly via SQL (bypassing the service API, which now eagerly deletes rows
6196        // on schema removal) to simulate a table that was left populated from a
6197        // previous registration cycle.
6198        let (db, service) = setup();
6199        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6200        {
6201            let conn = sqlite::open_connection(db.path()).expect("conn");
6202            conn.execute(
6203                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6204                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6205                [],
6206            )
6207            .expect("insert node");
6208            // Create per-kind table WITHOUT registering a schema — simulates orphaned rows
6209            // that remain after schema removal (or pre-existing table from a previous cycle).
6210            conn.execute_batch(&format!(
6211                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
6212                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
6213            ))
6214            .expect("create per-kind table");
6215            conn.execute(
6216                &format!(
6217                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
6218                     VALUES ('goal-1', 'Ship v2')"
6219                ),
6220                [],
6221            )
6222            .expect("insert property fts");
6223        }
6224
6225        // No schema registered — per-kind table has orphaned rows.
6226        let semantics = service.check_semantics().expect("semantics");
6227        assert_eq!(
6228            semantics.orphaned_property_fts_rows, 1,
6229            "orphaned property FTS rows must be detected with no registered schema"
6230        );
6231
6232        // Full rebuild should clean them (no schema means nothing to rebuild).
6233        service
6234            .rebuild_projections(ProjectionTarget::Fts)
6235            .expect("rebuild");
6236
6237        let conn = sqlite::open_connection(db.path()).expect("conn");
6238        let count: i64 = conn
6239            .query_row(
6240                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6241                [],
6242                |row| row.get(0),
6243            )
6244            .expect("count");
6245        assert_eq!(
6246            count, 0,
6247            "rebuild must delete rows from per-kind tables with no registered schema"
6248        );
6249    }
6250
6251    mod validate_fts_property_paths_tests {
6252        use super::super::validate_fts_property_paths;
6253
6254        #[test]
6255        fn valid_simple_path() {
6256            assert!(validate_fts_property_paths(&["$.name".to_owned()]).is_ok());
6257        }
6258
6259        #[test]
6260        fn valid_nested_path() {
6261            assert!(validate_fts_property_paths(&["$.address.city".to_owned()]).is_ok());
6262        }
6263
6264        #[test]
6265        fn valid_underscore_segment() {
6266            assert!(validate_fts_property_paths(&["$.a_b".to_owned()]).is_ok());
6267        }
6268
6269        #[test]
6270        fn rejects_bare_prefix() {
6271            let result = validate_fts_property_paths(&["$.".to_owned()]);
6272            assert!(result.is_err(), "path '$.' must be rejected");
6273        }
6274
6275        #[test]
6276        fn rejects_double_dot() {
6277            let result = validate_fts_property_paths(&["$..x".to_owned()]);
6278            assert!(result.is_err(), "path '$..x' must be rejected");
6279        }
6280
6281        #[test]
6282        fn rejects_trailing_dot() {
6283            let result = validate_fts_property_paths(&["$.foo.".to_owned()]);
6284            assert!(result.is_err(), "path '$.foo.' must be rejected");
6285        }
6286
6287        #[test]
6288        fn rejects_space_in_segment() {
6289            let result = validate_fts_property_paths(&["$.foo bar".to_owned()]);
6290            assert!(result.is_err(), "path '$.foo bar' must be rejected");
6291        }
6292
6293        #[test]
6294        fn rejects_bracket_syntax() {
6295            let result = validate_fts_property_paths(&["$.foo[0]".to_owned()]);
6296            assert!(result.is_err(), "path '$.foo[0]' must be rejected");
6297        }
6298
6299        #[test]
6300        fn rejects_duplicates() {
6301            let result = validate_fts_property_paths(&["$.name".to_owned(), "$.name".to_owned()]);
6302            assert!(result.is_err(), "duplicate paths must be rejected");
6303        }
6304
6305        #[test]
6306        fn rejects_empty_list() {
6307            let result = validate_fts_property_paths(&[]);
6308            assert!(result.is_err(), "empty path list must be rejected");
6309        }
6310    }
6311
6312    // --- A-6: per-kind FTS table tests ---
6313
6314    #[test]
6315    fn register_fts_schema_writes_to_per_kind_table() {
6316        // After A-6: register_fts_property_schema writes rows to fts_props_<kind>,
6317        // NOT to fts_node_properties.
6318        let (db, service) = setup();
6319        {
6320            let conn = sqlite::open_connection(db.path()).expect("conn");
6321            // Insert a node before registering the schema.
6322            conn.execute(
6323                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6324                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6325                [],
6326            )
6327            .expect("insert node");
6328        }
6329
6330        // Register schema — this triggers eager rebuild which writes to per-kind table.
6331        service
6332            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6333            .expect("register schema");
6334
6335        let conn = sqlite::open_connection(db.path()).expect("conn");
6336        let table = fathomdb_schema::fts_kind_table_name("Goal");
6337        // Per-kind table must have the row.
6338        let per_kind_count: i64 = conn
6339            .query_row(
6340                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6341                [],
6342                |row| row.get(0),
6343            )
6344            .expect("per-kind count");
6345        assert_eq!(
6346            per_kind_count, 1,
6347            "per-kind table must have the row after registration"
6348        );
6349    }
6350
6351    #[test]
6352    fn remove_fts_schema_deletes_from_per_kind_table() {
6353        // After A-6: remove_fts_property_schema deletes rows from fts_props_<kind>.
6354        let (db, service) = setup();
6355        {
6356            let conn = sqlite::open_connection(db.path()).expect("conn");
6357            conn.execute(
6358                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6359                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6360                [],
6361            )
6362            .expect("insert node");
6363        }
6364
6365        service
6366            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6367            .expect("register schema");
6368        service
6369            .remove_fts_property_schema("Goal")
6370            .expect("remove schema");
6371
6372        let conn = sqlite::open_connection(db.path()).expect("conn");
6373        let table = fathomdb_schema::fts_kind_table_name("Goal");
6374        let per_kind_count: i64 = conn
6375            .query_row(
6376                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6377                [],
6378                |row| row.get(0),
6379            )
6380            .expect("per-kind count");
6381        assert_eq!(
6382            per_kind_count, 0,
6383            "per-kind table must be empty after schema removal"
6384        );
6385    }
6386
6387    // --- B-1: weight field tests ---
6388
6389    #[test]
6390    fn fts_path_spec_with_weight_builder() {
6391        let spec = FtsPropertyPathSpec::scalar("$.title").with_weight(5.0);
6392        assert_eq!(spec.weight, Some(5.0));
6393        assert_eq!(spec.path, "$.title");
6394        assert_eq!(spec.mode, FtsPropertyPathMode::Scalar);
6395    }
6396
6397    #[test]
6398    fn fts_path_spec_serialize_with_weight() {
6399        use super::serialize_property_paths_json;
6400        let entries = vec![
6401            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6402            FtsPropertyPathSpec::scalar("$.body"),
6403        ];
6404        let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6405        // Must use rich object format because a weight is present
6406        let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6407        let paths = v
6408            .get("paths")
6409            .expect("paths key")
6410            .as_array()
6411            .expect("array");
6412        assert_eq!(paths.len(), 2);
6413        // First entry has weight
6414        assert_eq!(
6415            paths[0].get("path").and_then(serde_json::Value::as_str),
6416            Some("$.title")
6417        );
6418        assert_eq!(
6419            paths[0].get("weight").and_then(serde_json::Value::as_f64),
6420            Some(2.0)
6421        );
6422        // Second entry has no weight field
6423        assert!(
6424            paths[1].get("weight").is_none(),
6425            "unweighted spec must omit weight field"
6426        );
6427    }
6428
6429    #[test]
6430    fn fts_path_spec_serialize_no_weights() {
6431        use super::serialize_property_paths_json;
6432        let entries = vec![
6433            FtsPropertyPathSpec::scalar("$.title"),
6434            FtsPropertyPathSpec::scalar("$.payload"),
6435        ];
6436        let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6437        // Must use bare string array (backward compat)
6438        let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6439        assert!(
6440            v.is_array(),
6441            "all-scalar no-weight schema must serialize as bare string array"
6442        );
6443        let arr = v.as_array().expect("array");
6444        assert_eq!(arr.len(), 2);
6445        assert_eq!(arr[0].as_str(), Some("$.title"));
6446        assert_eq!(arr[1].as_str(), Some("$.payload"));
6447    }
6448
6449    #[test]
6450    fn fts_weight_validation_out_of_range() {
6451        let (_db, service) = setup();
6452        // weight = 0.0 must be rejected
6453        let entries_zero = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(0.0)];
6454        let result = service.register_fts_property_schema_with_entries(
6455            "Article",
6456            &entries_zero,
6457            None,
6458            &[],
6459            crate::rebuild_actor::RebuildMode::Eager,
6460        );
6461        assert!(result.is_err(), "weight 0.0 must be rejected");
6462        let err_msg = result.expect_err("weight 0.0 must be rejected").to_string();
6463        assert!(
6464            err_msg.contains("weight"),
6465            "error must mention weight: {err_msg}"
6466        );
6467
6468        // weight = 1001.0 must be rejected
6469        let entries_big = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(1001.0)];
6470        let result = service.register_fts_property_schema_with_entries(
6471            "Article",
6472            &entries_big,
6473            None,
6474            &[],
6475            crate::rebuild_actor::RebuildMode::Eager,
6476        );
6477        assert!(result.is_err(), "weight 1001.0 must be rejected");
6478    }
6479
6480    #[test]
6481    fn fts_weight_validation_valid() {
6482        let (_db, service) = setup();
6483        let entries = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(10.0)];
6484        let result = service.register_fts_property_schema_with_entries(
6485            "Article",
6486            &entries,
6487            None,
6488            &[],
6489            crate::rebuild_actor::RebuildMode::Eager,
6490        );
6491        assert!(
6492            result.is_ok(),
6493            "weight 10.0 must be accepted: {:?}",
6494            result.err()
6495        );
6496    }
6497
6498    // --- B-2: create_or_replace_fts_kind_table tests ---
6499
6500    #[test]
6501    fn create_or_replace_creates_multi_column_table() {
6502        use super::create_or_replace_fts_kind_table;
6503        let (db, _service) = setup();
6504        let conn = sqlite::open_connection(db.path()).expect("conn");
6505        let specs = vec![
6506            FtsPropertyPathSpec::scalar("$.title"),
6507            FtsPropertyPathSpec::recursive("$.payload"),
6508        ];
6509        create_or_replace_fts_kind_table(
6510            &conn,
6511            "Article",
6512            &specs,
6513            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6514        )
6515        .expect("create table");
6516
6517        // Verify table exists and has the expected columns.
6518        let table = fathomdb_schema::fts_kind_table_name("Article");
6519        // node_logical_id column
6520        let count: i64 = conn
6521            .query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))
6522            .expect("count");
6523        assert_eq!(count, 0, "new table must be empty");
6524
6525        // Verify columns exist by inserting a row with named columns
6526        let title_col = fathomdb_schema::fts_column_name("$.title", false);
6527        let payload_col = fathomdb_schema::fts_column_name("$.payload", true);
6528        conn.execute(
6529            &format!(
6530                "INSERT INTO {table} (node_logical_id, {title_col}, {payload_col}) VALUES ('id1', 'hello', 'world')"
6531            ),
6532            [],
6533        )
6534        .expect("insert with per-spec columns must succeed");
6535    }
6536
6537    #[test]
6538    fn create_or_replace_drops_and_recreates() {
6539        use super::create_or_replace_fts_kind_table;
6540        let (db, _service) = setup();
6541        let conn = sqlite::open_connection(db.path()).expect("conn");
6542
6543        // First call: 1 spec
6544        let specs_v1 = vec![FtsPropertyPathSpec::scalar("$.title")];
6545        create_or_replace_fts_kind_table(
6546            &conn,
6547            "Post",
6548            &specs_v1,
6549            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6550        )
6551        .expect("create v1");
6552
6553        // Second call: 2 specs (different layout)
6554        let specs_v2 = vec![
6555            FtsPropertyPathSpec::scalar("$.title"),
6556            FtsPropertyPathSpec::scalar("$.summary"),
6557        ];
6558        create_or_replace_fts_kind_table(
6559            &conn,
6560            "Post",
6561            &specs_v2,
6562            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6563        )
6564        .expect("create v2");
6565
6566        // Verify new layout: summary column must exist
6567        let table = fathomdb_schema::fts_kind_table_name("Post");
6568        let summary_col = fathomdb_schema::fts_column_name("$.summary", false);
6569        conn.execute(
6570            &format!("INSERT INTO {table} (node_logical_id, {summary_col}) VALUES ('id1', 'text')"),
6571            [],
6572        )
6573        .expect("second layout must allow summary column");
6574    }
6575
6576    #[test]
6577    fn create_or_replace_invalid_tokenizer() {
6578        use super::create_or_replace_fts_kind_table;
6579        let (db, _service) = setup();
6580        let conn = sqlite::open_connection(db.path()).expect("conn");
6581        let specs = vec![FtsPropertyPathSpec::scalar("$.title")];
6582        let result = create_or_replace_fts_kind_table(&conn, "Post", &specs, "'; DROP TABLE --");
6583        assert!(result.is_err(), "invalid tokenizer must be rejected");
6584        let err_msg = result
6585            .expect_err("invalid tokenizer must be rejected")
6586            .to_string();
6587        assert!(
6588            err_msg.contains("tokenizer"),
6589            "error must mention tokenizer: {err_msg}"
6590        );
6591    }
6592
6593    #[test]
6594    fn register_with_weights_creates_per_column_table() {
6595        let (db, service) = setup();
6596        let entries = vec![
6597            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6598            FtsPropertyPathSpec::scalar("$.body"),
6599        ];
6600        service
6601            .register_fts_property_schema_with_entries(
6602                "Article",
6603                &entries,
6604                None,
6605                &[],
6606                crate::rebuild_actor::RebuildMode::Eager,
6607            )
6608            .expect("register");
6609
6610        // Per-kind table must have per-spec columns, not just text_content
6611        let conn = sqlite::open_connection(db.path()).expect("conn");
6612        let table = fathomdb_schema::fts_kind_table_name("Article");
6613        let title_col = fathomdb_schema::fts_column_name("$.title", false);
6614        let body_col = fathomdb_schema::fts_column_name("$.body", false);
6615        // If the columns exist, insert must succeed
6616        conn.execute(
6617            &format!(
6618                "INSERT INTO {table} (node_logical_id, {title_col}, {body_col}) VALUES ('art-1', 'hello', 'world')"
6619            ),
6620            [],
6621        )
6622        .expect("per-spec columns must exist after registration with weights");
6623    }
6624
6625    #[test]
6626    fn weighted_to_unweighted_downgrade_recreates_table() {
6627        let (db, service) = setup();
6628
6629        // First register with weights (creates per-spec column layout).
6630        let weighted_entries = vec![
6631            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6632            FtsPropertyPathSpec::scalar("$.body"),
6633        ];
6634        service
6635            .register_fts_property_schema_with_entries(
6636                "Article",
6637                &weighted_entries,
6638                None,
6639                &[],
6640                crate::rebuild_actor::RebuildMode::Eager,
6641            )
6642            .expect("register weighted");
6643
6644        // Re-register the same kind WITHOUT weights.
6645        let unweighted_entries = vec![
6646            FtsPropertyPathSpec::scalar("$.title"),
6647            FtsPropertyPathSpec::scalar("$.body"),
6648        ];
6649        service
6650            .register_fts_property_schema_with_entries(
6651                "Article",
6652                &unweighted_entries,
6653                None,
6654                &[],
6655                crate::rebuild_actor::RebuildMode::Eager,
6656            )
6657            .expect("re-register unweighted");
6658
6659        // After downgrade, the table must have the text_content column
6660        // (legacy single-column layout), not the per-spec columns.
6661        let conn = sqlite::open_connection(db.path()).expect("conn");
6662        let table = fathomdb_schema::fts_kind_table_name("Article");
6663        let result = conn.execute(
6664            &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('art-1', 'hello world')"),
6665            [],
6666        );
6667        assert!(
6668            result.is_ok(),
6669            "text_content column must exist after weighted-to-unweighted downgrade"
6670        );
6671    }
6672
6673    // --- Pack A+G: profile CRUD + tokenizer presets ---
6674
6675    #[test]
6676    fn set_get_fts_profile_roundtrip() {
6677        let (_db, service) = setup();
6678        let profile = service
6679            .set_fts_profile("book", "unicode61")
6680            .expect("set_fts_profile");
6681        assert_eq!(profile.kind, "book");
6682        assert_eq!(profile.tokenizer, "unicode61");
6683
6684        let got = service
6685            .get_fts_profile("book")
6686            .expect("get_fts_profile")
6687            .expect("should be Some");
6688        assert_eq!(got.kind, "book");
6689        assert_eq!(got.tokenizer, "unicode61");
6690    }
6691
6692    #[test]
6693    fn fts_profile_upsert() {
6694        let (_db, service) = setup();
6695        service
6696            .set_fts_profile("article", "unicode61")
6697            .expect("first set");
6698        service
6699            .set_fts_profile("article", "porter unicode61 remove_diacritics 2")
6700            .expect("second set");
6701        let got = service
6702            .get_fts_profile("article")
6703            .expect("get")
6704            .expect("Some");
6705        assert_eq!(got.tokenizer, "porter unicode61 remove_diacritics 2");
6706    }
6707
6708    #[test]
6709    fn invalid_tokenizer_rejected() {
6710        let (_db, service) = setup();
6711        let result = service.set_fts_profile("book", "'; DROP TABLE nodes --");
6712        assert!(result.is_err(), "invalid tokenizer must be rejected");
6713        let msg = result.expect_err("must be Err").to_string();
6714        assert!(
6715            msg.contains("tokenizer") || msg.contains("invalid"),
6716            "error must mention tokenizer or invalid: {msg}"
6717        );
6718    }
6719
6720    #[test]
6721    fn preset_recall_optimized_english() {
6722        assert_eq!(
6723            super::resolve_tokenizer_preset("recall-optimized-english"),
6724            "porter unicode61 remove_diacritics 2"
6725        );
6726    }
6727
6728    #[test]
6729    fn preset_precision_optimized() {
6730        assert_eq!(
6731            super::resolve_tokenizer_preset("precision-optimized"),
6732            "unicode61 remove_diacritics 2"
6733        );
6734    }
6735
6736    #[test]
6737    fn preset_global_cjk() {
6738        assert_eq!(super::resolve_tokenizer_preset("global-cjk"), "icu");
6739    }
6740
6741    #[test]
6742    fn preset_substring_trigram() {
6743        assert_eq!(
6744            super::resolve_tokenizer_preset("substring-trigram"),
6745            "trigram"
6746        );
6747    }
6748
6749    #[test]
6750    fn preset_source_code() {
6751        assert_eq!(
6752            super::resolve_tokenizer_preset("source-code"),
6753            "unicode61 tokenchars '._-$@'"
6754        );
6755    }
6756
6757    #[test]
6758    fn preview_fts_row_count() {
6759        let (db, service) = setup();
6760        {
6761            let conn = sqlite::open_connection(db.path()).expect("conn");
6762            for i in 0..5u32 {
6763                conn.execute(
6764                    "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6765                     VALUES (?1, ?2, 'book', '{}', 100, 'src')",
6766                    rusqlite::params![format!("r{i}"), format!("lg{i}")],
6767                )
6768                .expect("insert node");
6769            }
6770            // Insert one superseded node that must NOT count
6771            conn.execute(
6772                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, superseded_at) \
6773                 VALUES ('r99', 'lg99', 'book', '{}', 100, 'src', 200)",
6774                [],
6775            )
6776            .expect("insert superseded");
6777        }
6778        let impact = service
6779            .preview_projection_impact("book", "fts")
6780            .expect("preview");
6781        assert_eq!(impact.rows_to_rebuild, 5);
6782    }
6783
6784    #[test]
6785    fn preview_populates_current_tokenizer() {
6786        let (_db, service) = setup();
6787        service
6788            .set_fts_profile("doc", "trigram")
6789            .expect("set profile");
6790        let impact = service
6791            .preview_projection_impact("doc", "fts")
6792            .expect("preview");
6793        assert_eq!(impact.current_tokenizer, Some("trigram".to_owned()));
6794        assert_eq!(impact.target_tokenizer, None);
6795    }
6796
6797    // --- Review fix: tokenizer allowlist alignment ---
6798
6799    #[test]
6800    fn create_or_replace_source_code_tokenizer_is_accepted() {
6801        // The source-code preset expands to "unicode61 tokenchars '._-$@'" which
6802        // contains `.`, `-`, `$`, `@`. The allowlist in create_or_replace_fts_kind_table
6803        // must accept these characters (matching set_fts_profile's allowlist).
6804        use super::create_or_replace_fts_kind_table;
6805        let (db, _service) = setup();
6806        let conn = sqlite::open_connection(db.path()).expect("conn");
6807        let specs = vec![FtsPropertyPathSpec::scalar("$.symbol")];
6808        let source_code_tokenizer = "unicode61 tokenchars '._-$@'";
6809        let result =
6810            create_or_replace_fts_kind_table(&conn, "Symbol", &specs, source_code_tokenizer);
6811        assert!(
6812            result.is_ok(),
6813            "source-code tokenizer string must be accepted by create_or_replace_fts_kind_table: {:?}",
6814            result.err()
6815        );
6816    }
6817
6818    #[test]
6819    fn source_code_profile_round_trip_through_register_fts_schema() {
6820        // Verify that set_fts_profile("source-code") followed by
6821        // register_fts_property_schema succeeds end-to-end.
6822        // Previously failed because set_fts_profile accepted "unicode61 tokenchars '._-$@'"
6823        // but create_or_replace_fts_kind_table rejected it (only allowed " '_").
6824        let db = tempfile::NamedTempFile::new().expect("temp file");
6825        let schema = Arc::new(fathomdb_schema::SchemaManager::new());
6826
6827        // Bootstrap the schema (creates projection_profiles table via migration 20).
6828        {
6829            let _coord = crate::ExecutionCoordinator::open(
6830                db.path(),
6831                Arc::clone(&schema),
6832                None,
6833                1,
6834                Arc::new(crate::TelemetryCounters::default()),
6835                None,
6836            )
6837            .expect("coordinator opens for bootstrap");
6838        }
6839
6840        let service = AdminService::new(db.path(), Arc::clone(&schema));
6841
6842        // Set source-code profile (uses preset resolver, stores "unicode61 tokenchars '._-$@'").
6843        service
6844            .set_fts_profile("Symbol", "source-code")
6845            .expect("set_fts_profile with source-code preset must succeed");
6846
6847        // Register an FTS schema for this kind — this calls create_or_replace_fts_kind_table
6848        // with the tokenizer from the profile row.
6849        let result = service.register_fts_property_schema("Symbol", &["$.name".to_owned()], None);
6850        assert!(
6851            result.is_ok(),
6852            "register_fts_property_schema must succeed when source-code profile is active: {:?}",
6853            result.err()
6854        );
6855    }
6856
6857    // --- 0.5.0 item 5: max_tokens() capacity ---
6858
6859    /// A stub embedder with `max_tokens=8192` can embed a pre-written chunk
6860    /// whose text exceeds 512 words without error. Verifies that `max_tokens()`
6861    /// advertises the correct capacity and that `regenerate_vector_embeddings`
6862    /// produces one vector row for one stored chunk, regardless of chunk length.
6863    /// (The engine does not re-chunk at regen time; splitting is the caller's
6864    /// responsibility at write time.)
6865    #[cfg(feature = "sqlite-vec")]
6866    #[test]
6867    fn embedder_max_tokens_8192_handles_chunk_exceeding_512_words() {
6868        let long_text = (0..600u32)
6869            .map(|i| format!("word{i}"))
6870            .collect::<Vec<_>>()
6871            .join(" ");
6872
6873        let db = NamedTempFile::new().expect("temp file");
6874        let schema = Arc::new(SchemaManager::new());
6875
6876        {
6877            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6878            schema.bootstrap(&conn).expect("bootstrap");
6879            conn.execute(
6880                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6881                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'src-1')",
6882                [],
6883            )
6884            .expect("insert node");
6885            conn.execute(
6886                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6887                 VALUES (?1, 'doc-1', ?2, 100)",
6888                rusqlite::params!["chunk-long", long_text],
6889            )
6890            .expect("insert long chunk");
6891        }
6892
6893        let embedder = LargeContextTestEmbedder::new("long-context-model", 4, 8192);
6894        let service = AdminService::new(db.path(), Arc::clone(&schema));
6895        let report = service
6896            .regenerate_vector_embeddings(
6897                &embedder,
6898                &VectorRegenerationConfig {
6899                    kind: "Document".to_owned(),
6900                    profile: "default".to_owned(),
6901                    chunking_policy: "per_chunk".to_owned(),
6902                    preprocessing_policy: "trim".to_owned(),
6903                },
6904            )
6905            .expect("regenerate with long-context embedder");
6906
6907        assert_eq!(
6908            report.total_chunks, 1,
6909            "600-word text pre-written as one chunk must result in exactly one embedded row"
6910        );
6911        assert_eq!(report.regenerated_rows, 1);
6912        assert_eq!(
6913            embedder.max_tokens(),
6914            8192,
6915            "embedder must advertise 8192 token capacity"
6916        );
6917    }
6918
6919    /// Stub embedder with a configurable `max_tokens` for long-context tests.
6920    #[cfg(feature = "sqlite-vec")]
6921    #[derive(Debug)]
6922    struct LargeContextTestEmbedder {
6923        identity: QueryEmbedderIdentity,
6924        vector: Vec<f32>,
6925        max_tokens: usize,
6926    }
6927
6928    #[cfg(feature = "sqlite-vec")]
6929    impl LargeContextTestEmbedder {
6930        fn new(model: &str, dimension: usize, max_tokens: usize) -> Self {
6931            Self {
6932                identity: QueryEmbedderIdentity {
6933                    model_identity: model.to_owned(),
6934                    model_version: "1.0.0".to_owned(),
6935                    dimension,
6936                    normalization_policy: "l2".to_owned(),
6937                },
6938                vector: vec![1.0; dimension],
6939                max_tokens,
6940            }
6941        }
6942    }
6943
6944    #[cfg(feature = "sqlite-vec")]
6945    impl QueryEmbedder for LargeContextTestEmbedder {
6946        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
6947            Ok(self.vector.clone())
6948        }
6949        fn identity(&self) -> QueryEmbedderIdentity {
6950            self.identity.clone()
6951        }
6952        fn max_tokens(&self) -> usize {
6953            self.max_tokens
6954        }
6955    }
6956
6957    /// Item 7 integration test: register schema, write nodes, call
6958    /// `regenerate_vector_embeddings_in_process`, verify contract row and
6959    /// that vec rows exist for every chunk.
6960    #[cfg(feature = "sqlite-vec")]
6961    #[test]
6962    #[allow(clippy::too_many_lines)]
6963    fn regenerate_vector_embeddings_in_process_writes_contract_and_vec_rows() {
6964        let db = NamedTempFile::new().expect("temp file");
6965        let schema = Arc::new(SchemaManager::new());
6966
6967        {
6968            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6969            schema.bootstrap(&conn).expect("bootstrap");
6970            for (row_id, logical_id, created_at, src) in [
6971                ("r1", "node-1", 100, "src1"),
6972                ("r2", "node-2", 101, "src2"),
6973                ("r3", "node-3", 102, "src3"),
6974            ] {
6975                conn.execute(
6976                    "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6977                     VALUES (?1, ?2, 'Doc', '{}', ?3, ?4)",
6978                    rusqlite::params![row_id, logical_id, created_at, src],
6979                )
6980                .expect("insert node");
6981            }
6982            for (chunk_id, node_id, text, created_at) in [
6983                ("c1", "node-1", "first document text", 100),
6984                ("c2", "node-2", "second document text", 101),
6985                ("c3", "node-3", "third document text", 102),
6986            ] {
6987                conn.execute(
6988                    "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6989                     VALUES (?1, ?2, ?3, ?4)",
6990                    rusqlite::params![chunk_id, node_id, text, created_at],
6991                )
6992                .expect("insert chunk");
6993            }
6994        }
6995
6996        let service = AdminService::new(db.path(), Arc::clone(&schema));
6997        let embedder = TestEmbedder::new("batch-test-model", 4);
6998        let config = VectorRegenerationConfig {
6999            kind: "Doc".to_owned(),
7000            profile: "default".to_owned(),
7001            chunking_policy: "per_chunk".to_owned(),
7002            preprocessing_policy: "trim".to_owned(),
7003        };
7004        let report = service
7005            .regenerate_vector_embeddings_in_process(&embedder, &config)
7006            .expect("in-process regen must succeed");
7007
7008        assert_eq!(report.total_chunks, 3);
7009        assert_eq!(report.regenerated_rows, 3);
7010        assert!(report.contract_persisted);
7011
7012        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7013        let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
7014        let vec_count: i64 = conn
7015            .query_row(&format!("SELECT count(*) FROM {vec_table}"), [], |row| {
7016                row.get(0)
7017            })
7018            .expect("vec_doc count");
7019        assert_eq!(vec_count, 3, "one vec row per chunk");
7020
7021        let model_identity: String = conn
7022            .query_row(
7023                "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
7024                [],
7025                |row| row.get(0),
7026            )
7027            .expect("contract row");
7028        assert_eq!(model_identity, "batch-test-model");
7029    }
7030
7031    // --- 0.5.0 item 6: per-kind vec regeneration ---
7032
7033    #[cfg(feature = "sqlite-vec")]
7034    #[test]
7035    #[allow(clippy::too_many_lines)]
7036    fn regenerate_vector_embeddings_targets_per_kind_table() {
7037        let db = NamedTempFile::new().expect("temp file");
7038        let schema = Arc::new(SchemaManager::new());
7039
7040        {
7041            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7042            schema.bootstrap(&conn).expect("bootstrap");
7043            conn.execute(
7044                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
7045                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
7046                [],
7047            )
7048            .expect("insert node");
7049            conn.execute(
7050                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
7051                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
7052                [],
7053            )
7054            .expect("insert chunk");
7055        }
7056
7057        let service = AdminService::new(db.path(), Arc::clone(&schema));
7058        let embedder = TestEmbedder::new("test-model", 4);
7059        let report = service
7060            .regenerate_vector_embeddings(
7061                &embedder,
7062                &VectorRegenerationConfig {
7063                    kind: "Document".to_owned(),
7064                    profile: "default".to_owned(),
7065                    chunking_policy: "per_chunk".to_owned(),
7066                    preprocessing_policy: "trim".to_owned(),
7067                },
7068            )
7069            .expect("regenerate vectors");
7070
7071        let expected_vec_table = fathomdb_schema::vec_kind_table_name("Document");
7072        assert_eq!(report.table_name, expected_vec_table);
7073        assert_eq!(report.regenerated_rows, 1);
7074
7075        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7076        let vec_count: i64 = conn
7077            .query_row(
7078                &format!("SELECT count(*) FROM {expected_vec_table}"),
7079                [],
7080                |row| row.get(0),
7081            )
7082            .expect("vec_document count");
7083        assert_eq!(vec_count, 1, "rows must be in vec_document");
7084
7085        let old_count: i64 = conn
7086            .query_row(
7087                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
7088                [],
7089                |r| r.get(0),
7090            )
7091            .expect("sqlite_master check");
7092        assert_eq!(
7093            old_count, 0,
7094            "vec_nodes_active must NOT be created for per-kind regen"
7095        );
7096    }
7097
7098    // --- 0.5.0 item 6 step 5: get_vec_profile reads per-kind key ---
7099
7100    #[test]
7101    fn get_vec_profile_returns_none_when_no_profile_exists() {
7102        let (db, service) = setup();
7103        let _ = db;
7104        let result = service.get_vec_profile("MyKind").expect("should not error");
7105        assert!(
7106            result.is_none(),
7107            "must return None when no profile registered"
7108        );
7109    }
7110
7111    #[cfg(feature = "sqlite-vec")]
7112    #[test]
7113    fn get_vec_profile_returns_profile_for_registered_kind() {
7114        let db = NamedTempFile::new().expect("temp file");
7115        let schema = Arc::new(SchemaManager::new());
7116        {
7117            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
7118            schema.bootstrap(&conn).expect("bootstrap");
7119            schema
7120                .ensure_vec_kind_profile(&conn, "MyKind", 128)
7121                .expect("ensure_vec_kind_profile");
7122        }
7123
7124        let service = AdminService::new(db.path(), Arc::clone(&schema));
7125        let profile = service.get_vec_profile("MyKind").expect("should not error");
7126        assert!(profile.is_some(), "must return profile after registration");
7127        assert_eq!(profile.expect("profile present").dimensions, 128);
7128    }
7129
7130    #[test]
7131    fn get_vec_profile_does_not_return_global_sentinel_row() {
7132        let (db, service) = setup();
7133        {
7134            let conn = sqlite::open_connection(db.path()).expect("conn");
7135            conn.execute(
7136                "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
7137                 VALUES ('*', 'vec', '{\"model_identity\":\"old-model\",\"dimensions\":384}', 0, 0)",
7138                [],
7139            )
7140            .expect("insert global sentinel");
7141        }
7142        let result = service
7143            .get_vec_profile("SomeKind")
7144            .expect("should not error");
7145        assert!(
7146            result.is_none(),
7147            "per-kind query must not return global ('*', 'vec') row"
7148        );
7149    }
7150}