Skip to main content

fathomdb_engine/admin/
mod.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::sync::mpsc::SyncSender;
4
5use fathomdb_schema::SchemaManager;
6use serde::{Deserialize, Serialize};
7
8use crate::rebuild_actor::{RebuildMode, RebuildRequest};
9
10use crate::{
11    EngineError, ProjectionRepairReport, ProjectionService, ids::new_id,
12    projection::ProjectionTarget, sqlite,
13};
14
15mod fts;
16mod operational;
17mod provenance;
18mod vector;
19
20pub use vector::load_vector_regeneration_config;
21
22#[cfg(test)]
23use fts::{
24    create_or_replace_fts_kind_table, serialize_property_paths_json, validate_fts_property_paths,
25};
26
27/// Results of a physical and structural integrity check on the database.
28#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
29pub struct IntegrityReport {
30    pub physical_ok: bool,
31    pub foreign_keys_ok: bool,
32    pub missing_fts_rows: usize,
33    pub missing_property_fts_rows: usize,
34    pub duplicate_active_logical_ids: usize,
35    pub operational_missing_collections: usize,
36    pub operational_missing_last_mutations: usize,
37    pub warnings: Vec<String>,
38}
39
40/// A registered FTS property projection schema for a node kind.
41#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
42pub struct FtsPropertySchemaRecord {
43    /// The node kind this schema applies to.
44    pub kind: String,
45    /// Flat display list of registered JSON property paths
46    /// (e.g. `["$.name", "$.title"]`). For recursive entries this lists
47    /// only the root path; mode information is carried by
48    /// [`Self::entries`].
49    pub property_paths: Vec<String>,
50    /// Full per-entry schema shape with mode
51    /// ([`FtsPropertyPathMode::Scalar`] | [`FtsPropertyPathMode::Recursive`]).
52    /// Read this field for mode-accurate round-trip of the registered
53    /// schema.
54    pub entries: Vec<FtsPropertyPathSpec>,
55    /// Subtree paths excluded from recursive walks. Empty for
56    /// scalar-only schemas or recursive schemas with no exclusions.
57    pub exclude_paths: Vec<String>,
58    /// Separator used when concatenating extracted values.
59    pub separator: String,
60    /// Schema format version.
61    pub format_version: i64,
62}
63
64/// Extraction mode for a single registered FTS property path.
65#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
66#[serde(rename_all = "snake_case")]
67pub enum FtsPropertyPathMode {
68    /// Resolve the path and append the scalar value(s). Matches legacy
69    /// pre-Phase-4 behaviour.
70    #[default]
71    Scalar,
72    /// Recursively walk every scalar leaf rooted at the path. Each leaf
73    /// contributes one entry to the position map.
74    Recursive,
75}
76
77/// A single registered property-FTS path with its extraction mode.
78#[non_exhaustive]
79#[derive(Clone, Debug, PartialEq, Serialize)]
80pub struct FtsPropertyPathSpec {
81    /// JSON path to the property (must start with `$.`).
82    pub path: String,
83    /// Whether to treat this path as a scalar or recursively walk it.
84    pub mode: FtsPropertyPathMode,
85    /// Optional BM25 weight multiplier for this path (1.0 = default).
86    /// Must satisfy `0.0 < weight <= 1000.0` when set.
87    pub weight: Option<f32>,
88}
89
90// f32 does not implement Eq (due to NaN), but weights in practice are
91// always finite values set by callers, so reflexivity holds.
92impl Eq for FtsPropertyPathSpec {}
93
94impl FtsPropertyPathSpec {
95    #[must_use]
96    pub fn scalar(path: impl Into<String>) -> Self {
97        Self {
98            path: path.into(),
99            mode: FtsPropertyPathMode::Scalar,
100            weight: None,
101        }
102    }
103
104    #[must_use]
105    pub fn recursive(path: impl Into<String>) -> Self {
106        Self {
107            path: path.into(),
108            mode: FtsPropertyPathMode::Recursive,
109            weight: None,
110        }
111    }
112
113    /// Set the BM25 weight multiplier for this path.
114    ///
115    /// The weight must satisfy `0.0 < weight <= 1000.0` at registration
116    /// time; this builder method does not validate — validation happens in
117    /// `register_fts_property_schema_with_entries`.
118    #[must_use]
119    pub fn with_weight(mut self, weight: f32) -> Self {
120        self.weight = Some(weight);
121        self
122    }
123}
124
125/// Options controlling how a safe database export is performed.
126#[derive(Clone, Copy, Debug)]
127pub struct SafeExportOptions {
128    /// When true, runs `PRAGMA wal_checkpoint(FULL)` before copying and fails if
129    /// any WAL frames could not be applied (busy != 0). Set to false only in
130    /// tests that seed a database without WAL mode.
131    pub force_checkpoint: bool,
132}
133
134impl Default for SafeExportOptions {
135    fn default() -> Self {
136        Self {
137            force_checkpoint: true,
138        }
139    }
140}
141
142// Must match PROTOCOL_VERSION in fathomdb-admin-bridge.rs
143const EXPORT_PROTOCOL_VERSION: u32 = 1;
144
145/// Manifest describing a completed safe export.
146#[derive(Clone, Debug, Serialize)]
147pub struct SafeExportManifest {
148    /// Unix timestamp (seconds since epoch) when the export was created.
149    pub exported_at: u64,
150    /// SHA-256 hex digest of the exported database file.
151    pub sha256: String,
152    /// Schema version recorded in `fathom_schema_migrations` at export time.
153    pub schema_version: u32,
154    /// Bridge protocol version compiled into this binary.
155    pub protocol_version: u32,
156    /// Number of `SQLite` pages in the exported database file.
157    pub page_count: u64,
158}
159
160/// Report from tracing all rows associated with a given `source_ref`.
161#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
162pub struct TraceReport {
163    pub source_ref: String,
164    pub node_rows: usize,
165    pub edge_rows: usize,
166    pub action_rows: usize,
167    pub operational_mutation_rows: usize,
168    pub node_logical_ids: Vec<String>,
169    pub action_ids: Vec<String>,
170    pub operational_mutation_ids: Vec<String>,
171}
172
173/// An edge that was skipped during a restore because an endpoint is missing.
174#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
175pub struct SkippedEdge {
176    pub edge_logical_id: String,
177    pub missing_endpoint: String,
178}
179
180/// Report from restoring a retired logical ID back to active state.
181#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
182pub struct LogicalRestoreReport {
183    pub logical_id: String,
184    pub was_noop: bool,
185    pub restored_node_rows: usize,
186    pub restored_edge_rows: usize,
187    pub restored_chunk_rows: usize,
188    pub restored_fts_rows: usize,
189    pub restored_property_fts_rows: usize,
190    pub restored_vec_rows: usize,
191    pub skipped_edges: Vec<SkippedEdge>,
192    pub notes: Vec<String>,
193}
194
195/// Report from permanently purging all rows for a logical ID.
196#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
197pub struct LogicalPurgeReport {
198    pub logical_id: String,
199    pub was_noop: bool,
200    pub deleted_node_rows: usize,
201    pub deleted_edge_rows: usize,
202    pub deleted_chunk_rows: usize,
203    pub deleted_fts_rows: usize,
204    pub deleted_vec_rows: usize,
205    pub notes: Vec<String>,
206}
207
208/// Options controlling provenance event purging behavior.
209#[derive(Clone, Debug, Serialize, Deserialize)]
210pub struct ProvenancePurgeOptions {
211    pub dry_run: bool,
212    #[serde(default)]
213    pub preserve_event_types: Vec<String>,
214}
215
216/// Report from a provenance event purge operation.
217#[derive(Clone, Debug, Serialize)]
218pub struct ProvenancePurgeReport {
219    pub events_deleted: u64,
220    pub events_preserved: u64,
221    pub oldest_remaining: Option<i64>,
222}
223
224/// Service providing administrative operations (integrity checks, exports, restores, purges).
225#[derive(Debug)]
226pub struct AdminService {
227    pub(super) database_path: PathBuf,
228    pub(super) schema_manager: Arc<SchemaManager>,
229    pub(super) projections: ProjectionService,
230    /// Sender side of the rebuild actor's channel.  `None` when the engine
231    /// was opened without a rebuild actor (e.g. in tests that use
232    /// [`AdminService::new`] directly).
233    pub(super) rebuild_sender: Option<SyncSender<RebuildRequest>>,
234}
235
236/// Results of a semantic consistency check on the graph data.
237#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
238pub struct SemanticReport {
239    /// Chunks whose `node_logical_id` has no active node.
240    pub orphaned_chunks: usize,
241    /// Active nodes with a NULL `source_ref` (loss of provenance).
242    pub null_source_ref_nodes: usize,
243    /// Steps referencing a `run_id` that does not exist in the runs table.
244    pub broken_step_fk: usize,
245    /// Actions referencing a `step_id` that does not exist in the steps table.
246    pub broken_action_fk: usize,
247    /// FTS rows whose `chunk_id` does not exist in the chunks table.
248    pub stale_fts_rows: usize,
249    /// FTS rows whose node has been superseded (`superseded_at` IS NOT NULL on all active rows).
250    pub fts_rows_for_superseded_nodes: usize,
251    /// Property FTS rows whose node has been superseded or does not exist.
252    pub stale_property_fts_rows: usize,
253    /// Property FTS rows whose kind has no registered FTS property schema.
254    pub orphaned_property_fts_rows: usize,
255    /// Property FTS rows whose `kind` does not match the active node's actual kind.
256    pub mismatched_kind_property_fts_rows: usize,
257    /// Active logical IDs with more than one per-kind FTS property row.
258    pub duplicate_property_fts_rows: usize,
259    /// Property FTS rows whose `text_content` no longer matches the canonical extraction.
260    pub drifted_property_fts_rows: usize,
261    /// Active edges where at least one endpoint has no active node.
262    pub dangling_edges: usize,
263    /// `logical_ids` where every version has been superseded (no active row).
264    pub orphaned_supersession_chains: usize,
265    /// Vec rows whose backing chunk no longer exists in the chunks table.
266    pub stale_vec_rows: usize,
267    /// Compatibility counter for vec rows whose chunk points at missing node history.
268    pub vec_rows_for_superseded_nodes: usize,
269    /// Latest-state keys whose latest mutation is a `put` but no current row exists.
270    pub missing_operational_current_rows: usize,
271    /// Current rows that do not match the latest mutation state.
272    pub stale_operational_current_rows: usize,
273    /// Mutations written after the owning collection was disabled.
274    pub disabled_collection_mutations: usize,
275    /// Access metadata rows whose `logical_id` no longer has any node history.
276    pub orphaned_last_access_metadata_rows: usize,
277    pub warnings: Vec<String>,
278}
279
280/// Configuration for regenerating vector embeddings.
281///
282/// 0.4.0 architectural invariant: vector identity is the embedder's
283/// responsibility, not the regeneration config's. This struct carries only
284/// WHERE the vectors live and HOW to chunk/preprocess them — never WHAT
285/// model produced them. The embedder supplied at regen-call time is the
286/// single source of truth for `model_identity`, `model_version`,
287/// `dimension`, and `normalization_policy`; the resulting vector profile
288/// is stamped directly from [`QueryEmbedder::identity`].
289///
290/// 0.5.0 breaking change: `table_name` is removed. The vec table name is now
291/// derived from `kind` via [`fathomdb_schema::vec_kind_table_name`].
292#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
293#[serde(rename_all = "snake_case", deny_unknown_fields)]
294pub struct VectorRegenerationConfig {
295    pub kind: String,
296    pub profile: String,
297    pub chunking_policy: String,
298    pub preprocessing_policy: String,
299}
300
301/// Report from a vector embedding regeneration run.
302#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
303pub struct VectorRegenerationReport {
304    pub profile: String,
305    pub table_name: String,
306    pub dimension: usize,
307    pub total_chunks: usize,
308    pub regenerated_rows: usize,
309    pub contract_persisted: bool,
310    pub notes: Vec<String>,
311}
312
313/// Stored FTS tokenizer profile for a node kind.
314///
315/// Created and updated by [`AdminService::set_fts_profile`].
316#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
317pub struct FtsProfile {
318    /// Node kind this profile applies to (e.g. `"Article"`).
319    pub kind: String,
320    /// FTS5 tokenizer string (e.g. `"porter unicode61 remove_diacritics 2"`).
321    pub tokenizer: String,
322    /// Unix timestamp when the profile was last activated, or `None` if never.
323    pub active_at: Option<i64>,
324    /// Unix timestamp when the profile row was first created.
325    pub created_at: i64,
326}
327
328/// Stored vector embedding profile (global, kind-agnostic).
329///
330/// Created and updated by [`AdminService::set_vec_profile`].
331#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
332pub struct VecProfile {
333    /// Identifier for the embedding model (e.g. `"openai/text-embedding-3-small"`).
334    pub model_identity: String,
335    /// Optional version string for the model.
336    pub model_version: Option<String>,
337    /// Number of dimensions produced by the model.
338    pub dimensions: u32,
339    /// Unix timestamp when the profile was last activated, or `None` if never.
340    pub active_at: Option<i64>,
341    /// Unix timestamp when the profile row was first created.
342    pub created_at: i64,
343}
344
345/// Estimated cost of rebuilding a projection (FTS table or vector embeddings).
346///
347/// Returned by [`AdminService::preview_projection_impact`].
348#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
349pub struct ProjectionImpact {
350    /// Number of rows that would be processed during a full rebuild.
351    pub rows_to_rebuild: u64,
352    /// Rough estimated rebuild time in seconds.
353    pub estimated_seconds: u64,
354    /// Estimated temporary disk space required during rebuild, in bytes.
355    pub temp_db_size_bytes: u64,
356    /// The tokenizer currently stored in `projection_profiles`, if any.
357    pub current_tokenizer: Option<String>,
358    /// Reserved for future use; always `None` currently.
359    pub target_tokenizer: Option<String>,
360}
361
362/// Well-known tokenizer preset names mapped to their FTS5 tokenizer strings.
363pub const TOKENIZER_PRESETS: &[(&str, &str)] = &[
364    (
365        "recall-optimized-english",
366        "porter unicode61 remove_diacritics 2",
367    ),
368    ("precision-optimized", "unicode61 remove_diacritics 2"),
369    ("global-cjk", "icu"),
370    ("substring-trigram", "trigram"),
371    ("source-code", "unicode61 tokenchars '._-$@'"),
372];
373
374/// Resolve a tokenizer preset name to its FTS5 tokenizer string.
375///
376/// If `input` matches a known preset name the preset value is returned.
377/// Otherwise `input` is returned unchanged (treated as a raw tokenizer string).
378pub fn resolve_tokenizer_preset(input: &str) -> &str {
379    for (name, value) in TOKENIZER_PRESETS {
380        if *name == input {
381            return value;
382        }
383    }
384    input
385}
386
387pub(super) const CURRENT_VECTOR_CONTRACT_FORMAT_VERSION: i64 = 1;
388pub(super) const MAX_PROFILE_LEN: usize = 128;
389pub(super) const MAX_POLICY_LEN: usize = 128;
390pub(super) const MAX_CONTRACT_JSON_BYTES: usize = 32 * 1024;
391pub(super) const MAX_AUDIT_METADATA_BYTES: usize = 2048;
392const DEFAULT_OPERATIONAL_READ_LIMIT: usize = 100;
393const MAX_OPERATIONAL_READ_LIMIT: usize = 1000;
394
395/// Thread-safe handle to the shared [`AdminService`].
396#[derive(Clone, Debug)]
397pub struct AdminHandle {
398    inner: Arc<AdminService>,
399}
400
401impl AdminHandle {
402    /// Wrap an [`AdminService`] in a shared handle.
403    #[must_use]
404    pub fn new(service: AdminService) -> Self {
405        Self {
406            inner: Arc::new(service),
407        }
408    }
409
410    /// Clone the inner `Arc` to the [`AdminService`].
411    #[must_use]
412    pub fn service(&self) -> Arc<AdminService> {
413        Arc::clone(&self.inner)
414    }
415}
416
417impl AdminService {
418    /// Create a new admin service for the database at the given path.
419    #[must_use]
420    pub fn new(path: impl AsRef<Path>, schema_manager: Arc<SchemaManager>) -> Self {
421        let database_path = path.as_ref().to_path_buf();
422        let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
423        Self {
424            database_path,
425            schema_manager,
426            projections,
427            rebuild_sender: None,
428        }
429    }
430
431    /// Create a new admin service wired to the background rebuild actor.
432    #[must_use]
433    pub fn new_with_rebuild(
434        path: impl AsRef<Path>,
435        schema_manager: Arc<SchemaManager>,
436        rebuild_sender: SyncSender<RebuildRequest>,
437    ) -> Self {
438        let database_path = path.as_ref().to_path_buf();
439        let projections = ProjectionService::new(&database_path, Arc::clone(&schema_manager));
440        Self {
441            database_path,
442            schema_manager,
443            projections,
444            rebuild_sender: Some(rebuild_sender),
445        }
446    }
447
448    pub(super) fn connect(&self) -> Result<rusqlite::Connection, EngineError> {
449        #[cfg(feature = "sqlite-vec")]
450        let conn = sqlite::open_connection_with_vec(&self.database_path)?;
451        #[cfg(not(feature = "sqlite-vec"))]
452        let conn = sqlite::open_connection(&self.database_path)?;
453        self.schema_manager.bootstrap(&conn)?;
454        Ok(conn)
455    }
456
457    /// # Errors
458    /// Returns [`EngineError`] if the database connection fails or any SQL query fails.
459    pub fn check_integrity(&self) -> Result<IntegrityReport, EngineError> {
460        let conn = self.connect()?;
461
462        let physical_result: String =
463            conn.query_row("PRAGMA integrity_check", [], |row| row.get(0))?;
464        let foreign_key_count: i64 =
465            conn.query_row("SELECT count(*) FROM pragma_foreign_key_check", [], |row| {
466                row.get(0)
467            })?;
468        let missing_fts_rows: i64 = conn.query_row(
469            r"
470            SELECT count(*)
471            FROM chunks c
472            JOIN nodes n
473              ON n.logical_id = c.node_logical_id
474             AND n.superseded_at IS NULL
475            WHERE NOT EXISTS (
476                SELECT 1
477                FROM fts_nodes f
478                WHERE f.chunk_id = c.id
479            )
480            ",
481            [],
482            |row| row.get(0),
483        )?;
484        let duplicate_active: i64 = conn.query_row(
485            r"
486            SELECT count(*)
487            FROM (
488                SELECT logical_id
489                FROM nodes
490                WHERE superseded_at IS NULL
491                GROUP BY logical_id
492                HAVING count(*) > 1
493            )
494            ",
495            [],
496            |row| row.get(0),
497        )?;
498        let operational_missing_collections: i64 = conn.query_row(
499            r"
500            SELECT (
501                SELECT count(*)
502                FROM operational_mutations m
503                LEFT JOIN operational_collections c ON c.name = m.collection_name
504                WHERE c.name IS NULL
505            ) + (
506                SELECT count(*)
507                FROM operational_current oc
508                LEFT JOIN operational_collections c ON c.name = oc.collection_name
509                WHERE c.name IS NULL
510            )
511            ",
512            [],
513            |row| row.get(0),
514        )?;
515        let operational_missing_last_mutations: i64 = conn.query_row(
516            r"
517            SELECT count(*)
518            FROM operational_current oc
519            LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
520            WHERE m.id IS NULL
521            ",
522            [],
523            |row| row.get(0),
524        )?;
525
526        // Count missing property FTS rows using the same extraction logic as
527        // write/rebuild. A pure-SQL check would overcount: nodes whose declared
528        // paths legitimately normalize to no values correctly have no row.
529        let missing_property_fts_rows = count_missing_property_fts_rows(&conn)?;
530
531        let mut warnings = Vec::new();
532        if missing_fts_rows > 0 {
533            warnings.push("missing FTS projections detected".to_owned());
534        }
535        if missing_property_fts_rows > 0 {
536            warnings.push("missing property FTS projections detected".to_owned());
537        }
538        if duplicate_active > 0 {
539            warnings.push("duplicate active logical_ids detected".to_owned());
540        }
541        if operational_missing_collections > 0 {
542            warnings.push("operational rows reference missing collections".to_owned());
543        }
544        if operational_missing_last_mutations > 0 {
545            warnings.push("operational current rows reference missing last mutations".to_owned());
546        }
547
548        // FIX(review): was `as usize` — unsound on 32-bit targets, wraps negatives silently.
549        // Options: (A) try_from().unwrap_or(0) — masks corruption, (B) try_from().expect() —
550        // panics on corruption, (C) propagate error. Chose (B) here: a negative count(*)
551        // signals data corruption, and the integrity report would be meaningless anyway.
552        Ok(IntegrityReport {
553            physical_ok: physical_result == "ok",
554            foreign_keys_ok: foreign_key_count == 0,
555            missing_fts_rows: i64_to_usize(missing_fts_rows),
556            missing_property_fts_rows: i64_to_usize(missing_property_fts_rows),
557            duplicate_active_logical_ids: i64_to_usize(duplicate_active),
558            operational_missing_collections: i64_to_usize(operational_missing_collections),
559            operational_missing_last_mutations: i64_to_usize(operational_missing_last_mutations),
560            warnings,
561        })
562    }
563
564    /// # Errors
565    /// Returns [`EngineError`] if the database connection fails or any SQL query fails.
566    #[allow(clippy::too_many_lines)]
567    pub fn check_semantics(&self) -> Result<SemanticReport, EngineError> {
568        let conn = self.connect()?;
569
570        let orphaned_chunks: i64 = conn.query_row(
571            r"
572            SELECT count(*)
573            FROM chunks c
574            WHERE NOT EXISTS (
575                SELECT 1 FROM nodes n
576                WHERE n.logical_id = c.node_logical_id
577            )
578            ",
579            [],
580            |row| row.get(0),
581        )?;
582
583        let null_source_ref_nodes: i64 = conn.query_row(
584            "SELECT count(*) FROM nodes WHERE source_ref IS NULL AND superseded_at IS NULL",
585            [],
586            |row| row.get(0),
587        )?;
588
589        let broken_step_fk: i64 = conn.query_row(
590            r"
591            SELECT count(*) FROM steps s
592            WHERE NOT EXISTS (SELECT 1 FROM runs r WHERE r.id = s.run_id)
593            ",
594            [],
595            |row| row.get(0),
596        )?;
597
598        let broken_action_fk: i64 = conn.query_row(
599            r"
600            SELECT count(*) FROM actions a
601            WHERE NOT EXISTS (SELECT 1 FROM steps s WHERE s.id = a.step_id)
602            ",
603            [],
604            |row| row.get(0),
605        )?;
606
607        let stale_fts_rows: i64 = conn.query_row(
608            r"
609            SELECT count(*) FROM fts_nodes f
610            WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = f.chunk_id)
611            ",
612            [],
613            |row| row.get(0),
614        )?;
615
616        let fts_rows_for_superseded_nodes: i64 = conn.query_row(
617            r"
618            SELECT count(*) FROM fts_nodes f
619            WHERE NOT EXISTS (
620                SELECT 1 FROM nodes n
621                WHERE n.logical_id = f.node_logical_id AND n.superseded_at IS NULL
622            )
623            ",
624            [],
625            |row| row.get(0),
626        )?;
627
628        let (
629            stale_property_fts_rows,
630            orphaned_property_fts_rows,
631            mismatched_kind_property_fts_rows,
632            duplicate_property_fts_rows,
633        ) = count_per_kind_property_fts_issues(&conn)?;
634
635        let drifted_property_fts_rows = count_drifted_property_fts_rows(&conn)?;
636
637        let dangling_edges: i64 = conn.query_row(
638            r"
639            SELECT count(*) FROM edges e
640            WHERE e.superseded_at IS NULL AND (
641                NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.source_logical_id AND n.superseded_at IS NULL)
642                OR
643                NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = e.target_logical_id AND n.superseded_at IS NULL)
644            )
645            ",
646            [],
647            |row| row.get(0),
648        )?;
649
650        let orphaned_supersession_chains: i64 = conn.query_row(
651            r"
652            SELECT count(*) FROM (
653                SELECT logical_id FROM nodes
654                GROUP BY logical_id
655                HAVING count(*) > 0 AND sum(CASE WHEN superseded_at IS NULL THEN 1 ELSE 0 END) = 0
656            )
657            ",
658            [],
659            |row| row.get(0),
660        )?;
661
662        // Vec stale row detection — iterates per-kind vec tables from projection_profiles.
663        #[cfg(feature = "sqlite-vec")]
664        let (stale_vec_rows, vec_rows_for_superseded_nodes): (i64, i64) = {
665            let kinds: Vec<String> =
666                match conn.prepare("SELECT kind FROM projection_profiles WHERE facet = 'vec'") {
667                    Ok(mut stmt) => stmt
668                        .query_map([], |row| row.get(0))
669                        .map_err(EngineError::Sqlite)?
670                        .collect::<Result<Vec<_>, _>>()
671                        .map_err(EngineError::Sqlite)?,
672                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
673                        if msg.contains("no such table: projection_profiles") =>
674                    {
675                        vec![]
676                    }
677                    Err(e) => return Err(EngineError::Sqlite(e)),
678                };
679            let mut stale = 0i64;
680            let mut superseded = 0i64;
681            for kind in &kinds {
682                let table = fathomdb_schema::vec_kind_table_name(kind);
683                let stale_sql = format!(
684                    "SELECT count(*) FROM {table} v \
685                     WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.id = v.chunk_id)"
686                );
687                let superseded_sql = format!(
688                    "SELECT count(*) FROM {table} v \
689                     JOIN chunks c ON c.id = v.chunk_id \
690                     WHERE NOT EXISTS (SELECT 1 FROM nodes n WHERE n.logical_id = c.node_logical_id)"
691                );
692                stale += match conn.query_row(&stale_sql, [], |row| row.get(0)) {
693                    Ok(n) => n,
694                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
695                        if msg.contains("no such table:")
696                            || msg.contains("no such module: vec0") =>
697                    {
698                        0
699                    }
700                    Err(e) => return Err(EngineError::Sqlite(e)),
701                };
702                superseded += match conn.query_row(&superseded_sql, [], |row| row.get(0)) {
703                    Ok(n) => n,
704                    Err(rusqlite::Error::SqliteFailure(_, Some(ref msg)))
705                        if msg.contains("no such table:")
706                            || msg.contains("no such module: vec0") =>
707                    {
708                        0
709                    }
710                    Err(e) => return Err(EngineError::Sqlite(e)),
711                };
712            }
713            (stale, superseded)
714        };
715        #[cfg(not(feature = "sqlite-vec"))]
716        let stale_vec_rows: i64 = 0;
717        #[cfg(not(feature = "sqlite-vec"))]
718        let vec_rows_for_superseded_nodes: i64 = 0;
719        let missing_operational_current_rows: i64 = conn.query_row(
720            r"
721            SELECT count(*)
722            FROM operational_mutations m
723            JOIN operational_collections c
724              ON c.name = m.collection_name
725             AND c.kind = 'latest_state'
726            WHERE m.op_kind = 'put'
727              AND NOT EXISTS (
728                    SELECT 1
729                    FROM operational_mutations newer
730                    WHERE newer.collection_name = m.collection_name
731                      AND newer.record_key = m.record_key
732                      AND newer.mutation_order > m.mutation_order
733                )
734              AND NOT EXISTS (
735                    SELECT 1
736                    FROM operational_current oc
737                    WHERE oc.collection_name = m.collection_name
738                      AND oc.record_key = m.record_key
739                )
740            ",
741            [],
742            |row| row.get(0),
743        )?;
744        let stale_operational_current_rows: i64 = conn.query_row(
745            r"
746            SELECT count(*)
747            FROM operational_current oc
748            JOIN operational_collections c
749              ON c.name = oc.collection_name
750             AND c.kind = 'latest_state'
751            LEFT JOIN operational_mutations m ON m.id = oc.last_mutation_id
752            WHERE m.id IS NULL
753               OR m.collection_name != oc.collection_name
754               OR m.record_key != oc.record_key
755               OR m.op_kind != 'put'
756               OR m.payload_json != oc.payload_json
757               OR EXISTS (
758                    SELECT 1
759                    FROM operational_mutations newer
760                    WHERE newer.collection_name = oc.collection_name
761                      AND newer.record_key = oc.record_key
762                      AND newer.mutation_order > m.mutation_order
763                )
764            ",
765            [],
766            |row| row.get(0),
767        )?;
768        let disabled_collection_mutations: i64 = conn.query_row(
769            r"
770            SELECT count(*)
771            FROM operational_mutations m
772            JOIN operational_collections c ON c.name = m.collection_name
773            WHERE c.disabled_at IS NOT NULL AND m.created_at > c.disabled_at
774            ",
775            [],
776            |row| row.get(0),
777        )?;
778        let orphaned_last_access_metadata_rows: i64 = conn.query_row(
779            r"
780            SELECT count(*)
781            FROM node_access_metadata am
782            WHERE NOT EXISTS (
783                SELECT 1 FROM nodes n WHERE n.logical_id = am.logical_id
784            )
785            ",
786            [],
787            |row| row.get(0),
788        )?;
789
790        let mut warnings = Vec::new();
791        if orphaned_chunks > 0 {
792            warnings.push(format!(
793                "{orphaned_chunks} orphaned chunk(s) with no surviving node history"
794            ));
795        }
796        if null_source_ref_nodes > 0 {
797            warnings.push(format!(
798                "{null_source_ref_nodes} active node(s) with null source_ref"
799            ));
800        }
801        if broken_step_fk > 0 {
802            warnings.push(format!(
803                "{broken_step_fk} step(s) referencing non-existent run"
804            ));
805        }
806        if broken_action_fk > 0 {
807            warnings.push(format!(
808                "{broken_action_fk} action(s) referencing non-existent step"
809            ));
810        }
811        if stale_fts_rows > 0 {
812            warnings.push(format!(
813                "{stale_fts_rows} stale FTS row(s) referencing missing chunk"
814            ));
815        }
816        if fts_rows_for_superseded_nodes > 0 {
817            warnings.push(format!(
818                "{fts_rows_for_superseded_nodes} FTS row(s) for superseded node(s)"
819            ));
820        }
821        if stale_property_fts_rows > 0 {
822            warnings.push(format!(
823                "{stale_property_fts_rows} stale property FTS row(s) for superseded/missing node(s)"
824            ));
825        }
826        if orphaned_property_fts_rows > 0 {
827            warnings.push(format!(
828                "{orphaned_property_fts_rows} orphaned property FTS row(s) for unregistered kind(s)"
829            ));
830        }
831        if mismatched_kind_property_fts_rows > 0 {
832            warnings.push(format!(
833                "{mismatched_kind_property_fts_rows} property FTS row(s) whose kind does not match the active node"
834            ));
835        }
836        if duplicate_property_fts_rows > 0 {
837            warnings.push(format!(
838                "{duplicate_property_fts_rows} active logical ID(s) with duplicate property FTS rows"
839            ));
840        }
841        if drifted_property_fts_rows > 0 {
842            warnings.push(format!(
843                "{drifted_property_fts_rows} property FTS row(s) with stale text_content"
844            ));
845        }
846        if dangling_edges > 0 {
847            warnings.push(format!(
848                "{dangling_edges} active edge(s) with missing endpoint node"
849            ));
850        }
851        if orphaned_supersession_chains > 0 {
852            warnings.push(format!(
853                "{orphaned_supersession_chains} logical_id(s) with all versions superseded"
854            ));
855        }
856        if stale_vec_rows > 0 {
857            warnings.push(format!(
858                "{stale_vec_rows} stale vec row(s) referencing missing chunk"
859            ));
860        }
861        if vec_rows_for_superseded_nodes > 0 {
862            warnings.push(format!(
863                "{vec_rows_for_superseded_nodes} vec row(s) whose node history is missing"
864            ));
865        }
866        if missing_operational_current_rows > 0 {
867            warnings.push(format!(
868                "{missing_operational_current_rows} latest-state key(s) missing operational_current rows"
869            ));
870        }
871        if stale_operational_current_rows > 0 {
872            warnings.push(format!(
873                "{stale_operational_current_rows} stale operational_current row(s)"
874            ));
875        }
876        if disabled_collection_mutations > 0 {
877            warnings.push(format!(
878                "{disabled_collection_mutations} mutation(s) were written after collection disable"
879            ));
880        }
881        if orphaned_last_access_metadata_rows > 0 {
882            warnings.push(format!(
883                "{orphaned_last_access_metadata_rows} last_access metadata row(s) reference missing node history"
884            ));
885        }
886
887        Ok(SemanticReport {
888            orphaned_chunks: i64_to_usize(orphaned_chunks),
889            null_source_ref_nodes: i64_to_usize(null_source_ref_nodes),
890            broken_step_fk: i64_to_usize(broken_step_fk),
891            broken_action_fk: i64_to_usize(broken_action_fk),
892            stale_fts_rows: i64_to_usize(stale_fts_rows),
893            fts_rows_for_superseded_nodes: i64_to_usize(fts_rows_for_superseded_nodes),
894            stale_property_fts_rows: i64_to_usize(stale_property_fts_rows),
895            orphaned_property_fts_rows: i64_to_usize(orphaned_property_fts_rows),
896            mismatched_kind_property_fts_rows: i64_to_usize(mismatched_kind_property_fts_rows),
897            duplicate_property_fts_rows: i64_to_usize(duplicate_property_fts_rows),
898            drifted_property_fts_rows: i64_to_usize(drifted_property_fts_rows),
899            dangling_edges: i64_to_usize(dangling_edges),
900            orphaned_supersession_chains: i64_to_usize(orphaned_supersession_chains),
901            stale_vec_rows: i64_to_usize(stale_vec_rows),
902            vec_rows_for_superseded_nodes: i64_to_usize(vec_rows_for_superseded_nodes),
903            missing_operational_current_rows: i64_to_usize(missing_operational_current_rows),
904            stale_operational_current_rows: i64_to_usize(stale_operational_current_rows),
905            disabled_collection_mutations: i64_to_usize(disabled_collection_mutations),
906            orphaned_last_access_metadata_rows: i64_to_usize(orphaned_last_access_metadata_rows),
907            warnings,
908        })
909    }
910}
911
912/// Count per-kind FTS integrity issues across all registered per-kind tables.
913/// Returns (stale, orphaned, `mismatched_kind`, duplicate) counts.
914///
915/// - Stale: rows in a per-kind table whose node is superseded or missing.
916/// - Orphaned: rows in a per-kind table for a kind with no registered schema.
917/// - Mismatched kind: impossible with per-kind tables (always 0).
918/// - Duplicate: same `node_logical_id` appears more than once in any per-kind table.
919fn count_per_kind_property_fts_issues(
920    conn: &rusqlite::Connection,
921) -> Result<(i64, i64, i64, i64), EngineError> {
922    // Collect all per-kind virtual tables from sqlite_master.
923    // Filter by sql LIKE 'CREATE VIRTUAL TABLE%' to exclude FTS5 shadow tables
924    // (e.g. fts_props_goal_data, fts_props_goal_idx) which share the same prefix.
925    let per_kind_tables: Vec<String> = {
926        let mut stmt = conn.prepare(
927            "SELECT name FROM sqlite_master \
928             WHERE type='table' AND name LIKE 'fts_props_%' \
929             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
930        )?;
931        stmt.query_map([], |r| r.get::<_, String>(0))?
932            .collect::<Result<Vec<_>, _>>()?
933    };
934
935    let registered_kinds: std::collections::HashSet<String> = {
936        let mut stmt = conn.prepare("SELECT kind FROM fts_property_schemas")?;
937        stmt.query_map([], |r| r.get::<_, String>(0))?
938            .collect::<Result<std::collections::HashSet<_>, _>>()?
939    };
940
941    let mut stale = 0i64;
942    let mut orphaned = 0i64;
943    let mut duplicate = 0i64;
944
945    for table in &per_kind_tables {
946        // Stale: rows whose node_logical_id has no active node.
947        let kind_stale: i64 = conn.query_row(
948            &format!(
949                "SELECT count(*) FROM {table} fp \
950                 WHERE NOT EXISTS (\
951                     SELECT 1 FROM nodes n \
952                     WHERE n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL\
953                 )"
954            ),
955            [],
956            |r| r.get(0),
957        )?;
958        stale += kind_stale;
959
960        // Duplicate: same node_logical_id more than once.
961        let kind_dup: i64 = conn.query_row(
962            &format!(
963                "SELECT count(*) FROM (\
964                     SELECT node_logical_id FROM {table} \
965                     GROUP BY node_logical_id HAVING count(*) > 1\
966                 )"
967            ),
968            [],
969            |r| r.get(0),
970        )?;
971        duplicate += kind_dup;
972
973        // Orphaned: this per-kind table has no corresponding schema.
974        // Determine which kind this table corresponds to by checking all registered kinds.
975        let table_has_schema = registered_kinds
976            .iter()
977            .any(|k| fathomdb_schema::fts_kind_table_name(k) == *table);
978        if !table_has_schema {
979            let table_rows: i64 =
980                conn.query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))?;
981            orphaned += table_rows;
982        }
983    }
984
985    // Mismatched kind is always 0 with per-kind tables.
986    Ok((stale, orphaned, 0, duplicate))
987}
988
989/// Count active nodes that should have a property FTS row (extraction yields a value)
990/// but don't. Uses the same extraction logic as write/rebuild to avoid false positives
991/// for nodes whose declared paths legitimately normalize to no values.
992fn count_missing_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
993    let schemas = crate::writer::load_fts_property_schemas(conn)?;
994    if schemas.is_empty() {
995        return Ok(0);
996    }
997
998    let mut missing = 0i64;
999    for (kind, schema) in &schemas {
1000        let table = fathomdb_schema::fts_kind_table_name(kind);
1001        // If the per-kind table doesn't exist yet, all nodes with extractable values are missing.
1002        let table_exists: bool = conn
1003            .query_row(
1004                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1005                [table.as_str()],
1006                |r| r.get::<_, i64>(0),
1007            )
1008            .unwrap_or(0)
1009            > 0;
1010
1011        if table_exists {
1012            let mut stmt = conn.prepare(&format!(
1013                "SELECT n.logical_id, n.properties FROM nodes n \
1014                 WHERE n.kind = ?1 AND n.superseded_at IS NULL \
1015                   AND NOT EXISTS (SELECT 1 FROM {table} fp WHERE fp.node_logical_id = n.logical_id)"
1016            ))?;
1017            let rows = stmt.query_map([kind.as_str()], |row| {
1018                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1019            })?;
1020            for row in rows {
1021                let (_logical_id, properties_str) = row?;
1022                let props: serde_json::Value =
1023                    serde_json::from_str(&properties_str).unwrap_or_default();
1024                if crate::writer::extract_property_fts(&props, schema)
1025                    .0
1026                    .is_some()
1027                {
1028                    missing += 1;
1029                }
1030            }
1031        } else {
1032            // Per-kind table doesn't exist yet — count all nodes with extractable values.
1033            let mut stmt = conn.prepare(
1034                "SELECT n.logical_id, n.properties FROM nodes n \
1035                 WHERE n.kind = ?1 AND n.superseded_at IS NULL",
1036            )?;
1037            let rows = stmt.query_map([kind.as_str()], |row| {
1038                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1039            })?;
1040            for row in rows {
1041                let (_logical_id, properties_str) = row?;
1042                let props: serde_json::Value =
1043                    serde_json::from_str(&properties_str).unwrap_or_default();
1044                if crate::writer::extract_property_fts(&props, schema)
1045                    .0
1046                    .is_some()
1047                {
1048                    missing += 1;
1049                }
1050            }
1051        }
1052    }
1053    Ok(missing)
1054}
1055
1056/// Count property FTS rows whose persisted text has drifted from the current
1057/// canonical value computed by the FTS extractors. Handles both per-kind
1058/// table shapes:
1059///
1060/// - Non-weighted (legacy / default): single `text_content` column per row.
1061///   Compared against `extract_property_fts(...)`.
1062/// - Weighted: one column per registered path (named by `fts_column_name`).
1063///   Compared against `extract_property_fts_columns(...)`; any per-column
1064///   mismatch counts the row as drifted exactly once.
1065///
1066/// This catches:
1067/// - rows whose text no longer matches the current node properties and schema
1068/// - rows that should have been removed (extraction now yields no value)
1069fn count_drifted_property_fts_rows(conn: &rusqlite::Connection) -> Result<i64, EngineError> {
1070    let schemas = crate::writer::load_fts_property_schemas(conn)?;
1071    if schemas.is_empty() {
1072        return Ok(0);
1073    }
1074
1075    let mut drifted = 0i64;
1076    for (kind, schema) in &schemas {
1077        let table = fathomdb_schema::fts_kind_table_name(kind);
1078        // If the per-kind table doesn't exist, no rows to check.
1079        let table_exists: bool = conn
1080            .query_row(
1081                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1",
1082                [table.as_str()],
1083                |r| r.get::<_, i64>(0),
1084            )
1085            .unwrap_or(0)
1086            > 0;
1087        if !table_exists {
1088            continue;
1089        }
1090
1091        // Dispatch on the persisted schema, not on the live table columns.
1092        // Registration (`register_fts_property_schema_with_entries` in
1093        // `admin/fts.rs`) chooses the per-kind table layout by checking
1094        // whether any entry carries a weight; mirroring that invariant here
1095        // keeps the two code paths in lockstep. A live-column probe would
1096        // misclassify a weighted schema whose registered paths happened to
1097        // include literal `$.text_content` (collapsing to a `text_content`
1098        // column), silently running the non-weighted comparator against
1099        // per-column storage.
1100        drifted += if schema.is_weighted() {
1101            count_drifted_weighted(conn, kind, &table, schema)?
1102        } else {
1103            count_drifted_non_weighted(conn, kind, &table, schema)?
1104        };
1105    }
1106    Ok(drifted)
1107}
1108
1109/// Drift count for the non-weighted (single `text_content` column) per-kind
1110/// FTS table. Preserves the historic query shape.
1111fn count_drifted_non_weighted(
1112    conn: &rusqlite::Connection,
1113    kind: &str,
1114    table: &str,
1115    schema: &crate::writer::PropertyFtsSchema,
1116) -> Result<i64, EngineError> {
1117    let mut drifted = 0i64;
1118    let mut stmt = conn.prepare(&format!(
1119        "SELECT fp.node_logical_id, fp.text_content, n.properties \
1120         FROM {table} fp \
1121         JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1122         WHERE n.kind = ?1"
1123    ))?;
1124    let rows = stmt.query_map([kind], |row| {
1125        Ok((
1126            row.get::<_, String>(0)?,
1127            row.get::<_, String>(1)?,
1128            row.get::<_, String>(2)?,
1129        ))
1130    })?;
1131    for row in rows {
1132        let (_logical_id, stored_text, properties_str) = row?;
1133        let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1134        let (expected, _positions, _stats) = crate::writer::extract_property_fts(&props, schema);
1135        match expected {
1136            Some(text) if text == stored_text => {}
1137            _ => drifted += 1,
1138        }
1139    }
1140    Ok(drifted)
1141}
1142
1143/// Drift count for a weighted (per-column) per-kind FTS table. One column per
1144/// registered path, named via `fts_column_name`. A row counts as drifted if
1145/// any per-column value differs from the canonical extraction.
1146fn count_drifted_weighted(
1147    conn: &rusqlite::Connection,
1148    kind: &str,
1149    table: &str,
1150    schema: &crate::writer::PropertyFtsSchema,
1151) -> Result<i64, EngineError> {
1152    // Column names come from `fts_column_name` and are restricted to
1153    // `[a-zA-Z0-9_]`, so direct interpolation + quoted identifiers are safe.
1154    let columns: Vec<String> = schema
1155        .paths
1156        .iter()
1157        .map(|entry| {
1158            let is_recursive = matches!(entry.mode, crate::writer::PropertyPathMode::Recursive);
1159            fathomdb_schema::fts_column_name(&entry.path, is_recursive)
1160        })
1161        .collect();
1162    if columns.is_empty() {
1163        // Weighted table with no registered columns is impossible in
1164        // practice (register always writes specs), but guard defensively:
1165        // nothing to compare against, so nothing drifts.
1166        return Ok(0);
1167    }
1168
1169    let select_cols: String = columns
1170        .iter()
1171        .map(|c| format!("fp.\"{c}\""))
1172        .collect::<Vec<_>>()
1173        .join(", ");
1174    let sql = format!(
1175        "SELECT fp.node_logical_id, {select_cols}, n.properties \
1176         FROM {table} fp \
1177         JOIN nodes n ON n.logical_id = fp.node_logical_id AND n.superseded_at IS NULL \
1178         WHERE n.kind = ?1"
1179    );
1180    let mut stmt = conn.prepare(&sql)?;
1181    // Column layout in the result set:
1182    //   [0] node_logical_id
1183    //   [1..1+columns.len()] per-spec stored text, in `columns` order
1184    //   [last] properties JSON
1185    let props_col_idx = columns.len() + 1;
1186    let rows = stmt.query_map([kind], |row| {
1187        let mut stored: Vec<String> = Vec::with_capacity(columns.len());
1188        for i in 0..columns.len() {
1189            stored.push(row.get::<_, String>(i + 1)?);
1190        }
1191        let properties: String = row.get(props_col_idx)?;
1192        Ok((stored, properties))
1193    })?;
1194
1195    let mut drifted = 0i64;
1196    for row in rows {
1197        let (stored, properties_str) = row?;
1198        let props: serde_json::Value = serde_json::from_str(&properties_str).unwrap_or_default();
1199        let expected = crate::writer::extract_property_fts_columns(&props, schema);
1200        // `extract_property_fts_columns` returns entries in schema-path order,
1201        // which matches `columns`. Compare per-column; any mismatch counts
1202        // the row as drifted exactly once.
1203        let row_drifted = if expected.len() == stored.len() {
1204            expected
1205                .iter()
1206                .zip(stored.iter())
1207                .any(|((_name, exp_text), stored_text)| exp_text != stored_text)
1208        } else {
1209            true
1210        };
1211        if row_drifted {
1212            drifted += 1;
1213        }
1214    }
1215    Ok(drifted)
1216}
1217
1218/// Convert a non-negative i64 count to usize, panicking on negative values
1219/// which would indicate data corruption.
1220#[allow(clippy::expect_used)]
1221pub(super) fn i64_to_usize(val: i64) -> usize {
1222    usize::try_from(val).expect("count(*) must be non-negative")
1223}
1224
1225pub(super) fn persist_simple_provenance_event(
1226    conn: &rusqlite::Connection,
1227    event_type: &str,
1228    subject: &str,
1229    metadata: Option<serde_json::Value>,
1230) -> Result<(), EngineError> {
1231    let metadata_json = metadata.map(|value| value.to_string()).unwrap_or_default();
1232    conn.execute(
1233        "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1234        rusqlite::params![new_id(), event_type, subject, metadata_json],
1235    )?;
1236    Ok(())
1237}
1238
1239pub(super) fn rebuild_operational_current_rows(
1240    tx: &rusqlite::Transaction<'_>,
1241    collections: &[String],
1242) -> Result<usize, EngineError> {
1243    let mut rebuilt_rows = 0usize;
1244    clear_operational_current_rows(tx, collections)?;
1245    let mut ins_current = tx.prepare_cached(
1246        "INSERT INTO operational_current \
1247         (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1248         VALUES (?1, ?2, ?3, ?4, ?5)",
1249    )?;
1250
1251    for collection in collections {
1252        let mut stmt = tx.prepare(
1253            "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
1254             FROM operational_mutations \
1255             WHERE collection_name = ?1 \
1256             ORDER BY record_key, mutation_order",
1257        )?;
1258        let mut latest_by_key: std::collections::HashMap<String, Option<(String, i64, String)>> =
1259            std::collections::HashMap::new();
1260        let rows = stmt.query_map([collection], operational::map_operational_mutation_row)?;
1261        for row in rows {
1262            let mutation = row?;
1263            match mutation.op_kind.as_str() {
1264                "put" => {
1265                    latest_by_key.insert(
1266                        mutation.record_key,
1267                        Some((mutation.payload_json, mutation.created_at, mutation.id)),
1268                    );
1269                }
1270                "delete" => {
1271                    latest_by_key.insert(mutation.record_key, None);
1272                }
1273                _ => {}
1274            }
1275        }
1276
1277        for (record_key, state) in latest_by_key {
1278            if let Some((payload_json, updated_at, last_mutation_id)) = state {
1279                ins_current.execute(rusqlite::params![
1280                    collection,
1281                    record_key,
1282                    payload_json,
1283                    updated_at,
1284                    last_mutation_id,
1285                ])?;
1286                rebuilt_rows += 1;
1287            }
1288        }
1289    }
1290
1291    drop(ins_current);
1292    Ok(rebuilt_rows)
1293}
1294
1295pub(super) fn clear_operational_current_rows(
1296    tx: &rusqlite::Transaction<'_>,
1297    collections: &[String],
1298) -> Result<(), EngineError> {
1299    let mut delete_current =
1300        tx.prepare_cached("DELETE FROM operational_current WHERE collection_name = ?1")?;
1301    let mut delete_secondary_current = tx.prepare_cached(
1302        "DELETE FROM operational_secondary_index_entries \
1303         WHERE collection_name = ?1 AND subject_kind = 'current'",
1304    )?;
1305    for collection in collections {
1306        delete_secondary_current.execute([collection])?;
1307        delete_current.execute([collection])?;
1308    }
1309    drop(delete_secondary_current);
1310    drop(delete_current);
1311    Ok(())
1312}
1313
1314#[cfg(test)]
1315#[allow(clippy::expect_used)]
1316mod tests {
1317    use std::fs;
1318    use std::sync::Arc;
1319
1320    use fathomdb_schema::SchemaManager;
1321    use tempfile::NamedTempFile;
1322
1323    use super::{
1324        AdminService, FtsPropertyPathMode, FtsPropertyPathSpec, SafeExportOptions,
1325        VectorRegenerationConfig,
1326    };
1327    use crate::embedder::{BatchEmbedder, EmbedderError, QueryEmbedder, QueryEmbedderIdentity};
1328    use crate::projection::ProjectionTarget;
1329    use crate::sqlite;
1330    use crate::{EngineError, OperationalCollectionKind, OperationalRegisterRequest};
1331
1332    #[cfg(feature = "sqlite-vec")]
1333    use crate::{ExecutionCoordinator, TelemetryCounters};
1334
1335    #[cfg(feature = "sqlite-vec")]
1336    use fathomdb_query::QueryBuilder;
1337
1338    #[cfg(feature = "sqlite-vec")]
1339    use super::load_vector_regeneration_config;
1340
1341    /// In-process embedder used by the regeneration test suite. The
1342    /// vector is parameterized so individual tests can distinguish which
1343    /// embedder produced which profile row.
1344    #[derive(Debug)]
1345    #[allow(dead_code)]
1346    struct TestEmbedder {
1347        identity: QueryEmbedderIdentity,
1348        vector: Vec<f32>,
1349    }
1350
1351    #[allow(dead_code)]
1352    impl TestEmbedder {
1353        fn new(model: &str, dimension: usize) -> Self {
1354            Self {
1355                identity: QueryEmbedderIdentity {
1356                    model_identity: model.to_owned(),
1357                    model_version: "1.0.0".to_owned(),
1358                    dimension,
1359                    normalization_policy: "l2".to_owned(),
1360                },
1361                vector: vec![1.0; dimension],
1362            }
1363        }
1364    }
1365
1366    impl QueryEmbedder for TestEmbedder {
1367        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1368            Ok(self.vector.clone())
1369        }
1370        fn identity(&self) -> QueryEmbedderIdentity {
1371            self.identity.clone()
1372        }
1373        fn max_tokens(&self) -> usize {
1374            512
1375        }
1376    }
1377
1378    impl BatchEmbedder for TestEmbedder {
1379        fn batch_embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, EmbedderError> {
1380            Ok(texts.iter().map(|_| self.vector.clone()).collect())
1381        }
1382        fn identity(&self) -> QueryEmbedderIdentity {
1383            self.identity.clone()
1384        }
1385        fn max_tokens(&self) -> usize {
1386            512
1387        }
1388    }
1389
1390    /// Embedder that always fails — used to exercise the post-request
1391    /// failure audit path without the complexity of subprocess machinery.
1392    #[derive(Debug)]
1393    #[allow(dead_code)]
1394    struct FailingEmbedder {
1395        identity: QueryEmbedderIdentity,
1396    }
1397
1398    impl QueryEmbedder for FailingEmbedder {
1399        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
1400            Err(EmbedderError::Failed("test failure".to_owned()))
1401        }
1402        fn identity(&self) -> QueryEmbedderIdentity {
1403            self.identity.clone()
1404        }
1405        fn max_tokens(&self) -> usize {
1406            512
1407        }
1408    }
1409
1410    #[allow(dead_code)]
1411    #[cfg(unix)]
1412    fn set_file_mode(path: &std::path::Path, mode: u32) {
1413        use std::os::unix::fs::PermissionsExt;
1414
1415        let mut permissions = fs::metadata(path).expect("script metadata").permissions();
1416        permissions.set_mode(mode);
1417        fs::set_permissions(path, permissions).expect("chmod");
1418    }
1419
1420    #[allow(dead_code)]
1421    #[cfg(not(unix))]
1422    fn set_file_mode(_path: &std::path::Path, _mode: u32) {}
1423    fn setup() -> (NamedTempFile, AdminService) {
1424        let db = NamedTempFile::new().expect("temp file");
1425        let schema = Arc::new(SchemaManager::new());
1426        {
1427            let conn = sqlite::open_connection(db.path()).expect("connection");
1428            schema.bootstrap(&conn).expect("bootstrap");
1429        }
1430        let service = AdminService::new(db.path(), Arc::clone(&schema));
1431        (db, service)
1432    }
1433
1434    #[test]
1435    fn check_integrity_includes_active_uniqueness_count() {
1436        let (_db, service) = setup();
1437        let report = service.check_integrity().expect("integrity check");
1438        assert_eq!(report.duplicate_active_logical_ids, 0);
1439        assert_eq!(report.operational_missing_collections, 0);
1440        assert_eq!(report.operational_missing_last_mutations, 0);
1441    }
1442
1443    #[test]
1444    fn trace_source_returns_node_logical_ids() {
1445        let (db, service) = setup();
1446        {
1447            let conn = sqlite::open_connection(db.path()).expect("conn");
1448            conn.execute(
1449                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1450                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 'source-1')",
1451                [],
1452            )
1453            .expect("insert node");
1454        }
1455        let report = service.trace_source("source-1").expect("trace");
1456        assert_eq!(report.node_rows, 1);
1457        assert_eq!(report.node_logical_ids, vec!["lg1"]);
1458    }
1459
1460    #[test]
1461    fn trace_source_includes_operational_mutations() {
1462        let (db, service) = setup();
1463        {
1464            let conn = sqlite::open_connection(db.path()).expect("conn");
1465            conn.execute(
1466                "INSERT INTO operational_collections \
1467                 (name, kind, schema_json, retention_json, format_version, created_at) \
1468                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1469                [],
1470            )
1471            .expect("insert collection");
1472            conn.execute(
1473                "INSERT INTO operational_mutations \
1474                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1475                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"ok\"}', 'source-1', 100, 1)",
1476                [],
1477            )
1478            .expect("insert mutation");
1479        }
1480
1481        let report = service.trace_source("source-1").expect("trace");
1482        assert_eq!(report.operational_mutation_rows, 1);
1483        assert_eq!(report.operational_mutation_ids, vec!["m1"]);
1484    }
1485
1486    #[test]
1487    fn excise_source_restores_prior_active_node() {
1488        let (db, service) = setup();
1489        {
1490            let conn = sqlite::open_connection(db.path()).expect("conn");
1491            conn.execute(
1492                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1493                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
1494                [],
1495            )
1496            .expect("insert v1 superseded");
1497            conn.execute(
1498                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1499                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
1500                [],
1501            )
1502            .expect("insert v2 active");
1503        }
1504        service.excise_source("source-2").expect("excise");
1505        {
1506            let conn = sqlite::open_connection(db.path()).expect("conn");
1507            let active_row_id: String = conn
1508                .query_row(
1509                    "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
1510                    [],
1511                    |row| row.get(0),
1512                )
1513                .expect("active row exists after excise");
1514            assert_eq!(active_row_id, "r1");
1515        }
1516    }
1517
1518    #[test]
1519    fn excise_source_deletes_operational_mutations_and_repairs_latest_state_current() {
1520        let (db, service) = setup();
1521        {
1522            let conn = sqlite::open_connection(db.path()).expect("conn");
1523            conn.execute(
1524                "INSERT INTO operational_collections \
1525                 (name, kind, schema_json, retention_json, format_version, created_at) \
1526                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
1527                [],
1528            )
1529            .expect("insert collection");
1530            conn.execute(
1531                "INSERT INTO operational_mutations \
1532                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1533                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'source-1', 100, 1)",
1534                [],
1535            )
1536            .expect("insert prior mutation");
1537            conn.execute(
1538                "INSERT INTO operational_mutations \
1539                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1540                 VALUES ('m2', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'source-2', 200, 2)",
1541                [],
1542            )
1543            .expect("insert excised mutation");
1544            conn.execute(
1545                "INSERT INTO operational_current \
1546                 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1547                 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 200, 'm2')",
1548                [],
1549            )
1550            .expect("insert current row");
1551        }
1552
1553        let traced = service
1554            .trace_source("source-2")
1555            .expect("trace before excise");
1556        assert_eq!(traced.operational_mutation_rows, 1);
1557        assert_eq!(traced.operational_mutation_ids, vec!["m2"]);
1558
1559        let excised = service.excise_source("source-2").expect("excise");
1560        assert_eq!(excised.operational_mutation_rows, 0);
1561        assert!(excised.operational_mutation_ids.is_empty());
1562
1563        {
1564            let conn = sqlite::open_connection(db.path()).expect("conn");
1565            let remaining: i64 = conn
1566                .query_row(
1567                    "SELECT count(*) FROM operational_mutations WHERE source_ref = 'source-2'",
1568                    [],
1569                    |row| row.get(0),
1570                )
1571                .expect("remaining count");
1572            assert_eq!(remaining, 0);
1573
1574            let current: (String, String) = conn
1575                .query_row(
1576                    "SELECT payload_json, last_mutation_id FROM operational_current \
1577                     WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
1578                    [],
1579                    |row| Ok((row.get(0)?, row.get(1)?)),
1580                )
1581                .expect("rebuilt current row");
1582            assert_eq!(current.0, "{\"status\":\"old\"}");
1583            assert_eq!(current.1, "m1");
1584        }
1585    }
1586
1587    #[test]
1588    fn restore_logical_id_reestablishes_last_pre_retire_content_and_attached_edges() {
1589        let (db, service) = setup();
1590        {
1591            let conn = sqlite::open_connection(db.path()).expect("conn");
1592            conn.execute(
1593                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1594                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1595                [],
1596            )
1597            .expect("insert node");
1598            conn.execute(
1599                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1600                 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1601                [],
1602            )
1603            .expect("insert target node");
1604            conn.execute(
1605                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1606                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1607                [],
1608            )
1609            .expect("insert chunk");
1610            conn.execute(
1611                "INSERT INTO edges \
1612                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1613                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1614                [],
1615            )
1616            .expect("insert edge");
1617            conn.execute(
1618                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1619                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1620                [],
1621            )
1622            .expect("insert node retire event");
1623            conn.execute(
1624                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1625                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
1626                [],
1627            )
1628            .expect("insert edge retire event");
1629            conn.execute(
1630                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1631                [],
1632            )
1633            .expect("retire node");
1634            conn.execute(
1635                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
1636                [],
1637            )
1638            .expect("retire edge");
1639            conn.execute("DELETE FROM fts_nodes", [])
1640                .expect("clear fts");
1641        }
1642
1643        let report = service.restore_logical_id("doc-1").expect("restore");
1644        assert_eq!(report.logical_id, "doc-1");
1645        assert!(!report.was_noop);
1646        assert_eq!(report.restored_node_rows, 1);
1647        assert_eq!(report.restored_edge_rows, 1);
1648        assert_eq!(report.restored_chunk_rows, 1);
1649        assert_eq!(report.restored_fts_rows, 1);
1650
1651        let conn = sqlite::open_connection(db.path()).expect("conn");
1652        let active_node_count: i64 = conn
1653            .query_row(
1654                "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1655                [],
1656                |row| row.get(0),
1657            )
1658            .expect("active node count");
1659        assert_eq!(active_node_count, 1);
1660        let active_edge_count: i64 = conn
1661            .query_row(
1662                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1663                [],
1664                |row| row.get(0),
1665            )
1666            .expect("active edge count");
1667        assert_eq!(active_edge_count, 1);
1668        let fts_count: i64 = conn
1669            .query_row(
1670                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
1671                [],
1672                |row| row.get(0),
1673            )
1674            .expect("fts count");
1675        assert_eq!(fts_count, 1);
1676    }
1677
1678    #[test]
1679    fn restore_logical_id_restores_edges_retired_after_the_node_retire_event() {
1680        let (db, service) = setup();
1681        {
1682            let conn = sqlite::open_connection(db.path()).expect("conn");
1683            conn.execute(
1684                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1685                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
1686                [],
1687            )
1688            .expect("insert node");
1689            conn.execute(
1690                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
1691                 VALUES ('node-row-topic', 'topic-1', 'Topic', '{}', 100, 'seed')",
1692                [],
1693            )
1694            .expect("insert target node");
1695            conn.execute(
1696                "INSERT INTO edges \
1697                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1698                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 'seed')",
1699                [],
1700            )
1701            .expect("insert edge");
1702            conn.execute(
1703                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1704                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1705                [],
1706            )
1707            .expect("insert node retire event");
1708            conn.execute(
1709                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1710                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 201, '')",
1711                [],
1712            )
1713            .expect("insert edge retire event");
1714            conn.execute(
1715                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
1716                [],
1717            )
1718            .expect("retire node");
1719            conn.execute(
1720                "UPDATE edges SET superseded_at = 201 WHERE logical_id = 'edge-1'",
1721                [],
1722            )
1723            .expect("retire edge");
1724        }
1725
1726        let report = service.restore_logical_id("doc-1").expect("restore");
1727        assert_eq!(report.restored_edge_rows, 1);
1728
1729        let conn = sqlite::open_connection(db.path()).expect("conn");
1730        let active_edge_count: i64 = conn
1731            .query_row(
1732                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
1733                [],
1734                |row| row.get(0),
1735            )
1736            .expect("active edge count");
1737        assert_eq!(active_edge_count, 1);
1738    }
1739
1740    #[test]
1741    fn restore_logical_id_prefers_latest_retired_revision_when_timestamps_tie() {
1742        let (db, service) = setup();
1743        {
1744            let conn = sqlite::open_connection(db.path()).expect("conn");
1745            conn.execute(
1746                "INSERT INTO nodes \
1747                 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1748                 VALUES ('node-row-older', 'doc-1', 'Document', '{\"title\":\"older\"}', 100, 200, 'forget-1')",
1749                [],
1750            )
1751            .expect("insert older retired node");
1752            conn.execute(
1753                "INSERT INTO nodes \
1754                 (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1755                 VALUES ('node-row-newer', 'doc-1', 'Document', '{\"title\":\"newer\"}', 100, 200, 'forget-1')",
1756                [],
1757            )
1758            .expect("insert newer retired node");
1759            conn.execute(
1760                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1761                 VALUES ('evt-retire-older', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1762                [],
1763            )
1764            .expect("insert older retire event");
1765            conn.execute(
1766                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1767                 VALUES ('evt-retire-newer', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1768                [],
1769            )
1770            .expect("insert newer retire event");
1771        }
1772
1773        let report = service.restore_logical_id("doc-1").expect("restore");
1774
1775        assert!(!report.was_noop);
1776        let conn = sqlite::open_connection(db.path()).expect("conn");
1777        let active_row: (String, String) = conn
1778            .query_row(
1779                "SELECT row_id, properties FROM nodes \
1780                 WHERE logical_id = 'doc-1' AND superseded_at IS NULL",
1781                [],
1782                |row| Ok((row.get(0)?, row.get(1)?)),
1783            )
1784            .expect("restored active row");
1785        assert_eq!(active_row.0, "node-row-newer");
1786        assert_eq!(active_row.1, "{\"title\":\"newer\"}");
1787    }
1788
1789    #[test]
1790    fn purge_logical_id_removes_retired_content_and_records_tombstone() {
1791        let (db, service) = setup();
1792        {
1793            let conn = sqlite::open_connection(db.path()).expect("conn");
1794            conn.execute(
1795                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1796                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1797                [],
1798            )
1799            .expect("insert retired node");
1800            conn.execute(
1801                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1802                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1803                [],
1804            )
1805            .expect("insert chunk");
1806            conn.execute(
1807                "INSERT INTO edges \
1808                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, superseded_at, source_ref) \
1809                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'topic-1', 'TAGGED', '{}', 100, 200, 'seed')",
1810                [],
1811            )
1812            .expect("insert retired edge");
1813            conn.execute(
1814                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
1815                 VALUES ('chunk-1', 'doc-1', 'Document', 'budget narrative')",
1816                [],
1817            )
1818            .expect("insert fts");
1819        }
1820
1821        let report = service.purge_logical_id("doc-1").expect("purge");
1822        assert_eq!(report.logical_id, "doc-1");
1823        assert!(!report.was_noop);
1824        assert_eq!(report.deleted_node_rows, 1);
1825        assert_eq!(report.deleted_edge_rows, 1);
1826        assert_eq!(report.deleted_chunk_rows, 1);
1827        assert_eq!(report.deleted_fts_rows, 1);
1828
1829        let conn = sqlite::open_connection(db.path()).expect("conn");
1830        let remaining_nodes: i64 = conn
1831            .query_row(
1832                "SELECT count(*) FROM nodes WHERE logical_id = 'doc-1'",
1833                [],
1834                |row| row.get(0),
1835            )
1836            .expect("remaining nodes");
1837        assert_eq!(remaining_nodes, 0);
1838        let remaining_edges: i64 = conn
1839            .query_row(
1840                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1'",
1841                [],
1842                |row| row.get(0),
1843            )
1844            .expect("remaining edges");
1845        assert_eq!(remaining_edges, 0);
1846        let remaining_chunks: i64 = conn
1847            .query_row(
1848                "SELECT count(*) FROM chunks WHERE id = 'chunk-1'",
1849                [],
1850                |row| row.get(0),
1851            )
1852            .expect("remaining chunks");
1853        assert_eq!(remaining_chunks, 0);
1854        let purge_events: i64 = conn
1855            .query_row(
1856                "SELECT count(*) FROM provenance_events WHERE event_type = 'purge_logical_id' AND subject = 'doc-1'",
1857                [],
1858                |row| row.get(0),
1859            )
1860            .expect("purge events");
1861        assert_eq!(purge_events, 1);
1862    }
1863
1864    #[test]
1865    fn check_semantics_accepts_preserved_retired_chunks() {
1866        let (db, service) = setup();
1867        {
1868            let conn = sqlite::open_connection(db.path()).expect("conn");
1869            conn.execute(
1870                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1871                 VALUES ('node-row-1', 'doc-1', 'Document', '{}', 100, 200, 'seed')",
1872                [],
1873            )
1874            .expect("insert retired node");
1875            conn.execute(
1876                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1877                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1878                [],
1879            )
1880            .expect("insert chunk");
1881        }
1882
1883        let report = service.check_semantics().expect("semantics");
1884        assert_eq!(report.orphaned_chunks, 0);
1885    }
1886
1887    #[test]
1888    fn check_semantics_detects_missing_retired_node_history_for_preserved_chunks() {
1889        let (db, service) = setup();
1890        {
1891            let conn = sqlite::open_connection(db.path()).expect("conn");
1892            conn.execute(
1893                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1894                 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1895                [],
1896            )
1897            .expect("insert orphaned chunk");
1898        }
1899
1900        let report = service.check_semantics().expect("semantics");
1901        assert_eq!(report.orphaned_chunks, 1);
1902    }
1903
1904    #[cfg(feature = "sqlite-vec")]
1905    #[test]
1906    fn check_semantics_detects_missing_retired_node_history_for_preserved_vec_rows() {
1907        let (db, service) = setup();
1908        {
1909            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1910            service
1911                .schema_manager
1912                .ensure_vec_kind_profile(&conn, "Doc", 4)
1913                .expect("ensure vec kind profile");
1914            conn.execute(
1915                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1916                 VALUES ('chunk-1', 'ghost-doc', 'budget narrative', 100)",
1917                [],
1918            )
1919            .expect("insert orphaned chunk");
1920            conn.execute(
1921                "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1922                [],
1923            )
1924            .expect("insert vec row");
1925        }
1926
1927        let report = service.check_semantics().expect("semantics");
1928        assert_eq!(report.orphaned_chunks, 1);
1929        assert_eq!(report.vec_rows_for_superseded_nodes, 1);
1930    }
1931
1932    #[cfg(feature = "sqlite-vec")]
1933    #[test]
1934    fn restore_logical_id_reestablishes_vector_search_without_reingest() {
1935        let (db, service) = setup();
1936        {
1937            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1938            service
1939                .schema_manager
1940                .ensure_vec_kind_profile(&conn, "Document", 4)
1941                .expect("ensure vec kind profile");
1942            conn.execute(
1943                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
1944                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
1945                [],
1946            )
1947            .expect("insert retired node");
1948            conn.execute(
1949                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
1950                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
1951                [],
1952            )
1953            .expect("insert chunk");
1954            conn.execute(
1955                "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
1956                [],
1957            )
1958            .expect("insert vec row");
1959            conn.execute(
1960                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
1961                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
1962                [],
1963            )
1964            .expect("insert retire event");
1965        }
1966
1967        let report = service.restore_logical_id("doc-1").expect("restore");
1968        assert_eq!(report.restored_vec_rows, 1);
1969
1970        let coordinator = ExecutionCoordinator::open(
1971            db.path(),
1972            Arc::new(SchemaManager::new()),
1973            Some(4),
1974            1,
1975            Arc::new(TelemetryCounters::default()),
1976            None,
1977        )
1978        .expect("coordinator");
1979        let compiled = QueryBuilder::nodes("Document")
1980            .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
1981            .compile()
1982            .expect("compile");
1983        let rows = coordinator
1984            .execute_compiled_read(&compiled)
1985            .expect("vector read");
1986        assert!(
1987            rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
1988            "restore should make the preserved vec row visible again without re-ingest"
1989        );
1990    }
1991
1992    #[cfg(feature = "sqlite-vec")]
1993    #[test]
1994    fn purge_logical_id_deletes_vec_rows_for_retired_content() {
1995        let (db, service) = setup();
1996        {
1997            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
1998            service
1999                .schema_manager
2000                .ensure_vec_kind_profile(&conn, "Document", 4)
2001                .expect("ensure vec kind profile");
2002            conn.execute(
2003                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
2004                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 200, 'seed')",
2005                [],
2006            )
2007            .expect("insert retired node");
2008            conn.execute(
2009                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2010                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2011                [],
2012            )
2013            .expect("insert chunk");
2014            conn.execute(
2015                "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
2016                [],
2017            )
2018            .expect("insert vec row");
2019        }
2020
2021        let report = service.purge_logical_id("doc-1").expect("purge");
2022        assert_eq!(report.deleted_vec_rows, 1);
2023
2024        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2025        let vec_count: i64 = conn
2026            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
2027            .expect("vec count");
2028        assert_eq!(vec_count, 0);
2029    }
2030
2031    #[cfg(feature = "sqlite-vec")]
2032    #[test]
2033    fn restore_logical_id_restores_visibility_of_regenerated_vectors() {
2034        let (db, service) = setup();
2035
2036        {
2037            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2038            conn.execute(
2039                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
2040                 VALUES ('node-row-1', 'doc-1', 'Document', '{\"title\":\"Budget\"}', 100, 'seed')",
2041                [],
2042            )
2043            .expect("insert node");
2044            conn.execute(
2045                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
2046                 VALUES ('chunk-1', 'doc-1', 'budget narrative', 100)",
2047                [],
2048            )
2049            .expect("insert chunk");
2050        }
2051
2052        let embedder = TestEmbedder::new("test-model", 4);
2053        service
2054            .regenerate_vector_embeddings(
2055                &embedder,
2056                &VectorRegenerationConfig {
2057                    kind: "Document".to_owned(),
2058                    profile: "default".to_owned(),
2059                    chunking_policy: "per_chunk".to_owned(),
2060                    preprocessing_policy: "trim".to_owned(),
2061                },
2062            )
2063            .expect("regenerate");
2064
2065        {
2066            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
2067            conn.execute(
2068                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
2069                 VALUES ('evt-node-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
2070                [],
2071            )
2072            .expect("insert retire event");
2073            conn.execute(
2074                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
2075                [],
2076            )
2077            .expect("retire node");
2078        }
2079
2080        let report = service.restore_logical_id("doc-1").expect("restore");
2081        assert_eq!(report.restored_vec_rows, 1);
2082
2083        let coordinator = ExecutionCoordinator::open(
2084            db.path(),
2085            Arc::new(SchemaManager::new()),
2086            Some(4),
2087            1,
2088            Arc::new(TelemetryCounters::default()),
2089            None,
2090        )
2091        .expect("coordinator");
2092        let compiled = QueryBuilder::nodes("Document")
2093            .vector_search("[0.0, 0.0, 0.0, 0.0]", 5)
2094            .compile()
2095            .expect("compile");
2096        let rows = coordinator
2097            .execute_compiled_read(&compiled)
2098            .expect("vector read");
2099        assert!(
2100            rows.nodes.iter().any(|row| row.logical_id == "doc-1"),
2101            "restored logical_id should become visible through regenerated vectors"
2102        );
2103    }
2104
2105    #[test]
2106    fn check_semantics_clean_db_returns_zeros() {
2107        let (_db, service) = setup();
2108        let report = service.check_semantics().expect("semantics check");
2109        assert_eq!(report.orphaned_chunks, 0);
2110        assert_eq!(report.null_source_ref_nodes, 0);
2111        assert_eq!(report.broken_step_fk, 0);
2112        assert_eq!(report.broken_action_fk, 0);
2113        assert_eq!(report.stale_fts_rows, 0);
2114        assert_eq!(report.fts_rows_for_superseded_nodes, 0);
2115        assert_eq!(report.dangling_edges, 0);
2116        assert_eq!(report.orphaned_supersession_chains, 0);
2117        assert_eq!(report.stale_vec_rows, 0);
2118        assert_eq!(report.vec_rows_for_superseded_nodes, 0);
2119        assert_eq!(report.missing_operational_current_rows, 0);
2120        assert_eq!(report.stale_operational_current_rows, 0);
2121        assert_eq!(report.disabled_collection_mutations, 0);
2122        assert_eq!(report.mismatched_kind_property_fts_rows, 0);
2123        assert_eq!(report.duplicate_property_fts_rows, 0);
2124        assert_eq!(report.drifted_property_fts_rows, 0);
2125        assert!(report.warnings.is_empty());
2126    }
2127
2128    #[test]
2129    fn register_operational_collection_persists_and_emits_provenance() {
2130        let (db, service) = setup();
2131        let record = service
2132            .register_operational_collection(&OperationalRegisterRequest {
2133                name: "connector_health".to_owned(),
2134                kind: OperationalCollectionKind::LatestState,
2135                schema_json: "{}".to_owned(),
2136                retention_json: "{}".to_owned(),
2137                filter_fields_json: "[]".to_owned(),
2138                validation_json: String::new(),
2139                secondary_indexes_json: "[]".to_owned(),
2140                format_version: 1,
2141            })
2142            .expect("register collection");
2143
2144        assert_eq!(record.name, "connector_health");
2145        assert_eq!(record.kind, OperationalCollectionKind::LatestState);
2146        assert_eq!(record.schema_json, "{}");
2147        assert_eq!(record.retention_json, "{}");
2148        assert_eq!(record.filter_fields_json, "[]");
2149        assert!(record.created_at > 0);
2150        assert_eq!(record.disabled_at, None);
2151
2152        let described = service
2153            .describe_operational_collection("connector_health")
2154            .expect("describe collection")
2155            .expect("collection exists");
2156        assert_eq!(described, record);
2157
2158        let conn = sqlite::open_connection(db.path()).expect("conn");
2159        let provenance_count: i64 = conn
2160            .query_row(
2161                "SELECT count(*) FROM provenance_events \
2162                 WHERE event_type = 'operational_collection_registered' AND subject = 'connector_health'",
2163                [],
2164                |row| row.get(0),
2165            )
2166            .expect("provenance count");
2167        assert_eq!(provenance_count, 1);
2168    }
2169
2170    #[test]
2171    fn register_and_update_operational_collection_validation_round_trip() {
2172        let (db, service) = setup();
2173        let record = service
2174            .register_operational_collection(&OperationalRegisterRequest {
2175                name: "connector_health".to_owned(),
2176                kind: OperationalCollectionKind::LatestState,
2177                schema_json: "{}".to_owned(),
2178                retention_json: "{}".to_owned(),
2179                filter_fields_json: "[]".to_owned(),
2180                validation_json: String::new(),
2181                secondary_indexes_json: "[]".to_owned(),
2182                format_version: 1,
2183            })
2184            .expect("register collection");
2185        assert_eq!(record.validation_json, "");
2186
2187        let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
2188        let updated = service
2189            .update_operational_collection_validation("connector_health", validation_json)
2190            .expect("update validation");
2191        assert_eq!(updated.validation_json, validation_json);
2192
2193        let described = service
2194            .describe_operational_collection("connector_health")
2195            .expect("describe collection")
2196            .expect("collection exists");
2197        assert_eq!(described.validation_json, validation_json);
2198
2199        let conn = sqlite::open_connection(db.path()).expect("conn");
2200        let provenance_count: i64 = conn
2201            .query_row(
2202                "SELECT count(*) FROM provenance_events \
2203                 WHERE event_type = 'operational_collection_validation_updated' \
2204                   AND subject = 'connector_health'",
2205                [],
2206                |row| row.get(0),
2207            )
2208            .expect("provenance count");
2209        assert_eq!(provenance_count, 1);
2210    }
2211
2212    #[test]
2213    fn register_update_and_rebuild_operational_secondary_indexes_round_trip() {
2214        let (db, service) = setup();
2215        let record = service
2216            .register_operational_collection(&OperationalRegisterRequest {
2217                name: "audit_log".to_owned(),
2218                kind: OperationalCollectionKind::AppendOnlyLog,
2219                schema_json: "{}".to_owned(),
2220                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2221                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
2222                validation_json: String::new(),
2223                secondary_indexes_json: "[]".to_owned(),
2224                format_version: 1,
2225            })
2226            .expect("register collection");
2227        assert_eq!(record.secondary_indexes_json, "[]");
2228
2229        {
2230            let writer = crate::WriterActor::start(
2231                db.path(),
2232                Arc::new(SchemaManager::new()),
2233                crate::ProvenanceMode::Warn,
2234                Arc::new(crate::TelemetryCounters::default()),
2235            )
2236            .expect("writer");
2237            writer
2238                .submit(crate::WriteRequest {
2239                    label: "secondary-index-seed".to_owned(),
2240                    nodes: vec![],
2241                    node_retires: vec![],
2242                    edges: vec![],
2243                    edge_retires: vec![],
2244                    chunks: vec![],
2245                    runs: vec![],
2246                    steps: vec![],
2247                    actions: vec![],
2248                    optional_backfills: vec![],
2249                    vec_inserts: vec![],
2250                    operational_writes: vec![
2251                        crate::OperationalWrite::Append {
2252                            collection: "audit_log".to_owned(),
2253                            record_key: "evt-1".to_owned(),
2254                            payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
2255                            source_ref: Some("src-1".to_owned()),
2256                        },
2257                        crate::OperationalWrite::Append {
2258                            collection: "audit_log".to_owned(),
2259                            record_key: "evt-2".to_owned(),
2260                            payload_json: r#"{"actor":"bob","ts":200}"#.to_owned(),
2261                            source_ref: Some("src-2".to_owned()),
2262                        },
2263                    ],
2264                })
2265                .expect("seed writes");
2266        }
2267
2268        let secondary_indexes_json = r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#;
2269        let updated = service
2270            .update_operational_collection_secondary_indexes("audit_log", secondary_indexes_json)
2271            .expect("update secondary indexes");
2272        assert_eq!(updated.secondary_indexes_json, secondary_indexes_json);
2273
2274        let conn = sqlite::open_connection(db.path()).expect("conn");
2275        let entry_count: i64 = conn
2276            .query_row(
2277                "SELECT count(*) FROM operational_secondary_index_entries \
2278                 WHERE collection_name = 'audit_log' AND index_name = 'actor_ts'",
2279                [],
2280                |row| row.get(0),
2281            )
2282            .expect("secondary index count");
2283        assert_eq!(entry_count, 2);
2284        conn.execute(
2285            "DELETE FROM operational_secondary_index_entries WHERE collection_name = 'audit_log'",
2286            [],
2287        )
2288        .expect("clear index entries");
2289        drop(conn);
2290
2291        let rebuild = service
2292            .rebuild_operational_secondary_indexes("audit_log")
2293            .expect("rebuild secondary indexes");
2294        assert_eq!(rebuild.collection_name, "audit_log");
2295        assert_eq!(rebuild.mutation_entries_rebuilt, 2);
2296        assert_eq!(rebuild.current_entries_rebuilt, 0);
2297    }
2298
2299    #[test]
2300    fn register_operational_collection_rejects_invalid_validation_contract() {
2301        let (_db, service) = setup();
2302
2303        let error = service
2304            .register_operational_collection(&OperationalRegisterRequest {
2305                name: "connector_health".to_owned(),
2306                kind: OperationalCollectionKind::LatestState,
2307                schema_json: "{}".to_owned(),
2308                retention_json: "{}".to_owned(),
2309                filter_fields_json: "[]".to_owned(),
2310                validation_json: r#"{"format_version":1,"mode":"enforce","fields":[{"name":"status","type":"string","minimum":0}]}"#
2311                    .to_owned(),
2312                secondary_indexes_json: "[]".to_owned(),
2313                format_version: 1,
2314            })
2315            .expect_err("invalid validation contract should reject");
2316
2317        assert!(matches!(error, EngineError::InvalidWrite(_)));
2318        assert!(error.to_string().contains("minimum/maximum"));
2319    }
2320
2321    #[test]
2322    fn validate_operational_collection_history_reports_invalid_rows_without_mutation() {
2323        let (db, service) = setup();
2324        service
2325            .register_operational_collection(&OperationalRegisterRequest {
2326                name: "audit_log".to_owned(),
2327                kind: OperationalCollectionKind::AppendOnlyLog,
2328                schema_json: "{}".to_owned(),
2329                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2330                filter_fields_json: "[]".to_owned(),
2331                validation_json: r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#
2332                    .to_owned(),
2333                secondary_indexes_json: "[]".to_owned(),
2334                format_version: 1,
2335            })
2336            .expect("register collection");
2337        {
2338            let writer = crate::WriterActor::start(
2339                db.path(),
2340                Arc::new(SchemaManager::new()),
2341                crate::ProvenanceMode::Warn,
2342                Arc::new(crate::TelemetryCounters::default()),
2343            )
2344            .expect("writer");
2345            writer
2346                .submit(crate::WriteRequest {
2347                    label: "history-validation".to_owned(),
2348                    nodes: vec![],
2349                    node_retires: vec![],
2350                    edges: vec![],
2351                    edge_retires: vec![],
2352                    chunks: vec![],
2353                    runs: vec![],
2354                    steps: vec![],
2355                    actions: vec![],
2356                    optional_backfills: vec![],
2357                    vec_inserts: vec![],
2358                    operational_writes: vec![
2359                        crate::OperationalWrite::Append {
2360                            collection: "audit_log".to_owned(),
2361                            record_key: "evt-1".to_owned(),
2362                            payload_json: r#"{"status":"ok"}"#.to_owned(),
2363                            source_ref: Some("src-1".to_owned()),
2364                        },
2365                        crate::OperationalWrite::Append {
2366                            collection: "audit_log".to_owned(),
2367                            record_key: "evt-2".to_owned(),
2368                            payload_json: r#"{"status":"bogus"}"#.to_owned(),
2369                            source_ref: Some("src-2".to_owned()),
2370                        },
2371                    ],
2372                })
2373                .expect("write");
2374        }
2375
2376        let report = service
2377            .validate_operational_collection_history("audit_log")
2378            .expect("validate history");
2379        assert_eq!(report.collection_name, "audit_log");
2380        assert_eq!(report.checked_rows, 2);
2381        assert_eq!(report.invalid_row_count, 1);
2382        assert_eq!(report.issues.len(), 1);
2383        assert_eq!(report.issues[0].record_key, "evt-2");
2384        assert!(report.issues[0].message.contains("must be one of"));
2385
2386        let trace = service
2387            .trace_operational_collection("audit_log", None)
2388            .expect("trace");
2389        assert_eq!(trace.mutation_count, 2);
2390
2391        let conn = sqlite::open_connection(db.path()).expect("conn");
2392        let provenance_count: i64 = conn
2393            .query_row(
2394                "SELECT count(*) FROM provenance_events \
2395                 WHERE event_type = 'operational_collection_history_validated' \
2396                   AND subject = 'audit_log'",
2397                [],
2398                |row| row.get(0),
2399            )
2400            .expect("provenance count");
2401        assert_eq!(provenance_count, 0);
2402    }
2403
2404    #[test]
2405    fn trace_operational_collection_returns_mutations_and_current_rows() {
2406        let (db, service) = setup();
2407        service
2408            .register_operational_collection(&OperationalRegisterRequest {
2409                name: "connector_health".to_owned(),
2410                kind: OperationalCollectionKind::LatestState,
2411                schema_json: "{}".to_owned(),
2412                retention_json: "{}".to_owned(),
2413                filter_fields_json: "[]".to_owned(),
2414                validation_json: String::new(),
2415                secondary_indexes_json: "[]".to_owned(),
2416                format_version: 1,
2417            })
2418            .expect("register collection");
2419        {
2420            let writer = crate::WriterActor::start(
2421                db.path(),
2422                Arc::new(SchemaManager::new()),
2423                crate::ProvenanceMode::Warn,
2424                Arc::new(crate::TelemetryCounters::default()),
2425            )
2426            .expect("writer");
2427            writer
2428                .submit(crate::WriteRequest {
2429                    label: "operational".to_owned(),
2430                    nodes: vec![],
2431                    node_retires: vec![],
2432                    edges: vec![],
2433                    edge_retires: vec![],
2434                    chunks: vec![],
2435                    runs: vec![],
2436                    steps: vec![],
2437                    actions: vec![],
2438                    optional_backfills: vec![],
2439                    vec_inserts: vec![],
2440                    operational_writes: vec![crate::OperationalWrite::Put {
2441                        collection: "connector_health".to_owned(),
2442                        record_key: "gmail".to_owned(),
2443                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2444                        source_ref: Some("src-1".to_owned()),
2445                    }],
2446                })
2447                .expect("write");
2448        }
2449
2450        let report = service
2451            .trace_operational_collection("connector_health", Some("gmail"))
2452            .expect("trace");
2453        assert_eq!(report.collection_name, "connector_health");
2454        assert_eq!(report.record_key.as_deref(), Some("gmail"));
2455        assert_eq!(report.mutation_count, 1);
2456        assert_eq!(report.current_count, 1);
2457        assert_eq!(report.mutations[0].op_kind, "put");
2458        assert_eq!(report.current_rows[0].payload_json, r#"{"status":"ok"}"#);
2459    }
2460
2461    #[test]
2462    fn trace_operational_collection_rejects_unknown_collection() {
2463        let (_db, service) = setup();
2464
2465        let error = service
2466            .trace_operational_collection("missing_collection", None)
2467            .expect_err("unknown collection should fail");
2468
2469        assert!(matches!(error, EngineError::InvalidWrite(_)));
2470        assert!(error.to_string().contains("is not registered"));
2471    }
2472
2473    #[test]
2474    fn rebuild_operational_current_repairs_missing_latest_state_rows() {
2475        let (db, service) = setup();
2476        service
2477            .register_operational_collection(&OperationalRegisterRequest {
2478                name: "connector_health".to_owned(),
2479                kind: OperationalCollectionKind::LatestState,
2480                schema_json: "{}".to_owned(),
2481                retention_json: "{}".to_owned(),
2482                filter_fields_json: "[]".to_owned(),
2483                validation_json: String::new(),
2484                secondary_indexes_json: "[]".to_owned(),
2485                format_version: 1,
2486            })
2487            .expect("register collection");
2488        {
2489            let writer = crate::WriterActor::start(
2490                db.path(),
2491                Arc::new(SchemaManager::new()),
2492                crate::ProvenanceMode::Warn,
2493                Arc::new(crate::TelemetryCounters::default()),
2494            )
2495            .expect("writer");
2496            writer
2497                .submit(crate::WriteRequest {
2498                    label: "operational".to_owned(),
2499                    nodes: vec![],
2500                    node_retires: vec![],
2501                    edges: vec![],
2502                    edge_retires: vec![],
2503                    chunks: vec![],
2504                    runs: vec![],
2505                    steps: vec![],
2506                    actions: vec![],
2507                    optional_backfills: vec![],
2508                    vec_inserts: vec![],
2509                    operational_writes: vec![crate::OperationalWrite::Put {
2510                        collection: "connector_health".to_owned(),
2511                        record_key: "gmail".to_owned(),
2512                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2513                        source_ref: Some("src-1".to_owned()),
2514                    }],
2515                })
2516                .expect("write");
2517        }
2518        {
2519            let conn = sqlite::open_connection(db.path()).expect("conn");
2520            conn.execute(
2521                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2522                [],
2523            )
2524            .expect("delete current row");
2525        }
2526
2527        let before = service.check_semantics().expect("semantics before rebuild");
2528        assert_eq!(before.missing_operational_current_rows, 1);
2529
2530        let repair = service
2531            .rebuild_operational_current(Some("connector_health"))
2532            .expect("rebuild current");
2533        assert_eq!(repair.collections_rebuilt, 1);
2534        assert_eq!(repair.current_rows_rebuilt, 1);
2535
2536        let after = service.check_semantics().expect("semantics after rebuild");
2537        assert_eq!(after.missing_operational_current_rows, 0);
2538
2539        let conn = sqlite::open_connection(db.path()).expect("conn");
2540        let payload: String = conn
2541            .query_row(
2542                "SELECT payload_json FROM operational_current \
2543                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2544                [],
2545                |row| row.get(0),
2546            )
2547            .expect("restored payload");
2548        assert_eq!(payload, r#"{"status":"ok"}"#);
2549    }
2550
2551    #[test]
2552    fn rebuild_operational_current_restores_latest_state_secondary_index_entries() {
2553        let (db, service) = setup();
2554        service
2555            .register_operational_collection(&OperationalRegisterRequest {
2556                name: "connector_health".to_owned(),
2557                kind: OperationalCollectionKind::LatestState,
2558                schema_json: "{}".to_owned(),
2559                retention_json: "{}".to_owned(),
2560                filter_fields_json: "[]".to_owned(),
2561                validation_json: String::new(),
2562                secondary_indexes_json: r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"}]"#.to_owned(),
2563                format_version: 1,
2564            })
2565            .expect("register collection");
2566        {
2567            let writer = crate::WriterActor::start(
2568                db.path(),
2569                Arc::new(SchemaManager::new()),
2570                crate::ProvenanceMode::Warn,
2571                Arc::new(crate::TelemetryCounters::default()),
2572            )
2573            .expect("writer");
2574            writer
2575                .submit(crate::WriteRequest {
2576                    label: "operational".to_owned(),
2577                    nodes: vec![],
2578                    node_retires: vec![],
2579                    edges: vec![],
2580                    edge_retires: vec![],
2581                    chunks: vec![],
2582                    runs: vec![],
2583                    steps: vec![],
2584                    actions: vec![],
2585                    optional_backfills: vec![],
2586                    vec_inserts: vec![],
2587                    operational_writes: vec![crate::OperationalWrite::Put {
2588                        collection: "connector_health".to_owned(),
2589                        record_key: "gmail".to_owned(),
2590                        payload_json: r#"{"status":"ok"}"#.to_owned(),
2591                        source_ref: Some("src-1".to_owned()),
2592                    }],
2593                })
2594                .expect("write");
2595        }
2596        {
2597            let conn = sqlite::open_connection(db.path()).expect("conn");
2598            let entry_count: i64 = conn
2599                .query_row(
2600                    "SELECT count(*) FROM operational_secondary_index_entries \
2601                     WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2602                    [],
2603                    |row| row.get(0),
2604                )
2605                .expect("secondary index count before repair");
2606            assert_eq!(entry_count, 1);
2607            conn.execute(
2608                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2609                [],
2610            )
2611            .expect("delete current row");
2612        }
2613
2614        service
2615            .rebuild_operational_current(Some("connector_health"))
2616            .expect("rebuild current");
2617
2618        let conn = sqlite::open_connection(db.path()).expect("conn");
2619        let entry_count: i64 = conn
2620            .query_row(
2621                "SELECT count(*) FROM operational_secondary_index_entries \
2622                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2623                [],
2624                |row| row.get(0),
2625            )
2626            .expect("secondary index count after repair");
2627        assert_eq!(entry_count, 1);
2628    }
2629
2630    #[test]
2631    fn operational_current_semantics_and_rebuild_follow_mutation_order() {
2632        let (db, service) = setup();
2633        {
2634            let conn = sqlite::open_connection(db.path()).expect("conn");
2635            conn.execute(
2636                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2637                 VALUES ('connector_health', 'latest_state', '{}', '{}', 1, 100)",
2638                [],
2639            )
2640            .expect("seed collection");
2641            conn.execute(
2642                "INSERT INTO operational_mutations \
2643                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2644                 VALUES ('m3', 'connector_health', 'gmail', 'put', '{\"status\":\"old\"}', 'src-1', 100, 1)",
2645                [],
2646            )
2647            .expect("seed first put");
2648            conn.execute(
2649                "INSERT INTO operational_mutations \
2650                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2651                 VALUES ('m2', 'connector_health', 'gmail', 'delete', '', 'src-2', 100, 2)",
2652                [],
2653            )
2654            .expect("seed delete");
2655            conn.execute(
2656                "INSERT INTO operational_mutations \
2657                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2658                 VALUES ('m1', 'connector_health', 'gmail', 'put', '{\"status\":\"new\"}', 'src-3', 100, 3)",
2659                [],
2660            )
2661            .expect("seed final put");
2662            conn.execute(
2663                "INSERT INTO operational_current \
2664                 (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2665                 VALUES ('connector_health', 'gmail', '{\"status\":\"new\"}', 100, 'm1')",
2666                [],
2667            )
2668            .expect("seed current");
2669        }
2670
2671        let before = service.check_semantics().expect("semantics before rebuild");
2672        assert_eq!(before.missing_operational_current_rows, 0);
2673        assert_eq!(before.stale_operational_current_rows, 0);
2674
2675        {
2676            let conn = sqlite::open_connection(db.path()).expect("conn");
2677            conn.execute(
2678                "DELETE FROM operational_current WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2679                [],
2680            )
2681            .expect("delete current row");
2682        }
2683
2684        let missing = service.check_semantics().expect("semantics after delete");
2685        assert_eq!(missing.missing_operational_current_rows, 1);
2686        assert_eq!(missing.stale_operational_current_rows, 0);
2687
2688        service
2689            .rebuild_operational_current(Some("connector_health"))
2690            .expect("rebuild current");
2691
2692        let after = service.check_semantics().expect("semantics after rebuild");
2693        assert_eq!(after.missing_operational_current_rows, 0);
2694        assert_eq!(after.stale_operational_current_rows, 0);
2695
2696        let conn = sqlite::open_connection(db.path()).expect("conn");
2697        let payload: String = conn
2698            .query_row(
2699                "SELECT payload_json FROM operational_current \
2700                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2701                [],
2702                |row| row.get(0),
2703            )
2704            .expect("restored payload");
2705        assert_eq!(payload, r#"{"status":"new"}"#);
2706    }
2707
2708    #[test]
2709    fn disable_operational_collection_sets_disabled_at_and_emits_provenance() {
2710        let (db, service) = setup();
2711        service
2712            .register_operational_collection(&OperationalRegisterRequest {
2713                name: "audit_log".to_owned(),
2714                kind: OperationalCollectionKind::AppendOnlyLog,
2715                schema_json: "{}".to_owned(),
2716                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
2717                filter_fields_json: "[]".to_owned(),
2718                validation_json: String::new(),
2719                secondary_indexes_json: "[]".to_owned(),
2720                format_version: 1,
2721            })
2722            .expect("register collection");
2723
2724        let record = service
2725            .disable_operational_collection("audit_log")
2726            .expect("disable collection");
2727        assert_eq!(record.name, "audit_log");
2728        assert!(record.disabled_at.is_some());
2729
2730        let disabled_at = record.disabled_at.expect("disabled_at");
2731        let described = service
2732            .describe_operational_collection("audit_log")
2733            .expect("describe collection")
2734            .expect("collection exists");
2735        assert_eq!(described.disabled_at, Some(disabled_at));
2736
2737        let writer = crate::WriterActor::start(
2738            db.path(),
2739            Arc::new(SchemaManager::new()),
2740            crate::ProvenanceMode::Warn,
2741            Arc::new(crate::TelemetryCounters::default()),
2742        )
2743        .expect("writer");
2744        let error = writer
2745            .submit(crate::WriteRequest {
2746                label: "disabled-operational".to_owned(),
2747                nodes: vec![],
2748                node_retires: vec![],
2749                edges: vec![],
2750                edge_retires: vec![],
2751                chunks: vec![],
2752                runs: vec![],
2753                steps: vec![],
2754                actions: vec![],
2755                optional_backfills: vec![],
2756                vec_inserts: vec![],
2757                operational_writes: vec![crate::OperationalWrite::Append {
2758                    collection: "audit_log".to_owned(),
2759                    record_key: "evt-1".to_owned(),
2760                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2761                    source_ref: Some("src-1".to_owned()),
2762                }],
2763            })
2764            .expect_err("disabled collection should reject writes");
2765        assert!(matches!(error, EngineError::InvalidWrite(_)));
2766        assert!(error.to_string().contains("is disabled"));
2767
2768        let conn = sqlite::open_connection(db.path()).expect("conn");
2769        let provenance_count: i64 = conn
2770            .query_row(
2771                "SELECT count(*) FROM provenance_events \
2772                 WHERE event_type = 'operational_collection_disabled' AND subject = 'audit_log'",
2773                [],
2774                |row| row.get(0),
2775            )
2776            .expect("provenance count");
2777        assert_eq!(provenance_count, 1);
2778    }
2779
2780    #[test]
2781    fn purge_operational_collection_deletes_append_only_rows_before_cutoff() {
2782        let (db, service) = setup();
2783        {
2784            let conn = sqlite::open_connection(db.path()).expect("conn");
2785            conn.execute(
2786                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2787                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_all\"}', 1, 100)",
2788                [],
2789            )
2790            .expect("seed collection");
2791            conn.execute(
2792                "INSERT INTO operational_mutations \
2793                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2794                 VALUES ('evt-1', 'audit_log', 'evt-1', 'append', '{\"seq\":1}', 'src-1', 100, 1)",
2795                [],
2796            )
2797            .expect("seed event 1");
2798            conn.execute(
2799                "INSERT INTO operational_mutations \
2800                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2801                 VALUES ('evt-2', 'audit_log', 'evt-2', 'append', '{\"seq\":2}', 'src-2', 200, 2)",
2802                [],
2803            )
2804            .expect("seed event 2");
2805            conn.execute(
2806                "INSERT INTO operational_mutations \
2807                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2808                 VALUES ('evt-3', 'audit_log', 'evt-3', 'append', '{\"seq\":3}', 'src-3', 300, 3)",
2809                [],
2810            )
2811            .expect("seed event 3");
2812        }
2813
2814        let report = service
2815            .purge_operational_collection("audit_log", 250)
2816            .expect("purge collection");
2817        assert_eq!(report.collection_name, "audit_log");
2818        assert_eq!(report.deleted_mutations, 2);
2819        assert_eq!(report.before_timestamp, 250);
2820
2821        let conn = sqlite::open_connection(db.path()).expect("conn");
2822        let remaining: Vec<String> = {
2823            let mut stmt = conn
2824                .prepare(
2825                    "SELECT id FROM operational_mutations \
2826                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2827                )
2828                .expect("stmt");
2829            stmt.query_map([], |row| row.get(0))
2830                .expect("rows")
2831                .collect::<Result<_, _>>()
2832                .expect("collect")
2833        };
2834        assert_eq!(remaining, vec!["evt-3".to_owned()]);
2835        let provenance_count: i64 = conn
2836            .query_row(
2837                "SELECT count(*) FROM provenance_events \
2838                 WHERE event_type = 'operational_collection_purged' AND subject = 'audit_log'",
2839                [],
2840                |row| row.get(0),
2841            )
2842            .expect("provenance count");
2843        assert_eq!(provenance_count, 1);
2844    }
2845
2846    #[test]
2847    fn compact_operational_collection_dry_run_reports_without_mutation() {
2848        let (db, service) = setup();
2849        {
2850            let conn = sqlite::open_connection(db.path()).expect("conn");
2851            conn.execute(
2852                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2853                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2854                [],
2855            )
2856            .expect("seed collection");
2857            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2858                conn.execute(
2859                    "INSERT INTO operational_mutations \
2860                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2861                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2862                    rusqlite::params![
2863                        format!("evt-{index}"),
2864                        format!("{{\"seq\":{index}}}"),
2865                        created_at,
2866                        index,
2867                    ],
2868                )
2869                .expect("seed event");
2870            }
2871        }
2872
2873        let report = service
2874            .compact_operational_collection("audit_log", true)
2875            .expect("compact collection");
2876        assert_eq!(report.collection_name, "audit_log");
2877        assert_eq!(report.deleted_mutations, 1);
2878        assert!(report.dry_run);
2879        assert_eq!(report.before_timestamp, None);
2880
2881        let conn = sqlite::open_connection(db.path()).expect("conn");
2882        let remaining_count: i64 = conn
2883            .query_row(
2884                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2885                [],
2886                |row| row.get(0),
2887            )
2888            .expect("remaining count");
2889        assert_eq!(remaining_count, 3);
2890        let provenance_count: i64 = conn
2891            .query_row(
2892                "SELECT count(*) FROM provenance_events \
2893                 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2894                [],
2895                |row| row.get(0),
2896            )
2897            .expect("provenance count");
2898        assert_eq!(provenance_count, 0);
2899    }
2900
2901    #[test]
2902    fn compact_operational_collection_keep_last_deletes_oldest_rows() {
2903        let (db, service) = setup();
2904        {
2905            let conn = sqlite::open_connection(db.path()).expect("conn");
2906            conn.execute(
2907                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2908                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2909                [],
2910            )
2911            .expect("seed collection");
2912            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2913                conn.execute(
2914                    "INSERT INTO operational_mutations \
2915                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2916                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2917                    rusqlite::params![
2918                        format!("evt-{index}"),
2919                        format!("{{\"seq\":{index}}}"),
2920                        created_at,
2921                        index,
2922                    ],
2923                )
2924                .expect("seed event");
2925            }
2926        }
2927
2928        let report = service
2929            .compact_operational_collection("audit_log", false)
2930            .expect("compact collection");
2931        assert_eq!(report.deleted_mutations, 1);
2932        assert!(!report.dry_run);
2933
2934        let conn = sqlite::open_connection(db.path()).expect("conn");
2935        let remaining: Vec<String> = {
2936            let mut stmt = conn
2937                .prepare(
2938                    "SELECT id FROM operational_mutations \
2939                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
2940                )
2941                .expect("stmt");
2942            stmt.query_map([], |row| row.get(0))
2943                .expect("rows")
2944                .collect::<Result<_, _>>()
2945                .expect("collect")
2946        };
2947        assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
2948        let provenance_count: i64 = conn
2949            .query_row(
2950                "SELECT count(*) FROM provenance_events \
2951                 WHERE event_type = 'operational_collection_compacted' AND subject = 'audit_log'",
2952                [],
2953                |row| row.get(0),
2954            )
2955            .expect("provenance count");
2956        assert_eq!(provenance_count, 1);
2957    }
2958
2959    #[test]
2960    fn plan_and_run_operational_retention_keep_last() {
2961        let (db, service) = setup();
2962        {
2963            let conn = sqlite::open_connection(db.path()).expect("conn");
2964            conn.execute(
2965                "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
2966                 VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
2967                [],
2968            )
2969            .expect("seed collection");
2970            for (index, created_at) in [(1_i64, 100_i64), (2, 200), (3, 300)] {
2971                conn.execute(
2972                    "INSERT INTO operational_mutations \
2973                     (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2974                     VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
2975                    rusqlite::params![
2976                        format!("evt-{index}"),
2977                        format!("{{\"seq\":{index}}}"),
2978                        created_at,
2979                        index,
2980                    ],
2981                )
2982                .expect("seed event");
2983            }
2984        }
2985
2986        let plan = service
2987            .plan_operational_retention(1_000, None, Some(10))
2988            .expect("plan retention");
2989        assert_eq!(plan.collections_examined, 1);
2990        assert_eq!(plan.items[0].collection_name, "audit_log");
2991        assert_eq!(
2992            plan.items[0].action_kind,
2993            crate::operational::OperationalRetentionActionKind::KeepLast
2994        );
2995        assert_eq!(plan.items[0].candidate_deletions, 1);
2996        assert_eq!(plan.items[0].max_rows, Some(2));
2997        assert_eq!(plan.items[0].last_run_at, None);
2998
2999        let dry_run = service
3000            .run_operational_retention(1_000, None, Some(10), true)
3001            .expect("dry-run retention");
3002        assert!(dry_run.dry_run);
3003        assert_eq!(dry_run.collections_acted_on, 1);
3004        assert_eq!(dry_run.items[0].deleted_mutations, 1);
3005        assert_eq!(dry_run.items[0].rows_remaining, 2);
3006
3007        let conn = sqlite::open_connection(db.path()).expect("conn");
3008        let remaining_count: i64 = conn
3009            .query_row(
3010                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
3011                [],
3012                |row| row.get(0),
3013            )
3014            .expect("remaining count after dry run");
3015        assert_eq!(remaining_count, 3);
3016        let retention_run_count: i64 = conn
3017            .query_row(
3018                "SELECT count(*) FROM operational_retention_runs WHERE collection_name = 'audit_log'",
3019                [],
3020                |row| row.get(0),
3021            )
3022            .expect("retention run count");
3023        assert_eq!(retention_run_count, 0);
3024        drop(conn);
3025
3026        let executed = service
3027            .run_operational_retention(1_000, None, Some(10), false)
3028            .expect("execute retention");
3029        assert_eq!(executed.collections_acted_on, 1);
3030        assert_eq!(executed.items[0].deleted_mutations, 1);
3031        assert_eq!(executed.items[0].rows_remaining, 2);
3032
3033        let conn = sqlite::open_connection(db.path()).expect("conn");
3034        let remaining: Vec<String> = {
3035            let mut stmt = conn
3036                .prepare(
3037                    "SELECT id FROM operational_mutations \
3038                     WHERE collection_name = 'audit_log' ORDER BY mutation_order",
3039                )
3040                .expect("stmt");
3041            stmt.query_map([], |row| row.get(0))
3042                .expect("rows")
3043                .collect::<Result<_, _>>()
3044                .expect("collect")
3045        };
3046        assert_eq!(remaining, vec!["evt-2".to_owned(), "evt-3".to_owned()]);
3047        let last_run_at: i64 = conn
3048            .query_row(
3049                "SELECT executed_at FROM operational_retention_runs \
3050                 WHERE collection_name = 'audit_log' ORDER BY executed_at DESC LIMIT 1",
3051                [],
3052                |row| row.get(0),
3053            )
3054            .expect("last run at");
3055        assert_eq!(last_run_at, 1_000);
3056    }
3057
3058    #[test]
3059    fn dry_run_operational_retention_does_not_mark_noop_collection_as_acted_on() {
3060        let (db, service) = setup();
3061        let conn = sqlite::open_connection(db.path()).expect("conn");
3062        conn.execute(
3063            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at) \
3064             VALUES ('audit_log', 'append_only_log', '{}', '{\"mode\":\"keep_last\",\"max_rows\":2}', 1, 100)",
3065            [],
3066        )
3067        .expect("seed collection");
3068        for (index, created_at) in [(1_i64, 100_i64), (2, 200)] {
3069            conn.execute(
3070                "INSERT INTO operational_mutations \
3071                 (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
3072                 VALUES (?1, 'audit_log', ?1, 'append', ?2, 'src', ?3, ?4)",
3073                rusqlite::params![
3074                    format!("evt-{index}"),
3075                    format!("{{\"seq\":{index}}}"),
3076                    created_at,
3077                    index,
3078                ],
3079            )
3080            .expect("seed event");
3081        }
3082        drop(conn);
3083
3084        let dry_run = service
3085            .run_operational_retention(1_000, None, Some(10), true)
3086            .expect("dry-run retention");
3087        assert!(dry_run.dry_run);
3088        assert_eq!(dry_run.collections_acted_on, 0);
3089        assert_eq!(dry_run.items[0].deleted_mutations, 0);
3090        assert_eq!(dry_run.items[0].rows_remaining, 2);
3091    }
3092
3093    #[test]
3094    fn compact_operational_collection_rejects_latest_state() {
3095        let (_db, service) = setup();
3096        service
3097            .register_operational_collection(&OperationalRegisterRequest {
3098                name: "connector_health".to_owned(),
3099                kind: OperationalCollectionKind::LatestState,
3100                schema_json: "{}".to_owned(),
3101                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3102                filter_fields_json: "[]".to_owned(),
3103                validation_json: String::new(),
3104                secondary_indexes_json: "[]".to_owned(),
3105                format_version: 1,
3106            })
3107            .expect("register collection");
3108
3109        let error = service
3110            .compact_operational_collection("connector_health", false)
3111            .expect_err("latest_state compaction should be rejected");
3112        assert!(matches!(error, EngineError::InvalidWrite(_)));
3113        assert!(error.to_string().contains("append_only_log"));
3114    }
3115
3116    #[test]
3117    fn register_operational_collection_persists_filter_fields_json() {
3118        let (_db, service) = setup();
3119
3120        let record = service
3121            .register_operational_collection(&OperationalRegisterRequest {
3122                name: "audit_log".to_owned(),
3123                kind: OperationalCollectionKind::AppendOnlyLog,
3124                schema_json: "{}".to_owned(),
3125                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3126                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3127                validation_json: String::new(),
3128                secondary_indexes_json: "[]".to_owned(),
3129                format_version: 1,
3130            })
3131            .expect("register collection");
3132
3133        assert_eq!(
3134            record.filter_fields_json,
3135            r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#
3136        );
3137    }
3138
3139    #[test]
3140    fn read_operational_collection_filters_append_only_rows_by_declared_fields() {
3141        let (db, service) = setup();
3142        service
3143            .register_operational_collection(&OperationalRegisterRequest {
3144                name: "audit_log".to_owned(),
3145                kind: OperationalCollectionKind::AppendOnlyLog,
3146                schema_json: "{}".to_owned(),
3147                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3148                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"seq","type":"integer","modes":["exact","range"]},{"name":"ts","type":"timestamp","modes":["exact","range"]}]"#.to_owned(),
3149                validation_json: String::new(),
3150                secondary_indexes_json: "[]".to_owned(),
3151                format_version: 1,
3152            })
3153            .expect("register collection");
3154        {
3155            let writer = crate::WriterActor::start(
3156                db.path(),
3157                Arc::new(SchemaManager::new()),
3158                crate::ProvenanceMode::Warn,
3159                Arc::new(crate::TelemetryCounters::default()),
3160            )
3161            .expect("writer");
3162            writer
3163                .submit(crate::WriteRequest {
3164                    label: "operational".to_owned(),
3165                    nodes: vec![],
3166                    node_retires: vec![],
3167                    edges: vec![],
3168                    edge_retires: vec![],
3169                    chunks: vec![],
3170                    runs: vec![],
3171                    steps: vec![],
3172                    actions: vec![],
3173                    optional_backfills: vec![],
3174                    vec_inserts: vec![],
3175                    operational_writes: vec![
3176                        crate::OperationalWrite::Append {
3177                            collection: "audit_log".to_owned(),
3178                            record_key: "evt-1".to_owned(),
3179                            payload_json: r#"{"actor":"alice","seq":1,"ts":100}"#.to_owned(),
3180                            source_ref: Some("src-1".to_owned()),
3181                        },
3182                        crate::OperationalWrite::Append {
3183                            collection: "audit_log".to_owned(),
3184                            record_key: "evt-2".to_owned(),
3185                            payload_json: r#"{"actor":"alice-admin","seq":2,"ts":200}"#.to_owned(),
3186                            source_ref: Some("src-2".to_owned()),
3187                        },
3188                        crate::OperationalWrite::Append {
3189                            collection: "audit_log".to_owned(),
3190                            record_key: "evt-3".to_owned(),
3191                            payload_json: r#"{"actor":"bob","seq":3,"ts":300}"#.to_owned(),
3192                            source_ref: Some("src-3".to_owned()),
3193                        },
3194                    ],
3195                })
3196                .expect("write");
3197        }
3198
3199        let report = service
3200            .read_operational_collection(&crate::operational::OperationalReadRequest {
3201                collection_name: "audit_log".to_owned(),
3202                filters: vec![
3203                    crate::operational::OperationalFilterClause::Prefix {
3204                        field: "actor".to_owned(),
3205                        value: "alice".to_owned(),
3206                    },
3207                    crate::operational::OperationalFilterClause::Range {
3208                        field: "ts".to_owned(),
3209                        lower: Some(150),
3210                        upper: Some(250),
3211                    },
3212                ],
3213                limit: Some(10),
3214            })
3215            .expect("filtered read");
3216
3217        assert_eq!(report.collection_name, "audit_log");
3218        assert_eq!(report.row_count, 1);
3219        assert!(!report.was_limited);
3220        assert_eq!(report.rows.len(), 1);
3221        assert_eq!(report.rows[0].record_key, "evt-2");
3222        assert_eq!(
3223            report.rows[0].payload_json,
3224            r#"{"actor":"alice-admin","seq":2,"ts":200}"#
3225        );
3226    }
3227
3228    #[test]
3229    fn read_operational_collection_uses_secondary_index_when_filter_values_are_missing() {
3230        let (db, service) = setup();
3231        service
3232            .register_operational_collection(&OperationalRegisterRequest {
3233                name: "audit_log".to_owned(),
3234                kind: OperationalCollectionKind::AppendOnlyLog,
3235                schema_json: "{}".to_owned(),
3236                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3237                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact","prefix"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#.to_owned(),
3238                validation_json: String::new(),
3239                secondary_indexes_json: r#"[{"name":"actor_ts","kind":"append_only_field_time","field":"actor","value_type":"string","time_field":"ts"}]"#.to_owned(),
3240                format_version: 1,
3241            })
3242            .expect("register collection");
3243        {
3244            let writer = crate::WriterActor::start(
3245                db.path(),
3246                Arc::new(SchemaManager::new()),
3247                crate::ProvenanceMode::Warn,
3248                Arc::new(crate::TelemetryCounters::default()),
3249            )
3250            .expect("writer");
3251            writer
3252                .submit(crate::WriteRequest {
3253                    label: "operational".to_owned(),
3254                    nodes: vec![],
3255                    node_retires: vec![],
3256                    edges: vec![],
3257                    edge_retires: vec![],
3258                    chunks: vec![],
3259                    runs: vec![],
3260                    steps: vec![],
3261                    actions: vec![],
3262                    optional_backfills: vec![],
3263                    vec_inserts: vec![],
3264                    operational_writes: vec![
3265                        crate::OperationalWrite::Append {
3266                            collection: "audit_log".to_owned(),
3267                            record_key: "evt-1".to_owned(),
3268                            payload_json: r#"{"actor":"alice","ts":100}"#.to_owned(),
3269                            source_ref: Some("src-1".to_owned()),
3270                        },
3271                        crate::OperationalWrite::Append {
3272                            collection: "audit_log".to_owned(),
3273                            record_key: "evt-2".to_owned(),
3274                            payload_json: r#"{"actor":"alice-admin","ts":200}"#.to_owned(),
3275                            source_ref: Some("src-2".to_owned()),
3276                        },
3277                    ],
3278                })
3279                .expect("write");
3280        }
3281        let conn = sqlite::open_connection(db.path()).expect("conn");
3282        conn.execute(
3283            "DELETE FROM operational_filter_values WHERE collection_name = 'audit_log'",
3284            [],
3285        )
3286        .expect("clear filter values");
3287        drop(conn);
3288
3289        let report = service
3290            .read_operational_collection(&crate::operational::OperationalReadRequest {
3291                collection_name: "audit_log".to_owned(),
3292                filters: vec![
3293                    crate::operational::OperationalFilterClause::Prefix {
3294                        field: "actor".to_owned(),
3295                        value: "alice".to_owned(),
3296                    },
3297                    crate::operational::OperationalFilterClause::Range {
3298                        field: "ts".to_owned(),
3299                        lower: Some(150),
3300                        upper: Some(250),
3301                    },
3302                ],
3303                limit: Some(10),
3304            })
3305            .expect("secondary-index read");
3306
3307        assert_eq!(report.row_count, 1);
3308        assert_eq!(report.rows[0].record_key, "evt-2");
3309    }
3310
3311    #[test]
3312    fn read_operational_collection_rejects_undeclared_fields_and_latest_state_collections() {
3313        let (_db, service) = setup();
3314        service
3315            .register_operational_collection(&OperationalRegisterRequest {
3316                name: "connector_health".to_owned(),
3317                kind: OperationalCollectionKind::LatestState,
3318                schema_json: "{}".to_owned(),
3319                retention_json: "{}".to_owned(),
3320                filter_fields_json: r#"[{"name":"status","type":"string","modes":["exact"]}]"#
3321                    .to_owned(),
3322                validation_json: String::new(),
3323                secondary_indexes_json: "[]".to_owned(),
3324                format_version: 1,
3325            })
3326            .expect("register collection");
3327
3328        let latest_state_error = service
3329            .read_operational_collection(&crate::operational::OperationalReadRequest {
3330                collection_name: "connector_health".to_owned(),
3331                filters: vec![crate::operational::OperationalFilterClause::Exact {
3332                    field: "status".to_owned(),
3333                    value: crate::operational::OperationalFilterValue::String("ok".to_owned()),
3334                }],
3335                limit: Some(10),
3336            })
3337            .expect_err("latest_state filtered reads should be rejected");
3338        assert!(latest_state_error.to_string().contains("append_only_log"));
3339
3340        service
3341            .register_operational_collection(&OperationalRegisterRequest {
3342                name: "audit_log".to_owned(),
3343                kind: OperationalCollectionKind::AppendOnlyLog,
3344                schema_json: "{}".to_owned(),
3345                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3346                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["exact"]}]"#
3347                    .to_owned(),
3348                validation_json: String::new(),
3349                secondary_indexes_json: "[]".to_owned(),
3350                format_version: 1,
3351            })
3352            .expect("register append-only collection");
3353
3354        let undeclared_error = service
3355            .read_operational_collection(&crate::operational::OperationalReadRequest {
3356                collection_name: "audit_log".to_owned(),
3357                filters: vec![crate::operational::OperationalFilterClause::Exact {
3358                    field: "missing".to_owned(),
3359                    value: crate::operational::OperationalFilterValue::String("x".to_owned()),
3360                }],
3361                limit: Some(10),
3362            })
3363            .expect_err("undeclared field should be rejected");
3364        assert!(undeclared_error.to_string().contains("undeclared"));
3365    }
3366
3367    #[test]
3368    fn read_operational_collection_applies_limit_and_reports_truncation() {
3369        let (db, service) = setup();
3370        service
3371            .register_operational_collection(&OperationalRegisterRequest {
3372                name: "audit_log".to_owned(),
3373                kind: OperationalCollectionKind::AppendOnlyLog,
3374                schema_json: "{}".to_owned(),
3375                retention_json: r#"{"mode":"keep_all"}"#.to_owned(),
3376                filter_fields_json: r#"[{"name":"actor","type":"string","modes":["prefix"]}]"#
3377                    .to_owned(),
3378                validation_json: String::new(),
3379                secondary_indexes_json: "[]".to_owned(),
3380                format_version: 1,
3381            })
3382            .expect("register collection");
3383        {
3384            let writer = crate::WriterActor::start(
3385                db.path(),
3386                Arc::new(SchemaManager::new()),
3387                crate::ProvenanceMode::Warn,
3388                Arc::new(crate::TelemetryCounters::default()),
3389            )
3390            .expect("writer");
3391            writer
3392                .submit(crate::WriteRequest {
3393                    label: "operational".to_owned(),
3394                    nodes: vec![],
3395                    node_retires: vec![],
3396                    edges: vec![],
3397                    edge_retires: vec![],
3398                    chunks: vec![],
3399                    runs: vec![],
3400                    steps: vec![],
3401                    actions: vec![],
3402                    optional_backfills: vec![],
3403                    vec_inserts: vec![],
3404                    operational_writes: vec![
3405                        crate::OperationalWrite::Append {
3406                            collection: "audit_log".to_owned(),
3407                            record_key: "evt-1".to_owned(),
3408                            payload_json: r#"{"actor":"alice-1"}"#.to_owned(),
3409                            source_ref: Some("src-1".to_owned()),
3410                        },
3411                        crate::OperationalWrite::Append {
3412                            collection: "audit_log".to_owned(),
3413                            record_key: "evt-2".to_owned(),
3414                            payload_json: r#"{"actor":"alice-2"}"#.to_owned(),
3415                            source_ref: Some("src-2".to_owned()),
3416                        },
3417                    ],
3418                })
3419                .expect("write");
3420        }
3421
3422        let report = service
3423            .read_operational_collection(&crate::operational::OperationalReadRequest {
3424                collection_name: "audit_log".to_owned(),
3425                filters: vec![crate::operational::OperationalFilterClause::Prefix {
3426                    field: "actor".to_owned(),
3427                    value: "alice".to_owned(),
3428                }],
3429                limit: Some(1),
3430            })
3431            .expect("limited read");
3432
3433        assert_eq!(report.row_count, 1);
3434        assert_eq!(report.applied_limit, 1);
3435        assert!(report.was_limited);
3436        assert_eq!(report.rows[0].record_key, "evt-2");
3437    }
3438
3439    #[test]
3440    fn preexisting_operational_collection_can_gain_filter_contract_after_upgrade() {
3441        let db = NamedTempFile::new().expect("temp db");
3442        let conn = sqlite::open_connection(db.path()).expect("conn");
3443        conn.execute_batch(
3444            r#"
3445            CREATE TABLE operational_collections (
3446                name TEXT PRIMARY KEY,
3447                kind TEXT NOT NULL,
3448                schema_json TEXT NOT NULL,
3449                retention_json TEXT NOT NULL,
3450                format_version INTEGER NOT NULL DEFAULT 1,
3451                created_at INTEGER NOT NULL DEFAULT 100,
3452                disabled_at INTEGER
3453            );
3454            CREATE TABLE operational_mutations (
3455                id TEXT PRIMARY KEY,
3456                collection_name TEXT NOT NULL,
3457                record_key TEXT NOT NULL,
3458                op_kind TEXT NOT NULL,
3459                payload_json TEXT NOT NULL,
3460                source_ref TEXT,
3461                created_at INTEGER NOT NULL DEFAULT 100,
3462                mutation_order INTEGER NOT NULL DEFAULT 1
3463            );
3464            INSERT INTO operational_collections (name, kind, schema_json, retention_json, format_version, created_at)
3465            VALUES ('audit_log', 'append_only_log', '{}', '{"mode":"keep_all"}', 1, 100);
3466            INSERT INTO operational_mutations
3467                (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order)
3468            VALUES
3469                ('evt-1', 'audit_log', 'evt-1', 'append', '{"actor":"alice","ts":0}', 'src-1', 100, 1);
3470            "#,
3471        )
3472        .expect("seed pre-v10 schema");
3473        drop(conn);
3474
3475        let service = AdminService::new(db.path(), Arc::new(SchemaManager::new()));
3476        let pre_update = service
3477            .read_operational_collection(&crate::operational::OperationalReadRequest {
3478                collection_name: "audit_log".to_owned(),
3479                filters: vec![crate::operational::OperationalFilterClause::Exact {
3480                    field: "actor".to_owned(),
3481                    value: crate::operational::OperationalFilterValue::String("alice".to_owned()),
3482                }],
3483                limit: Some(10),
3484            })
3485            .expect_err("read should reject undeclared fields before migration update");
3486        assert!(pre_update.to_string().contains("undeclared"));
3487
3488        let updated = service
3489            .update_operational_collection_filters(
3490                "audit_log",
3491                r#"[{"name":"actor","type":"string","modes":["exact"]},{"name":"ts","type":"timestamp","modes":["range"]}]"#,
3492            )
3493            .expect("update filter contract");
3494        assert!(updated.filter_fields_json.contains("\"actor\""));
3495
3496        let report = service
3497            .read_operational_collection(&crate::operational::OperationalReadRequest {
3498                collection_name: "audit_log".to_owned(),
3499                filters: vec![crate::operational::OperationalFilterClause::Range {
3500                    field: "ts".to_owned(),
3501                    lower: Some(0),
3502                    upper: Some(0),
3503                }],
3504                limit: Some(10),
3505            })
3506            .expect("read after explicit filter update");
3507        assert_eq!(report.row_count, 1);
3508        assert_eq!(report.rows[0].record_key, "evt-1");
3509    }
3510
3511    #[cfg(feature = "sqlite-vec")]
3512    #[test]
3513    fn check_semantics_detects_stale_vec_rows() {
3514        use crate::sqlite::open_connection_with_vec;
3515
3516        let db = NamedTempFile::new().expect("temp file");
3517        let schema = Arc::new(SchemaManager::new());
3518        {
3519            let conn = open_connection_with_vec(db.path()).expect("vec conn");
3520            schema.bootstrap(&conn).expect("bootstrap");
3521            schema
3522                .ensure_vec_kind_profile(&conn, "Doc", 3)
3523                .expect("vec kind profile");
3524            // Insert a vec row whose chunk does not exist.
3525            let bytes: Vec<u8> = [0.1f32, 0.2f32, 0.3f32]
3526                .iter()
3527                .flat_map(|f| f.to_le_bytes())
3528                .collect();
3529            conn.execute(
3530                "INSERT INTO vec_doc (chunk_id, embedding) VALUES ('ghost-chunk', ?1)",
3531                rusqlite::params![bytes],
3532            )
3533            .expect("insert stale vec row");
3534        }
3535        let service = AdminService::new(db.path(), Arc::clone(&schema));
3536        let report = service.check_semantics().expect("semantics check");
3537        assert_eq!(report.stale_vec_rows, 1);
3538        assert!(
3539            report.warnings.iter().any(|w| w.contains("stale vec")),
3540            "warning must mention stale vec"
3541        );
3542    }
3543
3544    #[cfg(feature = "sqlite-vec")]
3545    #[test]
3546    fn restore_vector_profiles_recreates_vec_table_from_metadata() {
3547        let db = NamedTempFile::new().expect("temp file");
3548        let schema = Arc::new(SchemaManager::new());
3549        {
3550            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3551            schema.bootstrap(&conn).expect("bootstrap");
3552            conn.execute(
3553                "INSERT INTO vector_profiles (profile, table_name, dimension, enabled) \
3554                 VALUES ('default', 'vec_nodes_active', 3, 1)",
3555                [],
3556            )
3557            .expect("insert vector profile");
3558        }
3559
3560        let service = AdminService::new(db.path(), Arc::clone(&schema));
3561        let report = service
3562            .restore_vector_profiles()
3563            .expect("restore vector profiles");
3564        assert_eq!(
3565            report.targets,
3566            vec![crate::projection::ProjectionTarget::Vec]
3567        );
3568        assert_eq!(report.rebuilt_rows, 1);
3569
3570        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3571        let count: i64 = conn
3572            .query_row(
3573                "SELECT count(*) FROM sqlite_schema WHERE name = 'vec_nodes_active'",
3574                [],
3575                |row| row.get(0),
3576            )
3577            .expect("vec schema count");
3578        assert_eq!(count, 1, "vec table should exist after restore");
3579    }
3580
3581    #[cfg(feature = "sqlite-vec")]
3582    #[test]
3583    fn load_vector_regeneration_config_supports_json_and_toml() {
3584        let dir = tempfile::tempdir().expect("temp dir");
3585        let json_path = dir.path().join("regen.json");
3586        let toml_path = dir.path().join("regen.toml");
3587
3588        let config = VectorRegenerationConfig {
3589            kind: "Document".to_owned(),
3590            profile: "default".to_owned(),
3591            chunking_policy: "per_chunk".to_owned(),
3592            preprocessing_policy: "trim".to_owned(),
3593        };
3594
3595        fs::write(&json_path, serde_json::to_string(&config).expect("json")).expect("write json");
3596        fs::write(&toml_path, toml::to_string(&config).expect("toml")).expect("write toml");
3597
3598        let parsed_json = load_vector_regeneration_config(&json_path).expect("json parse");
3599        let parsed_toml = load_vector_regeneration_config(&toml_path).expect("toml parse");
3600
3601        assert_eq!(parsed_json, config);
3602        assert_eq!(parsed_toml, config);
3603    }
3604
3605    /// The 0.4.0 rewrite removed the identity fields from the config.
3606    /// Any client that still serializes the pre-0.4 fields must be
3607    /// rejected AT THE SERDE BOUNDARY with a clear error — never
3608    /// silently accepted.
3609    #[test]
3610    fn regenerate_vector_embeddings_config_rejects_old_identity_fields() {
3611        // Pre-0.5.0 configs that include old fields (table_name, model_identity, etc.)
3612        // must be rejected at the serde boundary due to deny_unknown_fields.
3613        let legacy_json = r#"{
3614            "kind": "Document",
3615            "profile": "default",
3616            "table_name": "vec_nodes_active",
3617            "model_identity": "old-model",
3618            "model_version": "1.0",
3619            "dimension": 4,
3620            "normalization_policy": "l2",
3621            "chunking_policy": "per_chunk",
3622            "preprocessing_policy": "trim",
3623            "generator_command": ["/bin/echo"]
3624        }"#;
3625        let result: Result<VectorRegenerationConfig, _> = serde_json::from_str(legacy_json);
3626        assert!(
3627            result.is_err(),
3628            "legacy identity fields must be rejected at deserialization"
3629        );
3630    }
3631
3632    #[cfg(all(not(feature = "sqlite-vec"), unix))]
3633    #[test]
3634    fn regenerate_vector_embeddings_unsupported_vec_capability_writes_request_and_failed_audit() {
3635        let db = NamedTempFile::new().expect("temp file");
3636        let schema = Arc::new(SchemaManager::new());
3637
3638        {
3639            let conn = sqlite::open_connection(db.path()).expect("connection");
3640            schema.bootstrap(&conn).expect("bootstrap");
3641            conn.execute(
3642                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3643                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3644                [],
3645            )
3646            .expect("insert node");
3647            conn.execute(
3648                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3649                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3650                [],
3651            )
3652            .expect("insert chunk");
3653        }
3654
3655        let service = AdminService::new(db.path(), Arc::clone(&schema));
3656        let embedder = TestEmbedder::new("test-model", 4);
3657        let error = service
3658            .regenerate_vector_embeddings(
3659                &embedder,
3660                &VectorRegenerationConfig {
3661                    kind: "Document".to_owned(),
3662                    profile: "default".to_owned(),
3663                    chunking_policy: "per_chunk".to_owned(),
3664                    preprocessing_policy: "trim".to_owned(),
3665                },
3666            )
3667            .expect_err("sqlite-vec capability should be required");
3668
3669        assert!(error.to_string().contains("unsupported vec capability"));
3670
3671        let conn = sqlite::open_connection(db.path()).expect("connection");
3672        let request_count: i64 = conn
3673            .query_row(
3674                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3675                [],
3676                |row| row.get(0),
3677            )
3678            .expect("request count");
3679        assert_eq!(request_count, 1);
3680        let failed_count: i64 = conn
3681            .query_row(
3682                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3683                [],
3684                |row| row.get(0),
3685            )
3686            .expect("failed count");
3687        assert_eq!(failed_count, 1);
3688        let metadata_json: String = conn
3689            .query_row(
3690                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3691                [],
3692                |row| row.get(0),
3693            )
3694            .expect("failed metadata");
3695        assert!(metadata_json.contains("\"failure_class\":\"unsupported vec capability\""));
3696    }
3697
3698    #[cfg(feature = "sqlite-vec")]
3699    #[test]
3700    #[allow(clippy::too_many_lines)]
3701    fn regenerate_vector_embeddings_rebuilds_embeddings_via_embedder() {
3702        let db = NamedTempFile::new().expect("temp file");
3703        let schema = Arc::new(SchemaManager::new());
3704
3705        {
3706            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3707            schema.bootstrap(&conn).expect("bootstrap");
3708            conn.execute(
3709                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3710                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3711                [],
3712            )
3713            .expect("insert node");
3714            conn.execute(
3715                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3716                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3717                [],
3718            )
3719            .expect("insert chunk 1");
3720            conn.execute(
3721                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3722                 VALUES ('chunk-2', 'doc-1', 'travel plan', 101)",
3723                [],
3724            )
3725            .expect("insert chunk 2");
3726        }
3727
3728        let service = AdminService::new(db.path(), Arc::clone(&schema));
3729        let embedder = TestEmbedder::new("test-model", 4);
3730        let report = service
3731            .regenerate_vector_embeddings(
3732                &embedder,
3733                &VectorRegenerationConfig {
3734                    kind: "Document".to_owned(),
3735                    profile: "default".to_owned(),
3736                    chunking_policy: "per_chunk".to_owned(),
3737                    preprocessing_policy: "trim".to_owned(),
3738                },
3739            )
3740            .expect("regenerate vectors");
3741
3742        assert_eq!(report.profile, "default");
3743        assert_eq!(report.table_name, "vec_document");
3744        assert_eq!(report.dimension, 4);
3745        assert_eq!(report.total_chunks, 2);
3746        assert_eq!(report.regenerated_rows, 2);
3747        assert!(report.contract_persisted);
3748
3749        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3750        let vec_count: i64 = conn
3751            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3752            .expect("vec count");
3753        assert_eq!(vec_count, 2);
3754
3755        // The persisted vector contract must reflect the embedder
3756        // identity — not any string the caller passed in, because the
3757        // caller never passes one.
3758        let (model_identity, model_version, dimension, normalization_policy): (
3759            String,
3760            String,
3761            i64,
3762            String,
3763        ) = conn
3764            .query_row(
3765                "SELECT model_identity, model_version, dimension, normalization_policy \
3766                 FROM vector_embedding_contracts WHERE profile = 'default'",
3767                [],
3768                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3769            )
3770            .expect("contract row");
3771        assert_eq!(model_identity, "test-model");
3772        assert_eq!(model_version, "1.0.0");
3773        assert_eq!(dimension, 4);
3774        assert_eq!(normalization_policy, "l2");
3775
3776        let contract_format_version: i64 = conn
3777            .query_row(
3778                "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = 'default'",
3779                [],
3780                |row| row.get(0),
3781            )
3782            .expect("contract_format_version");
3783        assert_eq!(contract_format_version, 1);
3784        let request_count: i64 = conn
3785            .query_row(
3786                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_requested' AND subject = 'default'",
3787                [],
3788                |row| row.get(0),
3789            )
3790            .expect("request audit count");
3791        assert_eq!(request_count, 1);
3792        let apply_count: i64 = conn
3793            .query_row(
3794                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3795                [],
3796                |row| row.get(0),
3797            )
3798            .expect("apply audit count");
3799        assert_eq!(apply_count, 1);
3800        let apply_metadata: String = conn
3801            .query_row(
3802                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_apply' AND subject = 'default'",
3803                [],
3804                |row| row.get(0),
3805            )
3806            .expect("apply metadata");
3807        assert!(apply_metadata.contains("\"profile\":\"default\""));
3808        assert!(apply_metadata.contains("\"snapshot_hash\":"));
3809        assert!(apply_metadata.contains("\"model_identity\":\"test-model\""));
3810    }
3811
3812    #[cfg(feature = "sqlite-vec")]
3813    #[test]
3814    #[allow(clippy::too_many_lines)]
3815    fn regenerate_vector_embeddings_embedder_failure_leaves_contract_and_vec_rows_unchanged() {
3816        let db = NamedTempFile::new().expect("temp file");
3817        let schema = Arc::new(SchemaManager::new());
3818
3819        {
3820            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3821            schema.bootstrap(&conn).expect("bootstrap");
3822            conn.execute(
3823                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3824                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3825                [],
3826            )
3827            .expect("insert node");
3828            conn.execute(
3829                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3830                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3831                [],
3832            )
3833            .expect("insert chunk");
3834            schema
3835                .ensure_vec_kind_profile(&conn, "Document", 4)
3836                .expect("ensure vec kind profile");
3837            conn.execute(
3838                r"
3839                INSERT INTO vector_embedding_contracts (
3840                    profile,
3841                    table_name,
3842                    model_identity,
3843                    model_version,
3844                    dimension,
3845                    normalization_policy,
3846                    chunking_policy,
3847                    preprocessing_policy,
3848                    generator_command_json,
3849                    applied_at,
3850                    snapshot_hash
3851                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3852                ",
3853                rusqlite::params![
3854                    "default",
3855                    "vec_document",
3856                    "old-model",
3857                    "0.9.0",
3858                    4,
3859                    "l2",
3860                    "per_chunk",
3861                    "trim",
3862                    "[]",
3863                    111,
3864                    "old-snapshot"
3865                ],
3866            )
3867            .expect("seed contract");
3868            conn.execute(
3869                "INSERT INTO vec_document (chunk_id, embedding) VALUES ('chunk-1', zeroblob(16))",
3870                [],
3871            )
3872            .expect("seed vec row");
3873        }
3874
3875        let service = AdminService::new(db.path(), Arc::clone(&schema));
3876        let failing = FailingEmbedder {
3877            identity: QueryEmbedderIdentity {
3878                model_identity: "new-model".to_owned(),
3879                model_version: "1.0.0".to_owned(),
3880                dimension: 4,
3881                normalization_policy: "l2".to_owned(),
3882            },
3883        };
3884        let error = service
3885            .regenerate_vector_embeddings(
3886                &failing,
3887                &VectorRegenerationConfig {
3888                    kind: "Document".to_owned(),
3889                    profile: "default".to_owned(),
3890                    chunking_policy: "per_chunk".to_owned(),
3891                    preprocessing_policy: "trim".to_owned(),
3892                },
3893            )
3894            .expect_err("embedder should fail");
3895
3896        assert!(error.to_string().contains("embedder failure"));
3897
3898        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3899        let model_identity: String = conn
3900            .query_row(
3901                "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
3902                [],
3903                |row| row.get(0),
3904            )
3905            .expect("model identity");
3906        assert_eq!(model_identity, "old-model");
3907        let snapshot_hash: String = conn
3908            .query_row(
3909                "SELECT snapshot_hash FROM vector_embedding_contracts WHERE profile = 'default'",
3910                [],
3911                |row| row.get(0),
3912            )
3913            .expect("snapshot hash");
3914        assert_eq!(snapshot_hash, "old-snapshot");
3915        let vec_count: i64 = conn
3916            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
3917            .expect("vec count");
3918        assert_eq!(vec_count, 1);
3919        let failure_count: i64 = conn
3920            .query_row(
3921                "SELECT count(*) FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3922                [],
3923                |row| row.get(0),
3924            )
3925            .expect("failure count");
3926        assert_eq!(failure_count, 1);
3927        let failure_metadata: String = conn
3928            .query_row(
3929                "SELECT metadata_json FROM provenance_events WHERE event_type = 'vector_regeneration_failed' AND subject = 'default'",
3930                [],
3931                |row| row.get(0),
3932            )
3933            .expect("failure metadata");
3934        assert!(failure_metadata.contains("\"failure_class\":\"embedder failure\""));
3935    }
3936
3937    // Subprocess generator tests (snapshot-drift-via-concurrent-writer,
3938    // timeout, stdout/stderr overflow, oversized input, excessive chunk
3939    // count, malformed JSON, world-writable executable, disallowed
3940    // executable root, environment preservation) were removed in 0.4.0
3941    // along with the subprocess generator pattern itself. The failure
3942    // modes they exercised belong to the deleted
3943    // `run_vector_generator_bounded` pipeline and have no equivalent in
3944    // the direct-embedder path. See
3945    // `.claude/memory/project_vector_identity_invariant.md`.
3946
3947    #[cfg(feature = "sqlite-vec")]
3948    #[test]
3949    fn regenerate_vector_embeddings_rejects_whitespace_only_profile_before_mutation() {
3950        let db = NamedTempFile::new().expect("temp file");
3951        let schema = Arc::new(SchemaManager::new());
3952        {
3953            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3954            schema.bootstrap(&conn).expect("bootstrap");
3955            conn.execute(
3956                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
3957                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
3958                [],
3959            )
3960            .expect("insert node");
3961            conn.execute(
3962                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
3963                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
3964                [],
3965            )
3966            .expect("insert chunk");
3967        }
3968
3969        let service = AdminService::new(db.path(), Arc::clone(&schema));
3970        let embedder = TestEmbedder::new("test-model", 4);
3971        let error = service
3972            .regenerate_vector_embeddings(
3973                &embedder,
3974                &VectorRegenerationConfig {
3975                    kind: "Document".to_owned(),
3976                    profile: "   ".to_owned(),
3977                    chunking_policy: "per_chunk".to_owned(),
3978                    preprocessing_policy: "trim".to_owned(),
3979                },
3980            )
3981            .expect_err("whitespace profile should be rejected");
3982
3983        assert!(error.to_string().contains("invalid contract"));
3984        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
3985        let contract_count: i64 = conn
3986            .query_row(
3987                "SELECT count(*) FROM vector_embedding_contracts",
3988                [],
3989                |row| row.get(0),
3990            )
3991            .expect("contract count");
3992        assert_eq!(contract_count, 0);
3993        let provenance_count: i64 = conn
3994            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
3995                row.get(0)
3996            })
3997            .expect("provenance count");
3998        assert_eq!(provenance_count, 0);
3999    }
4000
4001    #[cfg(feature = "sqlite-vec")]
4002    #[test]
4003    fn regenerate_vector_embeddings_rejects_future_contract_format_version() {
4004        let db = NamedTempFile::new().expect("temp file");
4005        let schema = Arc::new(SchemaManager::new());
4006        {
4007            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4008            schema.bootstrap(&conn).expect("bootstrap");
4009            conn.execute(
4010                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4011                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
4012                [],
4013            )
4014            .expect("insert node");
4015            conn.execute(
4016                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4017                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
4018                [],
4019            )
4020            .expect("insert chunk");
4021            conn.execute(
4022                r"
4023                INSERT INTO vector_embedding_contracts (
4024                    profile,
4025                    table_name,
4026                    model_identity,
4027                    model_version,
4028                    dimension,
4029                    normalization_policy,
4030                    chunking_policy,
4031                    preprocessing_policy,
4032                    generator_command_json,
4033                    applied_at,
4034                    snapshot_hash,
4035                    contract_format_version,
4036                    updated_at
4037                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
4038                ",
4039                rusqlite::params![
4040                    "default",
4041                    "vec_nodes_active",
4042                    "old-model",
4043                    "0.9.0",
4044                    4,
4045                    "l2",
4046                    "per_chunk",
4047                    "trim",
4048                    "[]",
4049                    111,
4050                    "old-snapshot",
4051                    99,
4052                    111,
4053                ],
4054            )
4055            .expect("seed future contract");
4056        }
4057
4058        let service = AdminService::new(db.path(), Arc::clone(&schema));
4059        let embedder = TestEmbedder::new("test-model", 4);
4060        let error = service
4061            .regenerate_vector_embeddings(
4062                &embedder,
4063                &VectorRegenerationConfig {
4064                    kind: "Document".to_owned(),
4065                    profile: "default".to_owned(),
4066                    chunking_policy: "per_chunk".to_owned(),
4067                    preprocessing_policy: "trim".to_owned(),
4068                },
4069            )
4070            .expect_err("future contract version should be rejected");
4071
4072        assert!(error.to_string().contains("unsupported"));
4073        assert!(error.to_string().contains("format version"));
4074    }
4075
4076    #[test]
4077    fn check_semantics_detects_orphaned_chunk() {
4078        let (db, service) = setup();
4079        {
4080            // Open without FK enforcement to insert chunk with no active node.
4081            let conn = sqlite::open_connection(db.path()).expect("conn");
4082            conn.execute(
4083                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4084                 VALUES ('c1', 'ghost-node', 'text', 100)",
4085                [],
4086            )
4087            .expect("insert orphaned chunk");
4088        }
4089        let report = service.check_semantics().expect("semantics check");
4090        assert_eq!(report.orphaned_chunks, 1);
4091    }
4092
4093    #[test]
4094    fn check_semantics_detects_null_source_ref() {
4095        let (db, service) = setup();
4096        {
4097            let conn = sqlite::open_connection(db.path()).expect("conn");
4098            conn.execute(
4099                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at) \
4100                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100)",
4101                [],
4102            )
4103            .expect("insert node with null source_ref");
4104        }
4105        let report = service.check_semantics().expect("semantics check");
4106        assert_eq!(report.null_source_ref_nodes, 1);
4107    }
4108
4109    #[test]
4110    fn check_semantics_detects_broken_step_fk() {
4111        let (db, service) = setup();
4112        {
4113            // Explicitly disable FK enforcement for this connection so we can insert
4114            // an orphaned step (ghost run_id) to simulate a partial-write failure.
4115            let conn = sqlite::open_connection(db.path()).expect("conn");
4116            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4117                .expect("disable FK");
4118            conn.execute(
4119                "INSERT INTO steps (id, run_id, kind, status, properties, created_at) \
4120                 VALUES ('s1', 'ghost-run', 'llm', 'completed', '{}', 100)",
4121                [],
4122            )
4123            .expect("insert step with ghost run_id");
4124        }
4125        let report = service.check_semantics().expect("semantics check");
4126        assert_eq!(report.broken_step_fk, 1);
4127    }
4128
4129    #[test]
4130    fn check_semantics_detects_broken_action_fk() {
4131        let (db, service) = setup();
4132        {
4133            let conn = sqlite::open_connection(db.path()).expect("conn");
4134            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4135                .expect("disable FK");
4136            conn.execute(
4137                "INSERT INTO actions (id, step_id, kind, status, properties, created_at) \
4138                 VALUES ('a1', 'ghost-step', 'emit', 'completed', '{}', 100)",
4139                [],
4140            )
4141            .expect("insert action with ghost step_id");
4142        }
4143        let report = service.check_semantics().expect("semantics check");
4144        assert_eq!(report.broken_action_fk, 1);
4145    }
4146
4147    #[test]
4148    fn check_semantics_detects_stale_fts_rows() {
4149        let (db, service) = setup();
4150        {
4151            let conn = sqlite::open_connection(db.path()).expect("conn");
4152            // FTS virtual tables have no FK constraints; insert a row referencing
4153            // a chunk_id that does not exist in the chunks table.
4154            conn.execute(
4155                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4156                 VALUES ('ghost-chunk', 'any-node', 'Meeting', 'stale content')",
4157                [],
4158            )
4159            .expect("insert stale FTS row");
4160        }
4161        let report = service.check_semantics().expect("semantics check");
4162        assert_eq!(report.stale_fts_rows, 1);
4163    }
4164
4165    #[test]
4166    fn check_semantics_detects_fts_rows_for_superseded_nodes() {
4167        let (db, service) = setup();
4168        {
4169            let conn = sqlite::open_connection(db.path()).expect("conn");
4170            // Insert a node that has been fully superseded (superseded_at IS NOT NULL).
4171            conn.execute(
4172                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4173                 VALUES ('r1', 'lg-sup', 'Meeting', '{}', 100, 200, 'src-1')",
4174                [],
4175            )
4176            .expect("insert superseded node");
4177            // Insert an FTS row for the superseded node's logical_id.
4178            conn.execute(
4179                "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
4180                 VALUES ('ck-x', 'lg-sup', 'Meeting', 'superseded content')",
4181                [],
4182            )
4183            .expect("insert FTS row for superseded node");
4184        }
4185        let report = service.check_semantics().expect("semantics check");
4186        assert_eq!(report.fts_rows_for_superseded_nodes, 1);
4187    }
4188
4189    #[test]
4190    fn check_semantics_detects_dangling_edges() {
4191        let (db, service) = setup();
4192        {
4193            let conn = sqlite::open_connection(db.path()).expect("conn");
4194            conn.execute_batch("PRAGMA foreign_keys = OFF;")
4195                .expect("disable FK");
4196            // One active node as source; target does not exist — edge is dangling.
4197            conn.execute(
4198                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4199                 VALUES ('r1', 'lg-src', 'Meeting', '{}', 100, 'src-1')",
4200                [],
4201            )
4202            .expect("insert source node");
4203            conn.execute(
4204                "INSERT INTO edges \
4205                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
4206                 VALUES ('e1', 'edge-1', 'lg-src', 'ghost-target', 'LINKS', '{}', 100, 'src-1')",
4207                [],
4208            )
4209            .expect("insert dangling edge");
4210        }
4211        let report = service.check_semantics().expect("semantics check");
4212        assert_eq!(report.dangling_edges, 1);
4213    }
4214
4215    #[test]
4216    fn check_semantics_detects_orphaned_supersession_chains() {
4217        let (db, service) = setup();
4218        {
4219            let conn = sqlite::open_connection(db.path()).expect("conn");
4220            // Every version of this logical_id is superseded — no active row remains.
4221            conn.execute(
4222                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4223                 VALUES ('r1', 'lg-orphaned', 'Meeting', '{}', 100, 200, 'src-1')",
4224                [],
4225            )
4226            .expect("insert fully superseded node");
4227        }
4228        let report = service.check_semantics().expect("semantics check");
4229        assert_eq!(report.orphaned_supersession_chains, 1);
4230    }
4231
4232    #[test]
4233    fn check_semantics_detects_mismatched_kind_property_fts_rows() {
4234        // With per-kind tables, mismatched_kind is always 0 — rows in fts_props_<kind>
4235        // must belong to that kind by construction. However, orphaned rows (per-kind table
4236        // with no registered schema) serve as the equivalent signal and are tested via
4237        // check_semantics_detects_fts_rows_for_superseded_nodes. This test verifies
4238        // mismatched_kind is 0 even when per-kind table rows exist for a node.
4239        let (db, service) = setup();
4240        {
4241            let conn = sqlite::open_connection(db.path()).expect("conn");
4242            conn.execute(
4243                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4244                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4245                [],
4246            )
4247            .expect("register schema");
4248            conn.execute(
4249                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4250                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4251                [],
4252            )
4253            .expect("insert node");
4254            // Create the per-kind table and insert a correctly-kind row.
4255            let table = fathomdb_schema::fts_kind_table_name("Goal");
4256            conn.execute_batch(&format!(
4257                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4258                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4259            ))
4260            .expect("create per-kind table");
4261            conn.execute(
4262                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4263                [],
4264            )
4265            .expect("insert per-kind FTS row");
4266        }
4267        let report = service.check_semantics().expect("semantics check");
4268        // Per-kind tables make mismatched_kind impossible — always 0.
4269        assert_eq!(report.mismatched_kind_property_fts_rows, 0);
4270    }
4271
4272    #[test]
4273    fn check_semantics_detects_duplicate_property_fts_rows() {
4274        let (db, service) = setup();
4275        {
4276            let conn = sqlite::open_connection(db.path()).expect("conn");
4277            conn.execute(
4278                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4279                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'src-1')",
4280                [],
4281            )
4282            .expect("insert node");
4283            // Create the per-kind table and insert two rows for the same logical ID.
4284            let table = fathomdb_schema::fts_kind_table_name("Goal");
4285            conn.execute_batch(&format!(
4286                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4287                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4288            ))
4289            .expect("create per-kind table");
4290            conn.execute(
4291                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2')"),
4292                [],
4293            )
4294            .expect("insert first property FTS row");
4295            conn.execute(
4296                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Ship v2 duplicate')"),
4297                [],
4298            )
4299            .expect("insert duplicate property FTS row");
4300        }
4301        let report = service.check_semantics().expect("semantics check");
4302        assert_eq!(report.duplicate_property_fts_rows, 1);
4303    }
4304
4305    #[test]
4306    fn check_semantics_detects_drifted_property_fts_text() {
4307        let (db, service) = setup();
4308        {
4309            let conn = sqlite::open_connection(db.path()).expect("conn");
4310            conn.execute(
4311                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4312                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4313                [],
4314            )
4315            .expect("register schema");
4316            conn.execute(
4317                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4318                 VALUES ('r1', 'goal-1', 'Goal', '{\"name\":\"Current name\"}', 100, 'src-1')",
4319                [],
4320            )
4321            .expect("insert node");
4322            // Create per-kind table and insert a row with outdated text content.
4323            let table = fathomdb_schema::fts_kind_table_name("Goal");
4324            conn.execute_batch(&format!(
4325                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4326                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4327            ))
4328            .expect("create per-kind table");
4329            conn.execute(
4330                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'Old stale name')"),
4331                [],
4332            )
4333            .expect("insert stale property FTS row");
4334        }
4335        let report = service.check_semantics().expect("semantics check");
4336        assert_eq!(report.drifted_property_fts_rows, 1);
4337    }
4338
4339    #[test]
4340    fn check_semantics_detects_property_fts_row_that_should_not_exist() {
4341        let (db, service) = setup();
4342        {
4343            let conn = sqlite::open_connection(db.path()).expect("conn");
4344            conn.execute(
4345                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4346                 VALUES ('Goal', '[\"$.searchable\"]', ' ')",
4347                [],
4348            )
4349            .expect("register schema");
4350            // Node does NOT have $.searchable — extraction yields no value.
4351            conn.execute(
4352                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4353                 VALUES ('r1', 'goal-1', 'Goal', '{\"other\":\"field\"}', 100, 'src-1')",
4354                [],
4355            )
4356            .expect("insert node");
4357            // Create per-kind table and insert a phantom row that should not exist.
4358            let table = fathomdb_schema::fts_kind_table_name("Goal");
4359            conn.execute_batch(&format!(
4360                "CREATE VIRTUAL TABLE IF NOT EXISTS {table} \
4361                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4362            ))
4363            .expect("create per-kind table");
4364            conn.execute(
4365                &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('goal-1', 'phantom text')"),
4366                [],
4367            )
4368            .expect("insert phantom property FTS row");
4369        }
4370        let report = service.check_semantics().expect("semantics check");
4371        assert_eq!(
4372            report.drifted_property_fts_rows, 1,
4373            "row that should not exist must be counted as drifted"
4374        );
4375    }
4376
4377    /// Regression (0.5.2 Pack A): weighted (per-column) FTS property schemas
4378    /// used to crash `check_semantics` with
4379    /// `SqliteError: no such column: fp.text_content` because the drift
4380    /// counter hardcoded the non-weighted column shape. A clean DB with a
4381    /// weighted schema must now report 0 drift without panicking.
4382    #[test]
4383    fn check_semantics_clean_on_weighted_fts_schema_does_not_panic() {
4384        let (db, service) = setup();
4385        // Register a weighted schema (two specs: one scalar, one recursive).
4386        // Eager mode writes the schema row and creates the per-kind weighted
4387        // FTS table with per-path columns.
4388        // Weighted schemas (at least one entry with a weight) trigger the
4389        // per-column FTS table layout in `create_or_replace_fts_kind_table`.
4390        let entries = vec![
4391            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4392            FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4393        ];
4394        service
4395            .register_fts_property_schema_with_entries(
4396                "Article",
4397                &entries,
4398                Some(" "),
4399                &[],
4400                crate::rebuild_actor::RebuildMode::Eager,
4401            )
4402            .expect("register weighted schema");
4403
4404        // Insert a node and a matching FTS row whose per-column values match
4405        // the canonical extraction (expected: 0 drift).
4406        {
4407            let conn = sqlite::open_connection(db.path()).expect("conn");
4408            let properties = r#"{"title":"Hello","body":{"text":"world"}}"#;
4409            conn.execute(
4410                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4411                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4412                [properties],
4413            )
4414            .expect("insert node");
4415
4416            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4417            let (_kind, schema) = schemas
4418                .iter()
4419                .find(|(k, _)| k == "Article")
4420                .expect("weighted schema present");
4421            let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4422            let cols = crate::writer::extract_property_fts_columns(&props, schema);
4423
4424            let table = fathomdb_schema::fts_kind_table_name("Article");
4425            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4426            let placeholders: Vec<String> =
4427                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4428            let sql = format!(
4429                "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4430                cols = col_names.join(", "),
4431                placeholders = placeholders.join(", "),
4432            );
4433            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4434            let params: Vec<&dyn rusqlite::ToSql> =
4435                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4436                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4437                    .collect();
4438            conn.execute(&sql, params.as_slice())
4439                .expect("insert weighted FTS row");
4440        }
4441
4442        let report = service
4443            .check_semantics()
4444            .expect("semantics check must not crash on weighted schema");
4445        assert_eq!(report.drifted_property_fts_rows, 0);
4446    }
4447
4448    /// Regression (0.5.2 Pack A): drift detection in weighted FTS tables.
4449    /// After tampering a per-column value, `check_semantics` must count the
4450    /// row as drifted (once per row, regardless of how many columns mismatch).
4451    #[test]
4452    fn check_semantics_detects_drifted_property_fts_text_weighted() {
4453        let (db, service) = setup();
4454        // Weighted schemas (at least one entry with a weight) trigger the
4455        // per-column FTS table layout in `create_or_replace_fts_kind_table`.
4456        let entries = vec![
4457            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4458            FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4459        ];
4460        service
4461            .register_fts_property_schema_with_entries(
4462                "Article",
4463                &entries,
4464                Some(" "),
4465                &[],
4466                crate::rebuild_actor::RebuildMode::Eager,
4467            )
4468            .expect("register weighted schema");
4469
4470        let title_col = fathomdb_schema::fts_column_name("$.title", false);
4471
4472        {
4473            let conn = sqlite::open_connection(db.path()).expect("conn");
4474            let properties = r#"{"title":"Current","body":{"text":"body"}}"#;
4475            conn.execute(
4476                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4477                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4478                [properties],
4479            )
4480            .expect("insert node");
4481
4482            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4483            let (_kind, schema) = schemas
4484                .iter()
4485                .find(|(k, _)| k == "Article")
4486                .expect("weighted schema present");
4487            let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4488            let cols = crate::writer::extract_property_fts_columns(&props, schema);
4489
4490            let table = fathomdb_schema::fts_kind_table_name("Article");
4491            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4492            let placeholders: Vec<String> =
4493                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4494            let sql = format!(
4495                "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4496                cols = col_names.join(", "),
4497                placeholders = placeholders.join(", "),
4498            );
4499            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4500            let params: Vec<&dyn rusqlite::ToSql> =
4501                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4502                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4503                    .collect();
4504            conn.execute(&sql, params.as_slice())
4505                .expect("insert weighted FTS row");
4506
4507            // Tamper the title column so it no longer matches canonical extraction.
4508            conn.execute(
4509                &format!("UPDATE {table} SET {title_col} = 'tampered' WHERE node_logical_id = 'article-1'"),
4510                [],
4511            )
4512            .expect("tamper weighted FTS row");
4513        }
4514
4515        let report = service.check_semantics().expect("semantics check");
4516        assert_eq!(report.drifted_property_fts_rows, 1);
4517    }
4518
4519    /// Regression (0.5.2 Pack A): a DB with both a weighted and a non-weighted
4520    /// per-kind FTS table must report 0 drift when both are in sync. This
4521    /// exercises the dispatcher: weighted path for one kind, non-weighted
4522    /// path for the other, in a single `check_semantics` call.
4523    #[test]
4524    fn check_semantics_mixed_weighted_and_non_weighted_schemas() {
4525        let (db, service) = setup();
4526
4527        // Weighted schema for Article (per-column layout — requires weights).
4528        let weighted_entries = vec![
4529            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
4530            FtsPropertyPathSpec::recursive("$.body").with_weight(1.0),
4531        ];
4532        service
4533            .register_fts_property_schema_with_entries(
4534                "Article",
4535                &weighted_entries,
4536                Some(" "),
4537                &[],
4538                crate::rebuild_actor::RebuildMode::Eager,
4539            )
4540            .expect("register weighted schema");
4541
4542        // Non-weighted (single path) schema for Goal. The legacy JSON shape
4543        // (bare array of scalar paths) yields a non-weighted per-kind table.
4544        {
4545            let conn = sqlite::open_connection(db.path()).expect("conn");
4546            conn.execute(
4547                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
4548                 VALUES ('Goal', '[\"$.name\"]', ' ')",
4549                [],
4550            )
4551            .expect("register non-weighted schema");
4552            let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
4553            conn.execute_batch(&format!(
4554                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
4555                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
4556            ))
4557            .expect("create non-weighted per-kind table");
4558
4559            // Insert Article node + matching weighted FTS row.
4560            let article_props = r#"{"title":"Hello","body":{"text":"world"}}"#;
4561            conn.execute(
4562                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4563                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4564                [article_props],
4565            )
4566            .expect("insert article");
4567
4568            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4569            let (_k, article_schema) = schemas
4570                .iter()
4571                .find(|(k, _)| k == "Article")
4572                .expect("Article schema present");
4573            let props: serde_json::Value =
4574                serde_json::from_str(article_props).expect("parse article props");
4575            let cols = crate::writer::extract_property_fts_columns(&props, article_schema);
4576            let article_table = fathomdb_schema::fts_kind_table_name("Article");
4577            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4578            let placeholders: Vec<String> =
4579                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4580            let sql = format!(
4581                "INSERT INTO {article_table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4582                cols = col_names.join(", "),
4583                placeholders = placeholders.join(", "),
4584            );
4585            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4586            let params: Vec<&dyn rusqlite::ToSql> =
4587                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4588                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4589                    .collect();
4590            conn.execute(&sql, params.as_slice())
4591                .expect("insert weighted FTS row");
4592
4593            // Insert Goal node + matching non-weighted FTS row. Canonical
4594            // extraction for legacy schema on $.name yields the string
4595            // "Goal One".
4596            conn.execute(
4597                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4598                 VALUES ('r2', 'goal-1', 'Goal', '{\"name\":\"Goal One\"}', 100, 'src-2')",
4599                [],
4600            )
4601            .expect("insert goal node");
4602            conn.execute(
4603                &format!("INSERT INTO {goal_table} (node_logical_id, text_content) VALUES ('goal-1', 'Goal One')"),
4604                [],
4605            )
4606            .expect("insert non-weighted FTS row");
4607        }
4608
4609        let report = service
4610            .check_semantics()
4611            .expect("semantics check must handle both shapes");
4612        assert_eq!(
4613            report.drifted_property_fts_rows, 0,
4614            "clean mixed weighted + non-weighted DB must report 0 drift"
4615        );
4616    }
4617
4618    /// Regression (0.5.2 follow-up, review note): a weighted schema whose
4619    /// path set includes literal `$.text_content` collapses via
4620    /// `fts_column_name` to a `text_content` column. A live-column probe
4621    /// would then misclassify this weighted table as non-weighted and run
4622    /// the single-blob comparator against per-column storage — no crash,
4623    /// but silently incorrect drift counts (a clean DB would report drift).
4624    /// Dispatching on the persisted schema shape (any entry with
4625    /// `weight.is_some()` ⇒ weighted) avoids the collision.
4626    ///
4627    /// This test writes a CLEAN weighted row (per-column values matching
4628    /// canonical extraction) and asserts zero drift. Under the old
4629    /// live-column dispatcher the non-weighted comparator would read the
4630    /// single-path column value, compare it against the multi-path blob
4631    /// concatenation produced by `extract_property_fts`, and spuriously
4632    /// report drift.
4633    #[test]
4634    fn check_semantics_weighted_schema_with_text_content_path() {
4635        let (db, service) = setup();
4636        let entries = vec![
4637            FtsPropertyPathSpec::scalar("$.text_content").with_weight(2.0),
4638            FtsPropertyPathSpec::scalar("$.title").with_weight(1.0),
4639        ];
4640        service
4641            .register_fts_property_schema_with_entries(
4642                "Article",
4643                &entries,
4644                Some(" "),
4645                &[],
4646                crate::rebuild_actor::RebuildMode::Eager,
4647            )
4648            .expect("register weighted schema with $.text_content path");
4649
4650        {
4651            let conn = sqlite::open_connection(db.path()).expect("conn");
4652            // Two distinct, non-empty scalar values. The non-weighted
4653            // comparator would expect their joined blob ("canonical body
4654            // Hello") as the single `text_content` column value; the
4655            // weighted (per-column) layout stores them separately, so on a
4656            // live-column probe the dispatcher would misclassify and
4657            // spuriously report drift.
4658            let properties = r#"{"text_content":"canonical body","title":"Hello"}"#;
4659            conn.execute(
4660                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4661                 VALUES ('r1', 'article-1', 'Article', ?1, 100, 'src-1')",
4662                [properties],
4663            )
4664            .expect("insert node");
4665
4666            let schemas = crate::writer::load_fts_property_schemas(&conn).expect("load schemas");
4667            let (_kind, schema) = schemas
4668                .iter()
4669                .find(|(k, _)| k == "Article")
4670                .expect("weighted schema present");
4671            let props: serde_json::Value = serde_json::from_str(properties).expect("parse props");
4672            let cols = crate::writer::extract_property_fts_columns(&props, schema);
4673
4674            let table = fathomdb_schema::fts_kind_table_name("Article");
4675            let col_names: Vec<String> = cols.iter().map(|(n, _)| n.clone()).collect();
4676            let placeholders: Vec<String> =
4677                (2..=col_names.len() + 1).map(|i| format!("?{i}")).collect();
4678            let sql = format!(
4679                "INSERT INTO {table} (node_logical_id, {cols}) VALUES (?1, {placeholders})",
4680                cols = col_names.join(", "),
4681                placeholders = placeholders.join(", "),
4682            );
4683            let values: Vec<String> = cols.iter().map(|(_, v)| v.clone()).collect();
4684            let params: Vec<&dyn rusqlite::ToSql> =
4685                std::iter::once(&"article-1" as &dyn rusqlite::ToSql)
4686                    .chain(values.iter().map(|v| v as &dyn rusqlite::ToSql))
4687                    .collect();
4688            conn.execute(&sql, params.as_slice())
4689                .expect("insert weighted FTS row");
4690        }
4691
4692        let report = service.check_semantics().expect("semantics check");
4693        assert_eq!(
4694            report.drifted_property_fts_rows, 0,
4695            "weighted schema whose path collapses to `text_content` must be \
4696             dispatched as weighted (per-column comparator); a clean DB \
4697             must report 0 drift"
4698        );
4699    }
4700
4701    #[test]
4702    fn safe_export_writes_manifest_with_sha256() {
4703        let (_db, service) = setup();
4704        let export_dir = tempfile::TempDir::new().expect("temp dir");
4705        let export_path = export_dir.path().join("backup.db");
4706
4707        let manifest = service
4708            .safe_export(
4709                &export_path,
4710                SafeExportOptions {
4711                    force_checkpoint: false,
4712                },
4713            )
4714            .expect("export");
4715
4716        assert!(export_path.exists(), "exported db should exist");
4717        let manifest_path = export_dir.path().join("backup.db.export-manifest.json");
4718        assert!(
4719            manifest_path.exists(),
4720            "manifest file should exist at {}",
4721            manifest_path.display()
4722        );
4723        assert_eq!(manifest.sha256.len(), 64, "sha256 should be 64 hex chars");
4724        assert!(
4725            manifest.exported_at > 0,
4726            "exported_at should be a unix timestamp"
4727        );
4728        assert_eq!(
4729            manifest.schema_version,
4730            SchemaManager::new().current_version().0,
4731            "schema_version should match the live schema version"
4732        );
4733        assert_eq!(manifest.protocol_version, 1, "protocol_version should be 1");
4734        assert!(manifest.page_count > 0, "page_count should be positive");
4735    }
4736
4737    #[test]
4738    fn safe_export_preserves_operational_validation_contracts() {
4739        let (_db, service) = setup();
4740        let validation_json = r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#;
4741        service
4742            .register_operational_collection(&OperationalRegisterRequest {
4743                name: "connector_health".to_owned(),
4744                kind: OperationalCollectionKind::LatestState,
4745                schema_json: "{}".to_owned(),
4746                retention_json: "{}".to_owned(),
4747                filter_fields_json: "[]".to_owned(),
4748                validation_json: validation_json.to_owned(),
4749                secondary_indexes_json: "[]".to_owned(),
4750                format_version: 1,
4751            })
4752            .expect("register collection");
4753
4754        let export_dir = tempfile::TempDir::new().expect("temp dir");
4755        let export_path = export_dir.path().join("backup.db");
4756        service
4757            .safe_export(
4758                &export_path,
4759                SafeExportOptions {
4760                    force_checkpoint: false,
4761                },
4762            )
4763            .expect("export");
4764
4765        let exported = sqlite::open_connection(&export_path).expect("exported conn");
4766        let exported_validation_json: String = exported
4767            .query_row(
4768                "SELECT validation_json FROM operational_collections WHERE name = 'connector_health'",
4769                [],
4770                |row| row.get(0),
4771            )
4772            .expect("validation_json");
4773        assert_eq!(exported_validation_json, validation_json);
4774    }
4775
4776    #[test]
4777    fn safe_export_force_checkpoint_false_skips_wal_pragma() {
4778        let (_db, service) = setup();
4779        let export_dir = tempfile::TempDir::new().expect("temp dir");
4780        let export_path = export_dir.path().join("no-wal.db");
4781
4782        // force_checkpoint: false must not error even on a non-WAL database
4783        let manifest = service
4784            .safe_export(
4785                &export_path,
4786                SafeExportOptions {
4787                    force_checkpoint: false,
4788                },
4789            )
4790            .expect("export with no checkpoint");
4791
4792        assert!(
4793            manifest.page_count > 0,
4794            "page_count must be populated regardless of checkpoint mode"
4795        );
4796        assert_eq!(
4797            manifest.schema_version,
4798            SchemaManager::new().current_version().0
4799        );
4800        assert_eq!(manifest.protocol_version, 1);
4801    }
4802
4803    #[test]
4804    fn safe_export_force_checkpoint_false_still_captures_wal_backed_changes() {
4805        let (db, service) = setup();
4806        let conn = sqlite::open_connection(db.path()).expect("conn");
4807        let journal_mode: String = conn
4808            .query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
4809            .expect("enable wal");
4810        assert_eq!(journal_mode.to_lowercase(), "wal");
4811        let auto_checkpoint_pages: i64 = conn
4812            .query_row("PRAGMA wal_autocheckpoint=0", [], |row| row.get(0))
4813            .expect("disable auto checkpoint");
4814        assert_eq!(auto_checkpoint_pages, 0);
4815        conn.execute(
4816            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4817             VALUES ('r-wal', 'lg-wal', 'Meeting', '{}', 100, 'src-wal')",
4818            [],
4819        )
4820        .expect("insert wal-backed node");
4821
4822        let export_dir = tempfile::TempDir::new().expect("temp dir");
4823        let export_path = export_dir.path().join("wal-backed.db");
4824        service
4825            .safe_export(
4826                &export_path,
4827                SafeExportOptions {
4828                    force_checkpoint: false,
4829                },
4830            )
4831            .expect("export wal-backed db");
4832
4833        let exported = sqlite::open_connection(&export_path).expect("open exported db");
4834        let exported_count: i64 = exported
4835            .query_row(
4836                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-wal'",
4837                [],
4838                |row| row.get(0),
4839            )
4840            .expect("count exported nodes");
4841        assert_eq!(
4842            exported_count, 1,
4843            "safe_export must include committed rows that are still resident in the WAL"
4844        );
4845    }
4846
4847    #[test]
4848    fn excise_source_removes_searchable_content_after_excision() {
4849        let (db, service) = setup();
4850        {
4851            let conn = sqlite::open_connection(db.path()).expect("conn");
4852            conn.execute(
4853                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4854                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4855                [],
4856            )
4857            .expect("insert v1");
4858            conn.execute(
4859                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4860                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4861                [],
4862            )
4863            .expect("insert v2");
4864            conn.execute(
4865                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4866                 VALUES ('ck1', 'lg1', 'hello world', 100)",
4867                [],
4868            )
4869            .expect("insert chunk");
4870        }
4871        service.excise_source("source-2").expect("excise");
4872        {
4873            let conn = sqlite::open_connection(db.path()).expect("conn");
4874            let fts_count: i64 = conn
4875                .query_row(
4876                    "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'ck1'",
4877                    [],
4878                    |row| row.get(0),
4879                )
4880                .expect("fts count");
4881            assert_eq!(
4882                fts_count, 0,
4883                "excised content should not remain searchable after excise"
4884            );
4885        }
4886    }
4887
4888    #[cfg(feature = "sqlite-vec")]
4889    #[test]
4890    fn excise_source_cleans_chunks_and_vec_rows_for_excised_version() {
4891        let (db, service) = setup();
4892        {
4893            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4894            service
4895                .schema_manager
4896                .ensure_vec_kind_profile(&conn, "Meeting", 4)
4897                .expect("ensure vec kind profile");
4898            conn.execute(
4899                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, superseded_at, source_ref) \
4900                 VALUES ('r1', 'lg1', 'Meeting', '{}', 100, 200, 'source-1')",
4901                [],
4902            )
4903            .expect("insert v1");
4904            conn.execute(
4905                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
4906                 VALUES ('r2', 'lg1', 'Meeting', '{}', 200, 'source-2')",
4907                [],
4908            )
4909            .expect("insert v2");
4910            conn.execute(
4911                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
4912                 VALUES ('ck1', 'lg1', 'new content', 200)",
4913                [],
4914            )
4915            .expect("insert chunk");
4916            conn.execute(
4917                "INSERT INTO vec_meeting (chunk_id, embedding) VALUES ('ck1', zeroblob(16))",
4918                [],
4919            )
4920            .expect("insert vec row");
4921        }
4922
4923        service.excise_source("source-2").expect("excise");
4924
4925        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
4926        let active_row: String = conn
4927            .query_row(
4928                "SELECT row_id FROM nodes WHERE logical_id = 'lg1' AND superseded_at IS NULL",
4929                [],
4930                |row| row.get(0),
4931            )
4932            .expect("restored active row");
4933        assert_eq!(active_row, "r1");
4934        let chunk_count: i64 = conn
4935            .query_row(
4936                "SELECT count(*) FROM chunks WHERE node_logical_id = 'lg1'",
4937                [],
4938                |row| row.get(0),
4939            )
4940            .expect("chunk count");
4941        assert_eq!(
4942            chunk_count, 0,
4943            "excised source content must not survive as chunks"
4944        );
4945        let vec_count: i64 = conn
4946            .query_row("SELECT count(*) FROM vec_meeting", [], |row| row.get(0))
4947            .expect("vec count");
4948        assert_eq!(vec_count, 0, "excised source vec rows must be removed");
4949        let fts_count: i64 = conn
4950            .query_row(
4951                "SELECT count(*) FROM fts_nodes WHERE node_logical_id = 'lg1'",
4952                [],
4953                |row| row.get(0),
4954            )
4955            .expect("fts count");
4956        assert_eq!(
4957            fts_count, 0,
4958            "excised source content must not remain searchable"
4959        );
4960    }
4961
4962    #[test]
4963    fn export_page_count_matches_exported_file() {
4964        let (_db, service) = setup();
4965        let export_dir = tempfile::TempDir::new().expect("temp dir");
4966        let export_path = export_dir.path().join("page-count.db");
4967
4968        let manifest = service
4969            .safe_export(
4970                &export_path,
4971                SafeExportOptions {
4972                    force_checkpoint: false,
4973                },
4974            )
4975            .expect("export");
4976
4977        let exported = sqlite::open_connection(&export_path).expect("open exported db");
4978        let actual_page_count: u64 = exported
4979            .query_row("PRAGMA page_count", [], |row| row.get(0))
4980            .expect("page_count from exported file");
4981
4982        assert_eq!(
4983            manifest.page_count, actual_page_count,
4984            "manifest page_count must match the exported file's PRAGMA page_count"
4985        );
4986    }
4987
4988    #[test]
4989    fn no_temp_file_after_successful_export() {
4990        let (_db, service) = setup();
4991        let export_dir = tempfile::TempDir::new().expect("temp dir");
4992        let export_path = export_dir.path().join("no-tmp.db");
4993
4994        service
4995            .safe_export(
4996                &export_path,
4997                SafeExportOptions {
4998                    force_checkpoint: false,
4999                },
5000            )
5001            .expect("export");
5002
5003        let tmp_files: Vec<_> = fs::read_dir(export_dir.path())
5004            .expect("read export dir")
5005            .filter_map(Result::ok)
5006            .filter(|e| e.path().extension().is_some_and(|ext| ext == "tmp"))
5007            .collect();
5008
5009        assert!(
5010            tmp_files.is_empty(),
5011            "no .tmp files should remain after a successful export, found: {tmp_files:?}"
5012        );
5013    }
5014
5015    #[test]
5016    fn export_manifest_is_valid_json() {
5017        let (_db, service) = setup();
5018        let export_dir = tempfile::TempDir::new().expect("temp dir");
5019        let export_path = export_dir.path().join("valid-json.db");
5020
5021        service
5022            .safe_export(
5023                &export_path,
5024                SafeExportOptions {
5025                    force_checkpoint: false,
5026                },
5027            )
5028            .expect("export");
5029
5030        let manifest_path = export_dir.path().join("valid-json.db.export-manifest.json");
5031        let manifest_contents = fs::read_to_string(&manifest_path).expect("read manifest");
5032        let parsed: serde_json::Value =
5033            serde_json::from_str(&manifest_contents).expect("manifest must be valid JSON");
5034
5035        assert!(
5036            parsed.get("exported_at").is_some(),
5037            "manifest must contain exported_at"
5038        );
5039        assert!(
5040            parsed.get("sha256").is_some(),
5041            "manifest must contain sha256"
5042        );
5043        assert!(
5044            parsed.get("schema_version").is_some(),
5045            "manifest must contain schema_version"
5046        );
5047        assert!(
5048            parsed.get("protocol_version").is_some(),
5049            "manifest must contain protocol_version"
5050        );
5051        assert!(
5052            parsed.get("page_count").is_some(),
5053            "manifest must contain page_count"
5054        );
5055    }
5056
5057    #[test]
5058    fn provenance_purge_dry_run_reports_counts() {
5059        let (db, service) = setup();
5060        {
5061            let conn = sqlite::open_connection(db.path()).expect("conn");
5062            conn.execute(
5063                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5064                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5065                [],
5066            )
5067            .expect("insert p1");
5068            conn.execute(
5069                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5070                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5071                [],
5072            )
5073            .expect("insert p2");
5074            conn.execute(
5075                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5076                 VALUES ('p3', 'excise', 'lg3', 'src-1', 300)",
5077                [],
5078            )
5079            .expect("insert p3");
5080        }
5081
5082        let options = super::ProvenancePurgeOptions {
5083            dry_run: true,
5084            preserve_event_types: Vec::new(),
5085        };
5086        let report = service
5087            .purge_provenance_events(250, &options)
5088            .expect("dry run purge");
5089
5090        assert_eq!(report.events_deleted, 2);
5091        assert_eq!(report.events_preserved, 1);
5092        assert!(report.oldest_remaining.is_some());
5093
5094        let conn = sqlite::open_connection(db.path()).expect("conn");
5095        let total: i64 = conn
5096            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5097                row.get(0)
5098            })
5099            .expect("count");
5100        assert_eq!(total, 3, "dry_run must not delete any events");
5101    }
5102
5103    #[test]
5104    fn provenance_purge_deletes_old_events() {
5105        let (db, service) = setup();
5106        {
5107            let conn = sqlite::open_connection(db.path()).expect("conn");
5108            conn.execute(
5109                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5110                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5111                [],
5112            )
5113            .expect("insert p1");
5114            conn.execute(
5115                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5116                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 200)",
5117                [],
5118            )
5119            .expect("insert p2");
5120        }
5121
5122        let options = super::ProvenancePurgeOptions {
5123            dry_run: false,
5124            preserve_event_types: Vec::new(),
5125        };
5126        let report = service
5127            .purge_provenance_events(150, &options)
5128            .expect("purge");
5129
5130        assert_eq!(report.events_deleted, 1);
5131        assert_eq!(report.events_preserved, 1);
5132        assert_eq!(report.oldest_remaining, Some(200));
5133
5134        let conn = sqlite::open_connection(db.path()).expect("conn");
5135        let remaining: i64 = conn
5136            .query_row("SELECT count(*) FROM provenance_events", [], |row| {
5137                row.get(0)
5138            })
5139            .expect("count");
5140        assert_eq!(remaining, 1);
5141    }
5142
5143    #[test]
5144    fn provenance_purge_preserves_specified_types() {
5145        let (db, service) = setup();
5146        {
5147            let conn = sqlite::open_connection(db.path()).expect("conn");
5148            conn.execute(
5149                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5150                 VALUES ('p1', 'excise', 'lg1', 'src-1', 100)",
5151                [],
5152            )
5153            .expect("insert p1");
5154            conn.execute(
5155                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5156                 VALUES ('p2', 'node_insert', 'lg2', 'src-1', 100)",
5157                [],
5158            )
5159            .expect("insert p2");
5160            conn.execute(
5161                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5162                 VALUES ('p3', 'node_insert', 'lg3', 'src-1', 100)",
5163                [],
5164            )
5165            .expect("insert p3");
5166        }
5167
5168        let options = super::ProvenancePurgeOptions {
5169            dry_run: false,
5170            preserve_event_types: Vec::new(),
5171        };
5172        let report = service
5173            .purge_provenance_events(500, &options)
5174            .expect("purge");
5175
5176        assert_eq!(report.events_deleted, 2);
5177        assert_eq!(report.events_preserved, 1);
5178
5179        let conn = sqlite::open_connection(db.path()).expect("conn");
5180        let remaining_type: String = conn
5181            .query_row("SELECT event_type FROM provenance_events", [], |row| {
5182                row.get(0)
5183            })
5184            .expect("remaining event type");
5185        assert_eq!(remaining_type, "excise");
5186    }
5187
5188    #[test]
5189    fn provenance_purge_noop_with_zero_timestamp() {
5190        let (db, service) = setup();
5191        {
5192            let conn = sqlite::open_connection(db.path()).expect("conn");
5193            conn.execute(
5194                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at) \
5195                 VALUES ('p1', 'node_insert', 'lg1', 'src-1', 100)",
5196                [],
5197            )
5198            .expect("insert p1");
5199        }
5200
5201        let options = super::ProvenancePurgeOptions {
5202            dry_run: false,
5203            preserve_event_types: Vec::new(),
5204        };
5205        let report = service.purge_provenance_events(0, &options).expect("purge");
5206
5207        assert_eq!(report.events_deleted, 0);
5208        assert_eq!(report.events_preserved, 1);
5209        assert_eq!(report.oldest_remaining, Some(100));
5210    }
5211
5212    #[test]
5213    fn restore_skips_edge_when_counterpart_purged() {
5214        let (db, service) = setup();
5215        {
5216            let conn = sqlite::open_connection(db.path()).expect("conn");
5217            // Create node A (doc-1) and node B (doc-2)
5218            conn.execute(
5219                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5220                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5221                [],
5222            )
5223            .expect("insert node A");
5224            conn.execute(
5225                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5226                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5227                [],
5228            )
5229            .expect("insert node B");
5230            // Create edge between A and B
5231            conn.execute(
5232                "INSERT INTO edges \
5233                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5234                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5235                [],
5236            )
5237            .expect("insert edge");
5238            // Retire both A and B, and the edge
5239            conn.execute(
5240                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5241                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5242                [],
5243            )
5244            .expect("insert retire event A");
5245            conn.execute(
5246                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5247                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5248                [],
5249            )
5250            .expect("insert edge retire event");
5251            conn.execute(
5252                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5253                [],
5254            )
5255            .expect("retire node A");
5256            conn.execute(
5257                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5258                [],
5259            )
5260            .expect("retire node B");
5261            conn.execute(
5262                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5263                [],
5264            )
5265            .expect("retire edge");
5266            // Simulate purge of B: delete node rows but leave the edge intact
5267            // to reproduce the dangling-edge scenario the validation guards against.
5268            conn.execute("DELETE FROM nodes WHERE logical_id = 'doc-2'", [])
5269                .expect("purge node B rows");
5270        }
5271
5272        // Restore A — the edge should be skipped because B has no active node
5273        let report = service.restore_logical_id("doc-1").expect("restore A");
5274        assert!(!report.was_noop);
5275        assert_eq!(report.restored_node_rows, 1);
5276        assert_eq!(report.restored_edge_rows, 0, "edge should not be restored");
5277        assert_eq!(report.skipped_edges.len(), 1);
5278        assert_eq!(report.skipped_edges[0].edge_logical_id, "edge-1");
5279        assert_eq!(report.skipped_edges[0].missing_endpoint, "doc-2");
5280
5281        // Verify the edge is still retired in the database
5282        let conn = sqlite::open_connection(db.path()).expect("conn");
5283        let active_edge_count: i64 = conn
5284            .query_row(
5285                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5286                [],
5287                |row| row.get(0),
5288            )
5289            .expect("active edge count");
5290        assert_eq!(active_edge_count, 0, "edge must remain retired");
5291    }
5292
5293    #[test]
5294    fn restore_restores_edges_to_active_nodes() {
5295        let (db, service) = setup();
5296        {
5297            let conn = sqlite::open_connection(db.path()).expect("conn");
5298            // Create node A and node B (B stays active)
5299            conn.execute(
5300                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5301                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5302                [],
5303            )
5304            .expect("insert node A");
5305            conn.execute(
5306                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5307                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5308                [],
5309            )
5310            .expect("insert node B");
5311            // Create edge between A and B
5312            conn.execute(
5313                "INSERT INTO edges \
5314                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5315                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5316                [],
5317            )
5318            .expect("insert edge");
5319            // Retire only A
5320            conn.execute(
5321                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5322                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5323                [],
5324            )
5325            .expect("insert retire event A");
5326            conn.execute(
5327                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5328                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5329                [],
5330            )
5331            .expect("insert edge retire event");
5332            conn.execute(
5333                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5334                [],
5335            )
5336            .expect("retire node A");
5337            conn.execute(
5338                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5339                [],
5340            )
5341            .expect("retire edge");
5342        }
5343
5344        // Restore A — B is active, so the edge should be restored normally
5345        let report = service.restore_logical_id("doc-1").expect("restore A");
5346        assert!(!report.was_noop);
5347        assert_eq!(report.restored_node_rows, 1);
5348        assert!(report.restored_edge_rows > 0, "edge should be restored");
5349        assert!(
5350            report.skipped_edges.is_empty(),
5351            "no edges should be skipped"
5352        );
5353
5354        let conn = sqlite::open_connection(db.path()).expect("conn");
5355        let active_edge_count: i64 = conn
5356            .query_row(
5357                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5358                [],
5359                |row| row.get(0),
5360            )
5361            .expect("active edge count");
5362        assert_eq!(active_edge_count, 1, "edge must be active");
5363    }
5364
5365    #[test]
5366    fn restore_restores_edges_when_both_restored() {
5367        let (db, service) = setup();
5368        {
5369            let conn = sqlite::open_connection(db.path()).expect("conn");
5370            // Create node A and node B
5371            conn.execute(
5372                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5373                 VALUES ('node-row-a', 'doc-1', 'Document', '{}', 100, 'seed')",
5374                [],
5375            )
5376            .expect("insert node A");
5377            conn.execute(
5378                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5379                 VALUES ('node-row-b', 'doc-2', 'Document', '{}', 100, 'seed')",
5380                [],
5381            )
5382            .expect("insert node B");
5383            // Create edge between A and B
5384            conn.execute(
5385                "INSERT INTO edges \
5386                 (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
5387                 VALUES ('edge-row-1', 'edge-1', 'doc-1', 'doc-2', 'RELATED', '{}', 100, 'seed')",
5388                [],
5389            )
5390            .expect("insert edge");
5391            // Retire both A and B
5392            conn.execute(
5393                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5394                 VALUES ('evt-retire-a', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5395                [],
5396            )
5397            .expect("insert retire event A");
5398            conn.execute(
5399                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5400                 VALUES ('evt-retire-b', 'node_retire', 'doc-2', 'forget-1', 200, '')",
5401                [],
5402            )
5403            .expect("insert retire event B");
5404            conn.execute(
5405                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5406                 VALUES ('evt-edge-retire', 'edge_retire', 'edge-1', 'forget-1', 200, '')",
5407                [],
5408            )
5409            .expect("insert edge retire event");
5410            conn.execute(
5411                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5412                [],
5413            )
5414            .expect("retire node A");
5415            conn.execute(
5416                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-2'",
5417                [],
5418            )
5419            .expect("retire node B");
5420            conn.execute(
5421                "UPDATE edges SET superseded_at = 200 WHERE logical_id = 'edge-1'",
5422                [],
5423            )
5424            .expect("retire edge");
5425        }
5426
5427        // Restore B first — edge is skipped because A is still retired
5428        let report_b = service.restore_logical_id("doc-2").expect("restore B");
5429        assert!(!report_b.was_noop);
5430
5431        // Restore A — B is now active, so the edge should be restored
5432        let report_a = service.restore_logical_id("doc-1").expect("restore A");
5433        assert!(!report_a.was_noop);
5434        assert_eq!(report_a.restored_node_rows, 1);
5435        assert!(
5436            report_a.restored_edge_rows > 0,
5437            "edge should be restored when both endpoints active"
5438        );
5439        assert!(
5440            report_a.skipped_edges.is_empty(),
5441            "no edges should be skipped"
5442        );
5443
5444        let conn = sqlite::open_connection(db.path()).expect("conn");
5445        let active_edge_count: i64 = conn
5446            .query_row(
5447                "SELECT count(*) FROM edges WHERE logical_id = 'edge-1' AND superseded_at IS NULL",
5448                [],
5449                |row| row.get(0),
5450            )
5451            .expect("active edge count");
5452        assert_eq!(
5453            active_edge_count, 1,
5454            "edge must be active after both endpoints restored"
5455        );
5456    }
5457
5458    // ── FTS property schema end-to-end tests ──────────────────────────
5459
5460    #[test]
5461    fn fts_property_schema_crud_round_trip() {
5462        let (_db, service) = setup();
5463
5464        // Register
5465        let record = service
5466            .register_fts_property_schema(
5467                "Meeting",
5468                &["$.title".to_owned(), "$.summary".to_owned()],
5469                None,
5470            )
5471            .expect("register");
5472        assert_eq!(record.kind, "Meeting");
5473        assert_eq!(record.property_paths, vec!["$.title", "$.summary"]);
5474        assert_eq!(record.separator, " ");
5475        assert_eq!(record.format_version, 1);
5476
5477        // Describe
5478        let described = service
5479            .describe_fts_property_schema("Meeting")
5480            .expect("describe")
5481            .expect("should exist");
5482        assert_eq!(described, record);
5483
5484        // Describe missing kind
5485        let missing = service
5486            .describe_fts_property_schema("NoSuchKind")
5487            .expect("describe missing");
5488        assert!(missing.is_none());
5489
5490        // List
5491        let list = service.list_fts_property_schemas().expect("list");
5492        assert_eq!(list.len(), 1);
5493        assert_eq!(list[0].kind, "Meeting");
5494
5495        // Update (idempotent upsert)
5496        let updated = service
5497            .register_fts_property_schema(
5498                "Meeting",
5499                &["$.title".to_owned(), "$.notes".to_owned()],
5500                Some("\n"),
5501            )
5502            .expect("update");
5503        assert_eq!(updated.property_paths, vec!["$.title", "$.notes"]);
5504        assert_eq!(updated.separator, "\n");
5505
5506        // Remove
5507        service
5508            .remove_fts_property_schema("Meeting")
5509            .expect("remove");
5510        let after_remove = service
5511            .describe_fts_property_schema("Meeting")
5512            .expect("describe after remove");
5513        assert!(after_remove.is_none());
5514
5515        // Remove non-existent is an error
5516        let err = service.remove_fts_property_schema("Meeting");
5517        assert!(err.is_err());
5518    }
5519
5520    #[test]
5521    fn describe_fts_property_schema_round_trips_recursive_entries() {
5522        let (_db, service) = setup();
5523
5524        let entries = vec![
5525            FtsPropertyPathSpec::scalar("$.title"),
5526            FtsPropertyPathSpec::recursive("$.payload"),
5527        ];
5528        let exclude = vec!["$.payload.private".to_owned()];
5529        let registered = service
5530            .register_fts_property_schema_with_entries(
5531                "KnowledgeItem",
5532                &entries,
5533                Some(" "),
5534                &exclude,
5535                crate::rebuild_actor::RebuildMode::Eager,
5536            )
5537            .expect("register recursive");
5538
5539        // The register entry point now echoes back the fully-populated
5540        // record via the same load helper used by describe/list.
5541        assert_eq!(registered.entries, entries);
5542        assert_eq!(registered.exclude_paths, exclude);
5543        assert_eq!(registered.property_paths, vec!["$.title", "$.payload"]);
5544
5545        let described = service
5546            .describe_fts_property_schema("KnowledgeItem")
5547            .expect("describe")
5548            .expect("should exist");
5549        assert_eq!(described.kind, "KnowledgeItem");
5550        assert_eq!(described.entries, entries);
5551        assert_eq!(described.exclude_paths, exclude);
5552        assert_eq!(described.property_paths, vec!["$.title", "$.payload"]);
5553        assert_eq!(described.separator, " ");
5554        assert_eq!(described.format_version, 1);
5555    }
5556
5557    #[test]
5558    fn list_fts_property_schemas_round_trips_recursive_entries() {
5559        let (_db, service) = setup();
5560
5561        let entries = vec![
5562            FtsPropertyPathSpec::scalar("$.title"),
5563            FtsPropertyPathSpec::recursive("$.payload"),
5564        ];
5565        let exclude = vec!["$.payload.secret".to_owned()];
5566        service
5567            .register_fts_property_schema_with_entries(
5568                "KnowledgeItem",
5569                &entries,
5570                Some(" "),
5571                &exclude,
5572                crate::rebuild_actor::RebuildMode::Eager,
5573            )
5574            .expect("register recursive");
5575
5576        let listed = service.list_fts_property_schemas().expect("list");
5577        assert_eq!(listed.len(), 1);
5578        let record = &listed[0];
5579        assert_eq!(record.kind, "KnowledgeItem");
5580        assert_eq!(record.entries, entries);
5581        assert_eq!(record.exclude_paths, exclude);
5582        assert_eq!(record.property_paths, vec!["$.title", "$.payload"]);
5583    }
5584
5585    #[test]
5586    fn describe_fts_property_schema_round_trips_scalar_only_entries() {
5587        let (_db, service) = setup();
5588
5589        service
5590            .register_fts_property_schema(
5591                "Meeting",
5592                &["$.title".to_owned(), "$.summary".to_owned()],
5593                None,
5594            )
5595            .expect("register scalar");
5596
5597        let described = service
5598            .describe_fts_property_schema("Meeting")
5599            .expect("describe")
5600            .expect("should exist");
5601        assert_eq!(described.property_paths, vec!["$.title", "$.summary"]);
5602        assert_eq!(described.entries.len(), 2);
5603        for entry in &described.entries {
5604            assert_eq!(
5605                entry.mode,
5606                FtsPropertyPathMode::Scalar,
5607                "scalar-only schema should deserialize every entry as Scalar"
5608            );
5609        }
5610        assert!(described.exclude_paths.is_empty());
5611    }
5612
5613    #[test]
5614    fn restore_reestablishes_property_fts_visibility() {
5615        let (db, service) = setup();
5616        let doc_table = fathomdb_schema::fts_kind_table_name("Document");
5617        {
5618            let conn = sqlite::open_connection(db.path()).expect("conn");
5619            // Register a property schema for Document kind.
5620            conn.execute(
5621                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5622                 VALUES ('Document', '[\"$.title\", \"$.body\"]', ' ')",
5623                [],
5624            )
5625            .expect("register schema");
5626            // Create the per-kind FTS table.
5627            conn.execute_batch(&format!(
5628                "CREATE VIRTUAL TABLE IF NOT EXISTS {doc_table} USING fts5(\
5629                    node_logical_id UNINDEXED, text_content, \
5630                    tokenize = 'porter unicode61 remove_diacritics 2'\
5631                )"
5632            ))
5633            .expect("create per-kind table");
5634            // Insert an active node with extractable properties.
5635            conn.execute(
5636                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5637                 VALUES ('row-1', 'doc-1', 'Document', '{\"title\":\"Budget\",\"body\":\"Q3 forecast\"}', 100, 'seed')",
5638                [],
5639            )
5640            .expect("insert node");
5641            // Insert a chunk so restore has something to work with for FTS.
5642            conn.execute(
5643                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
5644                 VALUES ('chunk-1', 'doc-1', 'budget text', 100)",
5645                [],
5646            )
5647            .expect("insert chunk");
5648            // Insert property FTS row into per-kind table (as write path would).
5649            conn.execute(
5650                &format!(
5651                    "INSERT INTO {doc_table} (node_logical_id, text_content) \
5652                     VALUES ('doc-1', 'Budget Q3 forecast')"
5653                ),
5654                [],
5655            )
5656            .expect("insert property fts");
5657            // Simulate retire: supersede node, clear FTS.
5658            conn.execute(
5659                "INSERT INTO provenance_events (id, event_type, subject, source_ref, created_at, metadata_json) \
5660                 VALUES ('evt-retire', 'node_retire', 'doc-1', 'forget-1', 200, '')",
5661                [],
5662            )
5663            .expect("retire event");
5664            conn.execute(
5665                "UPDATE nodes SET superseded_at = 200 WHERE logical_id = 'doc-1'",
5666                [],
5667            )
5668            .expect("supersede");
5669            conn.execute("DELETE FROM fts_nodes", [])
5670                .expect("clear chunk fts");
5671            conn.execute(&format!("DELETE FROM {doc_table}"), [])
5672                .expect("clear property fts");
5673        }
5674
5675        let report = service.restore_logical_id("doc-1").expect("restore");
5676        assert_eq!(report.restored_property_fts_rows, 1);
5677
5678        // Verify the property FTS row was recreated in the per-kind table.
5679        let conn = sqlite::open_connection(db.path()).expect("conn");
5680        let prop_fts_count: i64 = conn
5681            .query_row(
5682                &format!("SELECT count(*) FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5683                [],
5684                |row| row.get(0),
5685            )
5686            .expect("prop fts count");
5687        assert_eq!(prop_fts_count, 1, "property FTS must be restored");
5688
5689        let text: String = conn
5690            .query_row(
5691                &format!("SELECT text_content FROM {doc_table} WHERE node_logical_id = 'doc-1'"),
5692                [],
5693                |row| row.get(0),
5694            )
5695            .expect("prop fts text");
5696        assert_eq!(text, "Budget Q3 forecast");
5697    }
5698
5699    #[test]
5700    fn safe_export_preserves_fts_property_schemas() {
5701        let (_db, service) = setup();
5702        service
5703            .register_fts_property_schema(
5704                "Goal",
5705                &["$.name".to_owned(), "$.rationale".to_owned()],
5706                None,
5707            )
5708            .expect("register schema");
5709
5710        let export_dir = tempfile::TempDir::new().expect("temp dir");
5711        let export_path = export_dir.path().join("backup.db");
5712        service
5713            .safe_export(
5714                &export_path,
5715                SafeExportOptions {
5716                    force_checkpoint: false,
5717                },
5718            )
5719            .expect("export");
5720
5721        // Open the exported DB and verify the schema survived.
5722        let exported_conn = rusqlite::Connection::open(&export_path).expect("open exported db");
5723        let kind: String = exported_conn
5724            .query_row(
5725                "SELECT kind FROM fts_property_schemas WHERE kind = 'Goal'",
5726                [],
5727                |row| row.get(0),
5728            )
5729            .expect("schema must exist in export");
5730        assert_eq!(kind, "Goal");
5731        let paths_json: String = exported_conn
5732            .query_row(
5733                "SELECT property_paths_json FROM fts_property_schemas WHERE kind = 'Goal'",
5734                [],
5735                |row| row.get(0),
5736            )
5737            .expect("paths must exist");
5738        let paths: Vec<String> = serde_json::from_str(&paths_json).expect("valid json");
5739        assert_eq!(paths, vec!["$.name", "$.rationale"]);
5740    }
5741
5742    #[test]
5743    #[allow(clippy::too_many_lines)]
5744    fn export_recovery_rebuilds_property_fts_from_canonical_state() {
5745        let (db, service) = setup();
5746        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5747        // Register a schema and insert two nodes with extractable properties.
5748        service
5749            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
5750            .expect("register");
5751        {
5752            let conn = sqlite::open_connection(db.path()).expect("conn");
5753            conn.execute(
5754                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5755                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5756                [],
5757            )
5758            .expect("insert node 1");
5759            conn.execute(
5760                &format!(
5761                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5762                     VALUES ('goal-1', 'Ship v2')"
5763                ),
5764                [],
5765            )
5766            .expect("insert property FTS row 1");
5767            conn.execute(
5768                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5769                 VALUES ('row-2', 'goal-2', 'Goal', '{\"name\":\"Launch redesign\"}', 100, 'seed')",
5770                [],
5771            )
5772            .expect("insert node 2");
5773            conn.execute(
5774                &format!(
5775                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5776                     VALUES ('goal-2', 'Launch redesign')"
5777                ),
5778                [],
5779            )
5780            .expect("insert property FTS row 2");
5781        }
5782
5783        // Export.
5784        let export_dir = tempfile::TempDir::new().expect("temp dir");
5785        let export_path = export_dir.path().join("backup.db");
5786        service
5787            .safe_export(
5788                &export_path,
5789                SafeExportOptions {
5790                    force_checkpoint: false,
5791                },
5792            )
5793            .expect("export");
5794
5795        // Corrupt the derived rows: replace correct text with wrong text for
5796        // goal-1, and delete the row for goal-2 entirely. This exercises both
5797        // corrupted-but-present rows and missing rows in the same recovery.
5798        {
5799            let conn = rusqlite::Connection::open(&export_path).expect("open export");
5800            // Bootstrap the exported DB to get per-kind tables.
5801            SchemaManager::new()
5802                .bootstrap(&conn)
5803                .expect("bootstrap export");
5804            conn.execute(
5805                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5806                [],
5807            )
5808            .expect("delete old row");
5809            conn.execute(
5810                &format!(
5811                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5812                     VALUES ('goal-1', 'completely wrong stale text')"
5813                ),
5814                [],
5815            )
5816            .expect("insert corrupted row");
5817            conn.execute(
5818                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5819                [],
5820            )
5821            .expect("delete goal-2 row");
5822        }
5823
5824        // Open the exported DB and rebuild projections from canonical state.
5825        let schema = Arc::new(SchemaManager::new());
5826        let exported_service = AdminService::new(&export_path, Arc::clone(&schema));
5827        exported_service
5828            .rebuild_projections(ProjectionTarget::Fts)
5829            .expect("rebuild");
5830
5831        // Verify the per-kind table has the correct rows after recovery.
5832        let conn = rusqlite::Connection::open(&export_path).expect("open export for verify");
5833        let goal1_text: String = conn
5834            .query_row(
5835                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5836                [],
5837                |r| r.get(0),
5838            )
5839            .expect("goal-1 text after rebuild");
5840        assert_eq!(
5841            goal1_text, "Ship v2",
5842            "goal-1 text must be corrected by rebuild"
5843        );
5844
5845        let goal2_count: i64 = conn
5846            .query_row(
5847                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-2'"),
5848                [],
5849                |r| r.get(0),
5850            )
5851            .expect("goal-2 count");
5852        assert_eq!(goal2_count, 1, "goal-2 row must be restored by rebuild");
5853
5854        let stale_count: i64 = conn
5855            .query_row(
5856                &format!("SELECT count(*) FROM {goal_table} WHERE text_content = 'completely wrong stale text'"),
5857                [],
5858                |r| r.get(0),
5859            )
5860            .expect("stale count");
5861        assert_eq!(stale_count, 0, "corrupted text must be gone after rebuild");
5862
5863        // Verify integrity and semantics are clean after recovery.
5864        let integrity = exported_service.check_integrity().expect("integrity");
5865        assert_eq!(integrity.missing_property_fts_rows, 0);
5866        let semantics = exported_service.check_semantics().expect("semantics");
5867        assert_eq!(semantics.drifted_property_fts_rows, 0);
5868        assert_eq!(semantics.orphaned_property_fts_rows, 0);
5869        assert_eq!(semantics.duplicate_property_fts_rows, 0);
5870    }
5871
5872    #[test]
5873    fn check_integrity_no_false_positives_for_empty_extraction() {
5874        let (db, service) = setup();
5875        {
5876            let conn = sqlite::open_connection(db.path()).expect("conn");
5877            // Register a schema that looks for $.searchable
5878            conn.execute(
5879                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5880                 VALUES ('Ticket', '[\"$.searchable\"]', ' ')",
5881                [],
5882            )
5883            .expect("register schema");
5884            // Insert a node whose properties do NOT contain $.searchable —
5885            // correctly has no property FTS row.
5886            conn.execute(
5887                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5888                 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"status\":\"open\"}', 100, 'seed')",
5889                [],
5890            )
5891            .expect("insert node");
5892        }
5893
5894        let report = service.check_integrity().expect("integrity");
5895        assert_eq!(
5896            report.missing_property_fts_rows, 0,
5897            "node with no extractable values must not be counted as missing"
5898        );
5899    }
5900
5901    #[test]
5902    fn check_integrity_detects_genuinely_missing_property_fts_rows() {
5903        let (db, service) = setup();
5904        {
5905            let conn = sqlite::open_connection(db.path()).expect("conn");
5906            conn.execute(
5907                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5908                 VALUES ('Ticket', '[\"$.title\"]', ' ')",
5909                [],
5910            )
5911            .expect("register schema");
5912            // Insert a node WITH an extractable $.title but no property FTS row.
5913            conn.execute(
5914                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5915                 VALUES ('row-1', 'ticket-1', 'Ticket', '{\"title\":\"fix login bug\"}', 100, 'seed')",
5916                [],
5917            )
5918            .expect("insert node");
5919        }
5920
5921        let report = service.check_integrity().expect("integrity");
5922        assert_eq!(
5923            report.missing_property_fts_rows, 1,
5924            "node with extractable values but no property FTS row must be detected"
5925        );
5926    }
5927
5928    #[test]
5929    fn rebuild_projections_fts_restores_missing_property_fts_rows() {
5930        let (db, service) = setup();
5931        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5932        {
5933            let conn = sqlite::open_connection(db.path()).expect("conn");
5934            conn.execute(
5935                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5936                 VALUES ('Goal', '[\"$.name\"]', ' ')",
5937                [],
5938            )
5939            .expect("register schema");
5940            conn.execute(
5941                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5942                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5943                [],
5944            )
5945            .expect("insert node");
5946            // Deliberately do NOT insert a property FTS row.
5947        }
5948
5949        let report = service
5950            .rebuild_projections(ProjectionTarget::Fts)
5951            .expect("rebuild");
5952        assert!(
5953            report.rebuilt_rows >= 1,
5954            "rebuild must insert at least one property FTS row"
5955        );
5956
5957        let conn = sqlite::open_connection(db.path()).expect("conn");
5958        let text: String = conn
5959            .query_row(
5960                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
5961                [],
5962                |row| row.get(0),
5963            )
5964            .expect("property FTS row must exist after rebuild");
5965        assert_eq!(text, "Ship v2");
5966    }
5967
5968    #[test]
5969    fn rebuild_missing_projections_fills_gap_for_deleted_property_fts_row() {
5970        let (db, service) = setup();
5971        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
5972        {
5973            let conn = sqlite::open_connection(db.path()).expect("conn");
5974            conn.execute(
5975                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
5976                 VALUES ('Goal', '[\"$.name\"]', ' ')",
5977                [],
5978            )
5979            .expect("register schema");
5980            conn.execute(
5981                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
5982                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
5983                [],
5984            )
5985            .expect("insert node");
5986            // Create per-kind table and insert then delete to simulate corruption.
5987            conn.execute_batch(&format!(
5988                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
5989                    node_logical_id UNINDEXED, text_content, \
5990                    tokenize = 'porter unicode61 remove_diacritics 2'\
5991                )"
5992            ))
5993            .expect("create per-kind table");
5994            conn.execute(
5995                &format!(
5996                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
5997                     VALUES ('goal-1', 'Ship v2')"
5998                ),
5999                [],
6000            )
6001            .expect("insert property fts");
6002            conn.execute(
6003                &format!("DELETE FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6004                [],
6005            )
6006            .expect("delete property fts");
6007        }
6008
6009        let report = service
6010            .rebuild_missing_projections()
6011            .expect("rebuild missing");
6012        assert!(
6013            report.rebuilt_rows >= 1,
6014            "missing rebuild must insert the gap-fill row"
6015        );
6016
6017        let conn = sqlite::open_connection(db.path()).expect("conn");
6018        let count: i64 = conn
6019            .query_row(
6020                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6021                [],
6022                |row| row.get(0),
6023            )
6024            .expect("count");
6025        assert_eq!(
6026            count, 1,
6027            "gap-fill must restore exactly one property FTS row"
6028        );
6029    }
6030
6031    #[test]
6032    fn remove_schema_then_rebuild_cleans_stale_property_fts_rows() {
6033        // This test verifies that a full FTS rebuild clears per-kind tables whose
6034        // schema has been removed (orphaned state). We create the orphaned state
6035        // directly via SQL (bypassing the service API, which now eagerly deletes rows
6036        // on schema removal) to simulate a table that was left populated from a
6037        // previous registration cycle.
6038        let (db, service) = setup();
6039        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
6040        {
6041            let conn = sqlite::open_connection(db.path()).expect("conn");
6042            conn.execute(
6043                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6044                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6045                [],
6046            )
6047            .expect("insert node");
6048            // Create per-kind table WITHOUT registering a schema — simulates orphaned rows
6049            // that remain after schema removal (or pre-existing table from a previous cycle).
6050            conn.execute_batch(&format!(
6051                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} \
6052                 USING fts5(node_logical_id UNINDEXED, text_content, tokenize = 'porter unicode61 remove_diacritics 2')"
6053            ))
6054            .expect("create per-kind table");
6055            conn.execute(
6056                &format!(
6057                    "INSERT INTO {goal_table} (node_logical_id, text_content) \
6058                     VALUES ('goal-1', 'Ship v2')"
6059                ),
6060                [],
6061            )
6062            .expect("insert property fts");
6063        }
6064
6065        // No schema registered — per-kind table has orphaned rows.
6066        let semantics = service.check_semantics().expect("semantics");
6067        assert_eq!(
6068            semantics.orphaned_property_fts_rows, 1,
6069            "orphaned property FTS rows must be detected with no registered schema"
6070        );
6071
6072        // Full rebuild should clean them (no schema means nothing to rebuild).
6073        service
6074            .rebuild_projections(ProjectionTarget::Fts)
6075            .expect("rebuild");
6076
6077        let conn = sqlite::open_connection(db.path()).expect("conn");
6078        let count: i64 = conn
6079            .query_row(
6080                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
6081                [],
6082                |row| row.get(0),
6083            )
6084            .expect("count");
6085        assert_eq!(
6086            count, 0,
6087            "rebuild must delete rows from per-kind tables with no registered schema"
6088        );
6089    }
6090
6091    mod validate_fts_property_paths_tests {
6092        use super::super::validate_fts_property_paths;
6093
6094        #[test]
6095        fn valid_simple_path() {
6096            assert!(validate_fts_property_paths(&["$.name".to_owned()]).is_ok());
6097        }
6098
6099        #[test]
6100        fn valid_nested_path() {
6101            assert!(validate_fts_property_paths(&["$.address.city".to_owned()]).is_ok());
6102        }
6103
6104        #[test]
6105        fn valid_underscore_segment() {
6106            assert!(validate_fts_property_paths(&["$.a_b".to_owned()]).is_ok());
6107        }
6108
6109        #[test]
6110        fn rejects_bare_prefix() {
6111            let result = validate_fts_property_paths(&["$.".to_owned()]);
6112            assert!(result.is_err(), "path '$.' must be rejected");
6113        }
6114
6115        #[test]
6116        fn rejects_double_dot() {
6117            let result = validate_fts_property_paths(&["$..x".to_owned()]);
6118            assert!(result.is_err(), "path '$..x' must be rejected");
6119        }
6120
6121        #[test]
6122        fn rejects_trailing_dot() {
6123            let result = validate_fts_property_paths(&["$.foo.".to_owned()]);
6124            assert!(result.is_err(), "path '$.foo.' must be rejected");
6125        }
6126
6127        #[test]
6128        fn rejects_space_in_segment() {
6129            let result = validate_fts_property_paths(&["$.foo bar".to_owned()]);
6130            assert!(result.is_err(), "path '$.foo bar' must be rejected");
6131        }
6132
6133        #[test]
6134        fn rejects_bracket_syntax() {
6135            let result = validate_fts_property_paths(&["$.foo[0]".to_owned()]);
6136            assert!(result.is_err(), "path '$.foo[0]' must be rejected");
6137        }
6138
6139        #[test]
6140        fn rejects_duplicates() {
6141            let result = validate_fts_property_paths(&["$.name".to_owned(), "$.name".to_owned()]);
6142            assert!(result.is_err(), "duplicate paths must be rejected");
6143        }
6144
6145        #[test]
6146        fn rejects_empty_list() {
6147            let result = validate_fts_property_paths(&[]);
6148            assert!(result.is_err(), "empty path list must be rejected");
6149        }
6150    }
6151
6152    // --- A-6: per-kind FTS table tests ---
6153
6154    #[test]
6155    fn register_fts_schema_writes_to_per_kind_table() {
6156        // After A-6: register_fts_property_schema writes rows to fts_props_<kind>,
6157        // NOT to fts_node_properties.
6158        let (db, service) = setup();
6159        {
6160            let conn = sqlite::open_connection(db.path()).expect("conn");
6161            // Insert a node before registering the schema.
6162            conn.execute(
6163                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6164                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6165                [],
6166            )
6167            .expect("insert node");
6168        }
6169
6170        // Register schema — this triggers eager rebuild which writes to per-kind table.
6171        service
6172            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6173            .expect("register schema");
6174
6175        let conn = sqlite::open_connection(db.path()).expect("conn");
6176        let table = fathomdb_schema::fts_kind_table_name("Goal");
6177        // Per-kind table must have the row.
6178        let per_kind_count: i64 = conn
6179            .query_row(
6180                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6181                [],
6182                |row| row.get(0),
6183            )
6184            .expect("per-kind count");
6185        assert_eq!(
6186            per_kind_count, 1,
6187            "per-kind table must have the row after registration"
6188        );
6189    }
6190
6191    #[test]
6192    fn remove_fts_schema_deletes_from_per_kind_table() {
6193        // After A-6: remove_fts_property_schema deletes rows from fts_props_<kind>.
6194        let (db, service) = setup();
6195        {
6196            let conn = sqlite::open_connection(db.path()).expect("conn");
6197            conn.execute(
6198                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6199                 VALUES ('row-1', 'goal-1', 'Goal', '{\"name\":\"Ship v2\"}', 100, 'seed')",
6200                [],
6201            )
6202            .expect("insert node");
6203        }
6204
6205        service
6206            .register_fts_property_schema("Goal", &["$.name".to_owned()], None)
6207            .expect("register schema");
6208        service
6209            .remove_fts_property_schema("Goal")
6210            .expect("remove schema");
6211
6212        let conn = sqlite::open_connection(db.path()).expect("conn");
6213        let table = fathomdb_schema::fts_kind_table_name("Goal");
6214        let per_kind_count: i64 = conn
6215            .query_row(
6216                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
6217                [],
6218                |row| row.get(0),
6219            )
6220            .expect("per-kind count");
6221        assert_eq!(
6222            per_kind_count, 0,
6223            "per-kind table must be empty after schema removal"
6224        );
6225    }
6226
6227    // --- B-1: weight field tests ---
6228
6229    #[test]
6230    fn fts_path_spec_with_weight_builder() {
6231        let spec = FtsPropertyPathSpec::scalar("$.title").with_weight(5.0);
6232        assert_eq!(spec.weight, Some(5.0));
6233        assert_eq!(spec.path, "$.title");
6234        assert_eq!(spec.mode, FtsPropertyPathMode::Scalar);
6235    }
6236
6237    #[test]
6238    fn fts_path_spec_serialize_with_weight() {
6239        use super::serialize_property_paths_json;
6240        let entries = vec![
6241            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6242            FtsPropertyPathSpec::scalar("$.body"),
6243        ];
6244        let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6245        // Must use rich object format because a weight is present
6246        let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6247        let paths = v
6248            .get("paths")
6249            .expect("paths key")
6250            .as_array()
6251            .expect("array");
6252        assert_eq!(paths.len(), 2);
6253        // First entry has weight
6254        assert_eq!(
6255            paths[0].get("path").and_then(serde_json::Value::as_str),
6256            Some("$.title")
6257        );
6258        assert_eq!(
6259            paths[0].get("weight").and_then(serde_json::Value::as_f64),
6260            Some(2.0)
6261        );
6262        // Second entry has no weight field
6263        assert!(
6264            paths[1].get("weight").is_none(),
6265            "unweighted spec must omit weight field"
6266        );
6267    }
6268
6269    #[test]
6270    fn fts_path_spec_serialize_no_weights() {
6271        use super::serialize_property_paths_json;
6272        let entries = vec![
6273            FtsPropertyPathSpec::scalar("$.title"),
6274            FtsPropertyPathSpec::scalar("$.payload"),
6275        ];
6276        let json = serialize_property_paths_json(&entries, &[]).expect("serialize");
6277        // Must use bare string array (backward compat)
6278        let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
6279        assert!(
6280            v.is_array(),
6281            "all-scalar no-weight schema must serialize as bare string array"
6282        );
6283        let arr = v.as_array().expect("array");
6284        assert_eq!(arr.len(), 2);
6285        assert_eq!(arr[0].as_str(), Some("$.title"));
6286        assert_eq!(arr[1].as_str(), Some("$.payload"));
6287    }
6288
6289    #[test]
6290    fn fts_weight_validation_out_of_range() {
6291        let (_db, service) = setup();
6292        // weight = 0.0 must be rejected
6293        let entries_zero = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(0.0)];
6294        let result = service.register_fts_property_schema_with_entries(
6295            "Article",
6296            &entries_zero,
6297            None,
6298            &[],
6299            crate::rebuild_actor::RebuildMode::Eager,
6300        );
6301        assert!(result.is_err(), "weight 0.0 must be rejected");
6302        let err_msg = result.expect_err("weight 0.0 must be rejected").to_string();
6303        assert!(
6304            err_msg.contains("weight"),
6305            "error must mention weight: {err_msg}"
6306        );
6307
6308        // weight = 1001.0 must be rejected
6309        let entries_big = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(1001.0)];
6310        let result = service.register_fts_property_schema_with_entries(
6311            "Article",
6312            &entries_big,
6313            None,
6314            &[],
6315            crate::rebuild_actor::RebuildMode::Eager,
6316        );
6317        assert!(result.is_err(), "weight 1001.0 must be rejected");
6318    }
6319
6320    #[test]
6321    fn fts_weight_validation_valid() {
6322        let (_db, service) = setup();
6323        let entries = vec![FtsPropertyPathSpec::scalar("$.title").with_weight(10.0)];
6324        let result = service.register_fts_property_schema_with_entries(
6325            "Article",
6326            &entries,
6327            None,
6328            &[],
6329            crate::rebuild_actor::RebuildMode::Eager,
6330        );
6331        assert!(
6332            result.is_ok(),
6333            "weight 10.0 must be accepted: {:?}",
6334            result.err()
6335        );
6336    }
6337
6338    // --- B-2: create_or_replace_fts_kind_table tests ---
6339
6340    #[test]
6341    fn create_or_replace_creates_multi_column_table() {
6342        use super::create_or_replace_fts_kind_table;
6343        let (db, _service) = setup();
6344        let conn = sqlite::open_connection(db.path()).expect("conn");
6345        let specs = vec![
6346            FtsPropertyPathSpec::scalar("$.title"),
6347            FtsPropertyPathSpec::recursive("$.payload"),
6348        ];
6349        create_or_replace_fts_kind_table(
6350            &conn,
6351            "Article",
6352            &specs,
6353            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6354        )
6355        .expect("create table");
6356
6357        // Verify table exists and has the expected columns.
6358        let table = fathomdb_schema::fts_kind_table_name("Article");
6359        // node_logical_id column
6360        let count: i64 = conn
6361            .query_row(&format!("SELECT count(*) FROM {table}"), [], |r| r.get(0))
6362            .expect("count");
6363        assert_eq!(count, 0, "new table must be empty");
6364
6365        // Verify columns exist by inserting a row with named columns
6366        let title_col = fathomdb_schema::fts_column_name("$.title", false);
6367        let payload_col = fathomdb_schema::fts_column_name("$.payload", true);
6368        conn.execute(
6369            &format!(
6370                "INSERT INTO {table} (node_logical_id, {title_col}, {payload_col}) VALUES ('id1', 'hello', 'world')"
6371            ),
6372            [],
6373        )
6374        .expect("insert with per-spec columns must succeed");
6375    }
6376
6377    #[test]
6378    fn create_or_replace_drops_and_recreates() {
6379        use super::create_or_replace_fts_kind_table;
6380        let (db, _service) = setup();
6381        let conn = sqlite::open_connection(db.path()).expect("conn");
6382
6383        // First call: 1 spec
6384        let specs_v1 = vec![FtsPropertyPathSpec::scalar("$.title")];
6385        create_or_replace_fts_kind_table(
6386            &conn,
6387            "Post",
6388            &specs_v1,
6389            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6390        )
6391        .expect("create v1");
6392
6393        // Second call: 2 specs (different layout)
6394        let specs_v2 = vec![
6395            FtsPropertyPathSpec::scalar("$.title"),
6396            FtsPropertyPathSpec::scalar("$.summary"),
6397        ];
6398        create_or_replace_fts_kind_table(
6399            &conn,
6400            "Post",
6401            &specs_v2,
6402            fathomdb_schema::DEFAULT_FTS_TOKENIZER,
6403        )
6404        .expect("create v2");
6405
6406        // Verify new layout: summary column must exist
6407        let table = fathomdb_schema::fts_kind_table_name("Post");
6408        let summary_col = fathomdb_schema::fts_column_name("$.summary", false);
6409        conn.execute(
6410            &format!("INSERT INTO {table} (node_logical_id, {summary_col}) VALUES ('id1', 'text')"),
6411            [],
6412        )
6413        .expect("second layout must allow summary column");
6414    }
6415
6416    #[test]
6417    fn create_or_replace_invalid_tokenizer() {
6418        use super::create_or_replace_fts_kind_table;
6419        let (db, _service) = setup();
6420        let conn = sqlite::open_connection(db.path()).expect("conn");
6421        let specs = vec![FtsPropertyPathSpec::scalar("$.title")];
6422        let result = create_or_replace_fts_kind_table(&conn, "Post", &specs, "'; DROP TABLE --");
6423        assert!(result.is_err(), "invalid tokenizer must be rejected");
6424        let err_msg = result
6425            .expect_err("invalid tokenizer must be rejected")
6426            .to_string();
6427        assert!(
6428            err_msg.contains("tokenizer"),
6429            "error must mention tokenizer: {err_msg}"
6430        );
6431    }
6432
6433    #[test]
6434    fn register_with_weights_creates_per_column_table() {
6435        let (db, service) = setup();
6436        let entries = vec![
6437            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6438            FtsPropertyPathSpec::scalar("$.body"),
6439        ];
6440        service
6441            .register_fts_property_schema_with_entries(
6442                "Article",
6443                &entries,
6444                None,
6445                &[],
6446                crate::rebuild_actor::RebuildMode::Eager,
6447            )
6448            .expect("register");
6449
6450        // Per-kind table must have per-spec columns, not just text_content
6451        let conn = sqlite::open_connection(db.path()).expect("conn");
6452        let table = fathomdb_schema::fts_kind_table_name("Article");
6453        let title_col = fathomdb_schema::fts_column_name("$.title", false);
6454        let body_col = fathomdb_schema::fts_column_name("$.body", false);
6455        // If the columns exist, insert must succeed
6456        conn.execute(
6457            &format!(
6458                "INSERT INTO {table} (node_logical_id, {title_col}, {body_col}) VALUES ('art-1', 'hello', 'world')"
6459            ),
6460            [],
6461        )
6462        .expect("per-spec columns must exist after registration with weights");
6463    }
6464
6465    #[test]
6466    fn weighted_to_unweighted_downgrade_recreates_table() {
6467        let (db, service) = setup();
6468
6469        // First register with weights (creates per-spec column layout).
6470        let weighted_entries = vec![
6471            FtsPropertyPathSpec::scalar("$.title").with_weight(2.0),
6472            FtsPropertyPathSpec::scalar("$.body"),
6473        ];
6474        service
6475            .register_fts_property_schema_with_entries(
6476                "Article",
6477                &weighted_entries,
6478                None,
6479                &[],
6480                crate::rebuild_actor::RebuildMode::Eager,
6481            )
6482            .expect("register weighted");
6483
6484        // Re-register the same kind WITHOUT weights.
6485        let unweighted_entries = vec![
6486            FtsPropertyPathSpec::scalar("$.title"),
6487            FtsPropertyPathSpec::scalar("$.body"),
6488        ];
6489        service
6490            .register_fts_property_schema_with_entries(
6491                "Article",
6492                &unweighted_entries,
6493                None,
6494                &[],
6495                crate::rebuild_actor::RebuildMode::Eager,
6496            )
6497            .expect("re-register unweighted");
6498
6499        // After downgrade, the table must have the text_content column
6500        // (legacy single-column layout), not the per-spec columns.
6501        let conn = sqlite::open_connection(db.path()).expect("conn");
6502        let table = fathomdb_schema::fts_kind_table_name("Article");
6503        let result = conn.execute(
6504            &format!("INSERT INTO {table} (node_logical_id, text_content) VALUES ('art-1', 'hello world')"),
6505            [],
6506        );
6507        assert!(
6508            result.is_ok(),
6509            "text_content column must exist after weighted-to-unweighted downgrade"
6510        );
6511    }
6512
6513    // --- Pack A+G: profile CRUD + tokenizer presets ---
6514
6515    #[test]
6516    fn set_get_fts_profile_roundtrip() {
6517        let (_db, service) = setup();
6518        let profile = service
6519            .set_fts_profile("book", "unicode61")
6520            .expect("set_fts_profile");
6521        assert_eq!(profile.kind, "book");
6522        assert_eq!(profile.tokenizer, "unicode61");
6523
6524        let got = service
6525            .get_fts_profile("book")
6526            .expect("get_fts_profile")
6527            .expect("should be Some");
6528        assert_eq!(got.kind, "book");
6529        assert_eq!(got.tokenizer, "unicode61");
6530    }
6531
6532    #[test]
6533    fn fts_profile_upsert() {
6534        let (_db, service) = setup();
6535        service
6536            .set_fts_profile("article", "unicode61")
6537            .expect("first set");
6538        service
6539            .set_fts_profile("article", "porter unicode61 remove_diacritics 2")
6540            .expect("second set");
6541        let got = service
6542            .get_fts_profile("article")
6543            .expect("get")
6544            .expect("Some");
6545        assert_eq!(got.tokenizer, "porter unicode61 remove_diacritics 2");
6546    }
6547
6548    #[test]
6549    fn invalid_tokenizer_rejected() {
6550        let (_db, service) = setup();
6551        let result = service.set_fts_profile("book", "'; DROP TABLE nodes --");
6552        assert!(result.is_err(), "invalid tokenizer must be rejected");
6553        let msg = result.expect_err("must be Err").to_string();
6554        assert!(
6555            msg.contains("tokenizer") || msg.contains("invalid"),
6556            "error must mention tokenizer or invalid: {msg}"
6557        );
6558    }
6559
6560    #[test]
6561    fn preset_recall_optimized_english() {
6562        assert_eq!(
6563            super::resolve_tokenizer_preset("recall-optimized-english"),
6564            "porter unicode61 remove_diacritics 2"
6565        );
6566    }
6567
6568    #[test]
6569    fn preset_precision_optimized() {
6570        assert_eq!(
6571            super::resolve_tokenizer_preset("precision-optimized"),
6572            "unicode61 remove_diacritics 2"
6573        );
6574    }
6575
6576    #[test]
6577    fn preset_global_cjk() {
6578        assert_eq!(super::resolve_tokenizer_preset("global-cjk"), "icu");
6579    }
6580
6581    #[test]
6582    fn preset_substring_trigram() {
6583        assert_eq!(
6584            super::resolve_tokenizer_preset("substring-trigram"),
6585            "trigram"
6586        );
6587    }
6588
6589    #[test]
6590    fn preset_source_code() {
6591        assert_eq!(
6592            super::resolve_tokenizer_preset("source-code"),
6593            "unicode61 tokenchars '._-$@'"
6594        );
6595    }
6596
6597    #[test]
6598    fn preview_fts_row_count() {
6599        let (db, service) = setup();
6600        {
6601            let conn = sqlite::open_connection(db.path()).expect("conn");
6602            for i in 0..5u32 {
6603                conn.execute(
6604                    "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6605                     VALUES (?1, ?2, 'book', '{}', 100, 'src')",
6606                    rusqlite::params![format!("r{i}"), format!("lg{i}")],
6607                )
6608                .expect("insert node");
6609            }
6610            // Insert one superseded node that must NOT count
6611            conn.execute(
6612                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, superseded_at) \
6613                 VALUES ('r99', 'lg99', 'book', '{}', 100, 'src', 200)",
6614                [],
6615            )
6616            .expect("insert superseded");
6617        }
6618        let impact = service
6619            .preview_projection_impact("book", "fts")
6620            .expect("preview");
6621        assert_eq!(impact.rows_to_rebuild, 5);
6622    }
6623
6624    #[test]
6625    fn preview_populates_current_tokenizer() {
6626        let (_db, service) = setup();
6627        service
6628            .set_fts_profile("doc", "trigram")
6629            .expect("set profile");
6630        let impact = service
6631            .preview_projection_impact("doc", "fts")
6632            .expect("preview");
6633        assert_eq!(impact.current_tokenizer, Some("trigram".to_owned()));
6634        assert_eq!(impact.target_tokenizer, None);
6635    }
6636
6637    // --- Review fix: tokenizer allowlist alignment ---
6638
6639    #[test]
6640    fn create_or_replace_source_code_tokenizer_is_accepted() {
6641        // The source-code preset expands to "unicode61 tokenchars '._-$@'" which
6642        // contains `.`, `-`, `$`, `@`. The allowlist in create_or_replace_fts_kind_table
6643        // must accept these characters (matching set_fts_profile's allowlist).
6644        use super::create_or_replace_fts_kind_table;
6645        let (db, _service) = setup();
6646        let conn = sqlite::open_connection(db.path()).expect("conn");
6647        let specs = vec![FtsPropertyPathSpec::scalar("$.symbol")];
6648        let source_code_tokenizer = "unicode61 tokenchars '._-$@'";
6649        let result =
6650            create_or_replace_fts_kind_table(&conn, "Symbol", &specs, source_code_tokenizer);
6651        assert!(
6652            result.is_ok(),
6653            "source-code tokenizer string must be accepted by create_or_replace_fts_kind_table: {:?}",
6654            result.err()
6655        );
6656    }
6657
6658    #[test]
6659    fn source_code_profile_round_trip_through_register_fts_schema() {
6660        // Verify that set_fts_profile("source-code") followed by
6661        // register_fts_property_schema succeeds end-to-end.
6662        // Previously failed because set_fts_profile accepted "unicode61 tokenchars '._-$@'"
6663        // but create_or_replace_fts_kind_table rejected it (only allowed " '_").
6664        let db = tempfile::NamedTempFile::new().expect("temp file");
6665        let schema = Arc::new(fathomdb_schema::SchemaManager::new());
6666
6667        // Bootstrap the schema (creates projection_profiles table via migration 20).
6668        {
6669            let _coord = crate::ExecutionCoordinator::open(
6670                db.path(),
6671                Arc::clone(&schema),
6672                None,
6673                1,
6674                Arc::new(crate::TelemetryCounters::default()),
6675                None,
6676            )
6677            .expect("coordinator opens for bootstrap");
6678        }
6679
6680        let service = AdminService::new(db.path(), Arc::clone(&schema));
6681
6682        // Set source-code profile (uses preset resolver, stores "unicode61 tokenchars '._-$@'").
6683        service
6684            .set_fts_profile("Symbol", "source-code")
6685            .expect("set_fts_profile with source-code preset must succeed");
6686
6687        // Register an FTS schema for this kind — this calls create_or_replace_fts_kind_table
6688        // with the tokenizer from the profile row.
6689        let result = service.register_fts_property_schema("Symbol", &["$.name".to_owned()], None);
6690        assert!(
6691            result.is_ok(),
6692            "register_fts_property_schema must succeed when source-code profile is active: {:?}",
6693            result.err()
6694        );
6695    }
6696
6697    // --- 0.5.0 item 5: max_tokens() capacity ---
6698
6699    /// A stub embedder with `max_tokens=8192` can embed a pre-written chunk
6700    /// whose text exceeds 512 words without error. Verifies that `max_tokens()`
6701    /// advertises the correct capacity and that `regenerate_vector_embeddings`
6702    /// produces one vector row for one stored chunk, regardless of chunk length.
6703    /// (The engine does not re-chunk at regen time; splitting is the caller's
6704    /// responsibility at write time.)
6705    #[cfg(feature = "sqlite-vec")]
6706    #[test]
6707    fn embedder_max_tokens_8192_handles_chunk_exceeding_512_words() {
6708        let long_text = (0..600u32)
6709            .map(|i| format!("word{i}"))
6710            .collect::<Vec<_>>()
6711            .join(" ");
6712
6713        let db = NamedTempFile::new().expect("temp file");
6714        let schema = Arc::new(SchemaManager::new());
6715
6716        {
6717            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6718            schema.bootstrap(&conn).expect("bootstrap");
6719            conn.execute(
6720                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6721                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'src-1')",
6722                [],
6723            )
6724            .expect("insert node");
6725            conn.execute(
6726                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6727                 VALUES (?1, 'doc-1', ?2, 100)",
6728                rusqlite::params!["chunk-long", long_text],
6729            )
6730            .expect("insert long chunk");
6731        }
6732
6733        let embedder = LargeContextTestEmbedder::new("long-context-model", 4, 8192);
6734        let service = AdminService::new(db.path(), Arc::clone(&schema));
6735        let report = service
6736            .regenerate_vector_embeddings(
6737                &embedder,
6738                &VectorRegenerationConfig {
6739                    kind: "Document".to_owned(),
6740                    profile: "default".to_owned(),
6741                    chunking_policy: "per_chunk".to_owned(),
6742                    preprocessing_policy: "trim".to_owned(),
6743                },
6744            )
6745            .expect("regenerate with long-context embedder");
6746
6747        assert_eq!(
6748            report.total_chunks, 1,
6749            "600-word text pre-written as one chunk must result in exactly one embedded row"
6750        );
6751        assert_eq!(report.regenerated_rows, 1);
6752        assert_eq!(
6753            embedder.max_tokens(),
6754            8192,
6755            "embedder must advertise 8192 token capacity"
6756        );
6757    }
6758
6759    /// Stub embedder with a configurable `max_tokens` for long-context tests.
6760    #[cfg(feature = "sqlite-vec")]
6761    #[derive(Debug)]
6762    struct LargeContextTestEmbedder {
6763        identity: QueryEmbedderIdentity,
6764        vector: Vec<f32>,
6765        max_tokens: usize,
6766    }
6767
6768    #[cfg(feature = "sqlite-vec")]
6769    impl LargeContextTestEmbedder {
6770        fn new(model: &str, dimension: usize, max_tokens: usize) -> Self {
6771            Self {
6772                identity: QueryEmbedderIdentity {
6773                    model_identity: model.to_owned(),
6774                    model_version: "1.0.0".to_owned(),
6775                    dimension,
6776                    normalization_policy: "l2".to_owned(),
6777                },
6778                vector: vec![1.0; dimension],
6779                max_tokens,
6780            }
6781        }
6782    }
6783
6784    #[cfg(feature = "sqlite-vec")]
6785    impl QueryEmbedder for LargeContextTestEmbedder {
6786        fn embed_query(&self, _text: &str) -> Result<Vec<f32>, EmbedderError> {
6787            Ok(self.vector.clone())
6788        }
6789        fn identity(&self) -> QueryEmbedderIdentity {
6790            self.identity.clone()
6791        }
6792        fn max_tokens(&self) -> usize {
6793            self.max_tokens
6794        }
6795    }
6796
6797    /// Item 7 integration test: register schema, write nodes, call
6798    /// `regenerate_vector_embeddings_in_process`, verify contract row and
6799    /// that vec rows exist for every chunk.
6800    #[cfg(feature = "sqlite-vec")]
6801    #[test]
6802    #[allow(clippy::too_many_lines)]
6803    fn regenerate_vector_embeddings_in_process_writes_contract_and_vec_rows() {
6804        let db = NamedTempFile::new().expect("temp file");
6805        let schema = Arc::new(SchemaManager::new());
6806
6807        {
6808            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6809            schema.bootstrap(&conn).expect("bootstrap");
6810            for (row_id, logical_id, created_at, src) in [
6811                ("r1", "node-1", 100, "src1"),
6812                ("r2", "node-2", 101, "src2"),
6813                ("r3", "node-3", 102, "src3"),
6814            ] {
6815                conn.execute(
6816                    "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6817                     VALUES (?1, ?2, 'Doc', '{}', ?3, ?4)",
6818                    rusqlite::params![row_id, logical_id, created_at, src],
6819                )
6820                .expect("insert node");
6821            }
6822            for (chunk_id, node_id, text, created_at) in [
6823                ("c1", "node-1", "first document text", 100),
6824                ("c2", "node-2", "second document text", 101),
6825                ("c3", "node-3", "third document text", 102),
6826            ] {
6827                conn.execute(
6828                    "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6829                     VALUES (?1, ?2, ?3, ?4)",
6830                    rusqlite::params![chunk_id, node_id, text, created_at],
6831                )
6832                .expect("insert chunk");
6833            }
6834        }
6835
6836        let service = AdminService::new(db.path(), Arc::clone(&schema));
6837        let embedder = TestEmbedder::new("batch-test-model", 4);
6838        let config = VectorRegenerationConfig {
6839            kind: "Doc".to_owned(),
6840            profile: "default".to_owned(),
6841            chunking_policy: "per_chunk".to_owned(),
6842            preprocessing_policy: "trim".to_owned(),
6843        };
6844        let report = service
6845            .regenerate_vector_embeddings_in_process(&embedder, &config)
6846            .expect("in-process regen must succeed");
6847
6848        assert_eq!(report.total_chunks, 3);
6849        assert_eq!(report.regenerated_rows, 3);
6850        assert!(report.contract_persisted);
6851
6852        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6853        let vec_count: i64 = conn
6854            .query_row("SELECT count(*) FROM vec_doc", [], |row| row.get(0))
6855            .expect("vec_doc count");
6856        assert_eq!(vec_count, 3, "one vec row per chunk");
6857
6858        let model_identity: String = conn
6859            .query_row(
6860                "SELECT model_identity FROM vector_embedding_contracts WHERE profile = 'default'",
6861                [],
6862                |row| row.get(0),
6863            )
6864            .expect("contract row");
6865        assert_eq!(model_identity, "batch-test-model");
6866    }
6867
6868    // --- 0.5.0 item 6: per-kind vec regeneration ---
6869
6870    #[cfg(feature = "sqlite-vec")]
6871    #[test]
6872    #[allow(clippy::too_many_lines)]
6873    fn regenerate_vector_embeddings_targets_per_kind_table() {
6874        let db = NamedTempFile::new().expect("temp file");
6875        let schema = Arc::new(SchemaManager::new());
6876
6877        {
6878            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6879            schema.bootstrap(&conn).expect("bootstrap");
6880            conn.execute(
6881                "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref) \
6882                 VALUES ('row-1', 'doc-1', 'Document', '{}', 100, 'source-1')",
6883                [],
6884            )
6885            .expect("insert node");
6886            conn.execute(
6887                "INSERT INTO chunks (id, node_logical_id, text_content, created_at) \
6888                 VALUES ('chunk-1', 'doc-1', 'budget discussion', 100)",
6889                [],
6890            )
6891            .expect("insert chunk");
6892        }
6893
6894        let service = AdminService::new(db.path(), Arc::clone(&schema));
6895        let embedder = TestEmbedder::new("test-model", 4);
6896        let report = service
6897            .regenerate_vector_embeddings(
6898                &embedder,
6899                &VectorRegenerationConfig {
6900                    kind: "Document".to_owned(),
6901                    profile: "default".to_owned(),
6902                    chunking_policy: "per_chunk".to_owned(),
6903                    preprocessing_policy: "trim".to_owned(),
6904                },
6905            )
6906            .expect("regenerate vectors");
6907
6908        assert_eq!(report.table_name, "vec_document");
6909        assert_eq!(report.regenerated_rows, 1);
6910
6911        let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6912        let vec_count: i64 = conn
6913            .query_row("SELECT count(*) FROM vec_document", [], |row| row.get(0))
6914            .expect("vec_document count");
6915        assert_eq!(vec_count, 1, "rows must be in vec_document");
6916
6917        let old_count: i64 = conn
6918            .query_row(
6919                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='vec_nodes_active'",
6920                [],
6921                |r| r.get(0),
6922            )
6923            .expect("sqlite_master check");
6924        assert_eq!(
6925            old_count, 0,
6926            "vec_nodes_active must NOT be created for per-kind regen"
6927        );
6928    }
6929
6930    // --- 0.5.0 item 6 step 5: get_vec_profile reads per-kind key ---
6931
6932    #[test]
6933    fn get_vec_profile_returns_none_when_no_profile_exists() {
6934        let (db, service) = setup();
6935        let _ = db;
6936        let result = service.get_vec_profile("MyKind").expect("should not error");
6937        assert!(
6938            result.is_none(),
6939            "must return None when no profile registered"
6940        );
6941    }
6942
6943    #[cfg(feature = "sqlite-vec")]
6944    #[test]
6945    fn get_vec_profile_returns_profile_for_registered_kind() {
6946        let db = NamedTempFile::new().expect("temp file");
6947        let schema = Arc::new(SchemaManager::new());
6948        {
6949            let conn = crate::sqlite::open_connection_with_vec(db.path()).expect("vec conn");
6950            schema.bootstrap(&conn).expect("bootstrap");
6951            schema
6952                .ensure_vec_kind_profile(&conn, "MyKind", 128)
6953                .expect("ensure_vec_kind_profile");
6954        }
6955
6956        let service = AdminService::new(db.path(), Arc::clone(&schema));
6957        let profile = service.get_vec_profile("MyKind").expect("should not error");
6958        assert!(profile.is_some(), "must return profile after registration");
6959        assert_eq!(profile.unwrap().dimensions, 128);
6960    }
6961
6962    #[test]
6963    fn get_vec_profile_does_not_return_global_sentinel_row() {
6964        let (db, service) = setup();
6965        {
6966            let conn = sqlite::open_connection(db.path()).expect("conn");
6967            conn.execute(
6968                "INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at) \
6969                 VALUES ('*', 'vec', '{\"model_identity\":\"old-model\",\"dimensions\":384}', 0, 0)",
6970                [],
6971            )
6972            .expect("insert global sentinel");
6973        }
6974        let result = service
6975            .get_vec_profile("SomeKind")
6976            .expect("should not error");
6977        assert!(
6978            result.is_none(),
6979            "per-kind query must not return global ('*', 'vec') row"
6980        );
6981    }
6982}