Skip to main content

fathomdb_engine/admin/
mod.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::mpsc::SyncSender;
4
5use fathomdb_schema::SchemaManager;
6use serde::{Deserialize, Serialize};
7
8use crate::rebuild_actor::{RebuildMode, RebuildRequest};
9
10use crate::{
11    EngineError, ProjectionRepairReport, ProjectionService, ids::new_id,
12    projection::ProjectionTarget, sqlite,
13};
14
15mod fts;
16mod operational;
17mod provenance;
18mod vector;
19
20pub use vector::load_vector_regeneration_config;
21
22#[cfg(test)]
23use fts::{
24    create_or_replace_fts_kind_table, serialize_property_paths_json, validate_fts_property_paths,
25};
26
27/// Results of a physical and structural integrity check on the database.
28#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
29pub struct IntegrityReport {
30    pub physical_ok: bool,
31    pub foreign_keys_ok: bool,
32    pub missing_fts_rows: usize,
33    pub missing_property_fts_rows: usize,
34    pub duplicate_active_logical_ids: usize,
35    pub operational_missing_collections: usize,
36    pub operational_missing_last_mutations: usize,
37    pub warnings: Vec<String>,
38}
39
40/// A registered FTS property projection schema for a node kind.
41#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
42pub struct FtsPropertySchemaRecord {
43    /// The node kind this schema applies to.
44    pub kind: String,
45    /// Flat display list of registered JSON property paths
46    /// (e.g. `["$.name", "$.title"]`). For recursive entries this lists
47    /// only the root path; mode information is carried by
48    /// [`Self::entries`].
49    pub property_paths: Vec<String>,
50    /// Full per-entry schema shape with mode
51    /// ([`FtsPropertyPathMode::Scalar`] | [`FtsPropertyPathMode::Recursive`]).
52    /// Read this field for mode-accurate round-trip of the registered
53    /// schema.
54    pub entries: Vec<FtsPropertyPathSpec>,
55    /// Subtree paths excluded from recursive walks. Empty for
56    /// scalar-only schemas or recursive schemas with no exclusions.
57    pub exclude_paths: Vec<String>,
58    /// Separator used when concatenating extracted values.
59    pub separator: String,
60    /// Schema format version.
61    pub format_version: i64,
62}
63
64/// Extraction mode for a single registered FTS property path.
65#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
66#[serde(rename_all = "snake_case")]
67pub enum FtsPropertyPathMode {
68    /// Resolve the path and append the scalar value(s). Matches legacy
69    /// pre-Phase-4 behaviour.
70    #[default]
71    Scalar,
72    /// Recursively walk every scalar leaf rooted at the path. Each leaf
73    /// contributes one entry to the position map.
74    Recursive,
75}
76
77/// A single registered property-FTS path with its extraction mode.
78#[non_exhaustive]
79#[derive(Clone, Debug, PartialEq, Serialize)]
80pub struct FtsPropertyPathSpec {
81    /// JSON path to the property (must start with `$.`).
82    pub path: String,
83    /// Whether to treat this path as a scalar or recursively walk it.
84    pub mode: FtsPropertyPathMode,
85    /// Optional BM25 weight multiplier for this path (1.0 = default).
86    /// Must satisfy `0.0 < weight <= 1000.0` when set.
87    pub weight: Option<f32>,
88}
89
90// f32 does not implement Eq (due to NaN), but weights in practice are
91// always finite values set by callers, so reflexivity holds.
92impl Eq for FtsPropertyPathSpec {}
93
94impl FtsPropertyPathSpec {
95    #[must_use]
96    pub fn scalar(path: impl Into<String>) -> Self {
97        Self {
98            path: path.into(),
99            mode: FtsPropertyPathMode::Scalar,
100            weight: None,
101        }
102    }
103
104    #[must_use]
105    pub fn recursive(path: impl Into<String>) -> Self {
106        Self {
107            path: path.into(),
108            mode: FtsPropertyPathMode::Recursive,
109            weight: None,
110        }
111    }
112
113    /// Set the BM25 weight multiplier for this path.
114    ///
115    /// The weight must satisfy `0.0 < weight <= 1000.0` at registration
116    /// time; this builder method does not validate — validation happens in
117    /// `register_fts_property_schema_with_entries`.
118    #[must_use]
119    pub fn with_weight(mut self, weight: f32) -> Self {
120        self.weight = Some(weight);
121        self
122    }
123}
124
125/// Options controlling how a safe database export is performed.
126#[derive(Clone, Copy, Debug)]
127pub struct SafeExportOptions {
128    /// When true, runs `PRAGMA wal_checkpoint(FULL)` before copying and fails if
129    /// any WAL frames could not be applied (busy != 0). Set to false only in
130    /// tests that seed a database without WAL mode.
131    pub force_checkpoint: bool,
132}
133
134impl Default for SafeExportOptions {
135    fn default() -> Self {
136        Self {
137            force_checkpoint: true,
138        }
139    }
140}
141
142// Must match PROTOCOL_VERSION in fathomdb-admin-bridge.rs
143const EXPORT_PROTOCOL_VERSION: u32 = 1;
144
145/// Manifest describing a completed safe export.
146#[derive(Clone, Debug, Serialize)]
147pub struct SafeExportManifest {
148    /// Unix timestamp (seconds since epoch) when the export was created.
149    pub exported_at: u64,
150    /// SHA-256 hex digest of the exported database file.
151    pub sha256: String,
152    /// Schema version recorded in `fathom_schema_migrations` at export time.
153    pub schema_version: u32,
154    /// Bridge protocol version compiled into this binary.
155    pub protocol_version: u32,
156    /// Number of `SQLite` pages in the exported database file.
157    pub page_count: u64,
158}
159
160/// Report from tracing all rows associated with a given `source_ref`.
161#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
162pub struct TraceReport {
163    pub source_ref: String,
164    pub node_rows: usize,
165    pub edge_rows: usize,
166    pub action_rows: usize,
167    pub operational_mutation_rows: usize,
168    pub node_logical_ids: Vec<String>,
169    pub action_ids: Vec<String>,
170    pub operational_mutation_ids: Vec<String>,
171}
172
173/// An edge that was skipped during a restore because an endpoint is missing.
174#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
175pub struct SkippedEdge {
176    pub edge_logical_id: String,
177    pub missing_endpoint: String,
178}
179
180/// Report from restoring a retired logical ID back to active state.
181#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
182pub struct LogicalRestoreReport {
183    pub logical_id: String,
184    pub was_noop: bool,
185    pub restored_node_rows: usize,
186    pub restored_edge_rows: usize,
187    pub restored_chunk_rows: usize,
188    pub restored_fts_rows: usize,
189    pub restored_property_fts_rows: usize,
190    pub restored_vec_rows: usize,
191    pub skipped_edges: Vec<SkippedEdge>,
192    pub notes: Vec<String>,
193}
194
195/// Report from permanently purging all rows for a logical ID.
196#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
197pub struct LogicalPurgeReport {
198    pub logical_id: String,
199    pub was_noop: bool,
200    pub deleted_node_rows: usize,
201    pub deleted_edge_rows: usize,
202    pub deleted_chunk_rows: usize,
203    pub deleted_fts_rows: usize,
204    pub deleted_vec_rows: usize,
205    pub notes: Vec<String>,
206}
207
208/// Options controlling provenance event purging behavior.
209#[derive(Clone, Debug, Serialize, Deserialize)]
210pub struct ProvenancePurgeOptions {
211    pub dry_run: bool,
212    #[serde(default)]
213    pub preserve_event_types: Vec<String>,
214}
215
216/// Report from a provenance event purge operation.
217#[derive(Clone, Debug, Serialize)]
218pub struct ProvenancePurgeReport {
219    pub events_deleted: u64,
220    pub events_preserved: u64,
221    pub oldest_remaining: Option<i64>,
222}
223
224/// Service providing administrative operations (integrity checks, exports, restores, purges).
225#[derive(Debug)]
226pub struct AdminService {
227    pub(super) database_path: PathBuf,
228    pub(super) schema_manager: Arc<SchemaManager>,
229    pub(super) projections: ProjectionService,
230    /// Sender side of the rebuild actor's channel.  `None` when the engine
231    /// was opened without a rebuild actor (e.g. in tests that use
232    /// [`AdminService::new`] directly).
233    pub(super) rebuild_sender: Option<SyncSender<RebuildRequest>>,
234}
235
236/// Results of a semantic consistency check on the graph data.
237#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
238pub struct SemanticReport {
239    /// Chunks whose `node_logical_id` has no active node.
240    pub orphaned_chunks: usize,
241    /// Active nodes with a NULL `source_ref` (loss of provenance).
242    pub null_source_ref_nodes: usize,
243    /// Steps referencing a `run_id` that does not exist in the runs table.
244    pub broken_step_fk: usize,
245    /// Actions referencing a `step_id` that does not exist in the steps table.
246    pub broken_action_fk: usize,
247    /// FTS rows whose `chunk_id` does not exist in the chunks table.
248    pub stale_fts_rows: usize,
249    /// FTS rows whose node has been superseded (`superseded_at` IS NOT NULL on all active rows).
250    pub fts_rows_for_superseded_nodes: usize,
251    /// Property FTS rows whose node has been superseded or does not exist.
252    pub stale_property_fts_rows: usize,
253    /// Property FTS rows whose kind has no registered FTS property schema.
254    pub orphaned_property_fts_rows: usize,
255    /// Property FTS rows whose `kind` does not match the active node's actual kind.
256    pub mismatched_kind_property_fts_rows: usize,
257    /// Active logical IDs with more than one per-kind FTS property row.
258    pub duplicate_property_fts_rows: usize,
259    /// Property FTS rows whose `text_content` no longer matches the canonical extraction.
260    pub drifted_property_fts_rows: usize,
261    /// Active edges where at least one endpoint has no active node.
262    pub dangling_edges: usize,
263    /// `logical_ids` where every version has been superseded (no active row).
264    pub orphaned_supersession_chains: usize,
265    /// Vec rows whose backing chunk no longer exists in the chunks table.
266    pub stale_vec_rows: usize,
267    /// Compatibility counter for vec rows whose chunk points at missing node history.
268    pub vec_rows_for_superseded_nodes: usize,
269    /// Latest-state keys whose latest mutation is a `put` but no current row exists.
270    pub missing_operational_current_rows: usize,
271    /// Current rows that do not match the latest mutation state.
272    pub stale_operational_current_rows: usize,
273    /// Mutations written after the owning collection was disabled.
274    pub disabled_collection_mutations: usize,
275    /// Access metadata rows whose `logical_id` no longer has any node history.
276    pub orphaned_last_access_metadata_rows: usize,
277    pub warnings: Vec<String>,
278}
279
280/// Configuration for regenerating vector embeddings.
281///
282/// 0.4.0 architectural invariant: vector identity is the embedder's
283/// responsibility, not the regeneration config's. This struct carries only
284/// WHERE the vectors live and HOW to chunk/preprocess them — never WHAT
285/// model produced them. The embedder supplied at regen-call time is the
286/// single source of truth for `model_identity`, `model_version`,
287/// `dimension`, and `normalization_policy`; the resulting vector profile
288/// is stamped directly from [`QueryEmbedder::identity`].
289///
290/// 0.5.0 breaking change: `table_name` is removed. The vec table name is now
291/// derived from `kind` via [`fathomdb_schema::vec_kind_table_name`].
292#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
293#[serde(rename_all = "snake_case", deny_unknown_fields)]
294pub struct VectorRegenerationConfig {
295    pub kind: String,
296    pub profile: String,
297    pub chunking_policy: String,
298    pub preprocessing_policy: String,
299}
300
301/// Report from a vector embedding regeneration run.
302#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
303pub struct VectorRegenerationReport {
304    pub profile: String,
305    pub table_name: String,
306    pub dimension: usize,
307    pub total_chunks: usize,
308    pub regenerated_rows: usize,
309    pub contract_persisted: bool,
310    pub notes: Vec<String>,
311}
312
313/// Stored FTS tokenizer profile for a node kind.
314///
315/// Created and updated by [`AdminService::set_fts_profile`].
316#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
317pub struct FtsProfile {
318    /// Node kind this profile applies to (e.g. `"Article"`).
319    pub kind: String,
320    /// FTS5 tokenizer string (e.g. `"porter unicode61 remove_diacritics 2"`).
321    pub tokenizer: String,
322    /// Unix timestamp when the profile was last activated, or `None` if never.
323    pub active_at: Option<i64>,
324    /// Unix timestamp when the profile row was first created.
325    pub created_at: i64,
326}
327
328/// Stored vector embedding profile (global, kind-agnostic).
329///
330/// Created and updated by [`AdminService::set_vec_profile`].
331#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
332pub struct VecProfile {
333    /// Identifier for the embedding model (e.g. `"openai/text-embedding-3-small"`).
334    pub model_identity: String,
335    /// Optional version string for the model.
336    pub model_version: Option<String>,
337    /// Number of dimensions produced by the model.
338    pub dimensions: u32,
339    /// Unix timestamp when the profile was last activated, or `None` if never.
340    pub active_at: Option<i64>,
341    /// Unix timestamp when the profile row was first created.
342    pub created_at: i64,
343}
344
345/// Estimated cost of rebuilding a projection (FTS table or vector embeddings).
346///
347/// Returned by [`AdminService::preview_projection_impact`].
348#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
349pub struct ProjectionImpact {
350    /// Number of rows that would be processed during a full rebuild.
351    pub rows_to_rebuild: u64,
352    /// Rough estimated rebuild time in seconds.
353    pub estimated_seconds: u64,
354    /// Estimated temporary disk space required during rebuild, in bytes.
355    pub temp_db_size_bytes: u64,
356    /// The tokenizer currently stored in `projection_profiles`, if any.
357    pub current_tokenizer: Option<String>,
358    /// Reserved for future use; always `None` currently.
359    pub target_tokenizer: Option<String>,
360}
361
362/// Well-known tokenizer preset names mapped to their FTS5 tokenizer strings.
363pub const TOKENIZER_PRESETS: &[(&str, &str)] = &[
364    (
365        "recall-optimized-english",
366        "porter unicode61 remove_diacritics 2",
367    ),
368    ("precision-optimized", "unicode61 remove_diacritics 2"),
369    ("global-cjk", "icu"),
370    ("substring-trigram", "trigram"),
371    ("source-code", "unicode61 tokenchars '._-$@'"),
372];
373
374/// Resolve a tokenizer preset name to its FTS5 tokenizer string.
375///
376/// If `input` matches a known preset name the preset value is returned.
377/// Otherwise `input` is returned unchanged (treated as a raw tokenizer string).
378pub fn resolve_tokenizer_preset(input: &str) -> &str {
379    for (name, value) in TOKENIZER_PRESETS {
380        if *name == input {
381            return value;
382        }
383    }
384    input
385}
386
387pub(super) const CURRENT_VECTOR_CONTRACT_FORMAT_VERSION: i64 = 1;
388pub(super) const MAX_PROFILE_LEN: usize = 128;
389pub(super) const MAX_POLICY_LEN: usize = 128;
390pub(super) const MAX_CONTRACT_JSON_BYTES: usize = 32 * 1024;
391pub(super) const MAX_AUDIT_METADATA_BYTES: usize = 2048;
392const DEFAULT_OPERATIONAL_READ_LIMIT: usize = 100;
393const MAX_OPERATIONAL_READ_LIMIT: usize = 1000;
394
395/// Thread-safe handle to the shared [`AdminService`].
396#[derive(Clone, Debug)]
397pub struct AdminHandle {
398    inner: Arc<AdminService>,
399}
400
401impl AdminHandle {
402    /// Wrap an [`AdminService`] in a shared handle.
403    #[must_use]
404    pub fn new(service: AdminService) -> Self {
405        Self {
406            inner: Arc::new(service),
407        }
408    }
409
410    /// Clone the inner `Arc` to the [`AdminService`].
411    #[must_use]
412    pub fn service(&self) -> Arc<AdminService> {
413        Arc::clone(&self.inner)
414    }
415}
416
417impl AdminService {
418    /// Create a new admin service for the database at the given path.
419    #[must_use]
420    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
421        let database_path = path.as_ref().to_path_buf();
422        let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
423        Self {
424            database_path,
425            schema_manager,
426            projections,
427            rebuild_sender: None,
428        }
429    }
430
431    /// Create a new admin service wired to the background rebuild actor.
432    #[must_use]
433    pub fn new_with_rebuild(
434        path: impl AsRef<Path>,
435        schema_manager: Arc<SchemaManager>,
436        rebuild_sender: SyncSender<RebuildRequest>,
437    ) -> Self {
438        let database_path = path.as_ref().to_path_buf();
439        let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
440        Self {
441            database_path,
442            schema_manager,
443            projections,
444            rebuild_sender: Some(rebuild_sender),
445        }
446    }
447
448    pub(super) fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
449        #[cfg(feature = "sqlite-vec")]
450        let conn = sqlite::open_connection_with_vec(&self.database_path)?;
451        #[cfg(not(feature = "sqlite-vec"))]
452        let conn = sqlite::open_connection(&self.database_path)?;
453        self.schema_manager.bootstrap(&conn)?;
454        Ok(conn)
455    }
456
457    /// # Errors
458    /// Returns [`EngineError`] if the database connection fails or any SQL query fails.
459    pub fn check_integrity(&self) -> Result<IntegrityReport, EngineError> {
460        let conn = self.connect()?;
461
462        let physical_result: String =
463            conn.query_row("PRAGMA integrity_check", [], |row| row.get(0))?;
464        let foreign_key_count: i64 =
465            conn.query_row("SELECT count(*) FROM pragma_foreign_key_check", [], |row| {
466                row.get(0)
467            })?;
468        let missing_fts_rows: i64 = conn.query_row(
469            r"
470            SELECT count(*)
471            FROM chunks c
472            JOIN nodes n
473              ON n.logical_id = c.node_logical_id
474             AND n.superseded_at IS NULL
475            WHERE NOT EXISTS (
476                SELECT 1
477                FROM fts_nodes f
478                WHERE f.chunk_id = c.id
479            )
480            ",
481            [],
482            |row| row.get(0),
483        )?;
484        let duplicate_active: i64 = conn.query_row(
485            r"
486            SELECT count(*)
487            FROM (
488                SELECT logical_id
489                FROM nodes
490                WHERE superseded_at IS NULL
491                GROUP BY logical_id
492                HAVING count(*) > 1
493            )
494            ",
495            [],
496            |row| row.get(0),
497        )?;
498        let operational_missing_collections: i64 = conn.query_row(
499            r"
500            SELECT (
501                SELECT count(*)
502                FROM operational_mutations m
503                LEFT JOIN operational_collections c ON c.name = m.collection_name
504                WHERE c.name IS NULL
505            ) + (
506                SELECT count(*)
507                FROM operational_current oc
508                LEFT JOIN operational_collections c ON c.name = oc.collection_name
509                WHERE c.name IS NULL
510            )
511            ",
512            [],
513            |row| row.get(0),
514        )?;
515        let operational_missing_last_mutations: i64 = conn.query_row(
516            r"
517            SELECT count(*)
518            FROM operational_current oc
519            LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
520            WHERE m.id IS NULL
521            ",
522            [],
523            |row| row.get(0),
524        )?;
525
526        // Count missing property FTS rows using the same extraction logic as
527        // write/rebuild. A pure-SQL check would overcount: nodes whose declared
528        // paths legitimately normalize to no values correctly have no row.
529        let missing_property_fts_rows = count_missing_property_fts_rows(&conn)?;
530
531        let mut warnings = Vec::new();
532        if missing_fts_rows > 0 {
533            warnings.push("missing FTS projections detected".to_owned());
534        }
535        if missing_property_fts_rows > 0 {
536            warnings.push("missing property FTS projections detected".to_owned());
537        }
538        if duplicate_active > 0 {
539            warnings.push("duplicate active logical_ids detected".to_owned());
540        }
541        if operational_missing_collections > 0 {
542            warnings.push("operational rows reference missing collections".to_owned());
543        }
544        if operational_missing_last_mutations > 0 {
545            warnings.push("operational current rows reference missing last mutations".to_owned());
546        }
547
548        // FIX(review): was `as usize` — unsound on 32-bit targets, wraps negatives silently.
549        // Options: (A) try_from().unwrap_or(0) — masks corruption, (B) try_from().expect() —
550        // panics on corruption, (C) propagate error. Chose (B) here: a negative count(*)
551        // signals data corruption, and the integrity report would be meaningless anyway.
552        Ok(IntegrityReport {
553            physical_ok: physical_result == "ok",
554            foreign_keys_ok: foreign_key_count == 0,
555            missing_fts_rows: i64_to_usize(missing_fts_rows),
556            missing_property_fts_rows: i64_to_usize(missing_property_fts_rows),
557            duplicate_active_logical_ids: i64_to_usize(duplicate_active),
558            operational_missing_collections: i64_to_usize(operational_missing_collections),
559            operational_missing_last_mutations: i64_to_usize(operational_missing_last_mutations),
560            warnings,
561        })
562    }
563
564    /// # Errors
565    /// Returns [`EngineError`] if the database connection fails or any SQL query fails.
566    #[allow(clippy::too_many_lines)]
567    pub fn check_semantics(&self) -> Result<SemanticReport, EngineError> {
568        let conn = self.connect()?;
569
570        let orphaned_chunks: i64 = conn.query_row(
571            r"
572            SELECT count(*)
573            FROM chunks c
574            WHERE NOT EXISTS (
575                SELECT 1 FROM nodes n
576                WHERE n.logical_id = c.node_logical_id
577            )
578            ",
579            [],
580            |row| row.get(0),
581        )?;
582
583        let null_source_ref_nodes: i64 = conn.query_row(
584            "SELECT count(*) FROM nodes WHERE source_ref IS NULL AND superseded_at IS NULL",
585            [],
586            |row| row.get(0),
587        )?;
588
589        let broken_step_fk: i64 = conn.query_row(
590            r"
591            SELECT count(*) FROM steps s
592            WHERE NOT EXISTS (SELECT 1 FROM runs r WHERE r.id = s.run_id)
593            ",
594            [],
595            |row| row.get(0),
596        )?;
597
598        let broken_action_fk: i64 = conn.query_row(
599            r"
600            SELECT count(*) FROM actions a
601            WHERE NOT EXISTS (SELECT 1 FROM steps s WHERE s.id = a.step_id)
602            ",
603            [],
604            |row| row.get(0),
605        )?;
606
607        let stale_fts_rows: i64 = conn.query_row(
608            r"
609            SELECT count(*) FROM fts_nodes f
610            WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = f.chunk_id)
611            ",
612            [],
613            |row| row.get(0),
614        )?;
615
616        let fts_rows_for_superseded_nodes: i64 = conn.query_row(
617            r"
618            SELECT count(*) FROM fts_nodes f
619            WHERE NOT EXISTS (
620                SELECT 1 FROM nodes n
621                WHERE n.logical_id = f.node_logical_id AND n.superseded_at IS NULL
622            )
623            ",
624            [],
625            |row| row.get(0),
626        )?;
627
628        let (
629            stale_property_fts_rows,
630            orphaned_property_fts_rows,
631            mismatched_kind_property_fts_rows,
632            duplicate_property_fts_rows,
633        ) = count_per_kind_property_fts_issues(&conn)?;
634
635        let drifted_property_fts_rows = count_drifted_property_fts_rows(&conn)?;
636
637        let dangling_edges: i64 = conn.query_row(
638            r"
639            SELECT count(*) FROM edges e
640            WHERE e.superseded_at IS NULL AND (
641                NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.source_logical_id AND n.superseded_at IS NULL)
642                OR
643                NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.target_logical_id AND n.superseded_at IS NULL)
644            )
645            ",
646            [],
647            |row| row.get(0),
648        )?;
649
650        let orphaned_supersession_chains: i64 = conn.query_row(
651            r"
652            SELECT count(*) FROM (
653                SELECT logical_id FROM nodes
654                GROUP BY logical_id
655                HAVING count(*) > 0 AND sum(CASE WHEN superseded_at IS NULL THEN 1 ELSE 0 END) = 0
656            )
657            ",
658            [],
659            |row| row.get(0),
660        )?;
661
662        // Vec stale row detection — iterates per-kind vec tables from projection_profiles.
663        #[cfg(feature = "sqlite-vec")]
664        let (stale_vec_rows, vec_rows_for_superseded_nodes): (i64, i64) = {
665            let kinds: Vec<String> =
666                match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
667                    Ok(mut stmt) => stmt
668                        .query_map([], |row| row.get(0))
669                        .map_err(EngineError::Sqlite)?
670                        .collect::<Result<Vec<_>, _>>()
671                        .map_err(EngineError::Sqlite)?,
672                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
673                        if msg.contains("no such table: projection_profiles") =>
674                    {
675                        vec![]
676                    }
677                    Err(e) => return Err(EngineError::Sqlite(e)),
678                };
679            let mut stale = 0i64;
680            let mut superseded = 0i64;
681            for kind in &kinds {
682                let table = fathomdb_schema::vec_kind_table_name(kind);
683                let stale_sql = format!(
684                    "SELECT count(*) FROM {table} v \
685                     WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = v.chunk_id)"
686                );
687                let superseded_sql = format!(
688                    "SELECT count(*) FROM {table} v \
689                     JOIN chunks c ON c.id = v.chunk_id \
690                     WHERE NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = c.node_logical_id)"
691                );
692                stale += match conn.query_row(&stale_sql, [], |row| row.get(0)) {
693                    Ok(n) => n,
694                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
695                        if msg.contains("no such table:")
696                            || msg.contains("no such module: vec0") =>
697                    {
698                        0
699                    }
700                    Err(e) => return Err(EngineError::Sqlite(e)),
701                };
702                superseded += match conn.query_row(&superseded_sql, [], |row| row.get(0)) {
703                    Ok(n) => n,
704                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
705                        if msg.contains("no such table:")
706                            || msg.contains("no such module: vec0") =>
707                    {
708                        0
709                    }
710                    Err(e) => return Err(EngineError::Sqlite(e)),
711                };
712            }
713            (stale, superseded)
714        };
715        #[cfg(not(feature = "sqlite-vec"))]
716        let stale_vec_rows: i64 = 0;
717        #[cfg(not(feature = "sqlite-vec"))]
718        let vec_rows_for_superseded_nodes: i64 = 0;
719        let missing_operational_current_rows: i64 = conn.query_row(
720            r"
721            SELECT count(*)
722            FROM operational_mutations m
723            JOIN operational_collections c
724              ON c.name = m.collection_name
725             AND c.kind = 'latest_state'
726            WHERE m.op_kind = 'put'
727              AND NOT EXISTS (
728                    SELECT 1
729                    FROM operational_mutations newer
730                    WHERE newer.collection_name = m.collection_name
731                      AND newer.record_key = m.record_key
732                      AND newer.mutation_order > m.mutation_order
733                )
734              AND NOT EXISTS (
735                    SELECT 1
736                    FROM operational_current oc
737                    WHERE oc.collection_name = m.collection_name
738                      AND oc.record_key = m.record_key
739                )
740            ",
741            [],
742            |row| row.get(0),
743        )?;
744        let stale_operational_current_rows: i64 = conn.query_row(
745            r"
746            SELECT count(*)
747            FROM operational_current oc
748            JOIN operational_collections c
749              ON c.name = oc.collection_name
750             AND c.kind = 'latest_state'
751            LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
752            WHERE m.id IS NULL
753               OR m.collection_name != oc.collection_name
754               OR m.record_key != oc.record_key
755               OR m.op_kind != 'put'
756               OR m.payload_json != oc.payload_json
757               OR EXISTS (
758                    SELECT 1
759                    FROM operational_mutations newer
760                    WHERE newer.collection_name = oc.collection_name
761                      AND newer.record_key = oc.record_key
762                      AND newer.mutation_order > m.mutation_order
763                )
764            ",
765            [],
766            |row| row.get(0),
767        )?;
768        let disabled_collection_mutations: i64 = conn.query_row(
769            r"
770            SELECT count(*)
771            FROM operational_mutations m
772            JOIN operational_collections c ON c.name = m.collection_name
773            WHERE c.disabled_at IS NOT NULL AND m.created_at > c.disabled_at
774            ",
775            [],
776            |row| row.get(0),
777        )?;
778        let orphaned_last_access_metadata_rows: i64 = conn.query_row(
779            r"
780            SELECT count(*)
781            FROM node_access_metadata am
782            WHERE NOT EXISTS (
783                SELECT 1 FROM nodes n WHERE n.logical_id = am.logical_id
784            )
785            ",
786            [],
787            |row| row.get(0),
788        )?;
789
790        let mut warnings = Vec::new();
791        if orphaned_chunks > 0 {
792            warnings.push(format!(
793                "{orphaned_chunks} orphaned chunk(s) with no surviving node history"
794            ));
795        }
796        if null_source_ref_nodes > 0 {
797            warnings.push(format!(
798                "{null_source_ref_nodes} active node(s) with null source_ref"
799            ));
800        }
801        if broken_step_fk > 0 {
802            warnings.push(format!(
803                "{broken_step_fk} step(s) referencing non-existent run"
804            ));
805        }
806        if broken_action_fk > 0 {
807            warnings.push(format!(
808                "{broken_action_fk} action(s) referencing non-existent step"
809            ));
810        }
811        if stale_fts_rows > 0 {
812            warnings.push(format!(
813                "{stale_fts_rows} stale FTS row(s) referencing missing chunk"
814            ));
815        }
816        if fts_rows_for_superseded_nodes > 0 {
817            warnings.push(format!(
818                "{fts_rows_for_superseded_nodes} FTS row(s) for superseded node(s)"
819            ));
820        }
821        if stale_property_fts_rows > 0 {
822            warnings.push(format!(
823                "{stale_property_fts_rows} stale property FTS row(s) for superseded/missing node(s)"
824            ));
825        }
826        if orphaned_property_fts_rows > 0 {
827            warnings.push(format!(
828                "{orphaned_property_fts_rows} orphaned property FTS row(s) for unregistered kind(s)"
829            ));
830        }
831        if mismatched_kind_property_fts_rows > 0 {
832            warnings.push(format!(
833                "{mismatched_kind_property_fts_rows} property FTS row(s) whose kind does not match the active node"
834            ));
835        }
836        if duplicate_property_fts_rows > 0 {
837            warnings.push(format!(
838                "{duplicate_property_fts_rows} active logical ID(s) with duplicate property FTS rows"
839            ));
840        }
841        if drifted_property_fts_rows > 0 {
842            warnings.push(format!(
843                "{drifted_property_fts_rows} property FTS row(s) with stale text_content"
844            ));
845        }
846        if dangling_edges > 0 {
847            warnings.push(format!(
848                "{dangling_edges} active edge(s) with missing endpoint node"
849            ));
850        }
851        if orphaned_supersession_chains > 0 {
852            warnings.push(format!(
853                "{orphaned_supersession_chains} logical_id(s) with all versions superseded"
854            ));
855        }
856        if stale_vec_rows > 0 {
857            warnings.push(format!(
858                "{stale_vec_rows} stale vec row(s) referencing missing chunk"
859            ));
860        }
861        if vec_rows_for_superseded_nodes > 0 {
862            warnings.push(format!(
863                "{vec_rows_for_superseded_nodes} vec row(s) whose node history is missing"
864            ));
865        }
866        if missing_operational_current_rows > 0 {
867            warnings.push(format!(
868                "{missing_operational_current_rows} latest-state key(s) missing operational_current rows"
869            ));
870        }
871        if stale_operational_current_rows > 0 {
872            warnings.push(format!(
873                "{stale_operational_current_rows} stale operational_current row(s)"
874            ));
875        }
876        if disabled_collection_mutations > 0 {
877            warnings.push(format!(
878                "{disabled_collection_mutations} mutation(s) were written after collection disable"
879            ));
880        }
881        if orphaned_last_access_metadata_rows > 0 {
882            warnings.push(format!(
883                "{orphaned_last_access_metadata_rows} last_access metadata row(s) reference missing node history"
884            ));
885        }
886
887        Ok(SemanticReport {
888            orphaned_chunks: i64_to_usize(orphaned_chunks),
889            null_source_ref_nodes: i64_to_usize(null_source_ref_nodes),
890            broken_step_fk: i64_to_usize(broken_step_fk),
891            broken_action_fk: i64_to_usize(broken_action_fk),
892            stale_fts_rows: i64_to_usize(stale_fts_rows),
893            fts_rows_for_superseded_nodes: i64_to_usize(fts_rows_for_superseded_nodes),
894            stale_property_fts_rows: i64_to_usize(stale_property_fts_rows),
895            orphaned_property_fts_rows: i64_to_usize(orphaned_property_fts_rows),
896            mismatched_kind_property_fts_rows: i64_to_usize(mismatched_kind_property_fts_rows),
897            duplicate_property_fts_rows: i64_to_usize(duplicate_property_fts_rows),
898            drifted_property_fts_rows: i64_to_usize(drifted_property_fts_rows),
899            dangling_edges: i64_to_usize(dangling_edges),
900            orphaned_supersession_chains: i64_to_usize(orphaned_supersession_chains),
901            stale_vec_rows: i64_to_usize(stale_vec_rows),
902            vec_rows_for_superseded_nodes: i64_to_usize(vec_rows_for_superseded_nodes),
903            missing_operational_current_rows: i64_to_usize(missing_operational_current_rows),
904            stale_operational_current_rows: i64_to_usize(stale_operational_current_rows),
905            disabled_collection_mutations: i64_to_usize(disabled_collection_mutations),
906            orphaned_last_access_metadata_rows: i64_to_usize(orphaned_last_access_metadata_rows),
907            warnings,
908        })
909    }
910}
911
912/// Count per-kind FTS integrity issues across all registered per-kind tables.
913/// Returns (stale, orphaned, `mismatched_kind`, duplicate) counts.
914///
915/// - Stale: rows in a per-kind table whose node is superseded or missing.
916/// - Orphaned: rows in a per-kind table for a kind with no registered schema.
917/// - Mismatched kind: impossible with per-kind tables (always 0).
918/// - Duplicate: same `node_logical_id` appears more than once in any per-kind table.
919fn count_per_kind_property_fts_issues(
920    conn: &rusqlite::Connection,
921) -> Result<(i64, i64, i64, i64), EngineError> {
922    // Collect all per-kind virtual tables from sqlite_master.
923    // Filter by sql LIKE 'CREATE VIRTUAL TABLE%' to exclude FTS5 shadow tables
924    // (e.g. fts_props_goal_data, fts_props_goal_idx) which share the same prefix.
925    let per_kind_tables: Vec<String> = {
926        let mut stmt = conn.prepare(
927            "SELECT name FROM sqlite_master \
928             WHERE type='table' AND name LIKE 'fts_props_%' \
929             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
930        )?;
931        stmt.query_map([], |r| r.get::<_, String>(0))?
932            .collect::<Result<Vec<_>, _>>()?
933    };
934
935    let registered_kinds: std::collections::HashSet<String> = {
936        let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
937        stmt.query_map([], |r| r.get::<_, String>(0))?
938            .collect::<Result<std::collections::HashSet<_>, _>>()?
939    };
940
941    let mut stale = 0i64;
942    let mut orphaned = 0i64;
943    let mut duplicate = 0i64;
944
945    for table in &per_kind_tables {
946        // Stale: rows whose node_logical_id has no active node.
947        let kind_stale: i64 = conn.query_row(
948            &format!(
949                "SELECT count(*) FROM {table} fp \
950                 WHERE NOT EXISTS (\
951                     SELECT 1 FROM nodes n \
952                     WHERE n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL\
953                 )"
954            ),
955            [],
956            |r| r.get(0),
957        )?;
958        stale += kind_stale;
959
960        // Duplicate: same node_logical_id more than once.
961        let kind_dup: i64 = conn.query_row(
962            &format!(
963                "SELECT count(*) FROM (\
964                     SELECT node_logical_id FROM {table} \
965                     GROUP BY node_logical_id HAVING count(*) > 1\
966                 )"
967            ),
968            [],
969            |r| r.get(0),
970        )?;
971        duplicate += kind_dup;
972
973        // Orphaned: this per-kind table has no corresponding schema.
974        // Determine which kind this table corresponds to by checking all registered kinds.
975        let table_has_schema = registered_kinds
976            .iter()
977            .any(|k| fathomdb_schema::fts_kind_table_name(k) == *table);
978        if !table_has_schema {
979            let table_rows: i64 =
980                conn.query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))?;
981            orphaned += table_rows;
982        }
983    }
984
985    // Mismatched kind is always 0 with per-kind tables.
986    Ok((stale, orphaned, 0, duplicate))
987}
988
989/// Count active nodes that should have a property FTS row (extraction yields a value)
990/// but don't. Uses the same extraction logic as write/rebuild to avoid false positives
991/// for nodes whose declared paths legitimately normalize to no values.
992fn count_missing_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
993    let schemas = crate::writer::load_fts_property_schemas(conn)?;
994    if schemas.is_empty() {
995        return Ok(0);
996    }
997
998    let mut missing = 0i64;
999    for (kind, schema) in &schemas {
1000        let table = fathomdb_schema::fts_kind_table_name(kind);
1001        // If the per-kind table doesn't exist yet, all nodes with extractable values are missing.
1002        let table_exists: bool = conn
1003            .query_row(
1004                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1005                [table.as_str()],
1006                |r| r.get::<_, i64>(0),
1007            )
1008            .unwrap_or(0)
1009            > 0;
1010
1011        if table_exists {
1012            let mut stmt = conn.prepare(&format!(
1013                "SELECT n.logical_id, n.properties FROM nodes n \
1014                 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
1015                   AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
1016            ))?;
1017            let rows = stmt.query_map([kind.as_str()], |row| {
1018                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1019            })?;
1020            for row in rows {
1021                let (_logical_id, properties_str) = row?;
1022                let props: serde_json::Value =
1023                    serde_json::from_str(&properties_str).unwrap_or_default();
1024                if crate::writer::extract_property_fts(&props, schema)
1025                    .0
1026                    .is_some()
1027                {
1028                    missing += 1;
1029                }
1030            }
1031        } else {
1032            // Per-kind table doesn't exist yet — count all nodes with extractable values.
1033            let mut stmt = conn.prepare(
1034                "SELECT n.logical_id, n.properties FROM nodes n \
1035                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
1036            )?;
1037            let rows = stmt.query_map([kind.as_str()], |row| {
1038                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1039            })?;
1040            for row in rows {
1041                let (_logical_id, properties_str) = row?;
1042                let props: serde_json::Value =
1043                    serde_json::from_str(&properties_str).unwrap_or_default();
1044                if crate::writer::extract_property_fts(&props, schema)
1045                    .0
1046                    .is_some()
1047                {
1048                    missing += 1;
1049                }
1050            }
1051        }
1052    }
1053    Ok(missing)
1054}
1055
1056/// Count property FTS rows whose `text_content` has drifted from the current canonical
1057/// value computed by `compute_property_fts_text(...)`. This catches:
1058/// - rows whose text no longer matches the current node properties and schema
1059/// - rows that should have been removed (extraction now yields no value)
1060fn count_drifted_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1061    let schemas = crate::writer::load_fts_property_schemas(conn)?;
1062    if schemas.is_empty() {
1063        return Ok(0);
1064    }
1065
1066    let mut drifted = 0i64;
1067    for (kind, schema) in &schemas {
1068        let table = fathomdb_schema::fts_kind_table_name(kind);
1069        // If the per-kind table doesn't exist, no rows to check.
1070        let table_exists: bool = conn
1071            .query_row(
1072                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1073                [table.as_str()],
1074                |r| r.get::<_, i64>(0),
1075            )
1076            .unwrap_or(0)
1077            > 0;
1078        if !table_exists {
1079            continue;
1080        }
1081        let mut stmt = conn.prepare(&format!(
1082            "SELECT fp.node_logical_id, fp.text_content, n.properties \
1083             FROM {table} fp \
1084             JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1085             WHERE n.kind = ?1"
1086        ))?;
1087        let rows = stmt.query_map([kind.as_str()], |row| {
1088            Ok((
1089                row.get::<_, String>(0)?,
1090                row.get::<_, String>(1)?,
1091                row.get::<_, String>(2)?,
1092            ))
1093        })?;
1094        for row in rows {
1095            let (_logical_id, stored_text, properties_str) = row?;
1096            let props: serde_json::Value =
1097                serde_json::from_str(&properties_str).unwrap_or_default();
1098            let (expected, _positions, _stats) =
1099                crate::writer::extract_property_fts(&props, schema);
1100            match expected {
1101                Some(text) if text == stored_text => {}
1102                _ => drifted += 1,
1103            }
1104        }
1105    }
1106    Ok(drifted)
1107}
1108
1109/// Convert a non-negative i64 count to usize, panicking on negative values
1110/// which would indicate data corruption.
1111#[allow(clippy::expect_used)]
1112pub(super) fn i64_to_usize(val: i64) -> usize {
1113    usize::try_from(val).expect("count(*) must be non-negative")
1114}
1115
1116pub(super) fn persist_simple_provenance_event(
1117    conn: &rusqlite::Connection,
1118    event_type: &str,
1119    subject: &str,
1120    metadata: Option<serde_json::Value>,
1121) -> Result<(), EngineError> {
1122    let metadata_json = metadata.map(|value| value.to_string()).unwrap_or_default();
1123    conn.execute(
1124        "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1125        rusqlite::params![new_id(), event_type, subject, metadata_json],
1126    )?;
1127    Ok(())
1128}
1129
1130pub(super) fn rebuild_operational_current_rows(
1131    tx: &rusqlite::Transaction<'_>,
1132    collections: &[String],
1133) -> Result<usize, EngineError> {
1134    let mut rebuilt_rows = 0usize;
1135    clear_operational_current_rows(tx, collections)?;
1136    let mut ins_current = tx.prepare_cached(
1137        "INSERT INTO operational_current \
1138         (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1139         VALUES (?1, ?2, ?3, ?4, ?5)",
1140    )?;
1141
1142    for collection in collections {
1143        let mut stmt = tx.prepare(
1144            "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
1145             FROM operational_mutations \
1146             WHERE collection_name = ?1 \
1147             ORDER BY record_key, mutation_order",
1148        )?;
1149        let mut latest_by_key: std::collections::HashMap<String, Option<(String, i64, String)>> =
1150            std::collections::HashMap::new();
1151        let rows = stmt.query_map([collection], operational::map_operational_mutation_row)?;
1152        for row in rows {
1153            let mutation = row?;
1154            match mutation.op_kind.as_str() {
1155                "put" => {
1156                    latest_by_key.insert(
1157                        mutation.record_key,
1158                        Some((mutation.payload_json, mutation.created_at, mutation.id)),
1159                    );
1160                }
1161                "delete" => {
1162                    latest_by_key.insert(mutation.record_key, None);
1163                }
1164                _ => {}
1165            }
1166        }
1167
1168        for (record_key, state) in latest_by_key {
1169            if let Some((payload_json, updated_at, last_mutation_id)) = state {
1170                ins_current.execute(rusqlite::params![
1171                    collection,
1172                    record_key,
1173                    payload_json,
1174                    updated_at,
1175                    last_mutation_id,
1176                ])?;
1177                rebuilt_rows += 1;
1178            }
1179        }
1180    }
1181
1182    drop(ins_current);
1183    Ok(rebuilt_rows)
1184}
1185
1186pub(super) fn clear_operational_current_rows(
1187    tx: &rusqlite::Transaction<'_>,
1188    collections: &[String],
1189) -> Result<(), EngineError> {
1190    let mut delete_current =
1191        tx.prepare_cached("DELETE FROM operational_current WHERE collection_name = ?1")?;
1192    let mut delete_secondary_current = tx.prepare_cached(
1193        "DELETE FROM operational_secondary_index_entries \
1194         WHERE collection_name = ?1 AND subject_kind = 'current'",
1195    )?;
1196    for collection in collections {
1197        delete_secondary_current.execute([collection])?;
1198        delete_current.execute([collection])?;
1199    }
1200    drop(delete_secondary_current);
1201    drop(delete_current);
1202    Ok(())
1203}
1204
1205#[cfg(test)]
1206#[allow(clippy::expect_used)]
1207mod tests {
1208    use std::fs;
1209    use std::sync::Arc;
1210
1211    use fathomdb_schema::SchemaManager;
1212    use tempfile::NamedTempFile;
1213
1214    use super::{
1215        AdminService, FtsPropertyPathMode, FtsPropertyPathSpec, SafeExportOptions,
1216        VectorRegenerationConfig,
1217    };
1218    use crate::embedder::{BatchEmbedder, EmbedderError, QueryEmbedder, QueryEmbedderIdentity};
1219    use crate::projection::ProjectionTarget;
1220    use crate::sqlite;
1221    use crate::{EngineError, OperationalCollectionKind, OperationalRegisterRequest};
1222
1223    #[cfg(feature = "sqlite-vec")]
1224    use crate::{ExecutionCoordinator, TelemetryCounters};
1225
1226    #[cfg(feature = "sqlite-vec")]
1227    use fathomdb_query::QueryBuilder;
1228
1229    #[cfg(feature = "sqlite-vec")]
1230    use super::load_vector_regeneration_config;
1231
1232    /// In-process embedder used by the regeneration test suite. The
1233    /// vector is parameterized so individual tests can distinguish which
1234    /// embedder produced which profile row.
1235    #[derive(Debug)]
1236    #[allow(dead_code)]
1237    struct TestEmbedder {
1238        identity: QueryEmbedderIdentity,
1239        vector: Vec<f32>,
1240    }
1241
1242    #[allow(dead_code)]
1243    impl TestEmbedder {
1244        fn new(model: &str, dimension: usize) -> Self {
1245            Self {
1246                identity: QueryEmbedderIdentity {
1247                    model_identity: model.to_owned(),
1248                    model_version: "1.0.0".to_owned(),
1249                    dimension,
1250                    normalization_policy: "l2".to_owned(),
1251                },
1252                vector: vec![1.0; dimension],
1253            }
1254        }
1255    }
1256
1257    impl QueryEmbedder for TestEmbedder {
1258        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1259            Ok(self.vector.clone())
1260        }
1261        fn identity(&self) -> QueryEmbedderIdentity {
1262            self.identity.clone()
1263        }
1264        fn max_tokens(&self) -> usize {
1265            512
1266        }
1267    }
1268
1269    impl BatchEmbedder for TestEmbedder {
1270        fn batch_embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, EmbedderError> {
1271            Ok(texts.iter().map(|_| self.vector.clone()).collect())
1272        }
1273        fn identity(&self) -> QueryEmbedderIdentity {
1274            self.identity.clone()
1275        }
1276        fn max_tokens(&self) -> usize {
1277            512
1278        }
1279    }
1280
1281    /// Embedder that always fails — used to exercise the post-request
1282    /// failure audit path without the complexity of subprocess machinery.
1283    #[derive(Debug)]
1284    #[allow(dead_code)]
1285    struct FailingEmbedder {
1286        identity: QueryEmbedderIdentity,
1287    }
1288
1289    impl QueryEmbedder for FailingEmbedder {
1290        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1291            Err(EmbedderError::Failed("test failure".to_owned()))
1292        }
1293        fn identity(&self) -> QueryEmbedderIdentity {
1294            self.identity.clone()
1295        }
1296        fn max_tokens(&self) -> usize {
1297            512
1298        }
1299    }
1300
1301    #[allow(dead_code)]
1302    #[cfg(unix)]
1303    fn set_file_mode(path: &std::path::Path, mode: u32) {
1304        use std::os::unix::fs::PermissionsExt;
1305
1306        let mut permissions = fs::metadata(path).expect("script metadata").permissions();
1307        permissions.set_mode(mode);
1308        fs::set_permissions(path, permissions).expect("chmod");
1309    }
1310
1311    #[allow(dead_code)]
1312    #[cfg(not(unix))]
1313    fn set_file_mode(_path: &std::path::Path, _mode: u32) {}
1314    fn setup() -> (NamedTempFile, AdminService) {
1315        let db = NamedTempFile::new().expect("temp file");
1316        let schema = Arc::new(SchemaManager::new());
1317        {
1318            let conn = sqlite::open_connection(db.path()).expect("connection");
1319            schema.bootstrap(&conn).expect("bootstrap");
1320        }
1321        let service = AdminService::new(db.path(), Arc::clone(&schema));
1322        (db, service)
1323    }
1324
1325    #[test]
1326    fn check_integrity_includes_active_uniqueness_count() {
1327        let (_db, service) = setup();
1328        let report = service.check_integrity().expect("integrity check");
1329        assert_eq!(report.duplicate_active_logical_ids, 0);
1330        assert_eq!(report.operational_missing_collections, 0);
1331        assert_eq!(report.operational_missing_last_mutations, 0);
1332    }
1333
1334    #[test]
1335    fn trace_source_returns_node_logical_ids() {
1336        let (db, service) = setup();
1337        {
1338            let conn = sqlite::open_connection(db.path()).expect("conn");
1339            conn.execute(
1340                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1341                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 'source-1')",
1342                [],
1343            )
1344            .expect("insert node");
1345        }
1346        let report = service.trace_source("source-1").expect("trace");
1347        assert_eq!(report.node_rows, 1);
1348        assert_eq!(report.node_logical_ids, vec!["lg1"]);
1349    }
1350
1351    #[test]
1352    fn trace_source_includes_operational_mutations() {
1353        let (db, service) = setup();
1354        {
1355            let conn = sqlite::open_connection(db.path()).expect("conn");
1356            conn.execute(
1357                "INSERT INTO operational_collections \
1358                 (name, kind, schema_json, retention_json, format_version, created_at) \
1359                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1360                [],
1361            )
1362            .expect("insert collection");
1363            conn.execute(
1364                "INSERT INTO operational_mutations \
1365                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1366                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"ok\"}', 'source-1', 100, 1)",
1367                [],
1368            )
1369            .expect("insert mutation");
1370        }
1371
1372        let report = service.trace_source("source-1").expect("trace");
1373        assert_eq!(report.operational_mutation_rows, 1);
1374        assert_eq!(report.operational_mutation_ids, vec!["m1"]);
1375    }
1376
1377    #[test]
1378    fn excise_source_restores_prior_active_node() {
1379        let (db, service) = setup();
1380        {
1381            let conn = sqlite::open_connection(db.path()).expect("conn");
1382            conn.execute(
1383                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1384                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
1385                [],
1386            )
1387            .expect("insert v1 superseded");
1388            conn.execute(
1389                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1390                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
1391                [],
1392            )
1393            .expect("insert v2 active");
1394        }
1395        service.excise_source("source-2").expect("excise");
1396        {
1397            let conn = sqlite::open_connection(db.path()).expect("conn");
1398            let active_row_id: String = conn
1399                .query_row(
1400                    "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
1401                    [],
1402                    |row| row.get(0),
1403                )
1404                .expect("active row exists after excise");
1405            assert_eq!(active_row_id, "r1");
1406        }
1407    }
1408
1409    #[test]
1410    fn excise_source_deletes_operational_mutations_and_repairs_latest_state_current() {
1411        let (db, service) = setup();
1412        {
1413            let conn = sqlite::open_connection(db.path()).expect("conn");
1414            conn.execute(
1415                "INSERT INTO operational_collections \
1416                 (name, kind, schema_json, retention_json, format_version, created_at) \
1417                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1418                [],
1419            )
1420            .expect("insert collection");
1421            conn.execute(
1422                "INSERT INTO operational_mutations \
1423                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1424                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'source-1', 100, 1)",
1425                [],
1426            )
1427            .expect("insert prior mutation");
1428            conn.execute(
1429                "INSERT INTO operational_mutations \
1430                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1431                 VALUES ('m2', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'source-2', 200, 2)",
1432                [],
1433            )
1434            .expect("insert excised mutation");
1435            conn.execute(
1436                "INSERT INTO operational_current \
1437                 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1438                 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 200, 'm2')",
1439                [],
1440            )
1441            .expect("insert current row");
1442        }
1443
1444        let traced = service
1445            .trace_source("source-2")
1446            .expect("trace before excise");
1447        assert_eq!(traced.operational_mutation_rows, 1);
1448        assert_eq!(traced.operational_mutation_ids, vec!["m2"]);
1449
1450        let excised = service.excise_source("source-2").expect("excise");
1451        assert_eq!(excised.operational_mutation_rows, 0);
1452        assert!(excised.operational_mutation_ids.is_empty());
1453
1454        {
1455            let conn = sqlite::open_connection(db.path()).expect("conn");
1456            let remaining: i64 = conn
1457                .query_row(
1458                    "SELECT count(*) FROM operational_mutations WHERE source_ref = 'source-2'",
1459                    [],
1460                    |row| row.get(0),
1461                )
1462                .expect("remaining count");
1463            assert_eq!(remaining, 0);
1464
1465            let current: (String, String) = conn
1466                .query_row(
1467                    "SELECT payload_json, last_mutation_id FROM operational_current \
1468                     WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1469                    [],
1470                    |row| Ok((row.get(0)?, row.get(1)?)),
1471                )
1472                .expect("rebuilt current row");
1473            assert_eq!(current.0, "{\"status\":\"old\"}");
1474            assert_eq!(current.1, "m1");
1475        }
1476    }
1477
1478    #[test]
1479    fn restore_logical_id_reestablishes_last_pre_retire_content_and_attached_edges() {
1480        let (db, service) = setup();
1481        {
1482            let conn = sqlite::open_connection(db.path()).expect("conn");
1483            conn.execute(
1484                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1485                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1486                [],
1487            )
1488            .expect("insert node");
1489            conn.execute(
1490                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1491                 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1492                [],
1493            )
1494            .expect("insert target node");
1495            conn.execute(
1496                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1497                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1498                [],
1499            )
1500            .expect("insert chunk");
1501            conn.execute(
1502                "INSERT INTO edges \
1503                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1504                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1505                [],
1506            )
1507            .expect("insert edge");
1508            conn.execute(
1509                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1510                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1511                [],
1512            )
1513            .expect("insert node retire event");
1514            conn.execute(
1515                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1516                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
1517                [],
1518            )
1519            .expect("insert edge retire event");
1520            conn.execute(
1521                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1522                [],
1523            )
1524            .expect("retire node");
1525            conn.execute(
1526                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
1527                [],
1528            )
1529            .expect("retire edge");
1530            conn.execute("DELETE FROM fts_nodes", [])
1531                .expect("clear fts");
1532        }
1533
1534        let report = service.restore_logical_id("doc-1").expect("restore");
1535        assert_eq!(report.logical_id, "doc-1");
1536        assert!(!report.was_noop);
1537        assert_eq!(report.restored_node_rows, 1);
1538        assert_eq!(report.restored_edge_rows, 1);
1539        assert_eq!(report.restored_chunk_rows, 1);
1540        assert_eq!(report.restored_fts_rows, 1);
1541
1542        let conn = sqlite::open_connection(db.path()).expect("conn");
1543        let active_node_count: i64 = conn
1544            .query_row(
1545                "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1546                [],
1547                |row| row.get(0),
1548            )
1549            .expect("active node count");
1550        assert_eq!(active_node_count, 1);
1551        let active_edge_count: i64 = conn
1552            .query_row(
1553                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1554                [],
1555                |row| row.get(0),
1556            )
1557            .expect("active edge count");
1558        assert_eq!(active_edge_count, 1);
1559        let fts_count: i64 = conn
1560            .query_row(
1561                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
1562                [],
1563                |row| row.get(0),
1564            )
1565            .expect("fts count");
1566        assert_eq!(fts_count, 1);
1567    }
1568
1569    #[test]
1570    fn restore_logical_id_restores_edges_retired_after_the_node_retire_event() {
1571        let (db, service) = setup();
1572        {
1573            let conn = sqlite::open_connection(db.path()).expect("conn");
1574            conn.execute(
1575                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1576                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1577                [],
1578            )
1579            .expect("insert node");
1580            conn.execute(
1581                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1582                 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1583                [],
1584            )
1585            .expect("insert target node");
1586            conn.execute(
1587                "INSERT INTO edges \
1588                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1589                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1590                [],
1591            )
1592            .expect("insert edge");
1593            conn.execute(
1594                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1595                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1596                [],
1597            )
1598            .expect("insert node retire event");
1599            conn.execute(
1600                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1601                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 201, '')",
1602                [],
1603            )
1604            .expect("insert edge retire event");
1605            conn.execute(
1606                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1607                [],
1608            )
1609            .expect("retire node");
1610            conn.execute(
1611                "UPDATE edges SET superseded_at = 201 WHERE logical_id = 'edge-1'",
1612                [],
1613            )
1614            .expect("retire edge");
1615        }
1616
1617        let report = service.restore_logical_id("doc-1").expect("restore");
1618        assert_eq!(report.restored_edge_rows, 1);
1619
1620        let conn = sqlite::open_connection(db.path()).expect("conn");
1621        let active_edge_count: i64 = conn
1622            .query_row(
1623                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1624                [],
1625                |row| row.get(0),
1626            )
1627            .expect("active edge count");
1628        assert_eq!(active_edge_count, 1);
1629    }
1630
1631    #[test]
1632    fn restore_logical_id_prefers_latest_retired_revision_when_timestamps_tie() {
1633        let (db, service) = setup();
1634        {
1635            let conn = sqlite::open_connection(db.path()).expect("conn");
1636            conn.execute(
1637                "INSERT INTO nodes \
1638                 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1639                 VALUES ('node-row-older', 'doc-1', 'Document', '{\"title\":\"older\"}', 100, 200, 'forget-1')",
1640                [],
1641            )
1642            .expect("insert older retired node");
1643            conn.execute(
1644                "INSERT INTO nodes \
1645                 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1646                 VALUES ('node-row-newer', 'doc-1', 'Document', '{\"title\":\"newer\"}', 100, 200, 'forget-1')",
1647                [],
1648            )
1649            .expect("insert newer retired node");
1650            conn.execute(
1651                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1652                 VALUES ('evt-retire-older', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1653                [],
1654            )
1655            .expect("insert older retire event");
1656            conn.execute(
1657                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1658                 VALUES ('evt-retire-newer', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1659                [],
1660            )
1661            .expect("insert newer retire event");
1662        }
1663
1664        let report = service.restore_logical_id("doc-1").expect("restore");
1665
1666        assert!(!report.was_noop);
1667        let conn = sqlite::open_connection(db.path()).expect("conn");
1668        let active_row: (String, String) = conn
1669            .query_row(
1670                "SELECT row_id, properties FROM nodes \
1671                 WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1672                [],
1673                |row| Ok((row.get(0)?, row.get(1)?)),
1674            )
1675            .expect("restored active row");
1676        assert_eq!(active_row.0, "node-row-newer");
1677        assert_eq!(active_row.1, "{\"title\":\"newer\"}");
1678    }
1679
1680    #[test]
1681    fn purge_logical_id_removes_retired_content_and_records_tombstone() {
1682        let (db, service) = setup();
1683        {
1684            let conn = sqlite::open_connection(db.path()).expect("conn");
1685            conn.execute(
1686                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1687                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1688                [],
1689            )
1690            .expect("insert retired node");
1691            conn.execute(
1692                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1693                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1694                [],
1695            )
1696            .expect("insert chunk");
1697            conn.execute(
1698                "INSERT INTO edges \
1699                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, superseded_at, source_ref) \
1700                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 200, 'seed')",
1701                [],
1702            )
1703            .expect("insert retired edge");
1704            conn.execute(
1705                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1706                 VALUES ('chunk-1', 'doc-1', 'Document', 'budget narrative')",
1707                [],
1708            )
1709            .expect("insert fts");
1710        }
1711
1712        let report = service.purge_logical_id("doc-1").expect("purge");
1713        assert_eq!(report.logical_id, "doc-1");
1714        assert!(!report.was_noop);
1715        assert_eq!(report.deleted_node_rows, 1);
1716        assert_eq!(report.deleted_edge_rows, 1);
1717        assert_eq!(report.deleted_chunk_rows, 1);
1718        assert_eq!(report.deleted_fts_rows, 1);
1719
1720        let conn = sqlite::open_connection(db.path()).expect("conn");
1721        let remaining_nodes: i64 = conn
1722            .query_row(
1723                "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1'",
1724                [],
1725                |row| row.get(0),
1726            )
1727            .expect("remaining nodes");
1728        assert_eq!(remaining_nodes, 0);
1729        let remaining_edges: i64 = conn
1730            .query_row(
1731                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1'",
1732                [],
1733                |row| row.get(0),
1734            )
1735            .expect("remaining edges");
1736        assert_eq!(remaining_edges, 0);
1737        let remaining_chunks: i64 = conn
1738            .query_row(
1739                "SELECT count(*) FROM chunks WHERE id = 'chunk-1'",
1740                [],
1741                |row| row.get(0),
1742            )
1743            .expect("remaining chunks");
1744        assert_eq!(remaining_chunks, 0);
1745        let purge_events: i64 = conn
1746            .query_row(
1747                "SELECT count(*) FROM provenance_events WHERE event_type = 'purge_logical_id' AND subject = 'doc-1'",
1748                [],
1749                |row| row.get(0),
1750            )
1751            .expect("purge events");
1752        assert_eq!(purge_events, 1);
1753    }
1754
1755    #[test]
1756    fn check_semantics_accepts_preserved_retired_chunks() {
1757        let (db, service) = setup();
1758        {
1759            let conn = sqlite::open_connection(db.path()).expect("conn");
1760            conn.execute(
1761                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1762                 VALUES ('node-row-1', 'doc-1', 'Document', '{}', 100, 200, 'seed')",
1763                [],
1764            )
1765            .expect("insert retired node");
1766            conn.execute(
1767                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1768                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1769                [],
1770            )
1771            .expect("insert chunk");
1772        }
1773
1774        let report = service.check_semantics().expect("semantics");
1775        assert_eq!(report.orphaned_chunks, 0);
1776    }
1777
1778    #[test]
1779    fn check_semantics_detects_missing_retired_node_history_for_preserved_chunks() {
1780        let (db, service) = setup();
1781        {
1782            let conn = sqlite::open_connection(db.path()).expect("conn");
1783            conn.execute(
1784                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1785                 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1786                [],
1787            )
1788            .expect("insert orphaned chunk");
1789        }
1790
1791        let report = service.check_semantics().expect("semantics");
1792        assert_eq!(report.orphaned_chunks, 1);
1793    }
1794
1795    #[cfg(feature = "sqlite-vec")]
1796    #[test]
1797    fn check_semantics_detects_missing_retired_node_history_for_preserved_vec_rows() {
1798        let (db, service) = setup();
1799        {
1800            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1801            service
1802                .schema_manager
1803                .ensure_vec_kind_profile(&conn, "Doc", 4)
1804                .expect("ensure vec kind profile");
1805            conn.execute(
1806                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1807                 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1808                [],
1809            )
1810            .expect("insert orphaned chunk");
1811            conn.execute(
1812                "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1813                [],
1814            )
1815            .expect("insert vec row");
1816        }
1817
1818        let report = service.check_semantics().expect("semantics");
1819        assert_eq!(report.orphaned_chunks, 1);
1820        assert_eq!(report.vec_rows_for_superseded_nodes, 1);
1821    }
1822
1823    #[cfg(feature = "sqlite-vec")]
1824    #[test]
1825    fn restore_logical_id_reestablishes_vector_search_without_reingest() {
1826        let (db, service) = setup();
1827        {
1828            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1829            service
1830                .schema_manager
1831                .ensure_vec_kind_profile(&conn, "Document", 4)
1832                .expect("ensure vec kind profile");
1833            conn.execute(
1834                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1835                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1836                [],
1837            )
1838            .expect("insert retired node");
1839            conn.execute(
1840                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1841                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1842                [],
1843            )
1844            .expect("insert chunk");
1845            conn.execute(
1846                "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1847                [],
1848            )
1849            .expect("insert vec row");
1850            conn.execute(
1851                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1852                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1853                [],
1854            )
1855            .expect("insert retire event");
1856        }
1857
1858        let report = service.restore_logical_id("doc-1").expect("restore");
1859        assert_eq!(report.restored_vec_rows, 1);
1860
1861        let coordinator = ExecutionCoordinator::open(
1862            db.path(),
1863            Arc::new(SchemaManager::new()),
1864            Some(4),
1865            1,
1866            Arc::new(TelemetryCounters::default()),
1867            None,
1868        )
1869        .expect("coordinator");
1870        let compiled = QueryBuilder::nodes("Document")
1871            .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
1872            .compile()
1873            .expect("compile");
1874        let rows = coordinator
1875            .execute_compiled_read(&compiled)
1876            .expect("vector read");
1877        assert!(
1878            rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
1879            "restore should make the preserved vec row visible again without re-ingest"
1880        );
1881    }
1882
1883    #[cfg(feature = "sqlite-vec")]
1884    #[test]
1885    fn purge_logical_id_deletes_vec_rows_for_retired_content() {
1886        let (db, service) = setup();
1887        {
1888            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1889            service
1890                .schema_manager
1891                .ensure_vec_kind_profile(&conn, "Document", 4)
1892                .expect("ensure vec kind profile");
1893            conn.execute(
1894                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1895                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1896                [],
1897            )
1898            .expect("insert retired node");
1899            conn.execute(
1900                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1901                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1902                [],
1903            )
1904            .expect("insert chunk");
1905            conn.execute(
1906                "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1907                [],
1908            )
1909            .expect("insert vec row");
1910        }
1911
1912        let report = service.purge_logical_id("doc-1").expect("purge");
1913        assert_eq!(report.deleted_vec_rows, 1);
1914
1915        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1916        let vec_count: i64 = conn
1917            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
1918            .expect("vec count");
1919        assert_eq!(vec_count, 0);
1920    }
1921
1922    #[cfg(feature = "sqlite-vec")]
1923    #[test]
1924    fn restore_logical_id_restores_visibility_of_regenerated_vectors() {
1925        let (db, service) = setup();
1926
1927        {
1928            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1929            conn.execute(
1930                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1931                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1932                [],
1933            )
1934            .expect("insert node");
1935            conn.execute(
1936                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1937                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1938                [],
1939            )
1940            .expect("insert chunk");
1941        }
1942
1943        let embedder = TestEmbedder::new("test-model", 4);
1944        service
1945            .regenerate_vector_embeddings(
1946                &embedder,
1947                &VectorRegenerationConfig {
1948                    kind: "Document".to_owned(),
1949                    profile: "default".to_owned(),
1950                    chunking_policy: "per_chunk".to_owned(),
1951                    preprocessing_policy: "trim".to_owned(),
1952                },
1953            )
1954            .expect("regenerate");
1955
1956        {
1957            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1958            conn.execute(
1959                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1960                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1961                [],
1962            )
1963            .expect("insert retire event");
1964            conn.execute(
1965                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1966                [],
1967            )
1968            .expect("retire node");
1969        }
1970
1971        let report = service.restore_logical_id("doc-1").expect("restore");
1972        assert_eq!(report.restored_vec_rows, 1);
1973
1974        let coordinator = ExecutionCoordinator::open(
1975            db.path(),
1976            Arc::new(SchemaManager::new()),
1977            Some(4),
1978            1,
1979            Arc::new(TelemetryCounters::default()),
1980            None,
1981        )
1982        .expect("coordinator");
1983        let compiled = QueryBuilder::nodes("Document")
1984            .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
1985            .compile()
1986            .expect("compile");
1987        let rows = coordinator
1988            .execute_compiled_read(&compiled)
1989            .expect("vector read");
1990        assert!(
1991            rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
1992            "restored logical_id should become visible through regenerated vectors"
1993        );
1994    }
1995
1996    #[test]
1997    fn check_semantics_clean_db_returns_zeros() {
1998        let (_db, service) = setup();
1999        let report = service.check_semantics().expect("semantics check");
2000        assert_eq!(report.orphaned_chunks, 0);
2001        assert_eq!(report.null_source_ref_nodes, 0);
2002        assert_eq!(report.broken_step_fk, 0);
2003        assert_eq!(report.broken_action_fk, 0);
2004        assert_eq!(report.stale_fts_rows, 0);
2005        assert_eq!(report.fts_rows_for_superseded_nodes, 0);
2006        assert_eq!(report.dangling_edges, 0);
2007        assert_eq!(report.orphaned_supersession_chains, 0);
2008        assert_eq!(report.stale_vec_rows, 0);
2009        assert_eq!(report.vec_rows_for_superseded_nodes, 0);
2010        assert_eq!(report.missing_operational_current_rows, 0);
2011        assert_eq!(report.stale_operational_current_rows, 0);
2012        assert_eq!(report.disabled_collection_mutations, 0);
2013        assert_eq!(report.mismatched_kind_property_fts_rows, 0);
2014        assert_eq!(report.duplicate_property_fts_rows, 0);
2015        assert_eq!(report.drifted_property_fts_rows, 0);
2016        assert!(report.warnings.is_empty());
2017    }
2018
2019    #[test]
2020    fn register_operational_collection_persists_and_emits_provenance() {
2021        let (db, service) = setup();
2022        let record = service
2023            .register_operational_collection(&OperationalRegisterRequest {
2024                name: "connector_health".to_owned(),
2025                kind: OperationalCollectionKind::LatestState,
2026                schema_json: "{}".to_owned(),
2027                retention_json: "{}".to_owned(),
2028                filter_fields_json: "[]".to_owned(),
2029                validation_json: String::new(),
2030                secondary_indexes_json: "[]".to_owned(),
2031                format_version: 1,
2032            })
2033            .expect("register collection");
2034
2035        assert_eq!(record.name, "connector_health");
2036        assert_eq!(record.kind, OperationalCollectionKind::LatestState);
2037        assert_eq!(record.schema_json, "{}");
2038        assert_eq!(record.retention_json, "{}");
2039        assert_eq!(record.filter_fields_json, "[]");
2040        assert!(record.created_at > 0);
2041        assert_eq!(record.disabled_at, None);
2042
2043        let described = service
2044            .describe_operational_collection("connector_health")
2045            .expect("describe collection")
2046            .expect("collection exists");
2047        assert_eq!(described, record);
2048
2049        let conn = sqlite::open_connection(db.path()).expect("conn");
2050        let provenance_count: i64 = conn
2051            .query_row(
2052                "SELECT count(*) FROM provenance_events \
2053                 WHERE event_type = 'operational_collection_registered' AND subject = 'connector_health'",
2054                [],
2055                |row| row.get(0),
2056            )
2057            .expect("provenance count");
2058        assert_eq!(provenance_count, 1);
2059    }
2060
2061    #[test]
2062    fn register_and_update_operational_collection_validation_round_trip() {
2063        let (db, service) = setup();
2064        let record = service
2065            .register_operational_collection(&OperationalRegisterRequest {
2066                name: "connector_health".to_owned(),
2067                kind: OperationalCollectionKind::LatestState,
2068                schema_json: "{}".to_owned(),
2069                retention_json: "{}".to_owned(),
2070                filter_fields_json: "[]".to_owned(),
2071                validation_json: String::new(),
2072                secondary_indexes_json: "[]".to_owned(),
2073                format_version: 1,
2074            })
2075            .expect("register collection");
2076        assert_eq!(record.validation_json, "");
2077
2078        let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
2079        let updated = service
2080            .update_operational_collection_validation("connector_health", validation_json)
2081            .expect("update validation");
2082        assert_eq!(updated.validation_json, validation_json);
2083
2084        let described = service
2085            .describe_operational_collection("connector_health")
2086            .expect("describe collection")
2087            .expect("collection exists");
2088        assert_eq!(described.validation_json, validation_json);
2089
2090        let conn = sqlite::open_connection(db.path()).expect("conn");
2091        let provenance_count: i64 = conn
2092            .query_row(
2093                "SELECT count(*) FROM provenance_events \
2094                 WHERE event_type = 'operational_collection_validation_updated' \
2095                   AND subject = 'connector_health'",
2096                [],
2097                |row| row.get(0),
2098            )
2099            .expect("provenance count");
2100        assert_eq!(provenance_count, 1);
2101    }
2102
2103    #[test]
2104    fn register_update_and_rebuild_operational_secondary_indexes_round_trip() {
2105        let (db, service) = setup();
2106        let record = service
2107            .register_operational_collection(&OperationalRegisterRequest {
2108                name: "audit_log".to_owned(),
2109                kind: OperationalCollectionKind::AppendOnlyLog,
2110                schema_json: "{}".to_owned(),
2111                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2112                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
2113                validation_json: String::new(),
2114                secondary_indexes_json: "[]".to_owned(),
2115                format_version: 1,
2116            })
2117            .expect("register collection");
2118        assert_eq!(record.secondary_indexes_json, "[]");
2119
2120        {
2121            let writer = crate::WriterActor::start(
2122                db.path(),
2123                Arc::new(SchemaManager::new()),
2124                crate::ProvenanceMode::Warn,
2125                Arc::new(crate::TelemetryCounters::default()),
2126            )
2127            .expect("writer");
2128            writer
2129                .submit(crate::WriteRequest {
2130                    label: "secondary-index-seed".to_owned(),
2131                    nodes: vec![],
2132                    node_retires: vec![],
2133                    edges: vec![],
2134                    edge_retires: vec![],
2135                    chunks: vec![],
2136                    runs: vec![],
2137                    steps: vec![],
2138                    actions: vec![],
2139                    optional_backfills: vec![],
2140                    vec_inserts: vec![],
2141                    operational_writes: vec![
2142                        crate::OperationalWrite::Append {
2143                            collection: "audit_log".to_owned(),
2144                            record_key: "evt-1".to_owned(),
2145                            payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
2146                            source_ref: Some("src-1".to_owned()),
2147                        },
2148                        crate::OperationalWrite::Append {
2149                            collection: "audit_log".to_owned(),
2150                            record_key: "evt-2".to_owned(),
2151                            payload_json: r#"{"actor":"bob","ts":200}"#.to_owned(),
2152                            source_ref: Some("src-2".to_owned()),
2153                        },
2154                    ],
2155                })
2156                .expect("seed writes");
2157        }
2158
2159        let secondary_indexes_json = r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#;
2160        let updated = service
2161            .update_operational_collection_secondary_indexes("audit_log", secondary_indexes_json)
2162            .expect("update secondary indexes");
2163        assert_eq!(updated.secondary_indexes_json, secondary_indexes_json);
2164
2165        let conn = sqlite::open_connection(db.path()).expect("conn");
2166        let entry_count: i64 = conn
2167            .query_row(
2168                "SELECT count(*) FROM operational_secondary_index_entries \
2169                 WHERE collection_name = 'audit_log' AND index_name = 'actor_ts'",
2170                [],
2171                |row| row.get(0),
2172            )
2173            .expect("secondary index count");
2174        assert_eq!(entry_count, 2);
2175        conn.execute(
2176            "DELETE FROM operational_secondary_index_entries WHERE collection_name = 'audit_log'",
2177            [],
2178        )
2179        .expect("clear index entries");
2180        drop(conn);
2181
2182        let rebuild = service
2183            .rebuild_operational_secondary_indexes("audit_log")
2184            .expect("rebuild secondary indexes");
2185        assert_eq!(rebuild.collection_name, "audit_log");
2186        assert_eq!(rebuild.mutation_entries_rebuilt, 2);
2187        assert_eq!(rebuild.current_entries_rebuilt, 0);
2188    }
2189
2190    #[test]
2191    fn register_operational_collection_rejects_invalid_validation_contract() {
2192        let (_db, service) = setup();
2193
2194        let error = service
2195            .register_operational_collection(&OperationalRegisterRequest {
2196                name: "connector_health".to_owned(),
2197                kind: OperationalCollectionKind::LatestState,
2198                schema_json: "{}".to_owned(),
2199                retention_json: "{}".to_owned(),
2200                filter_fields_json: "[]".to_owned(),
2201                validation_json: r#"{"format_version":1,"mode":"enforce","fields":[{"name":"status","type":"string","minimum":0}]}"#
2202                    .to_owned(),
2203                secondary_indexes_json: "[]".to_owned(),
2204                format_version: 1,
2205            })
2206            .expect_err("invalid validation contract should reject");
2207
2208        assert!(matches!(error, EngineError::InvalidWrite(_)));
2209        assert!(error.to_string().contains("minimum/maximum"));
2210    }
2211
2212    #[test]
2213    fn validate_operational_collection_history_reports_invalid_rows_without_mutation() {
2214        let (db, service) = setup();
2215        service
2216            .register_operational_collection(&OperationalRegisterRequest {
2217                name: "audit_log".to_owned(),
2218                kind: OperationalCollectionKind::AppendOnlyLog,
2219                schema_json: "{}".to_owned(),
2220                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2221                filter_fields_json: "[]".to_owned(),
2222                validation_json: r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#
2223                    .to_owned(),
2224                secondary_indexes_json: "[]".to_owned(),
2225                format_version: 1,
2226            })
2227            .expect("register collection");
2228        {
2229            let writer = crate::WriterActor::start(
2230                db.path(),
2231                Arc::new(SchemaManager::new()),
2232                crate::ProvenanceMode::Warn,
2233                Arc::new(crate::TelemetryCounters::default()),
2234            )
2235            .expect("writer");
2236            writer
2237                .submit(crate::WriteRequest {
2238                    label: "history-validation".to_owned(),
2239                    nodes: vec![],
2240                    node_retires: vec![],
2241                    edges: vec![],
2242                    edge_retires: vec![],
2243                    chunks: vec![],
2244                    runs: vec![],
2245                    steps: vec![],
2246                    actions: vec![],
2247                    optional_backfills: vec![],
2248                    vec_inserts: vec![],
2249                    operational_writes: vec![
2250                        crate::OperationalWrite::Append {
2251                            collection: "audit_log".to_owned(),
2252                            record_key: "evt-1".to_owned(),
2253                            payload_json: r#"{"status":"ok"}"#.to_owned(),
2254                            source_ref: Some("src-1".to_owned()),
2255                        },
2256                        crate::OperationalWrite::Append {
2257                            collection: "audit_log".to_owned(),
2258                            record_key: "evt-2".to_owned(),
2259                            payload_json: r#"{"status":"bogus"}"#.to_owned(),
2260                            source_ref: Some("src-2".to_owned()),
2261                        },
2262                    ],
2263                })
2264                .expect("write");
2265        }
2266
2267        let report = service
2268            .validate_operational_collection_history("audit_log")
2269            .expect("validate history");
2270        assert_eq!(report.collection_name, "audit_log");
2271        assert_eq!(report.checked_rows, 2);
2272        assert_eq!(report.invalid_row_count, 1);
2273        assert_eq!(report.issues.len(), 1);
2274        assert_eq!(report.issues[0].record_key, "evt-2");
2275        assert!(report.issues[0].message.contains("must be one of"));
2276
2277        let trace = service
2278            .trace_operational_collection("audit_log", None)
2279            .expect("trace");
2280        assert_eq!(trace.mutation_count, 2);
2281
2282        let conn = sqlite::open_connection(db.path()).expect("conn");
2283        let provenance_count: i64 = conn
2284            .query_row(
2285                "SELECT count(*) FROM provenance_events \
2286                 WHERE event_type = 'operational_collection_history_validated' \
2287                   AND subject = 'audit_log'",
2288                [],
2289                |row| row.get(0),
2290            )
2291            .expect("provenance count");
2292        assert_eq!(provenance_count, 0);
2293    }
2294
2295    #[test]
2296    fn trace_operational_collection_returns_mutations_and_current_rows() {
2297        let (db, service) = setup();
2298        service
2299            .register_operational_collection(&OperationalRegisterRequest {
2300                name: "connector_health".to_owned(),
2301                kind: OperationalCollectionKind::LatestState,
2302                schema_json: "{}".to_owned(),
2303                retention_json: "{}".to_owned(),
2304                filter_fields_json: "[]".to_owned(),
2305                validation_json: String::new(),
2306                secondary_indexes_json: "[]".to_owned(),
2307                format_version: 1,
2308            })
2309            .expect("register collection");
2310        {
2311            let writer = crate::WriterActor::start(
2312                db.path(),
2313                Arc::new(SchemaManager::new()),
2314                crate::ProvenanceMode::Warn,
2315                Arc::new(crate::TelemetryCounters::default()),
2316            )
2317            .expect("writer");
2318            writer
2319                .submit(crate::WriteRequest {
2320                    label: "operational".to_owned(),
2321                    nodes: vec![],
2322                    node_retires: vec![],
2323                    edges: vec![],
2324                    edge_retires: vec![],
2325                    chunks: vec![],
2326                    runs: vec![],
2327                    steps: vec![],
2328                    actions: vec![],
2329                    optional_backfills: vec![],
2330                    vec_inserts: vec![],
2331                    operational_writes: vec![crate::OperationalWrite::Put {
2332                        collection: "connector_health".to_owned(),
2333                        record_key: "gmail".to_owned(),
2334                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2335                        source_ref: Some("src-1".to_owned()),
2336                    }],
2337                })
2338                .expect("write");
2339        }
2340
2341        let report = service
2342            .trace_operational_collection("connector_health", Some("gmail"))
2343            .expect("trace");
2344        assert_eq!(report.collection_name, "connector_health");
2345        assert_eq!(report.record_key.as_deref(), Some("gmail"));
2346        assert_eq!(report.mutation_count, 1);
2347        assert_eq!(report.current_count, 1);
2348        assert_eq!(report.mutations[0].op_kind, "put");
2349        assert_eq!(report.current_rows[0].payload_json, r#"{"status":"ok"}"#);
2350    }
2351
2352    #[test]
2353    fn trace_operational_collection_rejects_unknown_collection() {
2354        let (_db, service) = setup();
2355
2356        let error = service
2357            .trace_operational_collection("missing_collection", None)
2358            .expect_err("unknown collection should fail");
2359
2360        assert!(matches!(error, EngineError::InvalidWrite(_)));
2361        assert!(error.to_string().contains("is not registered"));
2362    }
2363
2364    #[test]
2365    fn rebuild_operational_current_repairs_missing_latest_state_rows() {
2366        let (db, service) = setup();
2367        service
2368            .register_operational_collection(&OperationalRegisterRequest {
2369                name: "connector_health".to_owned(),
2370                kind: OperationalCollectionKind::LatestState,
2371                schema_json: "{}".to_owned(),
2372                retention_json: "{}".to_owned(),
2373                filter_fields_json: "[]".to_owned(),
2374                validation_json: String::new(),
2375                secondary_indexes_json: "[]".to_owned(),
2376                format_version: 1,
2377            })
2378            .expect("register collection");
2379        {
2380            let writer = crate::WriterActor::start(
2381                db.path(),
2382                Arc::new(SchemaManager::new()),
2383                crate::ProvenanceMode::Warn,
2384                Arc::new(crate::TelemetryCounters::default()),
2385            )
2386            .expect("writer");
2387            writer
2388                .submit(crate::WriteRequest {
2389                    label: "operational".to_owned(),
2390                    nodes: vec![],
2391                    node_retires: vec![],
2392                    edges: vec![],
2393                    edge_retires: vec![],
2394                    chunks: vec![],
2395                    runs: vec![],
2396                    steps: vec![],
2397                    actions: vec![],
2398                    optional_backfills: vec![],
2399                    vec_inserts: vec![],
2400                    operational_writes: vec![crate::OperationalWrite::Put {
2401                        collection: "connector_health".to_owned(),
2402                        record_key: "gmail".to_owned(),
2403                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2404                        source_ref: Some("src-1".to_owned()),
2405                    }],
2406                })
2407                .expect("write");
2408        }
2409        {
2410            let conn = sqlite::open_connection(db.path()).expect("conn");
2411            conn.execute(
2412                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2413                [],
2414            )
2415            .expect("delete current row");
2416        }
2417
2418        let before = service.check_semantics().expect("semantics before rebuild");
2419        assert_eq!(before.missing_operational_current_rows, 1);
2420
2421        let repair = service
2422            .rebuild_operational_current(Some("connector_health"))
2423            .expect("rebuild current");
2424        assert_eq!(repair.collections_rebuilt, 1);
2425        assert_eq!(repair.current_rows_rebuilt, 1);
2426
2427        let after = service.check_semantics().expect("semantics after rebuild");
2428        assert_eq!(after.missing_operational_current_rows, 0);
2429
2430        let conn = sqlite::open_connection(db.path()).expect("conn");
2431        let payload: String = conn
2432            .query_row(
2433                "SELECT payload_json FROM operational_current \
2434                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2435                [],
2436                |row| row.get(0),
2437            )
2438            .expect("restored payload");
2439        assert_eq!(payload, r#"{"status":"ok"}"#);
2440    }
2441
2442    #[test]
2443    fn rebuild_operational_current_restores_latest_state_secondary_index_entries() {
2444        let (db, service) = setup();
2445        service
2446            .register_operational_collection(&OperationalRegisterRequest {
2447                name: "connector_health".to_owned(),
2448                kind: OperationalCollectionKind::LatestState,
2449                schema_json: "{}".to_owned(),
2450                retention_json: "{}".to_owned(),
2451                filter_fields_json: "[]".to_owned(),
2452                validation_json: String::new(),
2453                secondary_indexes_json: r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"}]"#.to_owned(),
2454                format_version: 1,
2455            })
2456            .expect("register collection");
2457        {
2458            let writer = crate::WriterActor::start(
2459                db.path(),
2460                Arc::new(SchemaManager::new()),
2461                crate::ProvenanceMode::Warn,
2462                Arc::new(crate::TelemetryCounters::default()),
2463            )
2464            .expect("writer");
2465            writer
2466                .submit(crate::WriteRequest {
2467                    label: "operational".to_owned(),
2468                    nodes: vec![],
2469                    node_retires: vec![],
2470                    edges: vec![],
2471                    edge_retires: vec![],
2472                    chunks: vec![],
2473                    runs: vec![],
2474                    steps: vec![],
2475                    actions: vec![],
2476                    optional_backfills: vec![],
2477                    vec_inserts: vec![],
2478                    operational_writes: vec![crate::OperationalWrite::Put {
2479                        collection: "connector_health".to_owned(),
2480                        record_key: "gmail".to_owned(),
2481                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2482                        source_ref: Some("src-1".to_owned()),
2483                    }],
2484                })
2485                .expect("write");
2486        }
2487        {
2488            let conn = sqlite::open_connection(db.path()).expect("conn");
2489            let entry_count: i64 = conn
2490                .query_row(
2491                    "SELECT count(*) FROM operational_secondary_index_entries \
2492                     WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2493                    [],
2494                    |row| row.get(0),
2495                )
2496                .expect("secondary index count before repair");
2497            assert_eq!(entry_count, 1);
2498            conn.execute(
2499                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2500                [],
2501            )
2502            .expect("delete current row");
2503        }
2504
2505        service
2506            .rebuild_operational_current(Some("connector_health"))
2507            .expect("rebuild current");
2508
2509        let conn = sqlite::open_connection(db.path()).expect("conn");
2510        let entry_count: i64 = conn
2511            .query_row(
2512                "SELECT count(*) FROM operational_secondary_index_entries \
2513                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2514                [],
2515                |row| row.get(0),
2516            )
2517            .expect("secondary index count after repair");
2518        assert_eq!(entry_count, 1);
2519    }
2520
2521    #[test]
2522    fn operational_current_semantics_and_rebuild_follow_mutation_order() {
2523        let (db, service) = setup();
2524        {
2525            let conn = sqlite::open_connection(db.path()).expect("conn");
2526            conn.execute(
2527                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2528                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
2529                [],
2530            )
2531            .expect("seed collection");
2532            conn.execute(
2533                "INSERT INTO operational_mutations \
2534                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2535                 VALUES ('m3', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'src-1', 100, 1)",
2536                [],
2537            )
2538            .expect("seed first put");
2539            conn.execute(
2540                "INSERT INTO operational_mutations \
2541                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2542                 VALUES ('m2', 'connector_health', 'gmail', 'delete', '', 'src-2', 100, 2)",
2543                [],
2544            )
2545            .expect("seed delete");
2546            conn.execute(
2547                "INSERT INTO operational_mutations \
2548                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2549                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'src-3', 100, 3)",
2550                [],
2551            )
2552            .expect("seed final put");
2553            conn.execute(
2554                "INSERT INTO operational_current \
2555                 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2556                 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 100, 'm1')",
2557                [],
2558            )
2559            .expect("seed current");
2560        }
2561
2562        let before = service.check_semantics().expect("semantics before rebuild");
2563        assert_eq!(before.missing_operational_current_rows, 0);
2564        assert_eq!(before.stale_operational_current_rows, 0);
2565
2566        {
2567            let conn = sqlite::open_connection(db.path()).expect("conn");
2568            conn.execute(
2569                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2570                [],
2571            )
2572            .expect("delete current row");
2573        }
2574
2575        let missing = service.check_semantics().expect("semantics after delete");
2576        assert_eq!(missing.missing_operational_current_rows, 1);
2577        assert_eq!(missing.stale_operational_current_rows, 0);
2578
2579        service
2580            .rebuild_operational_current(Some("connector_health"))
2581            .expect("rebuild current");
2582
2583        let after = service.check_semantics().expect("semantics after rebuild");
2584        assert_eq!(after.missing_operational_current_rows, 0);
2585        assert_eq!(after.stale_operational_current_rows, 0);
2586
2587        let conn = sqlite::open_connection(db.path()).expect("conn");
2588        let payload: String = conn
2589            .query_row(
2590                "SELECT payload_json FROM operational_current \
2591                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2592                [],
2593                |row| row.get(0),
2594            )
2595            .expect("restored payload");
2596        assert_eq!(payload, r#"{"status":"new"}"#);
2597    }
2598
2599    #[test]
2600    fn disable_operational_collection_sets_disabled_at_and_emits_provenance() {
2601        let (db, service) = setup();
2602        service
2603            .register_operational_collection(&OperationalRegisterRequest {
2604                name: "audit_log".to_owned(),
2605                kind: OperationalCollectionKind::AppendOnlyLog,
2606                schema_json: "{}".to_owned(),
2607                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2608                filter_fields_json: "[]".to_owned(),
2609                validation_json: String::new(),
2610                secondary_indexes_json: "[]".to_owned(),
2611                format_version: 1,
2612            })
2613            .expect("register collection");
2614
2615        let record = service
2616            .disable_operational_collection("audit_log")
2617            .expect("disable collection");
2618        assert_eq!(record.name, "audit_log");
2619        assert!(record.disabled_at.is_some());
2620
2621        let disabled_at = record.disabled_at.expect("disabled_at");
2622        let described = service
2623            .describe_operational_collection("audit_log")
2624            .expect("describe collection")
2625            .expect("collection exists");
2626        assert_eq!(described.disabled_at, Some(disabled_at));
2627
2628        let writer = crate::WriterActor::start(
2629            db.path(),
2630            Arc::new(SchemaManager::new()),
2631            crate::ProvenanceMode::Warn,
2632            Arc::new(crate::TelemetryCounters::default()),
2633        )
2634        .expect("writer");
2635        let error = writer
2636            .submit(crate::WriteRequest {
2637                label: "disabled-operational".to_owned(),
2638                nodes: vec![],
2639                node_retires: vec![],
2640                edges: vec![],
2641                edge_retires: vec![],
2642                chunks: vec![],
2643                runs: vec![],
2644                steps: vec![],
2645                actions: vec![],
2646                optional_backfills: vec![],
2647                vec_inserts: vec![],
2648                operational_writes: vec![crate::OperationalWrite::Append {
2649                    collection: "audit_log".to_owned(),
2650                    record_key: "evt-1".to_owned(),
2651                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2652                    source_ref: Some("src-1".to_owned()),
2653                }],
2654            })
2655            .expect_err("disabled collection should reject writes");
2656        assert!(matches!(error, EngineError::InvalidWrite(_)));
2657        assert!(error.to_string().contains("is disabled"));
2658
2659        let conn = sqlite::open_connection(db.path()).expect("conn");
2660        let provenance_count: i64 = conn
2661            .query_row(
2662                "SELECT count(*) FROM provenance_events \
2663                 WHERE event_type = 'operational_collection_disabled' AND subject = 'audit_log'",
2664                [],
2665                |row| row.get(0),
2666            )
2667            .expect("provenance count");
2668        assert_eq!(provenance_count, 1);
2669    }
2670
2671    #[test]
2672    fn purge_operational_collection_deletes_append_only_rows_before_cutoff() {
2673        let (db, service) = setup();
2674        {
2675            let conn = sqlite::open_connection(db.path()).expect("conn");
2676            conn.execute(
2677                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2678                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_all\"}', 1, 100)",
2679                [],
2680            )
2681            .expect("seed collection");
2682            conn.execute(
2683                "INSERT INTO operational_mutations \
2684                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2685                 VALUES ('evt-1', 'audit_log', 'evt-1', 'append', '{\"seq\":1}', 'src-1', 100, 1)",
2686                [],
2687            )
2688            .expect("seed event 1");
2689            conn.execute(
2690                "INSERT INTO operational_mutations \
2691                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2692                 VALUES ('evt-2', 'audit_log', 'evt-2', 'append', '{\"seq\":2}', 'src-2', 200, 2)",
2693                [],
2694            )
2695            .expect("seed event 2");
2696            conn.execute(
2697                "INSERT INTO operational_mutations \
2698                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2699                 VALUES ('evt-3', 'audit_log', 'evt-3', 'append', '{\"seq\":3}', 'src-3', 300, 3)",
2700                [],
2701            )
2702            .expect("seed event 3");
2703        }
2704
2705        let report = service
2706            .purge_operational_collection("audit_log", 250)
2707            .expect("purge collection");
2708        assert_eq!(report.collection_name, "audit_log");
2709        assert_eq!(report.deleted_mutations, 2);
2710        assert_eq!(report.before_timestamp, 250);
2711
2712        let conn = sqlite::open_connection(db.path()).expect("conn");
2713        let remaining: Vec<String> = {
2714            let mut stmt = conn
2715                .prepare(
2716                    "SELECT id FROM operational_mutations \
2717                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2718                )
2719                .expect("stmt");
2720            stmt.query_map([], |row| row.get(0))
2721                .expect("rows")
2722                .collect::<Result<_, _>>()
2723                .expect("collect")
2724        };
2725        assert_eq!(remaining, vec!["evt-3".to_owned()]);
2726        let provenance_count: i64 = conn
2727            .query_row(
2728                "SELECT count(*) FROM provenance_events \
2729                 WHERE event_type = 'operational_collection_purged' AND subject = 'audit_log'",
2730                [],
2731                |row| row.get(0),
2732            )
2733            .expect("provenance count");
2734        assert_eq!(provenance_count, 1);
2735    }
2736
2737    #[test]
2738    fn compact_operational_collection_dry_run_reports_without_mutation() {
2739        let (db, service) = setup();
2740        {
2741            let conn = sqlite::open_connection(db.path()).expect("conn");
2742            conn.execute(
2743                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2744                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2745                [],
2746            )
2747            .expect("seed collection");
2748            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2749                conn.execute(
2750                    "INSERT INTO operational_mutations \
2751                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2752                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2753                    rusqlite::params![
2754                        format!("evt-{index}"),
2755                        format!("{{\"seq\":{index}}}"),
2756                        created_at,
2757                        index,
2758                    ],
2759                )
2760                .expect("seed event");
2761            }
2762        }
2763
2764        let report = service
2765            .compact_operational_collection("audit_log", true)
2766            .expect("compact collection");
2767        assert_eq!(report.collection_name, "audit_log");
2768        assert_eq!(report.deleted_mutations, 1);
2769        assert!(report.dry_run);
2770        assert_eq!(report.before_timestamp, None);
2771
2772        let conn = sqlite::open_connection(db.path()).expect("conn");
2773        let remaining_count: i64 = conn
2774            .query_row(
2775                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2776                [],
2777                |row| row.get(0),
2778            )
2779            .expect("remaining count");
2780        assert_eq!(remaining_count, 3);
2781        let provenance_count: i64 = conn
2782            .query_row(
2783                "SELECT count(*) FROM provenance_events \
2784                 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2785                [],
2786                |row| row.get(0),
2787            )
2788            .expect("provenance count");
2789        assert_eq!(provenance_count, 0);
2790    }
2791
2792    #[test]
2793    fn compact_operational_collection_keep_last_deletes_oldest_rows() {
2794        let (db, service) = setup();
2795        {
2796            let conn = sqlite::open_connection(db.path()).expect("conn");
2797            conn.execute(
2798                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2799                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2800                [],
2801            )
2802            .expect("seed collection");
2803            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2804                conn.execute(
2805                    "INSERT INTO operational_mutations \
2806                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2807                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2808                    rusqlite::params![
2809                        format!("evt-{index}"),
2810                        format!("{{\"seq\":{index}}}"),
2811                        created_at,
2812                        index,
2813                    ],
2814                )
2815                .expect("seed event");
2816            }
2817        }
2818
2819        let report = service
2820            .compact_operational_collection("audit_log", false)
2821            .expect("compact collection");
2822        assert_eq!(report.deleted_mutations, 1);
2823        assert!(!report.dry_run);
2824
2825        let conn = sqlite::open_connection(db.path()).expect("conn");
2826        let remaining: Vec<String> = {
2827            let mut stmt = conn
2828                .prepare(
2829                    "SELECT id FROM operational_mutations \
2830                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2831                )
2832                .expect("stmt");
2833            stmt.query_map([], |row| row.get(0))
2834                .expect("rows")
2835                .collect::<Result<_, _>>()
2836                .expect("collect")
2837        };
2838        assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
2839        let provenance_count: i64 = conn
2840            .query_row(
2841                "SELECT count(*) FROM provenance_events \
2842                 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2843                [],
2844                |row| row.get(0),
2845            )
2846            .expect("provenance count");
2847        assert_eq!(provenance_count, 1);
2848    }
2849
2850    #[test]
2851    fn plan_and_run_operational_retention_keep_last() {
2852        let (db, service) = setup();
2853        {
2854            let conn = sqlite::open_connection(db.path()).expect("conn");
2855            conn.execute(
2856                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2857                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2858                [],
2859            )
2860            .expect("seed collection");
2861            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2862                conn.execute(
2863                    "INSERT INTO operational_mutations \
2864                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2865                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2866                    rusqlite::params![
2867                        format!("evt-{index}"),
2868                        format!("{{\"seq\":{index}}}"),
2869                        created_at,
2870                        index,
2871                    ],
2872                )
2873                .expect("seed event");
2874            }
2875        }
2876
2877        let plan = service
2878            .plan_operational_retention(1_000, None, Some(10))
2879            .expect("plan retention");
2880        assert_eq!(plan.collections_examined, 1);
2881        assert_eq!(plan.items[0].collection_name, "audit_log");
2882        assert_eq!(
2883            plan.items[0].action_kind,
2884            crate::operational::OperationalRetentionActionKind::KeepLast
2885        );
2886        assert_eq!(plan.items[0].candidate_deletions, 1);
2887        assert_eq!(plan.items[0].max_rows, Some(2));
2888        assert_eq!(plan.items[0].last_run_at, None);
2889
2890        let dry_run = service
2891            .run_operational_retention(1_000, None, Some(10), true)
2892            .expect("dry-run retention");
2893        assert!(dry_run.dry_run);
2894        assert_eq!(dry_run.collections_acted_on, 1);
2895        assert_eq!(dry_run.items[0].deleted_mutations, 1);
2896        assert_eq!(dry_run.items[0].rows_remaining, 2);
2897
2898        let conn = sqlite::open_connection(db.path()).expect("conn");
2899        let remaining_count: i64 = conn
2900            .query_row(
2901                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2902                [],
2903                |row| row.get(0),
2904            )
2905            .expect("remaining count after dry run");
2906        assert_eq!(remaining_count, 3);
2907        let retention_run_count: i64 = conn
2908            .query_row(
2909                "SELECT count(*) FROM operational_retention_runs WHERE collection_name = 'audit_log'",
2910                [],
2911                |row| row.get(0),
2912            )
2913            .expect("retention run count");
2914        assert_eq!(retention_run_count, 0);
2915        drop(conn);
2916
2917        let executed = service
2918            .run_operational_retention(1_000, None, Some(10), false)
2919            .expect("execute retention");
2920        assert_eq!(executed.collections_acted_on, 1);
2921        assert_eq!(executed.items[0].deleted_mutations, 1);
2922        assert_eq!(executed.items[0].rows_remaining, 2);
2923
2924        let conn = sqlite::open_connection(db.path()).expect("conn");
2925        let remaining: Vec<String> = {
2926            let mut stmt = conn
2927                .prepare(
2928                    "SELECT id FROM operational_mutations \
2929                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2930                )
2931                .expect("stmt");
2932            stmt.query_map([], |row| row.get(0))
2933                .expect("rows")
2934                .collect::<Result<_, _>>()
2935                .expect("collect")
2936        };
2937        assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
2938        let last_run_at: i64 = conn
2939            .query_row(
2940                "SELECT executed_at FROM operational_retention_runs \
2941                 WHERE collection_name = 'audit_log' ORDER BY executed_at DESC LIMIT 1",
2942                [],
2943                |row| row.get(0),
2944            )
2945            .expect("last run at");
2946        assert_eq!(last_run_at, 1_000);
2947    }
2948
2949    #[test]
2950    fn dry_run_operational_retention_does_not_mark_noop_collection_as_acted_on() {
2951        let (db, service) = setup();
2952        let conn = sqlite::open_connection(db.path()).expect("conn");
2953        conn.execute(
2954            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2955             VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2956            [],
2957        )
2958        .expect("seed collection");
2959        for (index, created_at) in [(1_i64, 100_i64), (2, 200)] {
2960            conn.execute(
2961                "INSERT INTO operational_mutations \
2962                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2963                 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2964                rusqlite::params![
2965                    format!("evt-{index}"),
2966                    format!("{{\"seq\":{index}}}"),
2967                    created_at,
2968                    index,
2969                ],
2970            )
2971            .expect("seed event");
2972        }
2973        drop(conn);
2974
2975        let dry_run = service
2976            .run_operational_retention(1_000, None, Some(10), true)
2977            .expect("dry-run retention");
2978        assert!(dry_run.dry_run);
2979        assert_eq!(dry_run.collections_acted_on, 0);
2980        assert_eq!(dry_run.items[0].deleted_mutations, 0);
2981        assert_eq!(dry_run.items[0].rows_remaining, 2);
2982    }
2983
2984    #[test]
2985    fn compact_operational_collection_rejects_latest_state() {
2986        let (_db, service) = setup();
2987        service
2988            .register_operational_collection(&OperationalRegisterRequest {
2989                name: "connector_health".to_owned(),
2990                kind: OperationalCollectionKind::LatestState,
2991                schema_json: "{}".to_owned(),
2992                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2993                filter_fields_json: "[]".to_owned(),
2994                validation_json: String::new(),
2995                secondary_indexes_json: "[]".to_owned(),
2996                format_version: 1,
2997            })
2998            .expect("register collection");
2999
3000        let error = service
3001            .compact_operational_collection("connector_health", false)
3002            .expect_err("latest_state compaction should be rejected");
3003        assert!(matches!(error, EngineError::InvalidWrite(_)));
3004        assert!(error.to_string().contains("append_only_log"));
3005    }
3006
3007    #[test]
3008    fn register_operational_collection_persists_filter_fields_json() {
3009        let (_db, service) = setup();
3010
3011        let record = service
3012            .register_operational_collection(&OperationalRegisterRequest {
3013                name: "audit_log".to_owned(),
3014                kind: OperationalCollectionKind::AppendOnlyLog,
3015                schema_json: "{}".to_owned(),
3016                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3017                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3018                validation_json: String::new(),
3019                secondary_indexes_json: "[]".to_owned(),
3020                format_version: 1,
3021            })
3022            .expect("register collection");
3023
3024        assert_eq!(
3025            record.filter_fields_json,
3026            r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#
3027        );
3028    }
3029
3030    #[test]
3031    fn read_operational_collection_filters_append_only_rows_by_declared_fields() {
3032        let (db, service) = setup();
3033        service
3034            .register_operational_collection(&OperationalRegisterRequest {
3035                name: "audit_log".to_owned(),
3036                kind: OperationalCollectionKind::AppendOnlyLog,
3037                schema_json: "{}".to_owned(),
3038                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3039                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"seq","type":"integer","modes":["exact","range"]},{"name":"ts","type":"timestamp","modes":["exact","range"]}]"#.to_owned(),
3040                validation_json: String::new(),
3041                secondary_indexes_json: "[]".to_owned(),
3042                format_version: 1,
3043            })
3044            .expect("register collection");
3045        {
3046            let writer = crate::WriterActor::start(
3047                db.path(),
3048                Arc::new(SchemaManager::new()),
3049                crate::ProvenanceMode::Warn,
3050                Arc::new(crate::TelemetryCounters::default()),
3051            )
3052            .expect("writer");
3053            writer
3054                .submit(crate::WriteRequest {
3055                    label: "operational".to_owned(),
3056                    nodes: vec![],
3057                    node_retires: vec![],
3058                    edges: vec![],
3059                    edge_retires: vec![],
3060                    chunks: vec![],
3061                    runs: vec![],
3062                    steps: vec![],
3063                    actions: vec![],
3064                    optional_backfills: vec![],
3065                    vec_inserts: vec![],
3066                    operational_writes: vec![
3067                        crate::OperationalWrite::Append {
3068                            collection: "audit_log".to_owned(),
3069                            record_key: "evt-1".to_owned(),
3070                            payload_json: r#"{"actor":"alice","seq":1,"ts":100}"#.to_owned(),
3071                            source_ref: Some("src-1".to_owned()),
3072                        },
3073                        crate::OperationalWrite::Append {
3074                            collection: "audit_log".to_owned(),
3075                            record_key: "evt-2".to_owned(),
3076                            payload_json: r#"{"actor":"alice-admin","seq":2,"ts":200}"#.to_owned(),
3077                            source_ref: Some("src-2".to_owned()),
3078                        },
3079                        crate::OperationalWrite::Append {
3080                            collection: "audit_log".to_owned(),
3081                            record_key: "evt-3".to_owned(),
3082                            payload_json: r#"{"actor":"bob","seq":3,"ts":300}"#.to_owned(),
3083                            source_ref: Some("src-3".to_owned()),
3084                        },
3085                    ],
3086                })
3087                .expect("write");
3088        }
3089
3090        let report = service
3091            .read_operational_collection(&crate::operational::OperationalReadRequest {
3092                collection_name: "audit_log".to_owned(),
3093                filters: vec![
3094                    crate::operational::OperationalFilterClause::Prefix {
3095                        field: "actor".to_owned(),
3096                        value: "alice".to_owned(),
3097                    },
3098                    crate::operational::OperationalFilterClause::Range {
3099                        field: "ts".to_owned(),
3100                        lower: Some(150),
3101                        upper: Some(250),
3102                    },
3103                ],
3104                limit: Some(10),
3105            })
3106            .expect("filtered read");
3107
3108        assert_eq!(report.collection_name, "audit_log");
3109        assert_eq!(report.row_count, 1);
3110        assert!(!report.was_limited);
3111        assert_eq!(report.rows.len(), 1);
3112        assert_eq!(report.rows[0].record_key, "evt-2");
3113        assert_eq!(
3114            report.rows[0].payload_json,
3115            r#"{"actor":"alice-admin","seq":2,"ts":200}"#
3116        );
3117    }
3118
3119    #[test]
3120    fn read_operational_collection_uses_secondary_index_when_filter_values_are_missing() {
3121        let (db, service) = setup();
3122        service
3123            .register_operational_collection(&OperationalRegisterRequest {
3124                name: "audit_log".to_owned(),
3125                kind: OperationalCollectionKind::AppendOnlyLog,
3126                schema_json: "{}".to_owned(),
3127                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3128                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3129                validation_json: String::new(),
3130                secondary_indexes_json: r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#.to_owned(),
3131                format_version: 1,
3132            })
3133            .expect("register collection");
3134        {
3135            let writer = crate::WriterActor::start(
3136                db.path(),
3137                Arc::new(SchemaManager::new()),
3138                crate::ProvenanceMode::Warn,
3139                Arc::new(crate::TelemetryCounters::default()),
3140            )
3141            .expect("writer");
3142            writer
3143                .submit(crate::WriteRequest {
3144                    label: "operational".to_owned(),
3145                    nodes: vec![],
3146                    node_retires: vec![],
3147                    edges: vec![],
3148                    edge_retires: vec![],
3149                    chunks: vec![],
3150                    runs: vec![],
3151                    steps: vec![],
3152                    actions: vec![],
3153                    optional_backfills: vec![],
3154                    vec_inserts: vec![],
3155                    operational_writes: vec![
3156                        crate::OperationalWrite::Append {
3157                            collection: "audit_log".to_owned(),
3158                            record_key: "evt-1".to_owned(),
3159                            payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
3160                            source_ref: Some("src-1".to_owned()),
3161                        },
3162                        crate::OperationalWrite::Append {
3163                            collection: "audit_log".to_owned(),
3164                            record_key: "evt-2".to_owned(),
3165                            payload_json: r#"{"actor":"alice-admin","ts":200}"#.to_owned(),
3166                            source_ref: Some("src-2".to_owned()),
3167                        },
3168                    ],
3169                })
3170                .expect("write");
3171        }
3172        let conn = sqlite::open_connection(db.path()).expect("conn");
3173        conn.execute(
3174            "DELETE FROM operational_filter_values WHERE collection_name = 'audit_log'",
3175            [],
3176        )
3177        .expect("clear filter values");
3178        drop(conn);
3179
3180        let report = service
3181            .read_operational_collection(&crate::operational::OperationalReadRequest {
3182                collection_name: "audit_log".to_owned(),
3183                filters: vec![
3184                    crate::operational::OperationalFilterClause::Prefix {
3185                        field: "actor".to_owned(),
3186                        value: "alice".to_owned(),
3187                    },
3188                    crate::operational::OperationalFilterClause::Range {
3189                        field: "ts".to_owned(),
3190                        lower: Some(150),
3191                        upper: Some(250),
3192                    },
3193                ],
3194                limit: Some(10),
3195            })
3196            .expect("secondary-index read");
3197
3198        assert_eq!(report.row_count, 1);
3199        assert_eq!(report.rows[0].record_key, "evt-2");
3200    }
3201
3202    #[test]
3203    fn read_operational_collection_rejects_undeclared_fields_and_latest_state_collections() {
3204        let (_db, service) = setup();
3205        service
3206            .register_operational_collection(&OperationalRegisterRequest {
3207                name: "connector_health".to_owned(),
3208                kind: OperationalCollectionKind::LatestState,
3209                schema_json: "{}".to_owned(),
3210                retention_json: "{}".to_owned(),
3211                filter_fields_json: r#"[{"name":"status","type":"string","modes":["exact"]}]"#
3212                    .to_owned(),
3213                validation_json: String::new(),
3214                secondary_indexes_json: "[]".to_owned(),
3215                format_version: 1,
3216            })
3217            .expect("register collection");
3218
3219        let latest_state_error = service
3220            .read_operational_collection(&crate::operational::OperationalReadRequest {
3221                collection_name: "connector_health".to_owned(),
3222                filters: vec![crate::operational::OperationalFilterClause::Exact {
3223                    field: "status".to_owned(),
3224                    value: crate::operational::OperationalFilterValue::String("ok".to_owned()),
3225                }],
3226                limit: Some(10),
3227            })
3228            .expect_err("latest_state filtered reads should be rejected");
3229        assert!(latest_state_error.to_string().contains("append_only_log"));
3230
3231        service
3232            .register_operational_collection(&OperationalRegisterRequest {
3233                name: "audit_log".to_owned(),
3234                kind: OperationalCollectionKind::AppendOnlyLog,
3235                schema_json: "{}".to_owned(),
3236                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3237                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact"]}]"#
3238                    .to_owned(),
3239                validation_json: String::new(),
3240                secondary_indexes_json: "[]".to_owned(),
3241                format_version: 1,
3242            })
3243            .expect("register append-only collection");
3244
3245        let undeclared_error = service
3246            .read_operational_collection(&crate::operational::OperationalReadRequest {
3247                collection_name: "audit_log".to_owned(),
3248                filters: vec![crate::operational::OperationalFilterClause::Exact {
3249                    field: "missing".to_owned(),
3250                    value: crate::operational::OperationalFilterValue::String("x".to_owned()),
3251                }],
3252                limit: Some(10),
3253            })
3254            .expect_err("undeclared field should be rejected");
3255        assert!(undeclared_error.to_string().contains("undeclared"));
3256    }
3257
3258    #[test]
3259    fn read_operational_collection_applies_limit_and_reports_truncation() {
3260        let (db, service) = setup();
3261        service
3262            .register_operational_collection(&OperationalRegisterRequest {
3263                name: "audit_log".to_owned(),
3264                kind: OperationalCollectionKind::AppendOnlyLog,
3265                schema_json: "{}".to_owned(),
3266                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3267                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["prefix"]}]"#
3268                    .to_owned(),
3269                validation_json: String::new(),
3270                secondary_indexes_json: "[]".to_owned(),
3271                format_version: 1,
3272            })
3273            .expect("register collection");
3274        {
3275            let writer = crate::WriterActor::start(
3276                db.path(),
3277                Arc::new(SchemaManager::new()),
3278                crate::ProvenanceMode::Warn,
3279                Arc::new(crate::TelemetryCounters::default()),
3280            )
3281            .expect("writer");
3282            writer
3283                .submit(crate::WriteRequest {
3284                    label: "operational".to_owned(),
3285                    nodes: vec![],
3286                    node_retires: vec![],
3287                    edges: vec![],
3288                    edge_retires: vec![],
3289                    chunks: vec![],
3290                    runs: vec![],
3291                    steps: vec![],
3292                    actions: vec![],
3293                    optional_backfills: vec![],
3294                    vec_inserts: vec![],
3295                    operational_writes: vec![
3296                        crate::OperationalWrite::Append {
3297                            collection: "audit_log".to_owned(),
3298                            record_key: "evt-1".to_owned(),
3299                            payload_json: r#"{"actor":"alice-1"}"#.to_owned(),
3300                            source_ref: Some("src-1".to_owned()),
3301                        },
3302                        crate::OperationalWrite::Append {
3303                            collection: "audit_log".to_owned(),
3304                            record_key: "evt-2".to_owned(),
3305                            payload_json: r#"{"actor":"alice-2"}"#.to_owned(),
3306                            source_ref: Some("src-2".to_owned()),
3307                        },
3308                    ],
3309                })
3310                .expect("write");
3311        }
3312
3313        let report = service
3314            .read_operational_collection(&crate::operational::OperationalReadRequest {
3315                collection_name: "audit_log".to_owned(),
3316                filters: vec![crate::operational::OperationalFilterClause::Prefix {
3317                    field: "actor".to_owned(),
3318                    value: "alice".to_owned(),
3319                }],
3320                limit: Some(1),
3321            })
3322            .expect("limited read");
3323
3324        assert_eq!(report.row_count, 1);
3325        assert_eq!(report.applied_limit, 1);
3326        assert!(report.was_limited);
3327        assert_eq!(report.rows[0].record_key, "evt-2");
3328    }
3329
3330    #[test]
3331    fn preexisting_operational_collection_can_gain_filter_contract_after_upgrade() {
3332        let db = NamedTempFile::new().expect("temp db");
3333        let conn = sqlite::open_connection(db.path()).expect("conn");
3334        conn.execute_batch(
3335            r#"
3336            CREATE TABLE operational_collections (
3337                name TEXT PRIMARY KEY,
3338                kind TEXT NOT NULL,
3339                schema_json TEXT NOT NULL,
3340                retention_json TEXT NOT NULL,
3341                format_version INTEGER NOT NULL DEFAULT 1,
3342                created_at INTEGER NOT NULL DEFAULT 100,
3343                disabled_at INTEGER
3344            );
3345            CREATE TABLE operational_mutations (
3346                id TEXT PRIMARY KEY,
3347                collection_name TEXT NOT NULL,
3348                record_key TEXT NOT NULL,
3349                op_kind TEXT NOT NULL,
3350                payload_json TEXT NOT NULL,
3351                source_ref TEXT,
3352                created_at INTEGER NOT NULL DEFAULT 100,
3353                mutation_order INTEGER NOT NULL DEFAULT 1
3354            );
3355            INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at)
3356            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}', 1, 100);
3357            INSERT INTO operational_mutations
3358                (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order)
3359            VALUES
3360                ('evt-1', 'audit_log', 'evt-1', 'append', '{"actor":"alice","ts":0}', 'src-1', 100, 1);
3361            "#,
3362        )
3363        .expect("seed pre-v10 schema");
3364        drop(conn);
3365
3366        let service = AdminService::new(db.path(), Arc::new(SchemaManager::new()));
3367        let pre_update = service
3368            .read_operational_collection(&crate::operational::OperationalReadRequest {
3369                collection_name: "audit_log".to_owned(),
3370                filters: vec![crate::operational::OperationalFilterClause::Exact {
3371                    field: "actor".to_owned(),
3372                    value: crate::operational::OperationalFilterValue::String("alice".to_owned()),
3373                }],
3374                limit: Some(10),
3375            })
3376            .expect_err("read should reject undeclared fields before migration update");
3377        assert!(pre_update.to_string().contains("undeclared"));
3378
3379        let updated = service
3380            .update_operational_collection_filters(
3381                "audit_log",
3382                r#"[{"name":"actor","type":"string","modes":["exact"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#,
3383            )
3384            .expect("update filter contract");
3385        assert!(updated.filter_fields_json.contains("\"actor\""));
3386
3387        let report = service
3388            .read_operational_collection(&crate::operational::OperationalReadRequest {
3389                collection_name: "audit_log".to_owned(),
3390                filters: vec![crate::operational::OperationalFilterClause::Range {
3391                    field: "ts".to_owned(),
3392                    lower: Some(0),
3393                    upper: Some(0),
3394                }],
3395                limit: Some(10),
3396            })
3397            .expect("read after explicit filter update");
3398        assert_eq!(report.row_count, 1);
3399        assert_eq!(report.rows[0].record_key, "evt-1");
3400    }
3401
3402    #[cfg(feature = "sqlite-vec")]
3403    #[test]
3404    fn check_semantics_detects_stale_vec_rows() {
3405        use crate::sqlite::open_connection_with_vec;
3406
3407        let db = NamedTempFile::new().expect("temp file");
3408        let schema = Arc::new(SchemaManager::new());
3409        {
3410            let conn = open_connection_with_vec(db.path()).expect("vec conn");
3411            schema.bootstrap(&conn).expect("bootstrap");
3412            schema
3413                .ensure_vec_kind_profile(&conn, "Doc", 3)
3414                .expect("vec kind profile");
3415            // Insert a vec row whose chunk does not exist.
3416            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
3417                .iter()
3418                .flat_map(|f| f.to_le_bytes())
3419                .collect();
3420            conn.execute(
3421                "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('ghost-chunk', ?1)",
3422                rusqlite::params![bytes],
3423            )
3424            .expect("insert stale vec row");
3425        }
3426        let service = AdminService::new(db.path(), Arc::clone(&schema));
3427        let report = service.check_semantics().expect("semantics check");
3428        assert_eq!(report.stale_vec_rows, 1);
3429        assert!(
3430            report.warnings.iter().any(|w| w.contains("stale vec")),
3431            "warning must mention stale vec"
3432        );
3433    }
3434
3435    #[cfg(feature = "sqlite-vec")]
3436    #[test]
3437    fn restore_vector_profiles_recreates_vec_table_from_metadata() {
3438        let db = NamedTempFile::new().expect("temp file");
3439        let schema = Arc::new(SchemaManager::new());
3440        {
3441            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3442            schema.bootstrap(&conn).expect("bootstrap");
3443            conn.execute(
3444                "INSERT INTO vector_profiles (profile, table_name, dimension, enabled) \
3445                 VALUES ('default', 'vec_nodes_active', 3, 1)",
3446                [],
3447            )
3448            .expect("insert vector profile");
3449        }
3450
3451        let service = AdminService::new(db.path(), Arc::clone(&schema));
3452        let report = service
3453            .restore_vector_profiles()
3454            .expect("restore vector profiles");
3455        assert_eq!(
3456            report.targets,
3457            vec![crate::projection::ProjectionTarget::Vec]
3458        );
3459        assert_eq!(report.rebuilt_rows, 1);
3460
3461        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3462        let count: i64 = conn
3463            .query_row(
3464                "SELECT count(*) FROM sqlite_schema WHERE name = 'vec_nodes_active'",
3465                [],
3466                |row| row.get(0),
3467            )
3468            .expect("vec schema count");
3469        assert_eq!(count, 1, "vec table should exist after restore");
3470    }
3471
3472    #[cfg(feature = "sqlite-vec")]
3473    #[test]
3474    fn load_vector_regeneration_config_supports_json_and_toml() {
3475        let dir = tempfile::tempdir().expect("temp dir");
3476        let json_path = dir.path().join("regen.json");
3477        let toml_path = dir.path().join("regen.toml");
3478
3479        let config = VectorRegenerationConfig {
3480            kind: "Document".to_owned(),
3481            profile: "default".to_owned(),
3482            chunking_policy: "per_chunk".to_owned(),
3483            preprocessing_policy: "trim".to_owned(),
3484        };
3485
3486        fs::write(&json_path, serde_json::to_string(&config).expect("json")).expect("write json");
3487        fs::write(&toml_path, toml::to_string(&config).expect("toml")).expect("write toml");
3488
3489        let parsed_json = load_vector_regeneration_config(&json_path).expect("json parse");
3490        let parsed_toml = load_vector_regeneration_config(&toml_path).expect("toml parse");
3491
3492        assert_eq!(parsed_json, config);
3493        assert_eq!(parsed_toml, config);
3494    }
3495
3496    /// The 0.4.0 rewrite removed the identity fields from the config.
3497    /// Any client that still serializes the pre-0.4 fields must be
3498    /// rejected AT THE SERDE BOUNDARY with a clear error — never
3499    /// silently accepted.
3500    #[test]
3501    fn regenerate_vector_embeddings_config_rejects_old_identity_fields() {
3502        // Pre-0.5.0 configs that include old fields (table_name, model_identity, etc.)
3503        // must be rejected at the serde boundary due to deny_unknown_fields.
3504        let legacy_json = r#"{
3505            "kind": "Document",
3506            "profile": "default",
3507            "table_name": "vec_nodes_active",
3508            "model_identity": "old-model",
3509            "model_version": "1.0",
3510            "dimension": 4,
3511            "normalization_policy": "l2",
3512            "chunking_policy": "per_chunk",
3513            "preprocessing_policy": "trim",
3514            "generator_command": ["/bin/echo"]
3515        }"#;
3516        let result: Result<VectorRegenerationConfig, _> = serde_json::from_str(legacy_json);
3517        assert!(
3518            result.is_err(),
3519            "legacy identity fields must be rejected at deserialization"
3520        );
3521    }
3522
3523    #[cfg(all(not(feature = "sqlite-vec"), unix))]
3524    #[test]
3525    fn regenerate_vector_embeddings_unsupported_vec_capability_writes_request_and_failed_audit() {
3526        let db = NamedTempFile::new().expect("temp file");
3527        let schema = Arc::new(SchemaManager::new());
3528
3529        {
3530            let conn = sqlite::open_connection(db.path()).expect("connection");
3531            schema.bootstrap(&conn).expect("bootstrap");
3532            conn.execute(
3533                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3534                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3535                [],
3536            )
3537            .expect("insert node");
3538            conn.execute(
3539                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3540                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3541                [],
3542            )
3543            .expect("insert chunk");
3544        }
3545
3546        let service = AdminService::new(db.path(), Arc::clone(&schema));
3547        let embedder = TestEmbedder::new("test-model", 4);
3548        let error = service
3549            .regenerate_vector_embeddings(
3550                &embedder,
3551                &VectorRegenerationConfig {
3552                    kind: "Document".to_owned(),
3553                    profile: "default".to_owned(),
3554                    chunking_policy: "per_chunk".to_owned(),
3555                    preprocessing_policy: "trim".to_owned(),
3556                },
3557            )
3558            .expect_err("sqlite-vec capability should be required");
3559
3560        assert!(error.to_string().contains("unsupported vec capability"));
3561
3562        let conn = sqlite::open_connection(db.path()).expect("connection");
3563        let request_count: i64 = conn
3564            .query_row(
3565                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3566                [],
3567                |row| row.get(0),
3568            )
3569            .expect("request count");
3570        assert_eq!(request_count, 1);
3571        let failed_count: i64 = conn
3572            .query_row(
3573                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3574                [],
3575                |row| row.get(0),
3576            )
3577            .expect("failed count");
3578        assert_eq!(failed_count, 1);
3579        let metadata_json: String = conn
3580            .query_row(
3581                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3582                [],
3583                |row| row.get(0),
3584            )
3585            .expect("failed metadata");
3586        assert!(metadata_json.contains("\"failure_class\":\"unsupported vec capability\""));
3587    }
3588
3589    #[cfg(feature = "sqlite-vec")]
3590    #[test]
3591    #[allow(clippy::too_many_lines)]
3592    fn regenerate_vector_embeddings_rebuilds_embeddings_via_embedder() {
3593        let db = NamedTempFile::new().expect("temp file");
3594        let schema = Arc::new(SchemaManager::new());
3595
3596        {
3597            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3598            schema.bootstrap(&conn).expect("bootstrap");
3599            conn.execute(
3600                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3601                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3602                [],
3603            )
3604            .expect("insert node");
3605            conn.execute(
3606                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3607                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3608                [],
3609            )
3610            .expect("insert chunk 1");
3611            conn.execute(
3612                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3613                 VALUES ('chunk-2', 'doc-1', 'travel plan', 101)",
3614                [],
3615            )
3616            .expect("insert chunk 2");
3617        }
3618
3619        let service = AdminService::new(db.path(), Arc::clone(&schema));
3620        let embedder = TestEmbedder::new("test-model", 4);
3621        let report = service
3622            .regenerate_vector_embeddings(
3623                &embedder,
3624                &VectorRegenerationConfig {
3625                    kind: "Document".to_owned(),
3626                    profile: "default".to_owned(),
3627                    chunking_policy: "per_chunk".to_owned(),
3628                    preprocessing_policy: "trim".to_owned(),
3629                },
3630            )
3631            .expect("regenerate vectors");
3632
3633        assert_eq!(report.profile, "default");
3634        assert_eq!(report.table_name, "vec_document");
3635        assert_eq!(report.dimension, 4);
3636        assert_eq!(report.total_chunks, 2);
3637        assert_eq!(report.regenerated_rows, 2);
3638        assert!(report.contract_persisted);
3639
3640        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3641        let vec_count: i64 = conn
3642            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3643            .expect("vec count");
3644        assert_eq!(vec_count, 2);
3645
3646        // The persisted vector contract must reflect the embedder
3647        // identity — not any string the caller passed in, because the
3648        // caller never passes one.
3649        let (model_identity, model_version, dimension, normalization_policy): (
3650            String,
3651            String,
3652            i64,
3653            String,
3654        ) = conn
3655            .query_row(
3656                "SELECT model_identity, model_version, dimension, normalization_policy \
3657                 FROM vector_embedding_contracts WHERE profile = 'default'",
3658                [],
3659                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3660            )
3661            .expect("contract row");
3662        assert_eq!(model_identity, "test-model");
3663        assert_eq!(model_version, "1.0.0");
3664        assert_eq!(dimension, 4);
3665        assert_eq!(normalization_policy, "l2");
3666
3667        let contract_format_version: i64 = conn
3668            .query_row(
3669                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
3670                [],
3671                |row| row.get(0),
3672            )
3673            .expect("contract_format_version");
3674        assert_eq!(contract_format_version, 1);
3675        let request_count: i64 = conn
3676            .query_row(
3677                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3678                [],
3679                |row| row.get(0),
3680            )
3681            .expect("request audit count");
3682        assert_eq!(request_count, 1);
3683        let apply_count: i64 = conn
3684            .query_row(
3685                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3686                [],
3687                |row| row.get(0),
3688            )
3689            .expect("apply audit count");
3690        assert_eq!(apply_count, 1);
3691        let apply_metadata: String = conn
3692            .query_row(
3693                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3694                [],
3695                |row| row.get(0),
3696            )
3697            .expect("apply metadata");
3698        assert!(apply_metadata.contains("\"profile\":\"default\""));
3699        assert!(apply_metadata.contains("\"snapshot_hash\":"));
3700        assert!(apply_metadata.contains("\"model_identity\":\"test-model\""));
3701    }
3702
3703    #[cfg(feature = "sqlite-vec")]
3704    #[test]
3705    #[allow(clippy::too_many_lines)]
3706    fn regenerate_vector_embeddings_embedder_failure_leaves_contract_and_vec_rows_unchanged() {
3707        let db = NamedTempFile::new().expect("temp file");
3708        let schema = Arc::new(SchemaManager::new());
3709
3710        {
3711            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3712            schema.bootstrap(&conn).expect("bootstrap");
3713            conn.execute(
3714                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3715                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3716                [],
3717            )
3718            .expect("insert node");
3719            conn.execute(
3720                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3721                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3722                [],
3723            )
3724            .expect("insert chunk");
3725            schema
3726                .ensure_vec_kind_profile(&conn, "Document", 4)
3727                .expect("ensure vec kind profile");
3728            conn.execute(
3729                r"
3730                INSERT INTO vector_embedding_contracts (
3731                    profile,
3732                    table_name,
3733                    model_identity,
3734                    model_version,
3735                    dimension,
3736                    normalization_policy,
3737                    chunking_policy,
3738                    preprocessing_policy,
3739                    generator_command_json,
3740                    applied_at,
3741                    snapshot_hash
3742                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3743                ",
3744                rusqlite::params![
3745                    "default",
3746                    "vec_document",
3747                    "old-model",
3748                    "0.9.0",
3749                    4,
3750                    "l2",
3751                    "per_chunk",
3752                    "trim",
3753                    "[]",
3754                    111,
3755                    "old-snapshot"
3756                ],
3757            )
3758            .expect("seed contract");
3759            conn.execute(
3760                "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
3761                [],
3762            )
3763            .expect("seed vec row");
3764        }
3765
3766        let service = AdminService::new(db.path(), Arc::clone(&schema));
3767        let failing = FailingEmbedder {
3768            identity: QueryEmbedderIdentity {
3769                model_identity: "new-model".to_owned(),
3770                model_version: "1.0.0".to_owned(),
3771                dimension: 4,
3772                normalization_policy: "l2".to_owned(),
3773            },
3774        };
3775        let error = service
3776            .regenerate_vector_embeddings(
3777                &failing,
3778                &VectorRegenerationConfig {
3779                    kind: "Document".to_owned(),
3780                    profile: "default".to_owned(),
3781                    chunking_policy: "per_chunk".to_owned(),
3782                    preprocessing_policy: "trim".to_owned(),
3783                },
3784            )
3785            .expect_err("embedder should fail");
3786
3787        assert!(error.to_string().contains("embedder failure"));
3788
3789        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3790        let model_identity: String = conn
3791            .query_row(
3792                "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
3793                [],
3794                |row| row.get(0),
3795            )
3796            .expect("model identity");
3797        assert_eq!(model_identity, "old-model");
3798        let snapshot_hash: String = conn
3799            .query_row(
3800                "SELECT snapshot_hash FROM vector_embedding_contracts WHERE profile = 'default'",
3801                [],
3802                |row| row.get(0),
3803            )
3804            .expect("snapshot hash");
3805        assert_eq!(snapshot_hash, "old-snapshot");
3806        let vec_count: i64 = conn
3807            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3808            .expect("vec count");
3809        assert_eq!(vec_count, 1);
3810        let failure_count: i64 = conn
3811            .query_row(
3812                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3813                [],
3814                |row| row.get(0),
3815            )
3816            .expect("failure count");
3817        assert_eq!(failure_count, 1);
3818        let failure_metadata: String = conn
3819            .query_row(
3820                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3821                [],
3822                |row| row.get(0),
3823            )
3824            .expect("failure metadata");
3825        assert!(failure_metadata.contains("\"failure_class\":\"embedder failure\""));
3826    }
3827
3828    // Subprocess generator tests (snapshot-drift-via-concurrent-writer,
3829    // timeout, stdout/stderr overflow, oversized input, excessive chunk
3830    // count, malformed JSON, world-writable executable, disallowed
3831    // executable root, environment preservation) were removed in 0.4.0
3832    // along with the subprocess generator pattern itself. The failure
3833    // modes they exercised belong to the deleted
3834    // `run_vector_generator_bounded` pipeline and have no equivalent in
3835    // the direct-embedder path. See
3836    // `.claude/memory/project_vector_identity_invariant.md`.
3837
3838    #[cfg(feature = "sqlite-vec")]
3839    #[test]
3840    fn regenerate_vector_embeddings_rejects_whitespace_only_profile_before_mutation() {
3841        let db = NamedTempFile::new().expect("temp file");
3842        let schema = Arc::new(SchemaManager::new());
3843        {
3844            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3845            schema.bootstrap(&conn).expect("bootstrap");
3846            conn.execute(
3847                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3848                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3849                [],
3850            )
3851            .expect("insert node");
3852            conn.execute(
3853                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3854                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3855                [],
3856            )
3857            .expect("insert chunk");
3858        }
3859
3860        let service = AdminService::new(db.path(), Arc::clone(&schema));
3861        let embedder = TestEmbedder::new("test-model", 4);
3862        let error = service
3863            .regenerate_vector_embeddings(
3864                &embedder,
3865                &VectorRegenerationConfig {
3866                    kind: "Document".to_owned(),
3867                    profile: "   ".to_owned(),
3868                    chunking_policy: "per_chunk".to_owned(),
3869                    preprocessing_policy: "trim".to_owned(),
3870                },
3871            )
3872            .expect_err("whitespace profile should be rejected");
3873
3874        assert!(error.to_string().contains("invalid contract"));
3875        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3876        let contract_count: i64 = conn
3877            .query_row(
3878                "SELECT count(*) FROM vector_embedding_contracts",
3879                [],
3880                |row| row.get(0),
3881            )
3882            .expect("contract count");
3883        assert_eq!(contract_count, 0);
3884        let provenance_count: i64 = conn
3885            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
3886                row.get(0)
3887            })
3888            .expect("provenance count");
3889        assert_eq!(provenance_count, 0);
3890    }
3891
3892    #[cfg(feature = "sqlite-vec")]
3893    #[test]
3894    fn regenerate_vector_embeddings_rejects_future_contract_format_version() {
3895        let db = NamedTempFile::new().expect("temp file");
3896        let schema = Arc::new(SchemaManager::new());
3897        {
3898            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3899            schema.bootstrap(&conn).expect("bootstrap");
3900            conn.execute(
3901                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3902                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3903                [],
3904            )
3905            .expect("insert node");
3906            conn.execute(
3907                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3908                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3909                [],
3910            )
3911            .expect("insert chunk");
3912            conn.execute(
3913                r"
3914                INSERT INTO vector_embedding_contracts (
3915                    profile,
3916                    table_name,
3917                    model_identity,
3918                    model_version,
3919                    dimension,
3920                    normalization_policy,
3921                    chunking_policy,
3922                    preprocessing_policy,
3923                    generator_command_json,
3924                    applied_at,
3925                    snapshot_hash,
3926                    contract_format_version,
3927                    updated_at
3928                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
3929                ",
3930                rusqlite::params![
3931                    "default",
3932                    "vec_nodes_active",
3933                    "old-model",
3934                    "0.9.0",
3935                    4,
3936                    "l2",
3937                    "per_chunk",
3938                    "trim",
3939                    "[]",
3940                    111,
3941                    "old-snapshot",
3942                    99,
3943                    111,
3944                ],
3945            )
3946            .expect("seed future contract");
3947        }
3948
3949        let service = AdminService::new(db.path(), Arc::clone(&schema));
3950        let embedder = TestEmbedder::new("test-model", 4);
3951        let error = service
3952            .regenerate_vector_embeddings(
3953                &embedder,
3954                &VectorRegenerationConfig {
3955                    kind: "Document".to_owned(),
3956                    profile: "default".to_owned(),
3957                    chunking_policy: "per_chunk".to_owned(),
3958                    preprocessing_policy: "trim".to_owned(),
3959                },
3960            )
3961            .expect_err("future contract version should be rejected");
3962
3963        assert!(error.to_string().contains("unsupported"));
3964        assert!(error.to_string().contains("format version"));
3965    }
3966
3967    #[test]
3968    fn check_semantics_detects_orphaned_chunk() {
3969        let (db, service) = setup();
3970        {
3971            // Open without FK enforcement to insert chunk with no active node.
3972            let conn = sqlite::open_connection(db.path()).expect("conn");
3973            conn.execute(
3974                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3975                 VALUES ('c1', 'ghost-node', 'text', 100)",
3976                [],
3977            )
3978            .expect("insert orphaned chunk");
3979        }
3980        let report = service.check_semantics().expect("semantics check");
3981        assert_eq!(report.orphaned_chunks, 1);
3982    }
3983
3984    #[test]
3985    fn check_semantics_detects_null_source_ref() {
3986        let (db, service) = setup();
3987        {
3988            let conn = sqlite::open_connection(db.path()).expect("conn");
3989            conn.execute(
3990                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
3991                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100)",
3992                [],
3993            )
3994            .expect("insert node with null source_ref");
3995        }
3996        let report = service.check_semantics().expect("semantics check");
3997        assert_eq!(report.null_source_ref_nodes, 1);
3998    }
3999
4000    #[test]
4001    fn check_semantics_detects_broken_step_fk() {
4002        let (db, service) = setup();
4003        {
4004            // Explicitly disable FK enforcement for this connection so we can insert
4005            // an orphaned step (ghost run_id) to simulate a partial-write failure.
4006            let conn = sqlite::open_connection(db.path()).expect("conn");
4007            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4008                .expect("disable FK");
4009            conn.execute(
4010                "INSERT INTO steps (id, run_id, kind, status, properties, created_at) \
4011                 VALUES ('s1', 'ghost-run', 'llm', 'completed', '{}', 100)",
4012                [],
4013            )
4014            .expect("insert step with ghost run_id");
4015        }
4016        let report = service.check_semantics().expect("semantics check");
4017        assert_eq!(report.broken_step_fk, 1);
4018    }
4019
4020    #[test]
4021    fn check_semantics_detects_broken_action_fk() {
4022        let (db, service) = setup();
4023        {
4024            let conn = sqlite::open_connection(db.path()).expect("conn");
4025            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4026                .expect("disable FK");
4027            conn.execute(
4028                "INSERT INTO actions (id, step_id, kind, status, properties, created_at) \
4029                 VALUES ('a1', 'ghost-step', 'emit', 'completed', '{}', 100)",
4030                [],
4031            )
4032            .expect("insert action with ghost step_id");
4033        }
4034        let report = service.check_semantics().expect("semantics check");
4035        assert_eq!(report.broken_action_fk, 1);
4036    }
4037
4038    #[test]
4039    fn check_semantics_detects_stale_fts_rows() {
4040        let (db, service) = setup();
4041        {
4042            let conn = sqlite::open_connection(db.path()).expect("conn");
4043            // FTS virtual tables have no FK constraints; insert a row referencing
4044            // a chunk_id that does not exist in the chunks table.
4045            conn.execute(
4046                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4047                 VALUES ('ghost-chunk', 'any-node', 'Meeting', 'stale content')",
4048                [],
4049            )
4050            .expect("insert stale FTS row");
4051        }
4052        let report = service.check_semantics().expect("semantics check");
4053        assert_eq!(report.stale_fts_rows, 1);
4054    }
4055
4056    #[test]
4057    fn check_semantics_detects_fts_rows_for_superseded_nodes() {
4058        let (db, service) = setup();
4059        {
4060            let conn = sqlite::open_connection(db.path()).expect("conn");
4061            // Insert a node that has been fully superseded (superseded_at IS NOT NULL).
4062            conn.execute(
4063                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4064                 VALUES ('r1', 'lg-sup', 'Meeting', '{}', 100, 200, 'src-1')",
4065                [],
4066            )
4067            .expect("insert superseded node");
4068            // Insert an FTS row for the superseded node's logical_id.
4069            conn.execute(
4070                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4071                 VALUES ('ck-x', 'lg-sup', 'Meeting', 'superseded content')",
4072                [],
4073            )
4074            .expect("insert FTS row for superseded node");
4075        }
4076        let report = service.check_semantics().expect("semantics check");
4077        assert_eq!(report.fts_rows_for_superseded_nodes, 1);
4078    }
4079
4080    #[test]
4081    fn check_semantics_detects_dangling_edges() {
4082        let (db, service) = setup();
4083        {
4084            let conn = sqlite::open_connection(db.path()).expect("conn");
4085            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4086                .expect("disable FK");
4087            // One active node as source; target does not exist — edge is dangling.
4088            conn.execute(
4089                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4090                 VALUES ('r1', 'lg-src', 'Meeting', '{}', 100, 'src-1')",
4091                [],
4092            )
4093            .expect("insert source node");
4094            conn.execute(
4095                "INSERT INTO edges \
4096                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4097                 VALUES ('e1', 'edge-1', 'lg-src', 'ghost-target', 'LINKS', '{}', 100, 'src-1')",
4098                [],
4099            )
4100            .expect("insert dangling edge");
4101        }
4102        let report = service.check_semantics().expect("semantics check");
4103        assert_eq!(report.dangling_edges, 1);
4104    }
4105
4106    #[test]
4107    fn check_semantics_detects_orphaned_supersession_chains() {
4108        let (db, service) = setup();
4109        {
4110            let conn = sqlite::open_connection(db.path()).expect("conn");
4111            // Every version of this logical_id is superseded — no active row remains.
4112            conn.execute(
4113                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4114                 VALUES ('r1', 'lg-orphaned', 'Meeting', '{}', 100, 200, 'src-1')",
4115                [],
4116            )
4117            .expect("insert fully superseded node");
4118        }
4119        let report = service.check_semantics().expect("semantics check");
4120        assert_eq!(report.orphaned_supersession_chains, 1);
4121    }
4122
4123    #[test]
4124    fn check_semantics_detects_mismatched_kind_property_fts_rows() {
4125        // With per-kind tables, mismatched_kind is always 0 — rows in fts_props_<kind>
4126        // must belong to that kind by construction. However, orphaned rows (per-kind table
4127        // with no registered schema) serve as the equivalent signal and are tested via
4128        // check_semantics_detects_fts_rows_for_superseded_nodes. This test verifies
4129        // mismatched_kind is 0 even when per-kind table rows exist for a node.
4130        let (db, service) = setup();
4131        {
4132            let conn = sqlite::open_connection(db.path()).expect("conn");
4133            conn.execute(
4134                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4135                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4136                [],
4137            )
4138            .expect("register schema");
4139            conn.execute(
4140                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4141                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4142                [],
4143            )
4144            .expect("insert node");
4145            // Create the per-kind table and insert a correctly-kind row.
4146            let table = fathomdb_schema::fts_kind_table_name("Goal");
4147            conn.execute_batch(&format!(
4148                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4149                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4150            ))
4151            .expect("create per-kind table");
4152            conn.execute(
4153                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4154                [],
4155            )
4156            .expect("insert per-kind FTS row");
4157        }
4158        let report = service.check_semantics().expect("semantics check");
4159        // Per-kind tables make mismatched_kind impossible — always 0.
4160        assert_eq!(report.mismatched_kind_property_fts_rows, 0);
4161    }
4162
4163    #[test]
4164    fn check_semantics_detects_duplicate_property_fts_rows() {
4165        let (db, service) = setup();
4166        {
4167            let conn = sqlite::open_connection(db.path()).expect("conn");
4168            conn.execute(
4169                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4170                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4171                [],
4172            )
4173            .expect("insert node");
4174            // Create the per-kind table and insert two rows for the same logical ID.
4175            let table = fathomdb_schema::fts_kind_table_name("Goal");
4176            conn.execute_batch(&format!(
4177                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4178                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4179            ))
4180            .expect("create per-kind table");
4181            conn.execute(
4182                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4183                [],
4184            )
4185            .expect("insert first property FTS row");
4186            conn.execute(
4187                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2 duplicate')"),
4188                [],
4189            )
4190            .expect("insert duplicate property FTS row");
4191        }
4192        let report = service.check_semantics().expect("semantics check");
4193        assert_eq!(report.duplicate_property_fts_rows, 1);
4194    }
4195
4196    #[test]
4197    fn check_semantics_detects_drifted_property_fts_text() {
4198        let (db, service) = setup();
4199        {
4200            let conn = sqlite::open_connection(db.path()).expect("conn");
4201            conn.execute(
4202                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4203                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4204                [],
4205            )
4206            .expect("register schema");
4207            conn.execute(
4208                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4209                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Current name\"}', 100, 'src-1')",
4210                [],
4211            )
4212            .expect("insert node");
4213            // Create per-kind table and insert a row with outdated text content.
4214            let table = fathomdb_schema::fts_kind_table_name("Goal");
4215            conn.execute_batch(&format!(
4216                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4217                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4218            ))
4219            .expect("create per-kind table");
4220            conn.execute(
4221                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Old stale name')"),
4222                [],
4223            )
4224            .expect("insert stale property FTS row");
4225        }
4226        let report = service.check_semantics().expect("semantics check");
4227        assert_eq!(report.drifted_property_fts_rows, 1);
4228    }
4229
4230    #[test]
4231    fn check_semantics_detects_property_fts_row_that_should_not_exist() {
4232        let (db, service) = setup();
4233        {
4234            let conn = sqlite::open_connection(db.path()).expect("conn");
4235            conn.execute(
4236                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4237                 VALUES ('Goal', '[\"$.searchable\"]', ' ')",
4238                [],
4239            )
4240            .expect("register schema");
4241            // Node does NOT have $.searchable — extraction yields no value.
4242            conn.execute(
4243                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4244                 VALUES ('r1', 'goal-1', 'Goal', '{\"other\":\"field\"}', 100, 'src-1')",
4245                [],
4246            )
4247            .expect("insert node");
4248            // Create per-kind table and insert a phantom row that should not exist.
4249            let table = fathomdb_schema::fts_kind_table_name("Goal");
4250            conn.execute_batch(&format!(
4251                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4252                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4253            ))
4254            .expect("create per-kind table");
4255            conn.execute(
4256                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'phantom text')"),
4257                [],
4258            )
4259            .expect("insert phantom property FTS row");
4260        }
4261        let report = service.check_semantics().expect("semantics check");
4262        assert_eq!(
4263            report.drifted_property_fts_rows, 1,
4264            "row that should not exist must be counted as drifted"
4265        );
4266    }
4267
4268    #[test]
4269    fn safe_export_writes_manifest_with_sha256() {
4270        let (_db, service) = setup();
4271        let export_dir = tempfile::TempDir::new().expect("temp dir");
4272        let export_path = export_dir.path().join("backup.db");
4273
4274        let manifest = service
4275            .safe_export(
4276                &export_path,
4277                SafeExportOptions {
4278                    force_checkpoint: false,
4279                },
4280            )
4281            .expect("export");
4282
4283        assert!(export_path.exists(), "exported db should exist");
4284        let manifest_path = export_dir.path().join("backup.db.export-manifest.json");
4285        assert!(
4286            manifest_path.exists(),
4287            "manifest file should exist at {}",
4288            manifest_path.display()
4289        );
4290        assert_eq!(manifest.sha256.len(), 64, "sha256 should be 64 hex chars");
4291        assert!(
4292            manifest.exported_at > 0,
4293            "exported_at should be a unix timestamp"
4294        );
4295        assert_eq!(
4296            manifest.schema_version,
4297            SchemaManager::new().current_version().0,
4298            "schema_version should match the live schema version"
4299        );
4300        assert_eq!(manifest.protocol_version, 1, "protocol_version should be 1");
4301        assert!(manifest.page_count > 0, "page_count should be positive");
4302    }
4303
4304    #[test]
4305    fn safe_export_preserves_operational_validation_contracts() {
4306        let (_db, service) = setup();
4307        let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
4308        service
4309            .register_operational_collection(&OperationalRegisterRequest {
4310                name: "connector_health".to_owned(),
4311                kind: OperationalCollectionKind::LatestState,
4312                schema_json: "{}".to_owned(),
4313                retention_json: "{}".to_owned(),
4314                filter_fields_json: "[]".to_owned(),
4315                validation_json: validation_json.to_owned(),
4316                secondary_indexes_json: "[]".to_owned(),
4317                format_version: 1,
4318            })
4319            .expect("register collection");
4320
4321        let export_dir = tempfile::TempDir::new().expect("temp dir");
4322        let export_path = export_dir.path().join("backup.db");
4323        service
4324            .safe_export(
4325                &export_path,
4326                SafeExportOptions {
4327                    force_checkpoint: false,
4328                },
4329            )
4330            .expect("export");
4331
4332        let exported = sqlite::open_connection(&export_path).expect("exported conn");
4333        let exported_validation_json: String = exported
4334            .query_row(
4335                "SELECT validation_json FROM operational_collections WHERE name = 'connector_health'",
4336                [],
4337                |row| row.get(0),
4338            )
4339            .expect("validation_json");
4340        assert_eq!(exported_validation_json, validation_json);
4341    }
4342
4343    #[test]
4344    fn safe_export_force_checkpoint_false_skips_wal_pragma() {
4345        let (_db, service) = setup();
4346        let export_dir = tempfile::TempDir::new().expect("temp dir");
4347        let export_path = export_dir.path().join("no-wal.db");
4348
4349        // force_checkpoint: false must not error even on a non-WAL database
4350        let manifest = service
4351            .safe_export(
4352                &export_path,
4353                SafeExportOptions {
4354                    force_checkpoint: false,
4355                },
4356            )
4357            .expect("export with no checkpoint");
4358
4359        assert!(
4360            manifest.page_count > 0,
4361            "page_count must be populated regardless of checkpoint mode"
4362        );
4363        assert_eq!(
4364            manifest.schema_version,
4365            SchemaManager::new().current_version().0
4366        );
4367        assert_eq!(manifest.protocol_version, 1);
4368    }
4369
4370    #[test]
4371    fn safe_export_force_checkpoint_false_still_captures_wal_backed_changes() {
4372        let (db, service) = setup();
4373        let conn = sqlite::open_connection(db.path()).expect("conn");
4374        let journal_mode: String = conn
4375            .query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
4376            .expect("enable wal");
4377        assert_eq!(journal_mode.to_lowercase(), "wal");
4378        let auto_checkpoint_pages: i64 = conn
4379            .query_row("PRAGMA wal_autocheckpoint=0", [], |row| row.get(0))
4380            .expect("disable auto checkpoint");
4381        assert_eq!(auto_checkpoint_pages, 0);
4382        conn.execute(
4383            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4384             VALUES ('r-wal', 'lg-wal', 'Meeting', '{}', 100, 'src-wal')",
4385            [],
4386        )
4387        .expect("insert wal-backed node");
4388
4389        let export_dir = tempfile::TempDir::new().expect("temp dir");
4390        let export_path = export_dir.path().join("wal-backed.db");
4391        service
4392            .safe_export(
4393                &export_path,
4394                SafeExportOptions {
4395                    force_checkpoint: false,
4396                },
4397            )
4398            .expect("export wal-backed db");
4399
4400        let exported = sqlite::open_connection(&export_path).expect("open exported db");
4401        let exported_count: i64 = exported
4402            .query_row(
4403                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-wal'",
4404                [],
4405                |row| row.get(0),
4406            )
4407            .expect("count exported nodes");
4408        assert_eq!(
4409            exported_count, 1,
4410            "safe_export must include committed rows that are still resident in the WAL"
4411        );
4412    }
4413
4414    #[test]
4415    fn excise_source_removes_searchable_content_after_excision() {
4416        let (db, service) = setup();
4417        {
4418            let conn = sqlite::open_connection(db.path()).expect("conn");
4419            conn.execute(
4420                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4421                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4422                [],
4423            )
4424            .expect("insert v1");
4425            conn.execute(
4426                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4427                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4428                [],
4429            )
4430            .expect("insert v2");
4431            conn.execute(
4432                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4433                 VALUES ('ck1', 'lg1', 'hello world', 100)",
4434                [],
4435            )
4436            .expect("insert chunk");
4437        }
4438        service.excise_source("source-2").expect("excise");
4439        {
4440            let conn = sqlite::open_connection(db.path()).expect("conn");
4441            let fts_count: i64 = conn
4442                .query_row(
4443                    "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'ck1'",
4444                    [],
4445                    |row| row.get(0),
4446                )
4447                .expect("fts count");
4448            assert_eq!(
4449                fts_count, 0,
4450                "excised content should not remain searchable after excise"
4451            );
4452        }
4453    }
4454
4455    #[cfg(feature = "sqlite-vec")]
4456    #[test]
4457    fn excise_source_cleans_chunks_and_vec_rows_for_excised_version() {
4458        let (db, service) = setup();
4459        {
4460            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4461            service
4462                .schema_manager
4463                .ensure_vec_kind_profile(&conn, "Meeting", 4)
4464                .expect("ensure vec kind profile");
4465            conn.execute(
4466                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4467                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4468                [],
4469            )
4470            .expect("insert v1");
4471            conn.execute(
4472                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4473                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4474                [],
4475            )
4476            .expect("insert v2");
4477            conn.execute(
4478                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4479                 VALUES ('ck1', 'lg1', 'new content', 200)",
4480                [],
4481            )
4482            .expect("insert chunk");
4483            conn.execute(
4484                "INSERT INTO vec_meeting (chunk_id, embedding) VALUES ('ck1', zeroblob(16))",
4485                [],
4486            )
4487            .expect("insert vec row");
4488        }
4489
4490        service.excise_source("source-2").expect("excise");
4491
4492        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4493        let active_row: String = conn
4494            .query_row(
4495                "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
4496                [],
4497                |row| row.get(0),
4498            )
4499            .expect("restored active row");
4500        assert_eq!(active_row, "r1");
4501        let chunk_count: i64 = conn
4502            .query_row(
4503                "SELECT count(*) FROM chunks WHERE node_logical_id = 'lg1'",
4504                [],
4505                |row| row.get(0),
4506            )
4507            .expect("chunk count");
4508        assert_eq!(
4509            chunk_count, 0,
4510            "excised source content must not survive as chunks"
4511        );
4512        let vec_count: i64 = conn
4513            .query_row("SELECT count(*) FROM vec_meeting", [], |row| row.get(0))
4514            .expect("vec count");
4515        assert_eq!(vec_count, 0, "excised source vec rows must be removed");
4516        let fts_count: i64 = conn
4517            .query_row(
4518                "SELECT count(*) FROM fts_nodes WHERE node_logical_id = 'lg1'",
4519                [],
4520                |row| row.get(0),
4521            )
4522            .expect("fts count");
4523        assert_eq!(
4524            fts_count, 0,
4525            "excised source content must not remain searchable"
4526        );
4527    }
4528
4529    #[test]
4530    fn export_page_count_matches_exported_file() {
4531        let (_db, service) = setup();
4532        let export_dir = tempfile::TempDir::new().expect("temp dir");
4533        let export_path = export_dir.path().join("page-count.db");
4534
4535        let manifest = service
4536            .safe_export(
4537                &export_path,
4538                SafeExportOptions {
4539                    force_checkpoint: false,
4540                },
4541            )
4542            .expect("export");
4543
4544        let exported = sqlite::open_connection(&export_path).expect("open exported db");
4545        let actual_page_count: u64 = exported
4546            .query_row("PRAGMA page_count", [], |row| row.get(0))
4547            .expect("page_count from exported file");
4548
4549        assert_eq!(
4550            manifest.page_count, actual_page_count,
4551            "manifest page_count must match the exported file's PRAGMA page_count"
4552        );
4553    }
4554
4555    #[test]
4556    fn no_temp_file_after_successful_export() {
4557        let (_db, service) = setup();
4558        let export_dir = tempfile::TempDir::new().expect("temp dir");
4559        let export_path = export_dir.path().join("no-tmp.db");
4560
4561        service
4562            .safe_export(
4563                &export_path,
4564                SafeExportOptions {
4565                    force_checkpoint: false,
4566                },
4567            )
4568            .expect("export");
4569
4570        let tmp_files: Vec<_> = fs::read_dir(export_dir.path())
4571            .expect("read export dir")
4572            .filter_map(Result::ok)
4573            .filter(|e| e.path().extension().is_some_and(|ext| ext == "tmp"))
4574            .collect();
4575
4576        assert!(
4577            tmp_files.is_empty(),
4578            "no .tmp files should remain after a successful export, found: {tmp_files:?}"
4579        );
4580    }
4581
4582    #[test]
4583    fn export_manifest_is_valid_json() {
4584        let (_db, service) = setup();
4585        let export_dir = tempfile::TempDir::new().expect("temp dir");
4586        let export_path = export_dir.path().join("valid-json.db");
4587
4588        service
4589            .safe_export(
4590                &export_path,
4591                SafeExportOptions {
4592                    force_checkpoint: false,
4593                },
4594            )
4595            .expect("export");
4596
4597        let manifest_path = export_dir.path().join("valid-json.db.export-manifest.json");
4598        let manifest_contents = fs::read_to_string(&manifest_path).expect("read manifest");
4599        let parsed: serde_json::Value =
4600            serde_json::from_str(&manifest_contents).expect("manifest must be valid JSON");
4601
4602        assert!(
4603            parsed.get("exported_at").is_some(),
4604            "manifest must contain exported_at"
4605        );
4606        assert!(
4607            parsed.get("sha256").is_some(),
4608            "manifest must contain sha256"
4609        );
4610        assert!(
4611            parsed.get("schema_version").is_some(),
4612            "manifest must contain schema_version"
4613        );
4614        assert!(
4615            parsed.get("protocol_version").is_some(),
4616            "manifest must contain protocol_version"
4617        );
4618        assert!(
4619            parsed.get("page_count").is_some(),
4620            "manifest must contain page_count"
4621        );
4622    }
4623
4624    #[test]
4625    fn provenance_purge_dry_run_reports_counts() {
4626        let (db, service) = setup();
4627        {
4628            let conn = sqlite::open_connection(db.path()).expect("conn");
4629            conn.execute(
4630                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4631                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
4632                [],
4633            )
4634            .expect("insert p1");
4635            conn.execute(
4636                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4637                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
4638                [],
4639            )
4640            .expect("insert p2");
4641            conn.execute(
4642                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4643                 VALUES ('p3', 'excise', 'lg3', 'src-1', 300)",
4644                [],
4645            )
4646            .expect("insert p3");
4647        }
4648
4649        let options = super::ProvenancePurgeOptions {
4650            dry_run: true,
4651            preserve_event_types: Vec::new(),
4652        };
4653        let report = service
4654            .purge_provenance_events(250, &options)
4655            .expect("dry run purge");
4656
4657        assert_eq!(report.events_deleted, 2);
4658        assert_eq!(report.events_preserved, 1);
4659        assert!(report.oldest_remaining.is_some());
4660
4661        let conn = sqlite::open_connection(db.path()).expect("conn");
4662        let total: i64 = conn
4663            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
4664                row.get(0)
4665            })
4666            .expect("count");
4667        assert_eq!(total, 3, "dry_run must not delete any events");
4668    }
4669
4670    #[test]
4671    fn provenance_purge_deletes_old_events() {
4672        let (db, service) = setup();
4673        {
4674            let conn = sqlite::open_connection(db.path()).expect("conn");
4675            conn.execute(
4676                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4677                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
4678                [],
4679            )
4680            .expect("insert p1");
4681            conn.execute(
4682                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4683                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
4684                [],
4685            )
4686            .expect("insert p2");
4687        }
4688
4689        let options = super::ProvenancePurgeOptions {
4690            dry_run: false,
4691            preserve_event_types: Vec::new(),
4692        };
4693        let report = service
4694            .purge_provenance_events(150, &options)
4695            .expect("purge");
4696
4697        assert_eq!(report.events_deleted, 1);
4698        assert_eq!(report.events_preserved, 1);
4699        assert_eq!(report.oldest_remaining, Some(200));
4700
4701        let conn = sqlite::open_connection(db.path()).expect("conn");
4702        let remaining: i64 = conn
4703            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
4704                row.get(0)
4705            })
4706            .expect("count");
4707        assert_eq!(remaining, 1);
4708    }
4709
4710    #[test]
4711    fn provenance_purge_preserves_specified_types() {
4712        let (db, service) = setup();
4713        {
4714            let conn = sqlite::open_connection(db.path()).expect("conn");
4715            conn.execute(
4716                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4717                 VALUES ('p1', 'excise', 'lg1', 'src-1', 100)",
4718                [],
4719            )
4720            .expect("insert p1");
4721            conn.execute(
4722                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4723                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 100)",
4724                [],
4725            )
4726            .expect("insert p2");
4727            conn.execute(
4728                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4729                 VALUES ('p3', 'node_insert', 'lg3', 'src-1', 100)",
4730                [],
4731            )
4732            .expect("insert p3");
4733        }
4734
4735        let options = super::ProvenancePurgeOptions {
4736            dry_run: false,
4737            preserve_event_types: Vec::new(),
4738        };
4739        let report = service
4740            .purge_provenance_events(500, &options)
4741            .expect("purge");
4742
4743        assert_eq!(report.events_deleted, 2);
4744        assert_eq!(report.events_preserved, 1);
4745
4746        let conn = sqlite::open_connection(db.path()).expect("conn");
4747        let remaining_type: String = conn
4748            .query_row("SELECT event_type FROM provenance_events", [], |row| {
4749                row.get(0)
4750            })
4751            .expect("remaining event type");
4752        assert_eq!(remaining_type, "excise");
4753    }
4754
4755    #[test]
4756    fn provenance_purge_noop_with_zero_timestamp() {
4757        let (db, service) = setup();
4758        {
4759            let conn = sqlite::open_connection(db.path()).expect("conn");
4760            conn.execute(
4761                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
4762                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
4763                [],
4764            )
4765            .expect("insert p1");
4766        }
4767
4768        let options = super::ProvenancePurgeOptions {
4769            dry_run: false,
4770            preserve_event_types: Vec::new(),
4771        };
4772        let report = service.purge_provenance_events(0, &options).expect("purge");
4773
4774        assert_eq!(report.events_deleted, 0);
4775        assert_eq!(report.events_preserved, 1);
4776        assert_eq!(report.oldest_remaining, Some(100));
4777    }
4778
4779    #[test]
4780    fn restore_skips_edge_when_counterpart_purged() {
4781        let (db, service) = setup();
4782        {
4783            let conn = sqlite::open_connection(db.path()).expect("conn");
4784            // Create node A (doc-1) and node B (doc-2)
4785            conn.execute(
4786                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4787                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
4788                [],
4789            )
4790            .expect("insert node A");
4791            conn.execute(
4792                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4793                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
4794                [],
4795            )
4796            .expect("insert node B");
4797            // Create edge between A and B
4798            conn.execute(
4799                "INSERT INTO edges \
4800                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4801                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
4802                [],
4803            )
4804            .expect("insert edge");
4805            // Retire both A and B, and the edge
4806            conn.execute(
4807                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4808                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
4809                [],
4810            )
4811            .expect("insert retire event A");
4812            conn.execute(
4813                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4814                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
4815                [],
4816            )
4817            .expect("insert edge retire event");
4818            conn.execute(
4819                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
4820                [],
4821            )
4822            .expect("retire node A");
4823            conn.execute(
4824                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
4825                [],
4826            )
4827            .expect("retire node B");
4828            conn.execute(
4829                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
4830                [],
4831            )
4832            .expect("retire edge");
4833            // Simulate purge of B: delete node rows but leave the edge intact
4834            // to reproduce the dangling-edge scenario the validation guards against.
4835            conn.execute("DELETE FROM nodes WHERE logical_id = 'doc-2'", [])
4836                .expect("purge node B rows");
4837        }
4838
4839        // Restore A — the edge should be skipped because B has no active node
4840        let report = service.restore_logical_id("doc-1").expect("restore A");
4841        assert!(!report.was_noop);
4842        assert_eq!(report.restored_node_rows, 1);
4843        assert_eq!(report.restored_edge_rows, 0, "edge should not be restored");
4844        assert_eq!(report.skipped_edges.len(), 1);
4845        assert_eq!(report.skipped_edges[0].edge_logical_id, "edge-1");
4846        assert_eq!(report.skipped_edges[0].missing_endpoint, "doc-2");
4847
4848        // Verify the edge is still retired in the database
4849        let conn = sqlite::open_connection(db.path()).expect("conn");
4850        let active_edge_count: i64 = conn
4851            .query_row(
4852                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
4853                [],
4854                |row| row.get(0),
4855            )
4856            .expect("active edge count");
4857        assert_eq!(active_edge_count, 0, "edge must remain retired");
4858    }
4859
4860    #[test]
4861    fn restore_restores_edges_to_active_nodes() {
4862        let (db, service) = setup();
4863        {
4864            let conn = sqlite::open_connection(db.path()).expect("conn");
4865            // Create node A and node B (B stays active)
4866            conn.execute(
4867                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4868                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
4869                [],
4870            )
4871            .expect("insert node A");
4872            conn.execute(
4873                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4874                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
4875                [],
4876            )
4877            .expect("insert node B");
4878            // Create edge between A and B
4879            conn.execute(
4880                "INSERT INTO edges \
4881                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4882                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
4883                [],
4884            )
4885            .expect("insert edge");
4886            // Retire only A
4887            conn.execute(
4888                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4889                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
4890                [],
4891            )
4892            .expect("insert retire event A");
4893            conn.execute(
4894                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4895                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
4896                [],
4897            )
4898            .expect("insert edge retire event");
4899            conn.execute(
4900                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
4901                [],
4902            )
4903            .expect("retire node A");
4904            conn.execute(
4905                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
4906                [],
4907            )
4908            .expect("retire edge");
4909        }
4910
4911        // Restore A — B is active, so the edge should be restored normally
4912        let report = service.restore_logical_id("doc-1").expect("restore A");
4913        assert!(!report.was_noop);
4914        assert_eq!(report.restored_node_rows, 1);
4915        assert!(report.restored_edge_rows > 0, "edge should be restored");
4916        assert!(
4917            report.skipped_edges.is_empty(),
4918            "no edges should be skipped"
4919        );
4920
4921        let conn = sqlite::open_connection(db.path()).expect("conn");
4922        let active_edge_count: i64 = conn
4923            .query_row(
4924                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
4925                [],
4926                |row| row.get(0),
4927            )
4928            .expect("active edge count");
4929        assert_eq!(active_edge_count, 1, "edge must be active");
4930    }
4931
4932    #[test]
4933    fn restore_restores_edges_when_both_restored() {
4934        let (db, service) = setup();
4935        {
4936            let conn = sqlite::open_connection(db.path()).expect("conn");
4937            // Create node A and node B
4938            conn.execute(
4939                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4940                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
4941                [],
4942            )
4943            .expect("insert node A");
4944            conn.execute(
4945                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4946                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
4947                [],
4948            )
4949            .expect("insert node B");
4950            // Create edge between A and B
4951            conn.execute(
4952                "INSERT INTO edges \
4953                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4954                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
4955                [],
4956            )
4957            .expect("insert edge");
4958            // Retire both A and B
4959            conn.execute(
4960                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4961                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
4962                [],
4963            )
4964            .expect("insert retire event A");
4965            conn.execute(
4966                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4967                 VALUES ('evt-retire-b', 'node_retire', 'doc-2', 'forget-1', 200, '')",
4968                [],
4969            )
4970            .expect("insert retire event B");
4971            conn.execute(
4972                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
4973                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
4974                [],
4975            )
4976            .expect("insert edge retire event");
4977            conn.execute(
4978                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
4979                [],
4980            )
4981            .expect("retire node A");
4982            conn.execute(
4983                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
4984                [],
4985            )
4986            .expect("retire node B");
4987            conn.execute(
4988                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
4989                [],
4990            )
4991            .expect("retire edge");
4992        }
4993
4994        // Restore B first — edge is skipped because A is still retired
4995        let report_b = service.restore_logical_id("doc-2").expect("restore B");
4996        assert!(!report_b.was_noop);
4997
4998        // Restore A — B is now active, so the edge should be restored
4999        let report_a = service.restore_logical_id("doc-1").expect("restore A");
5000        assert!(!report_a.was_noop);
5001        assert_eq!(report_a.restored_node_rows, 1);
5002        assert!(
5003            report_a.restored_edge_rows > 0,
5004            "edge should be restored when both endpoints active"
5005        );
5006        assert!(
5007            report_a.skipped_edges.is_empty(),
5008            "no edges should be skipped"
5009        );
5010
5011        let conn = sqlite::open_connection(db.path()).expect("conn");
5012        let active_edge_count: i64 = conn
5013            .query_row(
5014                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5015                [],
5016                |row| row.get(0),
5017            )
5018            .expect("active edge count");
5019        assert_eq!(
5020            active_edge_count, 1,
5021            "edge must be active after both endpoints restored"
5022        );
5023    }
5024
5025    // ── FTS property schema end-to-end tests ──────────────────────────
5026
5027    #[test]
5028    fn fts_property_schema_crud_round_trip() {
5029        let (_db, service) = setup();
5030
5031        // Register
5032        let record = service
5033            .register_fts_property_schema(
5034                "Meeting",
5035                &["$.title".to_owned(), "$.summary".to_owned()],
5036                None,
5037            )
5038            .expect("register");
5039        assert_eq!(record.kind, "Meeting");
5040        assert_eq!(record.property_paths, vec!["$.title", "$.summary"]);
5041        assert_eq!(record.separator, " ");
5042        assert_eq!(record.format_version, 1);
5043
5044        // Describe
5045        let described = service
5046            .describe_fts_property_schema("Meeting")
5047            .expect("describe")
5048            .expect("should exist");
5049        assert_eq!(described, record);
5050
5051        // Describe missing kind
5052        let missing = service
5053            .describe_fts_property_schema("NoSuchKind")
5054            .expect("describe missing");
5055        assert!(missing.is_none());
5056
5057        // List
5058        let list = service.list_fts_property_schemas().expect("list");
5059        assert_eq!(list.len(), 1);
5060        assert_eq!(list[0].kind, "Meeting");
5061
5062        // Update (idempotent upsert)
5063        let updated = service
5064            .register_fts_property_schema(
5065                "Meeting",
5066                &["$.title".to_owned(), "$.notes".to_owned()],
5067                Some("\n"),
5068            )
5069            .expect("update");
5070        assert_eq!(updated.property_paths, vec!["$.title", "$.notes"]);
5071        assert_eq!(updated.separator, "\n");
5072
5073        // Remove
5074        service
5075            .remove_fts_property_schema("Meeting")
5076            .expect("remove");
5077        let after_remove = service
5078            .describe_fts_property_schema("Meeting")
5079            .expect("describe after remove");
5080        assert!(after_remove.is_none());
5081
5082        // Remove non-existent is an error
5083        let err = service.remove_fts_property_schema("Meeting");
5084        assert!(err.is_err());
5085    }
5086
5087    #[test]
5088    fn describe_fts_property_schema_round_trips_recursive_entries() {
5089        let (_db, service) = setup();
5090
5091        let entries = vec![
5092            FtsPropertyPathSpec::scalar("$.title"),
5093            FtsPropertyPathSpec::recursive("$.payload"),
5094        ];
5095        let exclude = vec!["$.payload.private".to_owned()];
5096        let registered = service
5097            .register_fts_property_schema_with_entries(
5098                "KnowledgeItem",
5099                &entries,
5100                Some(" "),
5101                &exclude,
5102                crate::rebuild_actor::RebuildMode::Eager,
5103            )
5104            .expect("register recursive");
5105
5106        // The register entry point now echoes back the fully-populated
5107        // record via the same load helper used by describe/list.
5108        assert_eq!(registered.entries, entries);
5109        assert_eq!(registered.exclude_paths, exclude);
5110        assert_eq!(registered.property_paths, vec!["$.title", "$.payload"]);
5111
5112        let described = service
5113            .describe_fts_property_schema("KnowledgeItem")
5114            .expect("describe")
5115            .expect("should exist");
5116        assert_eq!(described.kind, "KnowledgeItem");
5117        assert_eq!(described.entries, entries);
5118        assert_eq!(described.exclude_paths, exclude);
5119        assert_eq!(described.property_paths, vec!["$.title", "$.payload"]);
5120        assert_eq!(described.separator, " ");
5121        assert_eq!(described.format_version, 1);
5122    }
5123
5124    #[test]
5125    fn list_fts_property_schemas_round_trips_recursive_entries() {
5126        let (_db, service) = setup();
5127
5128        let entries = vec![
5129            FtsPropertyPathSpec::scalar("$.title"),
5130            FtsPropertyPathSpec::recursive("$.payload"),
5131        ];
5132        let exclude = vec!["$.payload.secret".to_owned()];
5133        service
5134            .register_fts_property_schema_with_entries(
5135                "KnowledgeItem",
5136                &entries,
5137                Some(" "),
5138                &exclude,
5139                crate::rebuild_actor::RebuildMode::Eager,
5140            )
5141            .expect("register recursive");
5142
5143        let listed = service.list_fts_property_schemas().expect("list");
5144        assert_eq!(listed.len(), 1);
5145        let record = &listed[0];
5146        assert_eq!(record.kind, "KnowledgeItem");
5147        assert_eq!(record.entries, entries);
5148        assert_eq!(record.exclude_paths, exclude);
5149        assert_eq!(record.property_paths, vec!["$.title", "$.payload"]);
5150    }
5151
5152    #[test]
5153    fn describe_fts_property_schema_round_trips_scalar_only_entries() {
5154        let (_db, service) = setup();
5155
5156        service
5157            .register_fts_property_schema(
5158                "Meeting",
5159                &["$.title".to_owned(), "$.summary".to_owned()],
5160                None,
5161            )
5162            .expect("register scalar");
5163
5164        let described = service
5165            .describe_fts_property_schema("Meeting")
5166            .expect("describe")
5167            .expect("should exist");
5168        assert_eq!(described.property_paths, vec!["$.title", "$.summary"]);
5169        assert_eq!(described.entries.len(), 2);
5170        for entry in &described.entries {
5171            assert_eq!(
5172                entry.mode,
5173                FtsPropertyPathMode::Scalar,
5174                "scalar-only schema should deserialize every entry as Scalar"
5175            );
5176        }
5177        assert!(described.exclude_paths.is_empty());
5178    }
5179
5180    #[test]
5181    fn restore_reestablishes_property_fts_visibility() {
5182        let (db, service) = setup();
5183        let doc_table = fathomdb_schema::fts_kind_table_name("Document");
5184        {
5185            let conn = sqlite::open_connection(db.path()).expect("conn");
5186            // Register a property schema for Document kind.
5187            conn.execute(
5188                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5189                 VALUES ('Document', '[\"$.title\", \"$.body\"]', ' ')",
5190                [],
5191            )
5192            .expect("register schema");
5193            // Create the per-kind FTS table.
5194            conn.execute_batch(&format!(
5195                "CREATE VIRTUAL TABLE IF NOT EXISTS {doc_table} USING fts5(\
5196                    node_logical_id UNINDEXED, text_content, \
5197                    tokenize = 'porter unicode61 remove_diacritics 2'\
5198                )"
5199            ))
5200            .expect("create per-kind table");
5201            // Insert an active node with extractable properties.
5202            conn.execute(
5203                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5204                 VALUES ('row-1', 'doc-1', 'Document', '{\"title\":\"Budget\",\"body\":\"Q3 forecast\"}', 100, 'seed')",
5205                [],
5206            )
5207            .expect("insert node");
5208            // Insert a chunk so restore has something to work with for FTS.
5209            conn.execute(
5210                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5211                 VALUES ('chunk-1', 'doc-1', 'budget text', 100)",
5212                [],
5213            )
5214            .expect("insert chunk");
5215            // Insert property FTS row into per-kind table (as write path would).
5216            conn.execute(
5217                &format!(
5218                    "INSERT INTO {doc_table} (node_logical_id, text_content) \
5219                     VALUES ('doc-1', 'Budget Q3 forecast')"
5220                ),
5221                [],
5222            )
5223            .expect("insert property fts");
5224            // Simulate retire: supersede node, clear FTS.
5225            conn.execute(
5226                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5227                 VALUES ('evt-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5228                [],
5229            )
5230            .expect("retire event");
5231            conn.execute(
5232                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5233                [],
5234            )
5235            .expect("supersede");
5236            conn.execute("DELETE FROM fts_nodes", [])
5237                .expect("clear chunk fts");
5238            conn.execute(&format!("DELETE FROM {doc_table}"), [])
5239                .expect("clear property fts");
5240        }
5241
5242        let report = service.restore_logical_id("doc-1").expect("restore");
5243        assert_eq!(report.restored_property_fts_rows, 1);
5244
5245        // Verify the property FTS row was recreated in the per-kind table.
5246        let conn = sqlite::open_connection(db.path()).expect("conn");
5247        let prop_fts_count: i64 = conn
5248            .query_row(
5249                &format!("SELECT count(*) FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5250                [],
5251                |row| row.get(0),
5252            )
5253            .expect("prop fts count");
5254        assert_eq!(prop_fts_count, 1, "property FTS must be restored");
5255
5256        let text: String = conn
5257            .query_row(
5258                &format!("SELECT text_content FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5259                [],
5260                |row| row.get(0),
5261            )
5262            .expect("prop fts text");
5263        assert_eq!(text, "Budget Q3 forecast");
5264    }
5265
5266    #[test]
5267    fn safe_export_preserves_fts_property_schemas() {
5268        let (_db, service) = setup();
5269        service
5270            .register_fts_property_schema(
5271                "Goal",
5272                &["$.name".to_owned(), "$.rationale".to_owned()],
5273                None,
5274            )
5275            .expect("register schema");
5276
5277        let export_dir = tempfile::TempDir::new().expect("temp dir");
5278        let export_path = export_dir.path().join("backup.db");
5279        service
5280            .safe_export(
5281                &export_path,
5282                SafeExportOptions {
5283                    force_checkpoint: false,
5284                },
5285            )
5286            .expect("export");
5287
5288        // Open the exported DB and verify the schema survived.
5289        let exported_conn = rusqlite::Connection::open(&export_path).expect("open exported db");
5290        let kind: String = exported_conn
5291            .query_row(
5292                "SELECT kind FROM fts_property_schemas WHERE kind = 'Goal'",
5293                [],
5294                |row| row.get(0),
5295            )
5296            .expect("schema must exist in export");
5297        assert_eq!(kind, "Goal");
5298        let paths_json: String = exported_conn
5299            .query_row(
5300                "SELECT property_paths_json FROM fts_property_schemas WHERE kind = 'Goal'",
5301                [],
5302                |row| row.get(0),
5303            )
5304            .expect("paths must exist");
5305        let paths: Vec<String> = serde_json::from_str(&paths_json).expect("valid json");
5306        assert_eq!(paths, vec!["$.name", "$.rationale"]);
5307    }
5308
5309    #[test]
5310    #[allow(clippy::too_many_lines)]
5311    fn export_recovery_rebuilds_property_fts_from_canonical_state() {
5312        let (db, service) = setup();
5313        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5314        // Register a schema and insert two nodes with extractable properties.
5315        service
5316            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5317            .expect("register");
5318        {
5319            let conn = sqlite::open_connection(db.path()).expect("conn");
5320            conn.execute(
5321                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5322                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5323                [],
5324            )
5325            .expect("insert node 1");
5326            conn.execute(
5327                &format!(
5328                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5329                     VALUES ('goal-1', 'Ship v2')"
5330                ),
5331                [],
5332            )
5333            .expect("insert property FTS row 1");
5334            conn.execute(
5335                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5336                 VALUES ('row-2', 'goal-2', 'Goal', '{\"name\":\"Launch redesign\"}', 100, 'seed')",
5337                [],
5338            )
5339            .expect("insert node 2");
5340            conn.execute(
5341                &format!(
5342                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5343                     VALUES ('goal-2', 'Launch redesign')"
5344                ),
5345                [],
5346            )
5347            .expect("insert property FTS row 2");
5348        }
5349
5350        // Export.
5351        let export_dir = tempfile::TempDir::new().expect("temp dir");
5352        let export_path = export_dir.path().join("backup.db");
5353        service
5354            .safe_export(
5355                &export_path,
5356                SafeExportOptions {
5357                    force_checkpoint: false,
5358                },
5359            )
5360            .expect("export");
5361
5362        // Corrupt the derived rows: replace correct text with wrong text for
5363        // goal-1, and delete the row for goal-2 entirely. This exercises both
5364        // corrupted-but-present rows and missing rows in the same recovery.
5365        {
5366            let conn = rusqlite::Connection::open(&export_path).expect("open export");
5367            // Bootstrap the exported DB to get per-kind tables.
5368            SchemaManager::new()
5369                .bootstrap(&conn)
5370                .expect("bootstrap export");
5371            conn.execute(
5372                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5373                [],
5374            )
5375            .expect("delete old row");
5376            conn.execute(
5377                &format!(
5378                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5379                     VALUES ('goal-1', 'completely wrong stale text')"
5380                ),
5381                [],
5382            )
5383            .expect("insert corrupted row");
5384            conn.execute(
5385                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5386                [],
5387            )
5388            .expect("delete goal-2 row");
5389        }
5390
5391        // Open the exported DB and rebuild projections from canonical state.
5392        let schema = Arc::new(SchemaManager::new());
5393        let exported_service = AdminService::new(&export_path, Arc::clone(&schema));
5394        exported_service
5395            .rebuild_projections(ProjectionTarget::Fts)
5396            .expect("rebuild");
5397
5398        // Verify the per-kind table has the correct rows after recovery.
5399        let conn = rusqlite::Connection::open(&export_path).expect("open export for verify");
5400        let goal1_text: String = conn
5401            .query_row(
5402                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5403                [],
5404                |r| r.get(0),
5405            )
5406            .expect("goal-1 text after rebuild");
5407        assert_eq!(
5408            goal1_text, "Ship v2",
5409            "goal-1 text must be corrected by rebuild"
5410        );
5411
5412        let goal2_count: i64 = conn
5413            .query_row(
5414                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5415                [],
5416                |r| r.get(0),
5417            )
5418            .expect("goal-2 count");
5419        assert_eq!(goal2_count, 1, "goal-2 row must be restored by rebuild");
5420
5421        let stale_count: i64 = conn
5422            .query_row(
5423                &format!("SELECT count(*) FROM {goal_table} WHERE text_content = 'completely wrong stale text'"),
5424                [],
5425                |r| r.get(0),
5426            )
5427            .expect("stale count");
5428        assert_eq!(stale_count, 0, "corrupted text must be gone after rebuild");
5429
5430        // Verify integrity and semantics are clean after recovery.
5431        let integrity = exported_service.check_integrity().expect("integrity");
5432        assert_eq!(integrity.missing_property_fts_rows, 0);
5433        let semantics = exported_service.check_semantics().expect("semantics");
5434        assert_eq!(semantics.drifted_property_fts_rows, 0);
5435        assert_eq!(semantics.orphaned_property_fts_rows, 0);
5436        assert_eq!(semantics.duplicate_property_fts_rows, 0);
5437    }
5438
5439    #[test]
5440    fn check_integrity_no_false_positives_for_empty_extraction() {
5441        let (db, service) = setup();
5442        {
5443            let conn = sqlite::open_connection(db.path()).expect("conn");
5444            // Register a schema that looks for $.searchable
5445            conn.execute(
5446                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5447                 VALUES ('Ticket', '[\"$.searchable\"]', ' ')",
5448                [],
5449            )
5450            .expect("register schema");
5451            // Insert a node whose properties do NOT contain $.searchable —
5452            // correctly has no property FTS row.
5453            conn.execute(
5454                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5455                 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"status\":\"open\"}', 100, 'seed')",
5456                [],
5457            )
5458            .expect("insert node");
5459        }
5460
5461        let report = service.check_integrity().expect("integrity");
5462        assert_eq!(
5463            report.missing_property_fts_rows, 0,
5464            "node with no extractable values must not be counted as missing"
5465        );
5466    }
5467
5468    #[test]
5469    fn check_integrity_detects_genuinely_missing_property_fts_rows() {
5470        let (db, service) = setup();
5471        {
5472            let conn = sqlite::open_connection(db.path()).expect("conn");
5473            conn.execute(
5474                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5475                 VALUES ('Ticket', '[\"$.title\"]', ' ')",
5476                [],
5477            )
5478            .expect("register schema");
5479            // Insert a node WITH an extractable $.title but no property FTS row.
5480            conn.execute(
5481                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5482                 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"title\":\"fix login bug\"}', 100, 'seed')",
5483                [],
5484            )
5485            .expect("insert node");
5486        }
5487
5488        let report = service.check_integrity().expect("integrity");
5489        assert_eq!(
5490            report.missing_property_fts_rows, 1,
5491            "node with extractable values but no property FTS row must be detected"
5492        );
5493    }
5494
5495    #[test]
5496    fn rebuild_projections_fts_restores_missing_property_fts_rows() {
5497        let (db, service) = setup();
5498        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5499        {
5500            let conn = sqlite::open_connection(db.path()).expect("conn");
5501            conn.execute(
5502                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5503                 VALUES ('Goal', '[\"$.name\"]', ' ')",
5504                [],
5505            )
5506            .expect("register schema");
5507            conn.execute(
5508                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5509                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5510                [],
5511            )
5512            .expect("insert node");
5513            // Deliberately do NOT insert a property FTS row.
5514        }
5515
5516        let report = service
5517            .rebuild_projections(ProjectionTarget::Fts)
5518            .expect("rebuild");
5519        assert!(
5520            report.rebuilt_rows >= 1,
5521            "rebuild must insert at least one property FTS row"
5522        );
5523
5524        let conn = sqlite::open_connection(db.path()).expect("conn");
5525        let text: String = conn
5526            .query_row(
5527                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5528                [],
5529                |row| row.get(0),
5530            )
5531            .expect("property FTS row must exist after rebuild");
5532        assert_eq!(text, "Ship v2");
5533    }
5534
5535    #[test]
5536    fn rebuild_missing_projections_fills_gap_for_deleted_property_fts_row() {
5537        let (db, service) = setup();
5538        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5539        {
5540            let conn = sqlite::open_connection(db.path()).expect("conn");
5541            conn.execute(
5542                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5543                 VALUES ('Goal', '[\"$.name\"]', ' ')",
5544                [],
5545            )
5546            .expect("register schema");
5547            conn.execute(
5548                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5549                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5550                [],
5551            )
5552            .expect("insert node");
5553            // Create per-kind table and insert then delete to simulate corruption.
5554            conn.execute_batch(&format!(
5555                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
5556                    node_logical_id UNINDEXED, text_content, \
5557                    tokenize = 'porter unicode61 remove_diacritics 2'\
5558                )"
5559            ))
5560            .expect("create per-kind table");
5561            conn.execute(
5562                &format!(
5563                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5564                     VALUES ('goal-1', 'Ship v2')"
5565                ),
5566                [],
5567            )
5568            .expect("insert property fts");
5569            conn.execute(
5570                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5571                [],
5572            )
5573            .expect("delete property fts");
5574        }
5575
5576        let report = service
5577            .rebuild_missing_projections()
5578            .expect("rebuild missing");
5579        assert!(
5580            report.rebuilt_rows >= 1,
5581            "missing rebuild must insert the gap-fill row"
5582        );
5583
5584        let conn = sqlite::open_connection(db.path()).expect("conn");
5585        let count: i64 = conn
5586            .query_row(
5587                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5588                [],
5589                |row| row.get(0),
5590            )
5591            .expect("count");
5592        assert_eq!(
5593            count, 1,
5594            "gap-fill must restore exactly one property FTS row"
5595        );
5596    }
5597
5598    #[test]
5599    fn remove_schema_then_rebuild_cleans_stale_property_fts_rows() {
5600        // This test verifies that a full FTS rebuild clears per-kind tables whose
5601        // schema has been removed (orphaned state). We create the orphaned state
5602        // directly via SQL (bypassing the service API, which now eagerly deletes rows
5603        // on schema removal) to simulate a table that was left populated from a
5604        // previous registration cycle.
5605        let (db, service) = setup();
5606        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5607        {
5608            let conn = sqlite::open_connection(db.path()).expect("conn");
5609            conn.execute(
5610                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5611                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5612                [],
5613            )
5614            .expect("insert node");
5615            // Create per-kind table WITHOUT registering a schema — simulates orphaned rows
5616            // that remain after schema removal (or pre-existing table from a previous cycle).
5617            conn.execute_batch(&format!(
5618                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
5619                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
5620            ))
5621            .expect("create per-kind table");
5622            conn.execute(
5623                &format!(
5624                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5625                     VALUES ('goal-1', 'Ship v2')"
5626                ),
5627                [],
5628            )
5629            .expect("insert property fts");
5630        }
5631
5632        // No schema registered — per-kind table has orphaned rows.
5633        let semantics = service.check_semantics().expect("semantics");
5634        assert_eq!(
5635            semantics.orphaned_property_fts_rows, 1,
5636            "orphaned property FTS rows must be detected with no registered schema"
5637        );
5638
5639        // Full rebuild should clean them (no schema means nothing to rebuild).
5640        service
5641            .rebuild_projections(ProjectionTarget::Fts)
5642            .expect("rebuild");
5643
5644        let conn = sqlite::open_connection(db.path()).expect("conn");
5645        let count: i64 = conn
5646            .query_row(
5647                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5648                [],
5649                |row| row.get(0),
5650            )
5651            .expect("count");
5652        assert_eq!(
5653            count, 0,
5654            "rebuild must delete rows from per-kind tables with no registered schema"
5655        );
5656    }
5657
5658    mod validate_fts_property_paths_tests {
5659        use super::super::validate_fts_property_paths;
5660
5661        #[test]
5662        fn valid_simple_path() {
5663            assert!(validate_fts_property_paths(&["$.name".to_owned()]).is_ok());
5664        }
5665
5666        #[test]
5667        fn valid_nested_path() {
5668            assert!(validate_fts_property_paths(&["$.address.city".to_owned()]).is_ok());
5669        }
5670
5671        #[test]
5672        fn valid_underscore_segment() {
5673            assert!(validate_fts_property_paths(&["$.a_b".to_owned()]).is_ok());
5674        }
5675
5676        #[test]
5677        fn rejects_bare_prefix() {
5678            let result = validate_fts_property_paths(&["$.".to_owned()]);
5679            assert!(result.is_err(), "path '$.' must be rejected");
5680        }
5681
5682        #[test]
5683        fn rejects_double_dot() {
5684            let result = validate_fts_property_paths(&["$..x".to_owned()]);
5685            assert!(result.is_err(), "path '$..x' must be rejected");
5686        }
5687
5688        #[test]
5689        fn rejects_trailing_dot() {
5690            let result = validate_fts_property_paths(&["$.foo.".to_owned()]);
5691            assert!(result.is_err(), "path '$.foo.' must be rejected");
5692        }
5693
5694        #[test]
5695        fn rejects_space_in_segment() {
5696            let result = validate_fts_property_paths(&["$.foo bar".to_owned()]);
5697            assert!(result.is_err(), "path '$.foo bar' must be rejected");
5698        }
5699
5700        #[test]
5701        fn rejects_bracket_syntax() {
5702            let result = validate_fts_property_paths(&["$.foo[0]".to_owned()]);
5703            assert!(result.is_err(), "path '$.foo[0]' must be rejected");
5704        }
5705
5706        #[test]
5707        fn rejects_duplicates() {
5708            let result = validate_fts_property_paths(&["$.name".to_owned(), "$.name".to_owned()]);
5709            assert!(result.is_err(), "duplicate paths must be rejected");
5710        }
5711
5712        #[test]
5713        fn rejects_empty_list() {
5714            let result = validate_fts_property_paths(&[]);
5715            assert!(result.is_err(), "empty path list must be rejected");
5716        }
5717    }
5718
5719    // --- A-6: per-kind FTS table tests ---
5720
5721    #[test]
5722    fn register_fts_schema_writes_to_per_kind_table() {
5723        // After A-6: register_fts_property_schema writes rows to fts_props_<kind>,
5724        // NOT to fts_node_properties.
5725        let (db, service) = setup();
5726        {
5727            let conn = sqlite::open_connection(db.path()).expect("conn");
5728            // Insert a node before registering the schema.
5729            conn.execute(
5730                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5731                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5732                [],
5733            )
5734            .expect("insert node");
5735        }
5736
5737        // Register schema — this triggers eager rebuild which writes to per-kind table.
5738        service
5739            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5740            .expect("register schema");
5741
5742        let conn = sqlite::open_connection(db.path()).expect("conn");
5743        let table = fathomdb_schema::fts_kind_table_name("Goal");
5744        // Per-kind table must have the row.
5745        let per_kind_count: i64 = conn
5746            .query_row(
5747                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
5748                [],
5749                |row| row.get(0),
5750            )
5751            .expect("per-kind count");
5752        assert_eq!(
5753            per_kind_count, 1,
5754            "per-kind table must have the row after registration"
5755        );
5756    }
5757
5758    #[test]
5759    fn remove_fts_schema_deletes_from_per_kind_table() {
5760        // After A-6: remove_fts_property_schema deletes rows from fts_props_<kind>.
5761        let (db, service) = setup();
5762        {
5763            let conn = sqlite::open_connection(db.path()).expect("conn");
5764            conn.execute(
5765                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5766                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5767                [],
5768            )
5769            .expect("insert node");
5770        }
5771
5772        service
5773            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5774            .expect("register schema");
5775        service
5776            .remove_fts_property_schema("Goal")
5777            .expect("remove schema");
5778
5779        let conn = sqlite::open_connection(db.path()).expect("conn");
5780        let table = fathomdb_schema::fts_kind_table_name("Goal");
5781        let per_kind_count: i64 = conn
5782            .query_row(
5783                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
5784                [],
5785                |row| row.get(0),
5786            )
5787            .expect("per-kind count");
5788        assert_eq!(
5789            per_kind_count, 0,
5790            "per-kind table must be empty after schema removal"
5791        );
5792    }
5793
5794    // --- B-1: weight field tests ---
5795
5796    #[test]
5797    fn fts_path_spec_with_weight_builder() {
5798        let spec = FtsPropertyPathSpec::scalar("$.title").with_weight(5.0);
5799        assert_eq!(spec.weight, Some(5.0));
5800        assert_eq!(spec.path, "$.title");
5801        assert_eq!(spec.mode, FtsPropertyPathMode::Scalar);
5802    }
5803
5804    #[test]
5805    fn fts_path_spec_serialize_with_weight() {
5806        use super::serialize_property_paths_json;
5807        let entries = vec![
5808            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
5809            FtsPropertyPathSpec::scalar("$.body"),
5810        ];
5811        let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
5812        // Must use rich object format because a weight is present
5813        let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
5814        let paths = v
5815            .get("paths")
5816            .expect("paths key")
5817            .as_array()
5818            .expect("array");
5819        assert_eq!(paths.len(), 2);
5820        // First entry has weight
5821        assert_eq!(
5822            paths[0].get("path").and_then(serde_json::Value::as_str),
5823            Some("$.title")
5824        );
5825        assert_eq!(
5826            paths[0].get("weight").and_then(serde_json::Value::as_f64),
5827            Some(2.0)
5828        );
5829        // Second entry has no weight field
5830        assert!(
5831            paths[1].get("weight").is_none(),
5832            "unweighted spec must omit weight field"
5833        );
5834    }
5835
5836    #[test]
5837    fn fts_path_spec_serialize_no_weights() {
5838        use super::serialize_property_paths_json;
5839        let entries = vec![
5840            FtsPropertyPathSpec::scalar("$.title"),
5841            FtsPropertyPathSpec::scalar("$.payload"),
5842        ];
5843        let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
5844        // Must use bare string array (backward compat)
5845        let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
5846        assert!(
5847            v.is_array(),
5848            "all-scalar no-weight schema must serialize as bare string array"
5849        );
5850        let arr = v.as_array().expect("array");
5851        assert_eq!(arr.len(), 2);
5852        assert_eq!(arr[0].as_str(), Some("$.title"));
5853        assert_eq!(arr[1].as_str(), Some("$.payload"));
5854    }
5855
5856    #[test]
5857    fn fts_weight_validation_out_of_range() {
5858        let (_db, service) = setup();
5859        // weight = 0.0 must be rejected
5860        let entries_zero = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(0.0)];
5861        let result = service.register_fts_property_schema_with_entries(
5862            "Article",
5863            &entries_zero,
5864            None,
5865            &[],
5866            crate::rebuild_actor::RebuildMode::Eager,
5867        );
5868        assert!(result.is_err(), "weight 0.0 must be rejected");
5869        let err_msg = result.expect_err("weight 0.0 must be rejected").to_string();
5870        assert!(
5871            err_msg.contains("weight"),
5872            "error must mention weight: {err_msg}"
5873        );
5874
5875        // weight = 1001.0 must be rejected
5876        let entries_big = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(1001.0)];
5877        let result = service.register_fts_property_schema_with_entries(
5878            "Article",
5879            &entries_big,
5880            None,
5881            &[],
5882            crate::rebuild_actor::RebuildMode::Eager,
5883        );
5884        assert!(result.is_err(), "weight 1001.0 must be rejected");
5885    }
5886
5887    #[test]
5888    fn fts_weight_validation_valid() {
5889        let (_db, service) = setup();
5890        let entries = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(10.0)];
5891        let result = service.register_fts_property_schema_with_entries(
5892            "Article",
5893            &entries,
5894            None,
5895            &[],
5896            crate::rebuild_actor::RebuildMode::Eager,
5897        );
5898        assert!(
5899            result.is_ok(),
5900            "weight 10.0 must be accepted: {:?}",
5901            result.err()
5902        );
5903    }
5904
5905    // --- B-2: create_or_replace_fts_kind_table tests ---
5906
5907    #[test]
5908    fn create_or_replace_creates_multi_column_table() {
5909        use super::create_or_replace_fts_kind_table;
5910        let (db, _service) = setup();
5911        let conn = sqlite::open_connection(db.path()).expect("conn");
5912        let specs = vec![
5913            FtsPropertyPathSpec::scalar("$.title"),
5914            FtsPropertyPathSpec::recursive("$.payload"),
5915        ];
5916        create_or_replace_fts_kind_table(
5917            &conn,
5918            "Article",
5919            &specs,
5920            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
5921        )
5922        .expect("create table");
5923
5924        // Verify table exists and has the expected columns.
5925        let table = fathomdb_schema::fts_kind_table_name("Article");
5926        // node_logical_id column
5927        let count: i64 = conn
5928            .query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))
5929            .expect("count");
5930        assert_eq!(count, 0, "new table must be empty");
5931
5932        // Verify columns exist by inserting a row with named columns
5933        let title_col = fathomdb_schema::fts_column_name("$.title", false);
5934        let payload_col = fathomdb_schema::fts_column_name("$.payload", true);
5935        conn.execute(
5936            &format!(
5937                "INSERT INTO {table} (node_logical_id, {title_col}, {payload_col}) VALUES ('id1', 'hello', 'world')"
5938            ),
5939            [],
5940        )
5941        .expect("insert with per-spec columns must succeed");
5942    }
5943
5944    #[test]
5945    fn create_or_replace_drops_and_recreates() {
5946        use super::create_or_replace_fts_kind_table;
5947        let (db, _service) = setup();
5948        let conn = sqlite::open_connection(db.path()).expect("conn");
5949
5950        // First call: 1 spec
5951        let specs_v1 = vec![FtsPropertyPathSpec::scalar("$.title")];
5952        create_or_replace_fts_kind_table(
5953            &conn,
5954            "Post",
5955            &specs_v1,
5956            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
5957        )
5958        .expect("create v1");
5959
5960        // Second call: 2 specs (different layout)
5961        let specs_v2 = vec![
5962            FtsPropertyPathSpec::scalar("$.title"),
5963            FtsPropertyPathSpec::scalar("$.summary"),
5964        ];
5965        create_or_replace_fts_kind_table(
5966            &conn,
5967            "Post",
5968            &specs_v2,
5969            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
5970        )
5971        .expect("create v2");
5972
5973        // Verify new layout: summary column must exist
5974        let table = fathomdb_schema::fts_kind_table_name("Post");
5975        let summary_col = fathomdb_schema::fts_column_name("$.summary", false);
5976        conn.execute(
5977            &format!("INSERT INTO {table} (node_logical_id, {summary_col}) VALUES ('id1', 'text')"),
5978            [],
5979        )
5980        .expect("second layout must allow summary column");
5981    }
5982
5983    #[test]
5984    fn create_or_replace_invalid_tokenizer() {
5985        use super::create_or_replace_fts_kind_table;
5986        let (db, _service) = setup();
5987        let conn = sqlite::open_connection(db.path()).expect("conn");
5988        let specs = vec![FtsPropertyPathSpec::scalar("$.title")];
5989        let result = create_or_replace_fts_kind_table(&conn, "Post", &specs, "'; DROP TABLE --");
5990        assert!(result.is_err(), "invalid tokenizer must be rejected");
5991        let err_msg = result
5992            .expect_err("invalid tokenizer must be rejected")
5993            .to_string();
5994        assert!(
5995            err_msg.contains("tokenizer"),
5996            "error must mention tokenizer: {err_msg}"
5997        );
5998    }
5999
6000    #[test]
6001    fn register_with_weights_creates_per_column_table() {
6002        let (db, service) = setup();
6003        let entries = vec![
6004            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6005            FtsPropertyPathSpec::scalar("$.body"),
6006        ];
6007        service
6008            .register_fts_property_schema_with_entries(
6009                "Article",
6010                &entries,
6011                None,
6012                &[],
6013                crate::rebuild_actor::RebuildMode::Eager,
6014            )
6015            .expect("register");
6016
6017        // Per-kind table must have per-spec columns, not just text_content
6018        let conn = sqlite::open_connection(db.path()).expect("conn");
6019        let table = fathomdb_schema::fts_kind_table_name("Article");
6020        let title_col = fathomdb_schema::fts_column_name("$.title", false);
6021        let body_col = fathomdb_schema::fts_column_name("$.body", false);
6022        // If the columns exist, insert must succeed
6023        conn.execute(
6024            &format!(
6025                "INSERT INTO {table} (node_logical_id, {title_col}, {body_col}) VALUES ('art-1', 'hello', 'world')"
6026            ),
6027            [],
6028        )
6029        .expect("per-spec columns must exist after registration with weights");
6030    }
6031
6032    #[test]
6033    fn weighted_to_unweighted_downgrade_recreates_table() {
6034        let (db, service) = setup();
6035
6036        // First register with weights (creates per-spec column layout).
6037        let weighted_entries = vec![
6038            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6039            FtsPropertyPathSpec::scalar("$.body"),
6040        ];
6041        service
6042            .register_fts_property_schema_with_entries(
6043                "Article",
6044                &weighted_entries,
6045                None,
6046                &[],
6047                crate::rebuild_actor::RebuildMode::Eager,
6048            )
6049            .expect("register weighted");
6050
6051        // Re-register the same kind WITHOUT weights.
6052        let unweighted_entries = vec![
6053            FtsPropertyPathSpec::scalar("$.title"),
6054            FtsPropertyPathSpec::scalar("$.body"),
6055        ];
6056        service
6057            .register_fts_property_schema_with_entries(
6058                "Article",
6059                &unweighted_entries,
6060                None,
6061                &[],
6062                crate::rebuild_actor::RebuildMode::Eager,
6063            )
6064            .expect("re-register unweighted");
6065
6066        // After downgrade, the table must have the text_content column
6067        // (legacy single-column layout), not the per-spec columns.
6068        let conn = sqlite::open_connection(db.path()).expect("conn");
6069        let table = fathomdb_schema::fts_kind_table_name("Article");
6070        let result = conn.execute(
6071            &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('art-1', 'hello world')"),
6072            [],
6073        );
6074        assert!(
6075            result.is_ok(),
6076            "text_content column must exist after weighted-to-unweighted downgrade"
6077        );
6078    }
6079
6080    // --- Pack A+G: profile CRUD + tokenizer presets ---
6081
6082    #[test]
6083    fn set_get_fts_profile_roundtrip() {
6084        let (_db, service) = setup();
6085        let profile = service
6086            .set_fts_profile("book", "unicode61")
6087            .expect("set_fts_profile");
6088        assert_eq!(profile.kind, "book");
6089        assert_eq!(profile.tokenizer, "unicode61");
6090
6091        let got = service
6092            .get_fts_profile("book")
6093            .expect("get_fts_profile")
6094            .expect("should be Some");
6095        assert_eq!(got.kind, "book");
6096        assert_eq!(got.tokenizer, "unicode61");
6097    }
6098
6099    #[test]
6100    fn fts_profile_upsert() {
6101        let (_db, service) = setup();
6102        service
6103            .set_fts_profile("article", "unicode61")
6104            .expect("first set");
6105        service
6106            .set_fts_profile("article", "porter unicode61 remove_diacritics 2")
6107            .expect("second set");
6108        let got = service
6109            .get_fts_profile("article")
6110            .expect("get")
6111            .expect("Some");
6112        assert_eq!(got.tokenizer, "porter unicode61 remove_diacritics 2");
6113    }
6114
6115    #[test]
6116    fn invalid_tokenizer_rejected() {
6117        let (_db, service) = setup();
6118        let result = service.set_fts_profile("book", "'; DROP TABLE nodes --");
6119        assert!(result.is_err(), "invalid tokenizer must be rejected");
6120        let msg = result.expect_err("must be Err").to_string();
6121        assert!(
6122            msg.contains("tokenizer") || msg.contains("invalid"),
6123            "error must mention tokenizer or invalid: {msg}"
6124        );
6125    }
6126
6127    #[test]
6128    fn preset_recall_optimized_english() {
6129        assert_eq!(
6130            super::resolve_tokenizer_preset("recall-optimized-english"),
6131            "porter unicode61 remove_diacritics 2"
6132        );
6133    }
6134
6135    #[test]
6136    fn preset_precision_optimized() {
6137        assert_eq!(
6138            super::resolve_tokenizer_preset("precision-optimized"),
6139            "unicode61 remove_diacritics 2"
6140        );
6141    }
6142
6143    #[test]
6144    fn preset_global_cjk() {
6145        assert_eq!(super::resolve_tokenizer_preset("global-cjk"), "icu");
6146    }
6147
6148    #[test]
6149    fn preset_substring_trigram() {
6150        assert_eq!(
6151            super::resolve_tokenizer_preset("substring-trigram"),
6152            "trigram"
6153        );
6154    }
6155
6156    #[test]
6157    fn preset_source_code() {
6158        assert_eq!(
6159            super::resolve_tokenizer_preset("source-code"),
6160            "unicode61 tokenchars '._-$@'"
6161        );
6162    }
6163
6164    #[test]
6165    fn preview_fts_row_count() {
6166        let (db, service) = setup();
6167        {
6168            let conn = sqlite::open_connection(db.path()).expect("conn");
6169            for i in 0..5u32 {
6170                conn.execute(
6171                    "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6172                     VALUES (?1, ?2, 'book', '{}', 100, 'src')",
6173                    rusqlite::params![format!("r{i}"), format!("lg{i}")],
6174                )
6175                .expect("insert node");
6176            }
6177            // Insert one superseded node that must NOT count
6178            conn.execute(
6179                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, superseded_at) \
6180                 VALUES ('r99', 'lg99', 'book', '{}', 100, 'src', 200)",
6181                [],
6182            )
6183            .expect("insert superseded");
6184        }
6185        let impact = service
6186            .preview_projection_impact("book", "fts")
6187            .expect("preview");
6188        assert_eq!(impact.rows_to_rebuild, 5);
6189    }
6190
6191    #[test]
6192    fn preview_populates_current_tokenizer() {
6193        let (_db, service) = setup();
6194        service
6195            .set_fts_profile("doc", "trigram")
6196            .expect("set profile");
6197        let impact = service
6198            .preview_projection_impact("doc", "fts")
6199            .expect("preview");
6200        assert_eq!(impact.current_tokenizer, Some("trigram".to_owned()));
6201        assert_eq!(impact.target_tokenizer, None);
6202    }
6203
6204    // --- Review fix: tokenizer allowlist alignment ---
6205
6206    #[test]
6207    fn create_or_replace_source_code_tokenizer_is_accepted() {
6208        // The source-code preset expands to "unicode61 tokenchars '._-$@'" which
6209        // contains `.`, `-`, `$`, `@`. The allowlist in create_or_replace_fts_kind_table
6210        // must accept these characters (matching set_fts_profile's allowlist).
6211        use super::create_or_replace_fts_kind_table;
6212        let (db, _service) = setup();
6213        let conn = sqlite::open_connection(db.path()).expect("conn");
6214        let specs = vec![FtsPropertyPathSpec::scalar("$.symbol")];
6215        let source_code_tokenizer = "unicode61 tokenchars '._-$@'";
6216        let result =
6217            create_or_replace_fts_kind_table(&conn, "Symbol", &specs, source_code_tokenizer);
6218        assert!(
6219            result.is_ok(),
6220            "source-code tokenizer string must be accepted by create_or_replace_fts_kind_table: {:?}",
6221            result.err()
6222        );
6223    }
6224
6225    #[test]
6226    fn source_code_profile_round_trip_through_register_fts_schema() {
6227        // Verify that set_fts_profile("source-code") followed by
6228        // register_fts_property_schema succeeds end-to-end.
6229        // Previously failed because set_fts_profile accepted "unicode61 tokenchars '._-$@'"
6230        // but create_or_replace_fts_kind_table rejected it (only allowed " '_").
6231        let db = tempfile::NamedTempFile::new().expect("temp file");
6232        let schema = Arc::new(fathomdb_schema::SchemaManager::new());
6233
6234        // Bootstrap the schema (creates projection_profiles table via migration 20).
6235        {
6236            let _coord = crate::ExecutionCoordinator::open(
6237                db.path(),
6238                Arc::clone(&schema),
6239                None,
6240                1,
6241                Arc::new(crate::TelemetryCounters::default()),
6242                None,
6243            )
6244            .expect("coordinator opens for bootstrap");
6245        }
6246
6247        let service = AdminService::new(db.path(), Arc::clone(&schema));
6248
6249        // Set source-code profile (uses preset resolver, stores "unicode61 tokenchars '._-$@'").
6250        service
6251            .set_fts_profile("Symbol", "source-code")
6252            .expect("set_fts_profile with source-code preset must succeed");
6253
6254        // Register an FTS schema for this kind — this calls create_or_replace_fts_kind_table
6255        // with the tokenizer from the profile row.
6256        let result = service.register_fts_property_schema("Symbol", &["$.name".to_owned()], None);
6257        assert!(
6258            result.is_ok(),
6259            "register_fts_property_schema must succeed when source-code profile is active: {:?}",
6260            result.err()
6261        );
6262    }
6263
6264    // --- 0.5.0 item 5: max_tokens() capacity ---
6265
6266    /// A stub embedder with `max_tokens=8192` can embed a pre-written chunk
6267    /// whose text exceeds 512 words without error. Verifies that `max_tokens()`
6268    /// advertises the correct capacity and that `regenerate_vector_embeddings`
6269    /// produces one vector row for one stored chunk, regardless of chunk length.
6270    /// (The engine does not re-chunk at regen time; splitting is the caller's
6271    /// responsibility at write time.)
6272    #[cfg(feature = "sqlite-vec")]
6273    #[test]
6274    fn embedder_max_tokens_8192_handles_chunk_exceeding_512_words() {
6275        let long_text = (0..600u32)
6276            .map(|i| format!("word{i}"))
6277            .collect::<Vec<_>>()
6278            .join(" ");
6279
6280        let db = NamedTempFile::new().expect("temp file");
6281        let schema = Arc::new(SchemaManager::new());
6282
6283        {
6284            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6285            schema.bootstrap(&conn).expect("bootstrap");
6286            conn.execute(
6287                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6288                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'src-1')",
6289                [],
6290            )
6291            .expect("insert node");
6292            conn.execute(
6293                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6294                 VALUES (?1, 'doc-1', ?2, 100)",
6295                rusqlite::params!["chunk-long", long_text],
6296            )
6297            .expect("insert long chunk");
6298        }
6299
6300        let embedder = LargeContextTestEmbedder::new("long-context-model", 4, 8192);
6301        let service = AdminService::new(db.path(), Arc::clone(&schema));
6302        let report = service
6303            .regenerate_vector_embeddings(
6304                &embedder,
6305                &VectorRegenerationConfig {
6306                    kind: "Document".to_owned(),
6307                    profile: "default".to_owned(),
6308                    chunking_policy: "per_chunk".to_owned(),
6309                    preprocessing_policy: "trim".to_owned(),
6310                },
6311            )
6312            .expect("regenerate with long-context embedder");
6313
6314        assert_eq!(
6315            report.total_chunks, 1,
6316            "600-word text pre-written as one chunk must result in exactly one embedded row"
6317        );
6318        assert_eq!(report.regenerated_rows, 1);
6319        assert_eq!(
6320            embedder.max_tokens(),
6321            8192,
6322            "embedder must advertise 8192 token capacity"
6323        );
6324    }
6325
6326    /// Stub embedder with a configurable `max_tokens` for long-context tests.
6327    #[cfg(feature = "sqlite-vec")]
6328    #[derive(Debug)]
6329    struct LargeContextTestEmbedder {
6330        identity: QueryEmbedderIdentity,
6331        vector: Vec<f32>,
6332        max_tokens: usize,
6333    }
6334
6335    #[cfg(feature = "sqlite-vec")]
6336    impl LargeContextTestEmbedder {
6337        fn new(model: &str, dimension: usize, max_tokens: usize) -> Self {
6338            Self {
6339                identity: QueryEmbedderIdentity {
6340                    model_identity: model.to_owned(),
6341                    model_version: "1.0.0".to_owned(),
6342                    dimension,
6343                    normalization_policy: "l2".to_owned(),
6344                },
6345                vector: vec![1.0; dimension],
6346                max_tokens,
6347            }
6348        }
6349    }
6350
6351    #[cfg(feature = "sqlite-vec")]
6352    impl QueryEmbedder for LargeContextTestEmbedder {
6353        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
6354            Ok(self.vector.clone())
6355        }
6356        fn identity(&self) -> QueryEmbedderIdentity {
6357            self.identity.clone()
6358        }
6359        fn max_tokens(&self) -> usize {
6360            self.max_tokens
6361        }
6362    }
6363
6364    /// Item 7 integration test: register schema, write nodes, call
6365    /// `regenerate_vector_embeddings_in_process`, verify contract row and
6366    /// that vec rows exist for every chunk.
6367    #[cfg(feature = "sqlite-vec")]
6368    #[test]
6369    #[allow(clippy::too_many_lines)]
6370    fn regenerate_vector_embeddings_in_process_writes_contract_and_vec_rows() {
6371        let db = NamedTempFile::new().expect("temp file");
6372        let schema = Arc::new(SchemaManager::new());
6373
6374        {
6375            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6376            schema.bootstrap(&conn).expect("bootstrap");
6377            for (row_id, logical_id, created_at, src) in [
6378                ("r1", "node-1", 100, "src1"),
6379                ("r2", "node-2", 101, "src2"),
6380                ("r3", "node-3", 102, "src3"),
6381            ] {
6382                conn.execute(
6383                    "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6384                     VALUES (?1, ?2, 'Doc', '{}', ?3, ?4)",
6385                    rusqlite::params![row_id, logical_id, created_at, src],
6386                )
6387                .expect("insert node");
6388            }
6389            for (chunk_id, node_id, text, created_at) in [
6390                ("c1", "node-1", "first document text", 100),
6391                ("c2", "node-2", "second document text", 101),
6392                ("c3", "node-3", "third document text", 102),
6393            ] {
6394                conn.execute(
6395                    "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6396                     VALUES (?1, ?2, ?3, ?4)",
6397                    rusqlite::params![chunk_id, node_id, text, created_at],
6398                )
6399                .expect("insert chunk");
6400            }
6401        }
6402
6403        let service = AdminService::new(db.path(), Arc::clone(&schema));
6404        let embedder = TestEmbedder::new("batch-test-model", 4);
6405        let config = VectorRegenerationConfig {
6406            kind: "Doc".to_owned(),
6407            profile: "default".to_owned(),
6408            chunking_policy: "per_chunk".to_owned(),
6409            preprocessing_policy: "trim".to_owned(),
6410        };
6411        let report = service
6412            .regenerate_vector_embeddings_in_process(&embedder, &config)
6413            .expect("in-process regen must succeed");
6414
6415        assert_eq!(report.total_chunks, 3);
6416        assert_eq!(report.regenerated_rows, 3);
6417        assert!(report.contract_persisted);
6418
6419        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6420        let vec_count: i64 = conn
6421            .query_row("SELECT count(*) FROM vec_doc", [], |row| row.get(0))
6422            .expect("vec_doc count");
6423        assert_eq!(vec_count, 3, "one vec row per chunk");
6424
6425        let model_identity: String = conn
6426            .query_row(
6427                "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
6428                [],
6429                |row| row.get(0),
6430            )
6431            .expect("contract row");
6432        assert_eq!(model_identity, "batch-test-model");
6433    }
6434
6435    // --- 0.5.0 item 6: per-kind vec regeneration ---
6436
6437    #[cfg(feature = "sqlite-vec")]
6438    #[test]
6439    #[allow(clippy::too_many_lines)]
6440    fn regenerate_vector_embeddings_targets_per_kind_table() {
6441        let db = NamedTempFile::new().expect("temp file");
6442        let schema = Arc::new(SchemaManager::new());
6443
6444        {
6445            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6446            schema.bootstrap(&conn).expect("bootstrap");
6447            conn.execute(
6448                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6449                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
6450                [],
6451            )
6452            .expect("insert node");
6453            conn.execute(
6454                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6455                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
6456                [],
6457            )
6458            .expect("insert chunk");
6459        }
6460
6461        let service = AdminService::new(db.path(), Arc::clone(&schema));
6462        let embedder = TestEmbedder::new("test-model", 4);
6463        let report = service
6464            .regenerate_vector_embeddings(
6465                &embedder,
6466                &VectorRegenerationConfig {
6467                    kind: "Document".to_owned(),
6468                    profile: "default".to_owned(),
6469                    chunking_policy: "per_chunk".to_owned(),
6470                    preprocessing_policy: "trim".to_owned(),
6471                },
6472            )
6473            .expect("regenerate vectors");
6474
6475        assert_eq!(report.table_name, "vec_document");
6476        assert_eq!(report.regenerated_rows, 1);
6477
6478        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6479        let vec_count: i64 = conn
6480            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
6481            .expect("vec_document count");
6482        assert_eq!(vec_count, 1, "rows must be in vec_document");
6483
6484        let old_count: i64 = conn
6485            .query_row(
6486                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
6487                [],
6488                |r| r.get(0),
6489            )
6490            .expect("sqlite_master check");
6491        assert_eq!(
6492            old_count, 0,
6493            "vec_nodes_active must NOT be created for per-kind regen"
6494        );
6495    }
6496
6497    // --- 0.5.0 item 6 step 5: get_vec_profile reads per-kind key ---
6498
6499    #[test]
6500    fn get_vec_profile_returns_none_when_no_profile_exists() {
6501        let (db, service) = setup();
6502        let _ = db;
6503        let result = service.get_vec_profile("MyKind").expect("should not error");
6504        assert!(
6505            result.is_none(),
6506            "must return None when no profile registered"
6507        );
6508    }
6509
6510    #[cfg(feature = "sqlite-vec")]
6511    #[test]
6512    fn get_vec_profile_returns_profile_for_registered_kind() {
6513        let db = NamedTempFile::new().expect("temp file");
6514        let schema = Arc::new(SchemaManager::new());
6515        {
6516            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6517            schema.bootstrap(&conn).expect("bootstrap");
6518            schema
6519                .ensure_vec_kind_profile(&conn, "MyKind", 128)
6520                .expect("ensure_vec_kind_profile");
6521        }
6522
6523        let service = AdminService::new(db.path(), Arc::clone(&schema));
6524        let profile = service.get_vec_profile("MyKind").expect("should not error");
6525        assert!(profile.is_some(), "must return profile after registration");
6526        assert_eq!(profile.unwrap().dimensions, 128);
6527    }
6528
6529    #[test]
6530    fn get_vec_profile_does_not_return_global_sentinel_row() {
6531        let (db, service) = setup();
6532        {
6533            let conn = sqlite::open_connection(db.path()).expect("conn");
6534            conn.execute(
6535                "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
6536                 VALUES ('*', 'vec', '{\"model_identity\":\"old-model\",\"dimensions\":384}', 0, 0)",
6537                [],
6538            )
6539            .expect("insert global sentinel");
6540        }
6541        let result = service
6542            .get_vec_profile("SomeKind")
6543            .expect("should not error");
6544        assert!(
6545            result.is_none(),
6546            "per-kind query must not return global ('*', 'vec') row"
6547        );
6548    }
6549}