Skip to main content

fathomdb_engine/writer/
mod.rs

1// Pack G: the `VecInsert` struct carries a `#[deprecated]` attribute aimed
2// at external callers; the writer module itself is the supported
3// implementation surface for the managed vector projection and uses the
4// type internally.
5#![allow(deprecated)]
6
7mod fts_extract;
8
9// Re-export fts_extract items at the writer module level so that external
10// callers (projection.rs, admin.rs, coordinator.rs, rebuild_actor.rs) can
11// still access them via `crate::writer::*`.
12// LEAF_SEPARATOR is used in coordinator.rs and in the test submodule below;
13// the allow attr silences the per-file unused-import lint since Rust does not
14// cross-file-lint pub(crate) re-exports.
15#[allow(unused_imports)]
16pub(crate) use fts_extract::{
17    ExtractStats, LEAF_SEPARATOR, PositionEntry, PropertyFtsSchema, PropertyPathMode,
18    extract_property_fts, extract_property_fts_columns, load_fts_property_schemas,
19    parse_property_schema_json,
20};
21
22// Re-export items that are only needed within this module's test submodules.
23#[cfg(test)]
24#[allow(unused_imports)]
25pub(crate) use fts_extract::{
26    MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PropertyPathEntry, extract_json_path,
27};
28
29use std::collections::HashMap;
30use std::mem::ManuallyDrop;
31use std::panic::AssertUnwindSafe;
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::mpsc::{self, Sender, SyncSender};
35use std::thread;
36use std::time::Duration;
37
38use fathomdb_schema::SchemaManager;
39use rusqlite::{OptionalExtension, TransactionBehavior, params};
40
41use crate::operational::{
42    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
43    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
44    OperationalValidationMode, extract_secondary_index_entries_for_current,
45    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
46    parse_operational_validation_contract, validate_operational_payload_against_contract,
47};
48use crate::telemetry::TelemetryCounters;
49use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
50
51/// A deferred projection backfill task submitted alongside a write.
52#[derive(Clone, Debug, PartialEq, Eq)]
53pub struct OptionalProjectionTask {
54    /// Which projection to backfill (FTS, vec, etc.).
55    pub target: ProjectionTarget,
56    /// JSON payload describing the backfill work.
57    pub payload: String,
58}
59
60/// Policy for handling existing chunks when upserting a node.
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
62pub enum ChunkPolicy {
63    /// Keep existing chunks and FTS rows.
64    #[default]
65    Preserve,
66    /// Delete existing chunks and FTS rows before inserting new ones.
67    Replace,
68}
69
70/// Controls how missing `source_ref` values are handled at write time.
71#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
72pub enum ProvenanceMode {
73    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
74    #[default]
75    Warn,
76    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
77    /// insert or retire is missing `source_ref`.
78    Require,
79}
80
81/// A node to be inserted in a [`WriteRequest`].
82#[derive(Clone, Debug, PartialEq, Eq)]
83pub struct NodeInsert {
84    pub row_id: String,
85    pub logical_id: String,
86    pub kind: String,
87    pub properties: String,
88    pub source_ref: Option<String>,
89    /// When true the writer supersedes the current active row for this `logical_id`
90    /// before inserting this new version. The supersession and insert are atomic.
91    pub upsert: bool,
92    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
93    pub chunk_policy: ChunkPolicy,
94    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
95    pub content_ref: Option<String>,
96}
97
98/// An edge to be inserted in a [`WriteRequest`].
99#[derive(Clone, Debug, PartialEq, Eq)]
100pub struct EdgeInsert {
101    pub row_id: String,
102    pub logical_id: String,
103    pub source_logical_id: String,
104    pub target_logical_id: String,
105    pub kind: String,
106    pub properties: String,
107    pub source_ref: Option<String>,
108    /// When true the writer supersedes the current active edge for this `logical_id`
109    /// before inserting this new version. The supersession and insert are atomic.
110    pub upsert: bool,
111}
112
113/// A node to be retired (soft-deleted) in a [`WriteRequest`].
114#[derive(Clone, Debug, PartialEq, Eq)]
115pub struct NodeRetire {
116    pub logical_id: String,
117    pub source_ref: Option<String>,
118}
119
120/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
121#[derive(Clone, Debug, PartialEq, Eq)]
122pub struct EdgeRetire {
123    pub logical_id: String,
124    pub source_ref: Option<String>,
125}
126
127/// A text chunk to be inserted in a [`WriteRequest`].
128#[derive(Clone, Debug, PartialEq, Eq)]
129pub struct ChunkInsert {
130    pub id: String,
131    pub node_logical_id: String,
132    pub text_content: String,
133    pub byte_start: Option<i64>,
134    pub byte_end: Option<i64>,
135    /// Optional hash of the external content this chunk was derived from.
136    pub content_hash: Option<String>,
137}
138
139/// A vector embedding to attach to an existing chunk.
140///
141/// The `chunk_id` must reference a chunk already present in the database or
142/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
143/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
144/// without the feature the insert is silently skipped.
145#[deprecated(
146    since = "0.6.0",
147    note = "raw VecInsert is the managed vector projection's internal wire format. Callers should configure embedding + configure_vec_kind and write nodes/chunks normally; the projection actor will produce vec rows automatically. This type will remain public for a transition window but may become crate-private in a future release."
148)]
149#[derive(Clone, Debug, PartialEq)]
150pub struct VecInsert {
151    pub chunk_id: String,
152    pub embedding: Vec<f32>,
153}
154
155/// A mutation to an operational collection submitted as part of a write request.
156#[derive(Clone, Debug, PartialEq, Eq)]
157pub enum OperationalWrite {
158    Append {
159        collection: String,
160        record_key: String,
161        payload_json: String,
162        source_ref: Option<String>,
163    },
164    Put {
165        collection: String,
166        record_key: String,
167        payload_json: String,
168        source_ref: Option<String>,
169    },
170    Delete {
171        collection: String,
172        record_key: String,
173        source_ref: Option<String>,
174    },
175}
176
177/// A run to be inserted in a [`WriteRequest`].
178#[derive(Clone, Debug, PartialEq, Eq)]
179pub struct RunInsert {
180    pub id: String,
181    pub kind: String,
182    pub status: String,
183    pub properties: String,
184    pub source_ref: Option<String>,
185    pub upsert: bool,
186    pub supersedes_id: Option<String>,
187}
188
189/// A step to be inserted in a [`WriteRequest`].
190#[derive(Clone, Debug, PartialEq, Eq)]
191pub struct StepInsert {
192    pub id: String,
193    pub run_id: String,
194    pub kind: String,
195    pub status: String,
196    pub properties: String,
197    pub source_ref: Option<String>,
198    pub upsert: bool,
199    pub supersedes_id: Option<String>,
200}
201
202/// An action to be inserted in a [`WriteRequest`].
203#[derive(Clone, Debug, PartialEq, Eq)]
204pub struct ActionInsert {
205    pub id: String,
206    pub step_id: String,
207    pub kind: String,
208    pub status: String,
209    pub properties: String,
210    pub source_ref: Option<String>,
211    pub upsert: bool,
212    pub supersedes_id: Option<String>,
213}
214
215// --- Write-request size limits ---
216const MAX_NODES: usize = 10_000;
217const MAX_EDGES: usize = 10_000;
218const MAX_CHUNKS: usize = 50_000;
219const MAX_RETIRES: usize = 10_000;
220const MAX_RUNTIME_ITEMS: usize = 10_000;
221const MAX_OPERATIONAL: usize = 10_000;
222const MAX_TOTAL_ITEMS: usize = 100_000;
223
224/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
225///
226/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
227/// writer thread is **not** cancelled, so the write may still commit.  This is
228/// intentionally distinct from [`EngineError::WriterRejected`], which signals
229/// that the writer thread has disconnected and the write will **not** commit.
230const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
231
232/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
233#[derive(Clone, Debug, PartialEq)]
234pub struct WriteRequest {
235    pub label: String,
236    pub nodes: Vec<NodeInsert>,
237    pub node_retires: Vec<NodeRetire>,
238    pub edges: Vec<EdgeInsert>,
239    pub edge_retires: Vec<EdgeRetire>,
240    pub chunks: Vec<ChunkInsert>,
241    pub runs: Vec<RunInsert>,
242    pub steps: Vec<StepInsert>,
243    pub actions: Vec<ActionInsert>,
244    pub optional_backfills: Vec<OptionalProjectionTask>,
245    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
246    /// `sqlite-vec` feature is absent.
247    pub vec_inserts: Vec<VecInsert>,
248    pub operational_writes: Vec<OperationalWrite>,
249}
250
251/// Receipt returned after a successful write transaction.
252#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct WriteReceipt {
254    pub label: String,
255    pub optional_backfill_count: usize,
256    pub warnings: Vec<String>,
257    pub provenance_warnings: Vec<String>,
258}
259
260/// Request to update `last_accessed_at` timestamps for a batch of nodes.
261#[derive(Clone, Debug, PartialEq, Eq)]
262pub struct LastAccessTouchRequest {
263    pub logical_ids: Vec<String>,
264    pub touched_at: i64,
265    pub source_ref: Option<String>,
266}
267
268/// Report from a last-access touch operation.
269#[derive(Clone, Debug, PartialEq, Eq)]
270pub struct LastAccessTouchReport {
271    pub touched_logical_ids: usize,
272    pub touched_at: i64,
273}
274
275#[derive(Clone, Debug, PartialEq, Eq)]
276struct FtsProjectionRow {
277    chunk_id: String,
278    node_logical_id: String,
279    kind: String,
280    text_content: String,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq)]
284struct FtsPropertyProjectionRow {
285    node_logical_id: String,
286    kind: String,
287    text_content: String,
288    positions: Vec<PositionEntry>,
289    /// Per-column text extracted for weighted schemas. Empty for non-weighted schemas.
290    /// When non-empty, the writer uses per-column INSERT instead of single `text_content`.
291    columns: Vec<(String, String)>,
292}
293
294struct PreparedWrite {
295    label: String,
296    nodes: Vec<NodeInsert>,
297    node_retires: Vec<NodeRetire>,
298    edges: Vec<EdgeInsert>,
299    edge_retires: Vec<EdgeRetire>,
300    chunks: Vec<ChunkInsert>,
301    runs: Vec<RunInsert>,
302    steps: Vec<StepInsert>,
303    actions: Vec<ActionInsert>,
304    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
305    /// consumed by the cfg-gated apply block.
306    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
307    vec_inserts: Vec<VecInsert>,
308    operational_writes: Vec<OperationalWrite>,
309    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
310    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
311    operational_validation_warnings: Vec<String>,
312    /// `node_logical_id` → kind for nodes co-submitted in this request.
313    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
314    node_kinds: HashMap<String, String>,
315    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
316    required_fts_rows: Vec<FtsProjectionRow>,
317    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
318    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
319    optional_backfills: Vec<OptionalProjectionTask>,
320}
321
322enum WriteMessage {
323    Submit {
324        prepared: Box<PreparedWrite>,
325        reply: Sender<Result<WriteReceipt, EngineError>>,
326    },
327    TouchLastAccessed {
328        request: LastAccessTouchRequest,
329        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
330    },
331    ClaimVectorProjection {
332        request: VectorProjectionClaimRequest,
333        reply: Sender<Result<Vec<VectorWorkClaim>, EngineError>>,
334    },
335    ApplyVectorProjection {
336        request: VectorProjectionApplyRequest,
337        reply: Sender<Result<(), EngineError>>,
338    },
339}
340
341/// A request to claim (transition pending → inflight) a batch of pending
342/// `vector_projection_work` rows inside a single writer transaction.
343#[derive(Clone, Debug)]
344pub struct VectorProjectionClaimRequest {
345    /// Minimum priority (inclusive). Use 1000 for incremental-only,
346    /// `i64::MIN` for any row.
347    pub min_priority: i64,
348    /// Maximum batch size.
349    pub limit: usize,
350}
351
352/// A claimed pending row, with the chunk text content needed to feed an
353/// embedder and recompute the canonical hash.
354#[derive(Clone, Debug)]
355pub struct VectorWorkClaim {
356    /// `vector_projection_work.work_id`.
357    pub work_id: i64,
358    /// Node kind.
359    pub kind: String,
360    /// Chunk id.
361    pub chunk_id: String,
362    /// The canonical hash stored in the work row at enqueue time.
363    pub canonical_hash: String,
364    /// The `embedding_profile_id` stored in the work row.
365    pub embedding_profile_id: i64,
366    /// Priority on the work row.
367    pub priority: i64,
368    /// Current chunk text fetched from `chunks.text_content`. Empty if the
369    /// chunk has been deleted since enqueue.
370    pub text_content: String,
371    /// True if the chunk row no longer exists.
372    pub chunk_missing: bool,
373}
374
375/// A request to finalize a claimed batch of projection work rows.
376#[derive(Debug, Default)]
377pub struct VectorProjectionApplyRequest {
378    /// Work rows whose embedding succeeded; apply into `vec_<kind>` and delete
379    /// the work row.
380    pub successes: Vec<VectorProjectionSuccess>,
381    /// Work rows to mark `state = 'discarded'` (hash mismatch, profile
382    /// change, etc.).  Each entry carries an optional error note.
383    pub discards: Vec<VectorProjectionDiscard>,
384    /// Work rows to revert to `state = 'pending'` with `attempt_count + 1`
385    /// and `last_error` set.
386    pub reverts: Vec<i64>,
387    /// Error message stamped on reverted rows.
388    pub revert_error: Option<String>,
389}
390
391/// Successful embedding to persist.
392#[derive(Clone, Debug)]
393pub struct VectorProjectionSuccess {
394    /// Work row id to delete after applying.
395    pub work_id: i64,
396    /// Node kind (used to derive the per-kind table name).
397    pub kind: String,
398    /// Chunk id (primary key in `vec_<kind>`).
399    pub chunk_id: String,
400    /// Embedding values.
401    pub embedding: Vec<f32>,
402}
403
404/// Work row to mark `discarded`.
405#[derive(Clone, Debug)]
406pub struct VectorProjectionDiscard {
407    /// Work row id.
408    pub work_id: i64,
409    /// Optional reason stored in `last_error`.
410    pub reason: Option<String>,
411}
412
413/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
414///
415/// On drop, the channel is closed and the writer thread is joined, ensuring all
416/// in-flight writes complete and the `SQLite` connection closes cleanly.
417#[derive(Debug)]
418pub struct WriterActor {
419    sender: ManuallyDrop<SyncSender<WriteMessage>>,
420    thread_handle: Option<thread::JoinHandle<()>>,
421    provenance_mode: ProvenanceMode,
422    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
423    _telemetry: Arc<TelemetryCounters>,
424}
425
426impl WriterActor {
427    /// # Errors
428    /// Returns [`EngineError`] if the writer thread cannot be spawned.
429    pub fn start(
430        path: impl AsRef<Path>,
431        schema_manager: Arc<SchemaManager>,
432        provenance_mode: ProvenanceMode,
433        telemetry: Arc<TelemetryCounters>,
434    ) -> Result<Self, EngineError> {
435        let database_path = path.as_ref().to_path_buf();
436        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
437
438        let writer_telemetry = Arc::clone(&telemetry);
439        let handle = thread::Builder::new()
440            .name("fathomdb-writer".to_owned())
441            .spawn(move || {
442                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
443            })
444            .map_err(EngineError::Io)?;
445
446        Ok(Self {
447            sender: ManuallyDrop::new(sender),
448            thread_handle: Some(handle),
449            provenance_mode,
450            _telemetry: telemetry,
451        })
452    }
453
454    /// Returns `true` if the writer thread is still running.
455    fn is_thread_alive(&self) -> bool {
456        self.thread_handle
457            .as_ref()
458            .is_some_and(|h| !h.is_finished())
459    }
460
461    /// Returns `WriterRejected` if the writer thread has exited.
462    fn check_thread_alive(&self) -> Result<(), EngineError> {
463        if self.is_thread_alive() {
464            Ok(())
465        } else {
466            Err(EngineError::WriterRejected(
467                "writer thread has exited".to_owned(),
468            ))
469        }
470    }
471
472    /// # Errors
473    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
474    /// down, or the underlying `SQLite` transaction fails.
475    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
476        self.check_thread_alive()?;
477        let prepared = prepare_write(request, self.provenance_mode)?;
478        let (reply_tx, reply_rx) = mpsc::channel();
479        self.sender
480            .send(WriteMessage::Submit {
481                prepared: Box::new(prepared),
482                reply: reply_tx,
483            })
484            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
485
486        recv_with_timeout(&reply_rx)
487    }
488
489    /// # Errors
490    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
491    /// or the underlying `SQLite` transaction fails.
492    pub fn touch_last_accessed(
493        &self,
494        request: LastAccessTouchRequest,
495    ) -> Result<LastAccessTouchReport, EngineError> {
496        self.check_thread_alive()?;
497        prepare_touch_last_accessed(&request, self.provenance_mode)?;
498        let (reply_tx, reply_rx) = mpsc::channel();
499        self.sender
500            .send(WriteMessage::TouchLastAccessed {
501                request,
502                reply: reply_tx,
503            })
504            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
505
506        recv_with_timeout(&reply_rx)
507    }
508
509    /// Claim up to `request.limit` pending `vector_projection_work` rows with
510    /// `priority >= request.min_priority`, transitioning them to
511    /// `state = 'inflight'` inside a single writer transaction. Returns the
512    /// claimed rows with their current chunk text.
513    ///
514    /// # Errors
515    /// Returns [`EngineError`] if the writer actor has shut down or the
516    /// underlying SQL fails.
517    pub fn claim_vector_projection(
518        &self,
519        request: VectorProjectionClaimRequest,
520    ) -> Result<Vec<VectorWorkClaim>, EngineError> {
521        self.check_thread_alive()?;
522        let (reply_tx, reply_rx) = mpsc::channel();
523        self.sender
524            .send(WriteMessage::ClaimVectorProjection {
525                request,
526                reply: reply_tx,
527            })
528            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
529        recv_with_timeout(&reply_rx)
530    }
531
532    /// Apply the result of an embed call: insert successful embeddings into
533    /// per-kind vec tables and delete the work rows; mark others discarded;
534    /// revert others to pending with an incremented `attempt_count`.
535    ///
536    /// # Errors
537    /// Returns [`EngineError`] if the writer actor has shut down or the
538    /// underlying SQL fails.
539    pub fn apply_vector_projection(
540        &self,
541        request: VectorProjectionApplyRequest,
542    ) -> Result<(), EngineError> {
543        self.check_thread_alive()?;
544        let (reply_tx, reply_rx) = mpsc::channel();
545        self.sender
546            .send(WriteMessage::ApplyVectorProjection {
547                request,
548                reply: reply_tx,
549            })
550            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
551        recv_with_timeout(&reply_rx)
552    }
553}
554
555#[cfg(not(feature = "tracing"))]
556#[allow(clippy::print_stderr)]
557fn stderr_panic_notice() {
558    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
559}
560
561impl Drop for WriterActor {
562    fn drop(&mut self) {
563        // Phase 1: close the channel so the writer thread's `for msg in receiver`
564        // loop exits after finishing any in-progress message.
565        // Must happen BEFORE join to avoid deadlock.
566        //
567        // SAFETY: drop is called exactly once, and no method accesses the sender
568        // after drop begins.
569        unsafe { ManuallyDrop::drop(&mut self.sender) };
570
571        // Phase 2: join the writer thread to ensure the SQLite connection closes.
572        if let Some(handle) = self.thread_handle.take() {
573            match handle.join() {
574                Ok(()) => {}
575                Err(payload) => {
576                    if std::thread::panicking() {
577                        trace_warn!(
578                            "writer thread panicked during shutdown (suppressed: already panicking)"
579                        );
580                        #[cfg(not(feature = "tracing"))]
581                        stderr_panic_notice();
582                    } else {
583                        std::panic::resume_unwind(payload);
584                    }
585                }
586            }
587        }
588    }
589}
590
591/// Wait for a reply from the writer thread, with a timeout.
592fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
593    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
594        .map_err(|error| match error {
595            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
596                "write timed out waiting for writer thread reply — the write may still commit"
597                    .to_owned(),
598            ),
599            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
600        })
601        .and_then(|result| result)
602}
603
604fn prepare_touch_last_accessed(
605    request: &LastAccessTouchRequest,
606    mode: ProvenanceMode,
607) -> Result<(), EngineError> {
608    if request.logical_ids.is_empty() {
609        return Err(EngineError::InvalidWrite(
610            "touch_last_accessed requires at least one logical_id".to_owned(),
611        ));
612    }
613    for logical_id in &request.logical_ids {
614        if logical_id.trim().is_empty() {
615            return Err(EngineError::InvalidWrite(
616                "touch_last_accessed requires non-empty logical_ids".to_owned(),
617            ));
618        }
619    }
620    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
621        return Err(EngineError::InvalidWrite(
622            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
623                .to_owned(),
624        ));
625    }
626    Ok(())
627}
628
629fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
630    let missing: Vec<String> = request
631        .nodes
632        .iter()
633        .filter(|n| n.source_ref.is_none())
634        .map(|n| format!("node '{}'", n.logical_id))
635        .chain(
636            request
637                .node_retires
638                .iter()
639                .filter(|r| r.source_ref.is_none())
640                .map(|r| format!("node retire '{}'", r.logical_id)),
641        )
642        .chain(
643            request
644                .edges
645                .iter()
646                .filter(|e| e.source_ref.is_none())
647                .map(|e| format!("edge '{}'", e.logical_id)),
648        )
649        .chain(
650            request
651                .edge_retires
652                .iter()
653                .filter(|r| r.source_ref.is_none())
654                .map(|r| format!("edge retire '{}'", r.logical_id)),
655        )
656        .chain(
657            request
658                .runs
659                .iter()
660                .filter(|r| r.source_ref.is_none())
661                .map(|r| format!("run '{}'", r.id)),
662        )
663        .chain(
664            request
665                .steps
666                .iter()
667                .filter(|s| s.source_ref.is_none())
668                .map(|s| format!("step '{}'", s.id)),
669        )
670        .chain(
671            request
672                .actions
673                .iter()
674                .filter(|a| a.source_ref.is_none())
675                .map(|a| format!("action '{}'", a.id)),
676        )
677        .chain(
678            request
679                .operational_writes
680                .iter()
681                .filter(|write| operational_write_source_ref(write).is_none())
682                .map(|write| {
683                    format!(
684                        "operational {} '{}:{}'",
685                        operational_write_kind(write),
686                        operational_write_collection(write),
687                        operational_write_record_key(write)
688                    )
689                }),
690        )
691        .collect();
692
693    if missing.is_empty() {
694        Ok(())
695    } else {
696        Err(EngineError::InvalidWrite(format!(
697            "ProvenanceMode::Require: missing source_ref on: {}",
698            missing.join(", ")
699        )))
700    }
701}
702
703fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
704    if request.nodes.len() > MAX_NODES {
705        return Err(EngineError::InvalidWrite(format!(
706            "too many nodes: {} exceeds limit of {MAX_NODES}",
707            request.nodes.len()
708        )));
709    }
710    if request.edges.len() > MAX_EDGES {
711        return Err(EngineError::InvalidWrite(format!(
712            "too many edges: {} exceeds limit of {MAX_EDGES}",
713            request.edges.len()
714        )));
715    }
716    if request.chunks.len() > MAX_CHUNKS {
717        return Err(EngineError::InvalidWrite(format!(
718            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
719            request.chunks.len()
720        )));
721    }
722    let retires = request.node_retires.len() + request.edge_retires.len();
723    if retires > MAX_RETIRES {
724        return Err(EngineError::InvalidWrite(format!(
725            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
726        )));
727    }
728    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
729    if runtime_items > MAX_RUNTIME_ITEMS {
730        return Err(EngineError::InvalidWrite(format!(
731            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
732        )));
733    }
734    if request.operational_writes.len() > MAX_OPERATIONAL {
735        return Err(EngineError::InvalidWrite(format!(
736            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
737            request.operational_writes.len()
738        )));
739    }
740    let total = request.nodes.len()
741        + request.node_retires.len()
742        + request.edges.len()
743        + request.edge_retires.len()
744        + request.chunks.len()
745        + request.runs.len()
746        + request.steps.len()
747        + request.actions.len()
748        + request.vec_inserts.len()
749        + request.operational_writes.len();
750    if total > MAX_TOTAL_ITEMS {
751        return Err(EngineError::InvalidWrite(format!(
752            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
753        )));
754    }
755    Ok(())
756}
757
758#[allow(clippy::too_many_lines)]
759fn prepare_write(
760    request: WriteRequest,
761    mode: ProvenanceMode,
762) -> Result<PreparedWrite, EngineError> {
763    validate_request_size(&request)?;
764
765    // --- ID validation: reject empty IDs ---
766    for node in &request.nodes {
767        if node.row_id.is_empty() {
768            return Err(EngineError::InvalidWrite(
769                "NodeInsert has empty row_id".to_owned(),
770            ));
771        }
772        if node.logical_id.is_empty() {
773            return Err(EngineError::InvalidWrite(
774                "NodeInsert has empty logical_id".to_owned(),
775            ));
776        }
777    }
778    for edge in &request.edges {
779        if edge.row_id.is_empty() {
780            return Err(EngineError::InvalidWrite(
781                "EdgeInsert has empty row_id".to_owned(),
782            ));
783        }
784        if edge.logical_id.is_empty() {
785            return Err(EngineError::InvalidWrite(
786                "EdgeInsert has empty logical_id".to_owned(),
787            ));
788        }
789    }
790    for chunk in &request.chunks {
791        if chunk.id.is_empty() {
792            return Err(EngineError::InvalidWrite(
793                "ChunkInsert has empty id".to_owned(),
794            ));
795        }
796        if chunk.text_content.is_empty() {
797            return Err(EngineError::InvalidWrite(format!(
798                "chunk '{}' has empty text_content; empty chunks are not allowed",
799                chunk.id
800            )));
801        }
802    }
803    for run in &request.runs {
804        if run.id.is_empty() {
805            return Err(EngineError::InvalidWrite(
806                "RunInsert has empty id".to_owned(),
807            ));
808        }
809    }
810    for step in &request.steps {
811        if step.id.is_empty() {
812            return Err(EngineError::InvalidWrite(
813                "StepInsert has empty id".to_owned(),
814            ));
815        }
816    }
817    for action in &request.actions {
818        if action.id.is_empty() {
819            return Err(EngineError::InvalidWrite(
820                "ActionInsert has empty id".to_owned(),
821            ));
822        }
823    }
824    for vi in &request.vec_inserts {
825        if vi.chunk_id.is_empty() {
826            return Err(EngineError::InvalidWrite(
827                "VecInsert has empty chunk_id".to_owned(),
828            ));
829        }
830        if vi.embedding.is_empty() {
831            return Err(EngineError::InvalidWrite(format!(
832                "VecInsert for chunk '{}' has empty embedding",
833                vi.chunk_id
834            )));
835        }
836    }
837    for operational in &request.operational_writes {
838        if operational_write_collection(operational).is_empty() {
839            return Err(EngineError::InvalidWrite(
840                "OperationalWrite has empty collection".to_owned(),
841            ));
842        }
843        if operational_write_record_key(operational).is_empty() {
844            return Err(EngineError::InvalidWrite(format!(
845                "OperationalWrite for collection '{}' has empty record_key",
846                operational_write_collection(operational)
847            )));
848        }
849        match operational {
850            OperationalWrite::Append { payload_json, .. }
851            | OperationalWrite::Put { payload_json, .. } => {
852                if payload_json.is_empty() {
853                    return Err(EngineError::InvalidWrite(format!(
854                        "OperationalWrite {} '{}:{}' has empty payload_json",
855                        operational_write_kind(operational),
856                        operational_write_collection(operational),
857                        operational_write_record_key(operational)
858                    )));
859                }
860            }
861            OperationalWrite::Delete { .. } => {}
862        }
863    }
864
865    // --- ID validation: reject duplicate row_ids within the request ---
866    {
867        let mut seen = std::collections::HashSet::new();
868        for node in &request.nodes {
869            if !seen.insert(node.row_id.as_str()) {
870                return Err(EngineError::InvalidWrite(format!(
871                    "duplicate row_id '{}' within the same WriteRequest",
872                    node.row_id
873                )));
874            }
875        }
876        for edge in &request.edges {
877            if !seen.insert(edge.row_id.as_str()) {
878                return Err(EngineError::InvalidWrite(format!(
879                    "duplicate row_id '{}' within the same WriteRequest",
880                    edge.row_id
881                )));
882            }
883        }
884    }
885
886    // --- ProvenanceMode::Require enforcement ---
887    if mode == ProvenanceMode::Require {
888        check_require_provenance(&request)?;
889    }
890
891    // --- Runtime table upsert validation ---
892    for run in &request.runs {
893        if run.upsert && run.supersedes_id.is_none() {
894            return Err(EngineError::InvalidWrite(format!(
895                "run '{}': upsert=true requires supersedes_id to be set",
896                run.id
897            )));
898        }
899    }
900    for step in &request.steps {
901        if step.upsert && step.supersedes_id.is_none() {
902            return Err(EngineError::InvalidWrite(format!(
903                "step '{}': upsert=true requires supersedes_id to be set",
904                step.id
905            )));
906        }
907    }
908    for action in &request.actions {
909        if action.upsert && action.supersedes_id.is_none() {
910            return Err(EngineError::InvalidWrite(format!(
911                "action '{}': upsert=true requires supersedes_id to be set",
912                action.id
913            )));
914        }
915    }
916
917    let node_kinds = request
918        .nodes
919        .iter()
920        .map(|node| (node.logical_id.clone(), node.kind.clone()))
921        .collect::<HashMap<_, _>>();
922
923    Ok(PreparedWrite {
924        label: request.label,
925        nodes: request.nodes,
926        node_retires: request.node_retires,
927        edges: request.edges,
928        edge_retires: request.edge_retires,
929        chunks: request.chunks,
930        runs: request.runs,
931        steps: request.steps,
932        actions: request.actions,
933        vec_inserts: request.vec_inserts,
934        operational_writes: request.operational_writes,
935        operational_collection_kinds: HashMap::new(),
936        operational_collection_filter_fields: HashMap::new(),
937        operational_validation_warnings: Vec::new(),
938        node_kinds,
939        required_fts_rows: Vec::new(),
940        required_property_fts_rows: Vec::new(),
941        optional_backfills: request.optional_backfills,
942    })
943}
944
945fn writer_loop(
946    database_path: &Path,
947    schema_manager: &Arc<SchemaManager>,
948    receiver: mpsc::Receiver<WriteMessage>,
949    telemetry: &TelemetryCounters,
950) {
951    trace_info!("writer thread started");
952
953    let mut conn = match sqlite::open_connection(database_path) {
954        Ok(conn) => conn,
955        Err(error) => {
956            trace_error!(error = %error, "writer thread: database connection failed");
957            reject_all(receiver, &error.to_string());
958            return;
959        }
960    };
961
962    if let Err(error) = schema_manager.bootstrap(&conn) {
963        trace_error!(error = %error, "writer thread: schema bootstrap failed");
964        reject_all(receiver, &error.to_string());
965        return;
966    }
967
968    for message in receiver {
969        match message {
970            WriteMessage::Submit {
971                mut prepared,
972                reply,
973            } => {
974                #[cfg(feature = "tracing")]
975                let start = std::time::Instant::now();
976                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
977                    resolve_and_apply(&mut conn, &mut prepared)
978                }));
979                if let Ok(inner) = result {
980                    #[allow(unused_variables)]
981                    if let Err(error) = &inner {
982                        trace_error!(
983                            label = %prepared.label,
984                            error = %error,
985                            "write failed"
986                        );
987                        telemetry.increment_errors();
988                    } else {
989                        let row_count = (prepared.nodes.len()
990                            + prepared.edges.len()
991                            + prepared.chunks.len()) as u64;
992                        telemetry.increment_writes(row_count);
993                        trace_info!(
994                            label = %prepared.label,
995                            nodes = prepared.nodes.len(),
996                            edges = prepared.edges.len(),
997                            chunks = prepared.chunks.len(),
998                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
999                            "write committed"
1000                        );
1001                    }
1002                    let _ = reply.send(inner);
1003                } else {
1004                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
1005                    telemetry.increment_errors();
1006                    // Attempt to clean up any open transaction after a panic.
1007                    let _ = conn.execute_batch("ROLLBACK");
1008                    let _ = reply.send(Err(EngineError::WriterRejected(
1009                        "writer thread panic during resolve_and_apply".to_owned(),
1010                    )));
1011                }
1012            }
1013            WriteMessage::TouchLastAccessed { request, reply } => {
1014                let result = apply_touch_last_accessed(&mut conn, &request);
1015                if result.is_ok() {
1016                    telemetry.increment_writes(0);
1017                } else {
1018                    telemetry.increment_errors();
1019                }
1020                let _ = reply.send(result);
1021            }
1022            WriteMessage::ClaimVectorProjection { request, reply } => {
1023                let result = apply_claim_vector_projection(&mut conn, &request);
1024                let _ = reply.send(result);
1025            }
1026            WriteMessage::ApplyVectorProjection { request, reply } => {
1027                let result = apply_apply_vector_projection(&mut conn, schema_manager, &request);
1028                let _ = reply.send(result);
1029            }
1030        }
1031    }
1032
1033    trace_info!("writer thread shutting down");
1034}
1035
1036fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1037    for message in receiver {
1038        match message {
1039            WriteMessage::Submit { reply, .. } => {
1040                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1041            }
1042            WriteMessage::TouchLastAccessed { reply, .. } => {
1043                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1044            }
1045            WriteMessage::ClaimVectorProjection { reply, .. } => {
1046                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1047            }
1048            WriteMessage::ApplyVectorProjection { reply, .. } => {
1049                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1050            }
1051        }
1052    }
1053}
1054
1055fn apply_claim_vector_projection(
1056    conn: &mut rusqlite::Connection,
1057    request: &VectorProjectionClaimRequest,
1058) -> Result<Vec<VectorWorkClaim>, EngineError> {
1059    let limit = i64::try_from(request.limit).unwrap_or(i64::MAX);
1060    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1061
1062    // Select candidate work_ids under the given min_priority filter.
1063    let ids: Vec<i64> = {
1064        let mut stmt = tx.prepare(
1065            "SELECT work_id FROM vector_projection_work \
1066             WHERE state = 'pending' AND priority >= ?1 \
1067             ORDER BY priority DESC, created_at ASC, work_id ASC \
1068             LIMIT ?2",
1069        )?;
1070        stmt.query_map(rusqlite::params![request.min_priority, limit], |r| {
1071            r.get::<_, i64>(0)
1072        })?
1073        .collect::<Result<Vec<_>, _>>()?
1074    };
1075
1076    if ids.is_empty() {
1077        tx.commit()?;
1078        return Ok(Vec::new());
1079    }
1080
1081    // Mark inflight.
1082    {
1083        let mut upd = tx.prepare(
1084            "UPDATE vector_projection_work \
1085             SET state = 'inflight', updated_at = unixepoch() \
1086             WHERE work_id = ?1",
1087        )?;
1088        for id in &ids {
1089            upd.execute(rusqlite::params![id])?;
1090        }
1091    }
1092
1093    // Fetch details + chunk text.
1094    let mut claims: Vec<VectorWorkClaim> = Vec::with_capacity(ids.len());
1095    {
1096        let mut sel = tx.prepare(
1097            "SELECT w.work_id, w.kind, w.chunk_id, w.canonical_hash, w.embedding_profile_id, \
1098                    w.priority, c.text_content \
1099             FROM vector_projection_work w \
1100             LEFT JOIN chunks c ON c.id = w.chunk_id \
1101             WHERE w.work_id = ?1",
1102        )?;
1103        for id in &ids {
1104            let row = sel.query_row(rusqlite::params![id], |r| {
1105                let text_opt: Option<String> = r.get(6)?;
1106                Ok(VectorWorkClaim {
1107                    work_id: r.get(0)?,
1108                    kind: r.get(1)?,
1109                    chunk_id: r.get(2)?,
1110                    canonical_hash: r.get(3)?,
1111                    embedding_profile_id: r.get(4)?,
1112                    priority: r.get(5)?,
1113                    chunk_missing: text_opt.is_none(),
1114                    text_content: text_opt.unwrap_or_default(),
1115                })
1116            })?;
1117            claims.push(row);
1118        }
1119    }
1120
1121    tx.commit()?;
1122    Ok(claims)
1123}
1124
1125fn apply_apply_vector_projection(
1126    conn: &mut rusqlite::Connection,
1127    #[allow(unused_variables)] schema_manager: &Arc<SchemaManager>,
1128    request: &VectorProjectionApplyRequest,
1129) -> Result<(), EngineError> {
1130    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1131
1132    // Apply successful embeddings to vec_<kind> tables and delete work rows.
1133    #[cfg(feature = "sqlite-vec")]
1134    {
1135        for success in &request.successes {
1136            let table_name = fathomdb_schema::vec_kind_table_name(&success.kind);
1137            let dimension = success.embedding.len();
1138            let bytes: Vec<u8> = success
1139                .embedding
1140                .iter()
1141                .flat_map(|f| f.to_le_bytes())
1142                .collect();
1143            tx.execute_batch(&format!(
1144                "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
1145                    chunk_id TEXT PRIMARY KEY,\
1146                    embedding float[{dimension}]\
1147                )"
1148            ))?;
1149            tx.execute(
1150                &format!(
1151                    "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
1152                ),
1153                rusqlite::params![success.chunk_id, bytes],
1154            )?;
1155            tx.execute(
1156                "DELETE FROM vector_projection_work WHERE work_id = ?1",
1157                rusqlite::params![success.work_id],
1158            )?;
1159        }
1160    }
1161    #[cfg(not(feature = "sqlite-vec"))]
1162    {
1163        // Without sqlite-vec the vec_<kind> table cannot exist; still delete
1164        // the work rows so the queue drains.
1165        for success in &request.successes {
1166            tx.execute(
1167                "DELETE FROM vector_projection_work WHERE work_id = ?1",
1168                rusqlite::params![success.work_id],
1169            )?;
1170        }
1171    }
1172
1173    for discard in &request.discards {
1174        tx.execute(
1175            "UPDATE vector_projection_work \
1176             SET state = 'discarded', last_error = ?1, updated_at = unixepoch() \
1177             WHERE work_id = ?2",
1178            rusqlite::params![discard.reason, discard.work_id],
1179        )?;
1180    }
1181
1182    if !request.reverts.is_empty() {
1183        let err = request.revert_error.as_deref();
1184        for work_id in &request.reverts {
1185            tx.execute(
1186                "UPDATE vector_projection_work \
1187                 SET state = 'pending', \
1188                     attempt_count = attempt_count + 1, \
1189                     last_error = ?1, \
1190                     updated_at = unixepoch() \
1191                 WHERE work_id = ?2",
1192                rusqlite::params![err, work_id],
1193            )?;
1194        }
1195    }
1196
1197    tx.commit()?;
1198    Ok(())
1199}
1200
1201/// Resolve FTS projection rows before the write transaction begins.
1202///
1203/// For each chunk in the prepared write, determines the node `kind` needed
1204/// to populate the FTS index. If the node was co-submitted in the same
1205/// request its kind is taken from `prepared.node_kinds` without a DB query.
1206/// If the node is pre-existing it is looked up in the database. If the node
1207/// cannot be found in either place, an `InvalidWrite` error is returned.
1208fn resolve_fts_rows(
1209    conn: &rusqlite::Connection,
1210    prepared: &mut PreparedWrite,
1211) -> Result<(), EngineError> {
1212    let retiring_ids: std::collections::HashSet<&str> = prepared
1213        .node_retires
1214        .iter()
1215        .map(|r| r.logical_id.as_str())
1216        .collect();
1217    for chunk in &prepared.chunks {
1218        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1219            return Err(EngineError::InvalidWrite(format!(
1220                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1221                 WriteRequest; retire and chunk insertion for the same node must not be combined",
1222                chunk.id, chunk.node_logical_id
1223            )));
1224        }
1225    }
1226    for chunk in &prepared.chunks {
1227        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1228            k.clone()
1229        } else {
1230            match conn.query_row(
1231                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1232                params![chunk.node_logical_id],
1233                |row| row.get::<_, String>(0),
1234            ) {
1235                Ok(kind) => kind,
1236                Err(rusqlite::Error::QueryReturnedNoRows) => {
1237                    return Err(EngineError::InvalidWrite(format!(
1238                        "chunk '{}' references node_logical_id '{}' that is not present in this \
1239                         write request or the database \
1240                         (v1 limitation: chunks and their nodes must be submitted together or the \
1241                         node must already exist)",
1242                        chunk.id, chunk.node_logical_id
1243                    )));
1244                }
1245                Err(e) => return Err(EngineError::Sqlite(e)),
1246            }
1247        };
1248        prepared.required_fts_rows.push(FtsProjectionRow {
1249            chunk_id: chunk.id.clone(),
1250            node_logical_id: chunk.node_logical_id.clone(),
1251            kind,
1252            text_content: chunk.text_content.clone(),
1253        });
1254    }
1255    trace_debug!(
1256        fts_rows = prepared.required_fts_rows.len(),
1257        chunks_processed = prepared.chunks.len(),
1258        "fts row resolution completed"
1259    );
1260    Ok(())
1261}
1262
1263/// Load FTS property schemas from the database and extract property values from
1264/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
1265fn resolve_property_fts_rows(
1266    conn: &rusqlite::Connection,
1267    prepared: &mut PreparedWrite,
1268) -> Result<(), EngineError> {
1269    if prepared.nodes.is_empty() {
1270        return Ok(());
1271    }
1272
1273    let schemas: HashMap<String, PropertyFtsSchema> =
1274        load_fts_property_schemas(conn)?.into_iter().collect();
1275
1276    if schemas.is_empty() {
1277        return Ok(());
1278    }
1279
1280    let mut combined_stats = ExtractStats::default();
1281    for node in &prepared.nodes {
1282        let Some(schema) = schemas.get(&node.kind) else {
1283            continue;
1284        };
1285        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1286        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
1287        let (text_content, positions, stats) = extract_property_fts(&props, schema);
1288        combined_stats.merge(stats);
1289        if let Some(text_content) = text_content {
1290            let columns = if has_weights {
1291                extract_property_fts_columns(&props, schema)
1292            } else {
1293                Vec::new()
1294            };
1295            prepared
1296                .required_property_fts_rows
1297                .push(FtsPropertyProjectionRow {
1298                    node_logical_id: node.logical_id.clone(),
1299                    kind: node.kind.clone(),
1300                    text_content,
1301                    positions,
1302                    columns,
1303                });
1304        }
1305    }
1306    if combined_stats != ExtractStats::default() {
1307        trace_debug!(
1308            depth_cap_hit = combined_stats.depth_cap_hit,
1309            byte_cap_reached = combined_stats.byte_cap_reached,
1310            excluded_subtree = combined_stats.excluded_subtree,
1311            "property fts recursive extraction guardrails engaged"
1312        );
1313    }
1314    trace_debug!(
1315        property_fts_rows = prepared.required_property_fts_rows.len(),
1316        nodes_processed = prepared.nodes.len(),
1317        "property fts row resolution completed"
1318    );
1319    Ok(())
1320}
1321
1322fn resolve_operational_writes(
1323    conn: &rusqlite::Connection,
1324    prepared: &mut PreparedWrite,
1325) -> Result<(), EngineError> {
1326    let mut collection_kinds = HashMap::new();
1327    let mut collection_filter_fields = HashMap::new();
1328    let mut collection_validation_contracts = HashMap::new();
1329    for write in &prepared.operational_writes {
1330        let collection = operational_write_collection(write);
1331        if !collection_kinds.contains_key(collection) {
1332            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1333                .query_row(
1334                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1335                    params![collection],
1336                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1337                )
1338                .optional()
1339                .map_err(EngineError::Sqlite)?;
1340            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1341                .ok_or_else(|| {
1342                    EngineError::InvalidWrite(format!(
1343                        "operational collection '{collection}' is not registered"
1344                    ))
1345                })?;
1346            if disabled_at.is_some() {
1347                return Err(EngineError::InvalidWrite(format!(
1348                    "operational collection '{collection}' is disabled"
1349                )));
1350            }
1351            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1352                .map_err(EngineError::InvalidWrite)?;
1353            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1354            let validation_contract = parse_operational_validation_contract(&validation_json)
1355                .map_err(EngineError::InvalidWrite)?;
1356            collection_kinds.insert(collection.to_owned(), kind);
1357            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1358            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1359        }
1360
1361        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1362            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1363        })?;
1364        match (kind, write) {
1365            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1366            | (
1367                OperationalCollectionKind::LatestState,
1368                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1369            ) => {}
1370            (OperationalCollectionKind::AppendOnlyLog, _) => {
1371                return Err(EngineError::InvalidWrite(format!(
1372                    "operational collection '{collection}' is append_only_log and only accepts Append"
1373                )));
1374            }
1375            (OperationalCollectionKind::LatestState, _) => {
1376                return Err(EngineError::InvalidWrite(format!(
1377                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1378                )));
1379            }
1380        }
1381        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1382            let _ = check_operational_write_against_contract(write, contract)?;
1383        }
1384    }
1385    prepared.operational_collection_kinds = collection_kinds;
1386    prepared.operational_collection_filter_fields = collection_filter_fields;
1387    Ok(())
1388}
1389
1390fn parse_operational_filter_fields(
1391    filter_fields_json: &str,
1392) -> Result<Vec<OperationalFilterField>, EngineError> {
1393    let fields: Vec<OperationalFilterField> =
1394        serde_json::from_str(filter_fields_json).map_err(|error| {
1395            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1396        })?;
1397    let mut seen = std::collections::HashSet::new();
1398    for field in &fields {
1399        if field.name.trim().is_empty() {
1400            return Err(EngineError::InvalidWrite(
1401                "filter_fields_json field names must not be empty".to_owned(),
1402            ));
1403        }
1404        if !seen.insert(field.name.as_str()) {
1405            return Err(EngineError::InvalidWrite(format!(
1406                "filter_fields_json contains duplicate field '{}'",
1407                field.name
1408            )));
1409        }
1410        if field.modes.is_empty() {
1411            return Err(EngineError::InvalidWrite(format!(
1412                "filter_fields_json field '{}' must declare at least one mode",
1413                field.name
1414            )));
1415        }
1416        if field.modes.contains(&OperationalFilterMode::Prefix)
1417            && field.field_type != OperationalFilterFieldType::String
1418        {
1419            return Err(EngineError::InvalidWrite(format!(
1420                "filter field '{}' only supports prefix for string types",
1421                field.name
1422            )));
1423        }
1424    }
1425    Ok(fields)
1426}
1427
1428#[derive(Clone, Debug, PartialEq, Eq)]
1429struct OperationalFilterValueRow {
1430    field_name: String,
1431    string_value: Option<String>,
1432    integer_value: Option<i64>,
1433}
1434
1435fn extract_operational_filter_values(
1436    filter_fields: &[OperationalFilterField],
1437    payload_json: &str,
1438) -> Vec<OperationalFilterValueRow> {
1439    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1440        return Vec::new();
1441    };
1442    let Some(object) = parsed.as_object() else {
1443        return Vec::new();
1444    };
1445
1446    filter_fields
1447        .iter()
1448        .filter_map(|field| {
1449            let value = object.get(&field.name)?;
1450            match field.field_type {
1451                OperationalFilterFieldType::String => {
1452                    value
1453                        .as_str()
1454                        .map(|string_value| OperationalFilterValueRow {
1455                            field_name: field.name.clone(),
1456                            string_value: Some(string_value.to_owned()),
1457                            integer_value: None,
1458                        })
1459                }
1460                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1461                    value
1462                        .as_i64()
1463                        .map(|integer_value| OperationalFilterValueRow {
1464                            field_name: field.name.clone(),
1465                            string_value: None,
1466                            integer_value: Some(integer_value),
1467                        })
1468                }
1469            }
1470        })
1471        .collect()
1472}
1473
1474fn resolve_and_apply(
1475    conn: &mut rusqlite::Connection,
1476    prepared: &mut PreparedWrite,
1477) -> Result<WriteReceipt, EngineError> {
1478    resolve_fts_rows(conn, prepared)?;
1479    resolve_property_fts_rows(conn, prepared)?;
1480    resolve_operational_writes(conn, prepared)?;
1481    apply_write(conn, prepared)
1482}
1483
1484fn apply_touch_last_accessed(
1485    conn: &mut rusqlite::Connection,
1486    request: &LastAccessTouchRequest,
1487) -> Result<LastAccessTouchReport, EngineError> {
1488    let mut seen = std::collections::HashSet::new();
1489    let logical_ids = request
1490        .logical_ids
1491        .iter()
1492        .filter(|logical_id| seen.insert(logical_id.as_str()))
1493        .cloned()
1494        .collect::<Vec<_>>();
1495    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1496
1497    for logical_id in &logical_ids {
1498        let exists = tx
1499            .query_row(
1500                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1501                params![logical_id],
1502                |row| row.get::<_, i64>(0),
1503            )
1504            .optional()?
1505            .is_some();
1506        if !exists {
1507            return Err(EngineError::InvalidWrite(format!(
1508                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1509            )));
1510        }
1511    }
1512
1513    {
1514        let mut upsert_metadata = tx.prepare_cached(
1515            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1516             VALUES (?1, ?2, ?2) \
1517             ON CONFLICT(logical_id) DO UPDATE SET \
1518                 last_accessed_at = excluded.last_accessed_at, \
1519                 updated_at = excluded.updated_at",
1520        )?;
1521        let mut insert_provenance = tx.prepare_cached(
1522            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1523             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1524        )?;
1525        for logical_id in &logical_ids {
1526            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1527            insert_provenance.execute(params![
1528                new_id(),
1529                logical_id,
1530                request.source_ref.as_deref(),
1531                format!("{{\"touched_at\":{}}}", request.touched_at),
1532            ])?;
1533        }
1534    }
1535
1536    tx.commit()?;
1537    Ok(LastAccessTouchReport {
1538        touched_logical_ids: logical_ids.len(),
1539        touched_at: request.touched_at,
1540    })
1541}
1542
1543fn ensure_operational_collections_writable(
1544    tx: &rusqlite::Transaction<'_>,
1545    prepared: &PreparedWrite,
1546) -> Result<(), EngineError> {
1547    for collection in prepared.operational_collection_kinds.keys() {
1548        let disabled_at: Option<Option<i64>> = tx
1549            .query_row(
1550                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1551                params![collection],
1552                |row| row.get::<_, Option<i64>>(0),
1553            )
1554            .optional()?;
1555        match disabled_at {
1556            Some(Some(_)) => {
1557                return Err(EngineError::InvalidWrite(format!(
1558                    "operational collection '{collection}' is disabled"
1559                )));
1560            }
1561            Some(None) => {}
1562            None => {
1563                return Err(EngineError::InvalidWrite(format!(
1564                    "operational collection '{collection}' is not registered"
1565                )));
1566            }
1567        }
1568    }
1569    Ok(())
1570}
1571
1572fn validate_operational_writes_against_live_contracts(
1573    tx: &rusqlite::Transaction<'_>,
1574    prepared: &PreparedWrite,
1575) -> Result<Vec<String>, EngineError> {
1576    let mut collection_validation_contracts =
1577        HashMap::<String, Option<OperationalValidationContract>>::new();
1578    for collection in prepared.operational_collection_kinds.keys() {
1579        let validation_json: String = tx
1580            .query_row(
1581                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1582                params![collection],
1583                |row| row.get(0),
1584            )
1585            .map_err(EngineError::Sqlite)?;
1586        let validation_contract = parse_operational_validation_contract(&validation_json)
1587            .map_err(EngineError::InvalidWrite)?;
1588        collection_validation_contracts.insert(collection.clone(), validation_contract);
1589    }
1590
1591    let mut warnings = Vec::new();
1592    for write in &prepared.operational_writes {
1593        if let Some(Some(contract)) =
1594            collection_validation_contracts.get(operational_write_collection(write))
1595            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1596        {
1597            warnings.push(warning);
1598        }
1599    }
1600
1601    Ok(warnings)
1602}
1603
1604fn load_live_operational_secondary_indexes(
1605    tx: &rusqlite::Transaction<'_>,
1606    prepared: &PreparedWrite,
1607) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1608    let mut collection_indexes = HashMap::new();
1609    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1610        let secondary_indexes_json: String = tx
1611            .query_row(
1612                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1613                params![collection],
1614                |row| row.get(0),
1615            )
1616            .map_err(EngineError::Sqlite)?;
1617        let indexes =
1618            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1619                .map_err(EngineError::InvalidWrite)?;
1620        collection_indexes.insert(collection.clone(), indexes);
1621    }
1622    Ok(collection_indexes)
1623}
1624
1625fn check_operational_write_against_contract(
1626    write: &OperationalWrite,
1627    contract: &OperationalValidationContract,
1628) -> Result<Option<String>, EngineError> {
1629    if contract.mode == OperationalValidationMode::Disabled {
1630        return Ok(None);
1631    }
1632
1633    let (payload_json, collection, record_key) = match write {
1634        OperationalWrite::Append {
1635            collection,
1636            record_key,
1637            payload_json,
1638            ..
1639        }
1640        | OperationalWrite::Put {
1641            collection,
1642            record_key,
1643            payload_json,
1644            ..
1645        } => (
1646            payload_json.as_str(),
1647            collection.as_str(),
1648            record_key.as_str(),
1649        ),
1650        OperationalWrite::Delete { .. } => return Ok(None),
1651    };
1652
1653    match validate_operational_payload_against_contract(contract, payload_json) {
1654        Ok(()) => Ok(None),
1655        Err(message) => match contract.mode {
1656            OperationalValidationMode::Disabled => Ok(None),
1657            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1658                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1659                kind = operational_write_kind(write)
1660            ))),
1661            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1662                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1663                kind = operational_write_kind(write)
1664            ))),
1665        },
1666    }
1667}
1668
1669#[allow(clippy::too_many_lines)]
1670fn apply_write(
1671    conn: &mut rusqlite::Connection,
1672    prepared: &mut PreparedWrite,
1673) -> Result<WriteReceipt, EngineError> {
1674    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1675
1676    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1677    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1678    {
1679        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1680        let mut del_prop_positions = tx
1681            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1682        // Double-write cleanup: also remove staging rows for this node so that a
1683        // rebuild-in-progress does not re-insert a retired node at swap time.
1684        let mut del_staging = tx.prepare_cached(
1685            "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1686        )?;
1687        let mut sup_node = tx.prepare_cached(
1688            "UPDATE nodes SET superseded_at = unixepoch() \
1689             WHERE logical_id = ?1 AND superseded_at IS NULL",
1690        )?;
1691        let mut ins_event = tx.prepare_cached(
1692            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1693             VALUES (?1, 'node_retire', ?2, ?3)",
1694        )?;
1695        for retire in &prepared.node_retires {
1696            del_fts.execute(params![retire.logical_id])?;
1697            // Delete from the per-kind FTS table: look up the node's kind first.
1698            let kind_opt: Option<String> = tx
1699                .query_row(
1700                    "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1701                    params![retire.logical_id],
1702                    |r| r.get(0),
1703                )
1704                .optional()?;
1705            if let Some(kind) = kind_opt {
1706                let table = fathomdb_schema::fts_kind_table_name(&kind);
1707                // Only delete if the per-kind table exists (may not exist if no schema registered).
1708                let table_exists: bool = tx
1709                    .query_row(
1710                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1711                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1712                        rusqlite::params![table],
1713                        |r| r.get::<_, i64>(0),
1714                    )
1715                    .unwrap_or(0)
1716                    > 0;
1717                if table_exists {
1718                    tx.execute(
1719                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1720                        params![retire.logical_id],
1721                    )?;
1722                }
1723            }
1724            del_prop_positions.execute(params![retire.logical_id])?;
1725            del_staging.execute(params![retire.logical_id])?;
1726            // Pack E: remove any pending vector-projection work for chunks
1727            // owned by the retired node so retired chunks are not processed.
1728            tx.execute(
1729                "DELETE FROM vector_projection_work \
1730                 WHERE state = 'pending' AND chunk_id IN \
1731                     (SELECT id FROM chunks WHERE node_logical_id = ?1)",
1732                params![retire.logical_id],
1733            )?;
1734            sup_node.execute(params![retire.logical_id])?;
1735            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1736        }
1737    }
1738
1739    // Edge retires: mark superseded, record audit event.
1740    {
1741        let mut sup_edge = tx.prepare_cached(
1742            "UPDATE edges SET superseded_at = unixepoch() \
1743             WHERE logical_id = ?1 AND superseded_at IS NULL",
1744        )?;
1745        let mut ins_event = tx.prepare_cached(
1746            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1747             VALUES (?1, 'edge_retire', ?2, ?3)",
1748        )?;
1749        for retire in &prepared.edge_retires {
1750            sup_edge.execute(params![retire.logical_id])?;
1751            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1752        }
1753    }
1754
1755    // Node inserts (with optional upsert + chunk-policy handling).
1756    {
1757        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1758        let mut del_prop_positions = tx
1759            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1760        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1761        let mut sup_node = tx.prepare_cached(
1762            "UPDATE nodes SET superseded_at = unixepoch() \
1763             WHERE logical_id = ?1 AND superseded_at IS NULL",
1764        )?;
1765        let mut ins_node = tx.prepare_cached(
1766            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1767             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1768        )?;
1769        for node in &prepared.nodes {
1770            if node.upsert {
1771                // Property FTS rows are always replaced on upsert since properties change.
1772                // Delete from the per-kind table (table name is dynamic).
1773                let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1774                // Only delete if the per-kind table exists (may not exist if no schema registered).
1775                let table_exists: bool = tx
1776                    .query_row(
1777                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1778                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1779                        rusqlite::params![table],
1780                        |r| r.get::<_, i64>(0),
1781                    )
1782                    .unwrap_or(0)
1783                    > 0;
1784                if table_exists {
1785                    tx.execute(
1786                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1787                        params![node.logical_id],
1788                    )?;
1789                }
1790                del_prop_positions.execute(params![node.logical_id])?;
1791                if node.chunk_policy == ChunkPolicy::Replace {
1792                    // 0.5.0: delete from the per-kind vec table instead of the global sentinel.
1793                    #[cfg(feature = "sqlite-vec")]
1794                    {
1795                        let vec_table = fathomdb_schema::vec_kind_table_name(&node.kind);
1796                        let del_sql = format!(
1797                            "DELETE FROM {vec_table} WHERE chunk_id IN \
1798                             (SELECT id FROM chunks WHERE node_logical_id = ?1)"
1799                        );
1800                        match tx.execute(&del_sql, params![node.logical_id]) {
1801                            Ok(_) => {}
1802                            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {}
1803                            Err(e) => return Err(e.into()),
1804                        }
1805                    }
1806                    del_fts.execute(params![node.logical_id])?;
1807                    // Pack E: drop pending vector-projection work for the
1808                    // chunks we're about to delete — they no longer exist.
1809                    tx.execute(
1810                        "DELETE FROM vector_projection_work \
1811                         WHERE state = 'pending' AND chunk_id IN \
1812                             (SELECT id FROM chunks WHERE node_logical_id = ?1)",
1813                        params![node.logical_id],
1814                    )?;
1815                    del_chunks.execute(params![node.logical_id])?;
1816                }
1817                sup_node.execute(params![node.logical_id])?;
1818            }
1819            ins_node.execute(params![
1820                node.row_id,
1821                node.logical_id,
1822                node.kind,
1823                node.properties,
1824                node.source_ref,
1825                node.content_ref,
1826            ])?;
1827        }
1828    }
1829
1830    // Edge inserts (with optional upsert).
1831    {
1832        let mut sup_edge = tx.prepare_cached(
1833            "UPDATE edges SET superseded_at = unixepoch() \
1834             WHERE logical_id = ?1 AND superseded_at IS NULL",
1835        )?;
1836        let mut ins_edge = tx.prepare_cached(
1837            "INSERT INTO edges \
1838             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1839             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1840        )?;
1841        for edge in &prepared.edges {
1842            if edge.upsert {
1843                sup_edge.execute(params![edge.logical_id])?;
1844            }
1845            ins_edge.execute(params![
1846                edge.row_id,
1847                edge.logical_id,
1848                edge.source_logical_id,
1849                edge.target_logical_id,
1850                edge.kind,
1851                edge.properties,
1852                edge.source_ref,
1853            ])?;
1854        }
1855    }
1856
1857    // Chunk inserts.
1858    {
1859        let mut ins_chunk = tx.prepare_cached(
1860            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1861             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1862        )?;
1863        for chunk in &prepared.chunks {
1864            ins_chunk.execute(params![
1865                chunk.id,
1866                chunk.node_logical_id,
1867                chunk.text_content,
1868                chunk.byte_start,
1869                chunk.byte_end,
1870                chunk.content_hash,
1871            ])?;
1872        }
1873    }
1874
1875    // Pack E: enqueue incremental vector projection work for inserted chunks
1876    // whose kind is vector-enabled and where an active embedding profile exists.
1877    // This runs in the same transaction as the canonical commit.
1878    maybe_enqueue_vector_projection_work(&tx, prepared)?;
1879
1880    // Run inserts (with optional upsert).
1881    {
1882        let mut sup_run = tx.prepare_cached(
1883            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1884        )?;
1885        let mut ins_run = tx.prepare_cached(
1886            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1887             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1888        )?;
1889        for run in &prepared.runs {
1890            if run.upsert
1891                && let Some(ref prior_id) = run.supersedes_id
1892            {
1893                sup_run.execute(params![prior_id])?;
1894            }
1895            ins_run.execute(params![
1896                run.id,
1897                run.kind,
1898                run.status,
1899                run.properties,
1900                run.source_ref
1901            ])?;
1902        }
1903    }
1904
1905    // Step inserts (with optional upsert).
1906    {
1907        let mut sup_step = tx.prepare_cached(
1908            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1909        )?;
1910        let mut ins_step = tx.prepare_cached(
1911            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1912             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1913        )?;
1914        for step in &prepared.steps {
1915            if step.upsert
1916                && let Some(ref prior_id) = step.supersedes_id
1917            {
1918                sup_step.execute(params![prior_id])?;
1919            }
1920            ins_step.execute(params![
1921                step.id,
1922                step.run_id,
1923                step.kind,
1924                step.status,
1925                step.properties,
1926                step.source_ref,
1927            ])?;
1928        }
1929    }
1930
1931    // Action inserts (with optional upsert).
1932    {
1933        let mut sup_action = tx.prepare_cached(
1934            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1935        )?;
1936        let mut ins_action = tx.prepare_cached(
1937            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1938             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1939        )?;
1940        for action in &prepared.actions {
1941            if action.upsert
1942                && let Some(ref prior_id) = action.supersedes_id
1943            {
1944                sup_action.execute(params![prior_id])?;
1945            }
1946            ins_action.execute(params![
1947                action.id,
1948                action.step_id,
1949                action.kind,
1950                action.status,
1951                action.properties,
1952                action.source_ref,
1953            ])?;
1954        }
1955    }
1956
1957    // Operational mutation log writes and latest-state current rows.
1958    {
1959        ensure_operational_collections_writable(&tx, prepared)?;
1960        prepared.operational_validation_warnings =
1961            validate_operational_writes_against_live_contracts(&tx, prepared)?;
1962        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1963
1964        let mut next_mutation_order: i64 = tx.query_row(
1965            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1966            [],
1967            |row| row.get(0),
1968        )?;
1969        let mut ins_mutation = tx.prepare_cached(
1970            "INSERT INTO operational_mutations \
1971             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
1972             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1973        )?;
1974        let mut ins_filter_value = tx.prepare_cached(
1975            "INSERT INTO operational_filter_values \
1976             (mutation_id, collection_name, field_name, string_value, integer_value) \
1977             VALUES (?1, ?2, ?3, ?4, ?5)",
1978        )?;
1979        let mut upsert_current = tx.prepare_cached(
1980            "INSERT INTO operational_current \
1981             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
1982             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
1983             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
1984                 payload_json = excluded.payload_json, \
1985                 updated_at = excluded.updated_at, \
1986                 last_mutation_id = excluded.last_mutation_id",
1987        )?;
1988        let mut del_current = tx.prepare_cached(
1989            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
1990        )?;
1991        let mut del_current_secondary_indexes = tx.prepare_cached(
1992            "DELETE FROM operational_secondary_index_entries \
1993             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
1994        )?;
1995        let mut ins_secondary_index = tx.prepare_cached(
1996            "INSERT INTO operational_secondary_index_entries \
1997             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
1998              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
1999             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2000        )?;
2001        let mut current_row_stmt = tx.prepare_cached(
2002            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2003             WHERE collection_name = ?1 AND record_key = ?2",
2004        )?;
2005
2006        for write in &prepared.operational_writes {
2007            let collection = operational_write_collection(write);
2008            let record_key = operational_write_record_key(write);
2009            let mutation_id = new_id();
2010            next_mutation_order += 1;
2011            let payload_json = operational_write_payload(write);
2012            ins_mutation.execute(params![
2013                &mutation_id,
2014                collection,
2015                record_key,
2016                operational_write_kind(write),
2017                payload_json,
2018                operational_write_source_ref(write),
2019                next_mutation_order,
2020            ])?;
2021            if let Some(indexes) = collection_secondary_indexes.get(collection) {
2022                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2023                    ins_secondary_index.execute(params![
2024                        collection,
2025                        entry.index_name,
2026                        "mutation",
2027                        &mutation_id,
2028                        record_key,
2029                        entry.sort_timestamp,
2030                        entry.slot1_text,
2031                        entry.slot1_integer,
2032                        entry.slot2_text,
2033                        entry.slot2_integer,
2034                        entry.slot3_text,
2035                        entry.slot3_integer,
2036                    ])?;
2037                }
2038            }
2039            if let Some(filter_fields) = prepared
2040                .operational_collection_filter_fields
2041                .get(collection)
2042            {
2043                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2044                    ins_filter_value.execute(params![
2045                        &mutation_id,
2046                        collection,
2047                        filter_value.field_name,
2048                        filter_value.string_value,
2049                        filter_value.integer_value,
2050                    ])?;
2051                }
2052            }
2053
2054            if prepared.operational_collection_kinds.get(collection)
2055                == Some(&OperationalCollectionKind::LatestState)
2056            {
2057                del_current_secondary_indexes.execute(params![collection, record_key])?;
2058                match write {
2059                    OperationalWrite::Put { payload_json, .. } => {
2060                        upsert_current.execute(params![
2061                            collection,
2062                            record_key,
2063                            payload_json,
2064                            &mutation_id,
2065                        ])?;
2066                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
2067                            let (current_payload_json, updated_at, last_mutation_id): (
2068                                String,
2069                                i64,
2070                                String,
2071                            ) = current_row_stmt
2072                                .query_row(params![collection, record_key], |row| {
2073                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2074                                })?;
2075                            for entry in extract_secondary_index_entries_for_current(
2076                                indexes,
2077                                &current_payload_json,
2078                                updated_at,
2079                            ) {
2080                                ins_secondary_index.execute(params![
2081                                    collection,
2082                                    entry.index_name,
2083                                    "current",
2084                                    last_mutation_id.as_str(),
2085                                    record_key,
2086                                    entry.sort_timestamp,
2087                                    entry.slot1_text,
2088                                    entry.slot1_integer,
2089                                    entry.slot2_text,
2090                                    entry.slot2_integer,
2091                                    entry.slot3_text,
2092                                    entry.slot3_integer,
2093                                ])?;
2094                            }
2095                        }
2096                    }
2097                    OperationalWrite::Delete { .. } => {
2098                        del_current.execute(params![collection, record_key])?;
2099                    }
2100                    OperationalWrite::Append { .. } => {}
2101                }
2102            }
2103        }
2104    }
2105
2106    // FTS row inserts.
2107    {
2108        let mut ins_fts = tx.prepare_cached(
2109            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2110             VALUES (?1, ?2, ?3, ?4)",
2111        )?;
2112        for fts_row in &prepared.required_fts_rows {
2113            ins_fts.execute(params![
2114                fts_row.chunk_id,
2115                fts_row.node_logical_id,
2116                fts_row.kind,
2117                fts_row.text_content,
2118            ])?;
2119        }
2120    }
2121
2122    // Property FTS row inserts.
2123    if !prepared.required_property_fts_rows.is_empty() {
2124        let mut ins_positions = tx.prepare_cached(
2125            "INSERT INTO fts_node_property_positions \
2126             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2127             VALUES (?1, ?2, ?3, ?4, ?5)",
2128        )?;
2129        // Delete any stale position rows for these nodes; writer already
2130        // deleted the per-kind FTS rows in the upsert block above.
2131        let mut del_positions = tx
2132            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2133        for row in &prepared.required_property_fts_rows {
2134            del_positions.execute(params![row.node_logical_id])?;
2135            // Insert into per-kind FTS table (table name is dynamic).
2136            let table = fathomdb_schema::fts_kind_table_name(&row.kind);
2137            ensure_property_fts_table_for_row(&tx, &row.kind, &row.columns)?;
2138            if row.columns.is_empty() {
2139                tx.prepare(&format!(
2140                    "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
2141                ))?
2142                .execute(params![row.node_logical_id, row.text_content])?;
2143            } else {
2144                // Weighted schema: per-column INSERT.
2145                let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
2146                let placeholders: Vec<String> = (2..=row.columns.len() + 1)
2147                    .map(|i| format!("?{i}"))
2148                    .collect();
2149                let sql = format!(
2150                    "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
2151                    cols = col_names.join(", "),
2152                    placeholders = placeholders.join(", "),
2153                );
2154                let mut stmt = tx.prepare(&sql)?;
2155                stmt.execute(rusqlite::params_from_iter(
2156                    std::iter::once(row.node_logical_id.as_str())
2157                        .chain(row.columns.iter().map(|(_, v)| v.as_str())),
2158                ))?;
2159            }
2160            for pos in &row.positions {
2161                ins_positions.execute(params![
2162                    row.node_logical_id,
2163                    row.kind,
2164                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2165                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2166                    pos.leaf_path,
2167                ])?;
2168            }
2169        }
2170    }
2171
2172    // Double-write to staging: for nodes whose kind has an active rebuild
2173    // (state IN ('PENDING','BUILDING','SWAPPING')), upsert the precomputed
2174    // text_content into fts_property_rebuild_staging.  Both this upsert and
2175    // the per-kind FTS insert above happen inside the same transaction.
2176    if !prepared.required_property_fts_rows.is_empty() {
2177        let mut ins_staging_simple = tx.prepare_cached(
2178            "INSERT INTO fts_property_rebuild_staging \
2179             (kind, node_logical_id, text_content) \
2180             VALUES (?1, ?2, ?3) \
2181             ON CONFLICT(kind, node_logical_id) DO UPDATE \
2182             SET text_content = excluded.text_content",
2183        )?;
2184        let mut ins_staging_columns = tx.prepare_cached(
2185            "INSERT INTO fts_property_rebuild_staging \
2186             (kind, node_logical_id, text_content, columns_json) \
2187             VALUES (?1, ?2, ?3, ?4) \
2188             ON CONFLICT(kind, node_logical_id) DO UPDATE \
2189             SET text_content = excluded.text_content, \
2190                 columns_json = excluded.columns_json",
2191        )?;
2192        let mut check_rebuild = tx.prepare_cached(
2193            "SELECT 1 FROM fts_property_rebuild_state \
2194             WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2195        )?;
2196        for row in &prepared.required_property_fts_rows {
2197            let in_rebuild: bool = check_rebuild
2198                .query_row(params![row.kind], |_| Ok(true))
2199                .optional()?
2200                .unwrap_or(false);
2201            if in_rebuild {
2202                if row.columns.is_empty() {
2203                    ins_staging_simple.execute(params![
2204                        row.kind,
2205                        row.node_logical_id,
2206                        row.text_content
2207                    ])?;
2208                } else {
2209                    let json_map: serde_json::Map<String, serde_json::Value> = row
2210                        .columns
2211                        .iter()
2212                        .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
2213                        .collect();
2214                    let columns_json =
2215                        serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
2216                    ins_staging_columns.execute(params![
2217                        row.kind,
2218                        row.node_logical_id,
2219                        row.text_content,
2220                        columns_json
2221                    ])?;
2222                }
2223            }
2224        }
2225    }
2226
2227    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
2228    // 0.5.0: inserts target per-kind tables (vec_<sanitized_kind>) instead of vec_nodes_active.
2229    #[cfg(feature = "sqlite-vec")]
2230    {
2231        // Build chunk_id → node_logical_id from the current request's chunks.
2232        let chunk_to_node: std::collections::HashMap<&str, &str> = prepared
2233            .chunks
2234            .iter()
2235            .map(|c| (c.id.as_str(), c.node_logical_id.as_str()))
2236            .collect();
2237
2238        for vi in &prepared.vec_inserts {
2239            // Resolve kind: prefer in-memory lookup; fall back to DB for pre-existing chunks.
2240            let kind: Option<String> = chunk_to_node
2241                .get(vi.chunk_id.as_str())
2242                .and_then(|lid| prepared.node_kinds.get(*lid))
2243                .cloned()
2244                .or_else(|| {
2245                    tx.query_row(
2246                        "SELECT n.kind FROM chunks c \
2247                         JOIN nodes n ON n.logical_id = c.node_logical_id \
2248                         WHERE c.id = ?1 LIMIT 1",
2249                        rusqlite::params![vi.chunk_id],
2250                        |row| row.get::<_, String>(0),
2251                    )
2252                    .optional()
2253                    .unwrap_or(None)
2254                });
2255
2256            let Some(kind) = kind else {
2257                // No kind found — chunk not yet inserted; skip silently.
2258                continue;
2259            };
2260
2261            let table_name = fathomdb_schema::vec_kind_table_name(&kind);
2262            let dimension = vi.embedding.len();
2263            let bytes: Vec<u8> = vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2264            // Auto-create the per-kind vec table if it does not exist yet.
2265            tx.execute_batch(&format!(
2266                "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
2267                    chunk_id TEXT PRIMARY KEY,\
2268                    embedding float[{dimension}]\
2269                )"
2270            ))?;
2271            tx.execute(
2272                &format!(
2273                    "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
2274                ),
2275                rusqlite::params![vi.chunk_id, bytes],
2276            )?;
2277        }
2278    }
2279
2280    tx.commit()?;
2281
2282    let provenance_warnings: Vec<String> = prepared
2283        .nodes
2284        .iter()
2285        .filter(|node| node.source_ref.is_none())
2286        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2287        .chain(
2288            prepared
2289                .node_retires
2290                .iter()
2291                .filter(|r| r.source_ref.is_none())
2292                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2293        )
2294        .chain(
2295            prepared
2296                .edges
2297                .iter()
2298                .filter(|e| e.source_ref.is_none())
2299                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2300        )
2301        .chain(
2302            prepared
2303                .edge_retires
2304                .iter()
2305                .filter(|r| r.source_ref.is_none())
2306                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2307        )
2308        .chain(
2309            prepared
2310                .runs
2311                .iter()
2312                .filter(|r| r.source_ref.is_none())
2313                .map(|r| format!("run '{}' has no source_ref", r.id)),
2314        )
2315        .chain(
2316            prepared
2317                .steps
2318                .iter()
2319                .filter(|s| s.source_ref.is_none())
2320                .map(|s| format!("step '{}' has no source_ref", s.id)),
2321        )
2322        .chain(
2323            prepared
2324                .actions
2325                .iter()
2326                .filter(|a| a.source_ref.is_none())
2327                .map(|a| format!("action '{}' has no source_ref", a.id)),
2328        )
2329        .chain(
2330            prepared
2331                .operational_writes
2332                .iter()
2333                .filter(|write| operational_write_source_ref(write).is_none())
2334                .map(|write| {
2335                    format!(
2336                        "operational {} '{}:{}' has no source_ref",
2337                        operational_write_kind(write),
2338                        operational_write_collection(write),
2339                        operational_write_record_key(write)
2340                    )
2341                }),
2342        )
2343        .collect();
2344
2345    let mut warnings = provenance_warnings.clone();
2346    warnings.extend(prepared.operational_validation_warnings.clone());
2347
2348    Ok(WriteReceipt {
2349        label: prepared.label.clone(),
2350        optional_backfill_count: prepared.optional_backfills.len(),
2351        warnings,
2352        provenance_warnings,
2353    })
2354}
2355
2356/// Pack E: enqueue `vector_projection_work` rows for chunks inserted in this
2357/// write.  A row is enqueued with `priority = 1000` (incremental) when:
2358///
2359/// * an active embedding profile exists in `vector_embedding_profiles`, AND
2360/// * the chunk's node kind has an enabled row in `vector_index_schemas`.
2361///
2362/// If a pending row already exists for `(chunk_id, embedding_profile_id)` its
2363/// priority is promoted to `max(existing, 1000)`; otherwise a fresh row is
2364/// inserted.  When no active profile exists or the kind is not enabled, this
2365/// is a no-op.  Runs inside the same transaction as the canonical commit.
2366fn maybe_enqueue_vector_projection_work(
2367    tx: &rusqlite::Transaction<'_>,
2368    prepared: &PreparedWrite,
2369) -> Result<(), EngineError> {
2370    if prepared.chunks.is_empty() {
2371        return Ok(());
2372    }
2373
2374    // Resolve the active embedding profile once.
2375    let active_profile: Option<i64> = tx
2376        .query_row(
2377            "SELECT profile_id FROM vector_embedding_profiles WHERE active = 1",
2378            [],
2379            |row| row.get::<_, i64>(0),
2380        )
2381        .optional()?;
2382    let Some(profile_id) = active_profile else {
2383        return Ok(());
2384    };
2385
2386    // Cache enabled-kind lookups to avoid repeated probes.
2387    let mut enabled_cache: HashMap<String, bool> = HashMap::new();
2388
2389    let mut is_kind_enabled = |kind: &str| -> Result<bool, EngineError> {
2390        if let Some(v) = enabled_cache.get(kind) {
2391            return Ok(*v);
2392        }
2393        let enabled: bool = tx
2394            .query_row(
2395                "SELECT 1 FROM vector_index_schemas WHERE kind = ?1 AND enabled = 1",
2396                params![kind],
2397                |_| Ok(true),
2398            )
2399            .optional()?
2400            .unwrap_or(false);
2401        enabled_cache.insert(kind.to_owned(), enabled);
2402        Ok(enabled)
2403    };
2404
2405    let mut ins_stmt = tx.prepare(
2406        "INSERT INTO vector_projection_work \
2407         (kind, node_logical_id, chunk_id, canonical_hash, priority, \
2408          embedding_profile_id, state, created_at, updated_at) \
2409         SELECT ?1, ?2, ?3, ?4, 1000, ?5, 'pending', unixepoch(), unixepoch() \
2410         WHERE NOT EXISTS ( \
2411             SELECT 1 FROM vector_projection_work \
2412             WHERE chunk_id = ?3 AND embedding_profile_id = ?5 AND state = 'pending' \
2413         )",
2414    )?;
2415    let mut promote_stmt = tx.prepare(
2416        "UPDATE vector_projection_work \
2417         SET priority = 1000, updated_at = unixepoch() \
2418         WHERE chunk_id = ?1 AND embedding_profile_id = ?2 AND state = 'pending' \
2419           AND priority < 1000",
2420    )?;
2421
2422    for chunk in &prepared.chunks {
2423        // Resolve kind: prefer the in-memory cache from co-submitted nodes;
2424        // fall back to the DB for pre-existing nodes.
2425        let kind_owned: Option<String> =
2426            if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
2427                Some(k.clone())
2428            } else {
2429                tx.query_row(
2430                    "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
2431                    params![chunk.node_logical_id],
2432                    |row| row.get::<_, String>(0),
2433                )
2434                .optional()?
2435            };
2436        let Some(kind) = kind_owned else {
2437            continue;
2438        };
2439        if !is_kind_enabled(&kind)? {
2440            continue;
2441        }
2442        let canonical_hash = crate::admin::canonical_chunk_hash(&chunk.id, &chunk.text_content);
2443        let inserted = ins_stmt.execute(params![
2444            kind,
2445            chunk.node_logical_id,
2446            chunk.id,
2447            canonical_hash,
2448            profile_id,
2449        ])?;
2450        if inserted == 0 {
2451            // Existing pending row: promote priority to 1000 if lower.
2452            promote_stmt.execute(params![chunk.id, profile_id])?;
2453        }
2454    }
2455    Ok(())
2456}
2457
2458fn ensure_property_fts_table_for_row(
2459    conn: &rusqlite::Connection,
2460    kind: &str,
2461    columns: &[(String, String)],
2462) -> Result<(), rusqlite::Error> {
2463    let table = fathomdb_schema::fts_kind_table_name(kind);
2464    let exists: bool = conn
2465        .query_row(
2466            "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 \
2467             AND sql LIKE 'CREATE VIRTUAL TABLE%'",
2468            rusqlite::params![table],
2469            |_| Ok(true),
2470        )
2471        .optional()?
2472        .unwrap_or(false);
2473    if exists {
2474        return Ok(());
2475    }
2476
2477    let tokenizer = fathomdb_schema::resolve_fts_tokenizer(conn, kind)
2478        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
2479    let tokenizer_sql = tokenizer.replace('\'', "''");
2480    let cols: Vec<String> = if columns.is_empty() {
2481        vec![
2482            "node_logical_id UNINDEXED".to_owned(),
2483            "text_content".to_owned(),
2484        ]
2485    } else {
2486        std::iter::once("node_logical_id UNINDEXED".to_owned())
2487            .chain(columns.iter().map(|(name, _)| name.clone()))
2488            .collect()
2489    };
2490    conn.execute_batch(&format!(
2491        "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5({cols}, tokenize='{tokenizer_sql}')",
2492        cols = cols.join(", "),
2493    ))?;
2494    Ok(())
2495}
2496
2497fn operational_write_collection(write: &OperationalWrite) -> &str {
2498    match write {
2499        OperationalWrite::Append { collection, .. }
2500        | OperationalWrite::Put { collection, .. }
2501        | OperationalWrite::Delete { collection, .. } => collection,
2502    }
2503}
2504
2505fn operational_write_record_key(write: &OperationalWrite) -> &str {
2506    match write {
2507        OperationalWrite::Append { record_key, .. }
2508        | OperationalWrite::Put { record_key, .. }
2509        | OperationalWrite::Delete { record_key, .. } => record_key,
2510    }
2511}
2512
2513fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2514    match write {
2515        OperationalWrite::Append { .. } => "append",
2516        OperationalWrite::Put { .. } => "put",
2517        OperationalWrite::Delete { .. } => "delete",
2518    }
2519}
2520
2521fn operational_write_payload(write: &OperationalWrite) -> &str {
2522    match write {
2523        OperationalWrite::Append { payload_json, .. }
2524        | OperationalWrite::Put { payload_json, .. } => payload_json,
2525        OperationalWrite::Delete { .. } => "null",
2526    }
2527}
2528
2529fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2530    match write {
2531        OperationalWrite::Append { source_ref, .. }
2532        | OperationalWrite::Put { source_ref, .. }
2533        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2534    }
2535}
2536
2537#[cfg(test)]
2538#[allow(clippy::expect_used)]
2539mod tests {
2540    use std::sync::Arc;
2541
2542    use fathomdb_schema::SchemaManager;
2543    use tempfile::NamedTempFile;
2544
2545    use super::{apply_write, prepare_write, resolve_operational_writes};
2546    use crate::{
2547        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2548        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2549        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2550        projection::ProjectionTarget,
2551    };
2552
2553    #[test]
2554    fn writer_executes_runtime_table_rows() {
2555        let db = NamedTempFile::new().expect("temporary db");
2556        let writer = WriterActor::start(
2557            db.path(),
2558            Arc::new(SchemaManager::new()),
2559            ProvenanceMode::Warn,
2560            Arc::new(TelemetryCounters::default()),
2561        )
2562        .expect("writer");
2563
2564        let receipt = writer
2565            .submit(WriteRequest {
2566                label: "runtime".to_owned(),
2567                nodes: vec![],
2568                node_retires: vec![],
2569                edges: vec![],
2570                edge_retires: vec![],
2571                chunks: vec![],
2572                runs: vec![RunInsert {
2573                    id: "run-1".to_owned(),
2574                    kind: "session".to_owned(),
2575                    status: "completed".to_owned(),
2576                    properties: "{}".to_owned(),
2577                    source_ref: Some("src-1".to_owned()),
2578                    upsert: false,
2579                    supersedes_id: None,
2580                }],
2581                steps: vec![StepInsert {
2582                    id: "step-1".to_owned(),
2583                    run_id: "run-1".to_owned(),
2584                    kind: "llm".to_owned(),
2585                    status: "completed".to_owned(),
2586                    properties: "{}".to_owned(),
2587                    source_ref: Some("src-1".to_owned()),
2588                    upsert: false,
2589                    supersedes_id: None,
2590                }],
2591                actions: vec![ActionInsert {
2592                    id: "action-1".to_owned(),
2593                    step_id: "step-1".to_owned(),
2594                    kind: "emit".to_owned(),
2595                    status: "completed".to_owned(),
2596                    properties: "{}".to_owned(),
2597                    source_ref: Some("src-1".to_owned()),
2598                    upsert: false,
2599                    supersedes_id: None,
2600                }],
2601                optional_backfills: vec![],
2602                vec_inserts: vec![],
2603                operational_writes: vec![],
2604            })
2605            .expect("write receipt");
2606
2607        assert_eq!(receipt.label, "runtime");
2608    }
2609    #[test]
2610    fn writer_put_operational_write_updates_current_and_mutations() {
2611        let db = NamedTempFile::new().expect("temporary db");
2612        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2613        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2614        conn.execute(
2615            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2616             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2617            [],
2618        )
2619        .expect("seed collection");
2620        drop(conn);
2621        let writer = WriterActor::start(
2622            db.path(),
2623            Arc::new(SchemaManager::new()),
2624            ProvenanceMode::Warn,
2625            Arc::new(TelemetryCounters::default()),
2626        )
2627        .expect("writer");
2628
2629        writer
2630            .submit(WriteRequest {
2631                label: "node-and-operational".to_owned(),
2632                nodes: vec![NodeInsert {
2633                    row_id: "row-1".to_owned(),
2634                    logical_id: "lg-1".to_owned(),
2635                    kind: "Meeting".to_owned(),
2636                    properties: "{}".to_owned(),
2637                    source_ref: Some("src-1".to_owned()),
2638                    upsert: false,
2639                    chunk_policy: ChunkPolicy::Preserve,
2640                    content_ref: None,
2641                }],
2642                node_retires: vec![],
2643                edges: vec![],
2644                edge_retires: vec![],
2645                chunks: vec![],
2646                runs: vec![],
2647                steps: vec![],
2648                actions: vec![],
2649                optional_backfills: vec![],
2650                vec_inserts: vec![],
2651                operational_writes: vec![OperationalWrite::Put {
2652                    collection: "connector_health".to_owned(),
2653                    record_key: "gmail".to_owned(),
2654                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2655                    source_ref: Some("src-1".to_owned()),
2656                }],
2657            })
2658            .expect("write receipt");
2659
2660        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2661        let node_count: i64 = conn
2662            .query_row(
2663                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2664                [],
2665                |row| row.get(0),
2666            )
2667            .expect("node count");
2668        assert_eq!(node_count, 1);
2669        let mutation_count: i64 = conn
2670            .query_row(
2671                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2672                 AND record_key = 'gmail'",
2673                [],
2674                |row| row.get(0),
2675            )
2676            .expect("mutation count");
2677        assert_eq!(mutation_count, 1);
2678        let payload: String = conn
2679            .query_row(
2680                "SELECT payload_json FROM operational_current \
2681                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2682                [],
2683                |row| row.get(0),
2684            )
2685            .expect("current payload");
2686        assert_eq!(payload, r#"{"status":"ok"}"#);
2687    }
2688
2689    #[test]
2690    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2691        let db = NamedTempFile::new().expect("temporary db");
2692        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2693        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2694        conn.execute(
2695            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2696             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2697            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2698        )
2699        .expect("seed collection");
2700        drop(conn);
2701        let writer = WriterActor::start(
2702            db.path(),
2703            Arc::new(SchemaManager::new()),
2704            ProvenanceMode::Warn,
2705            Arc::new(TelemetryCounters::default()),
2706        )
2707        .expect("writer");
2708
2709        writer
2710            .submit(WriteRequest {
2711                label: "disabled-validation".to_owned(),
2712                nodes: vec![],
2713                node_retires: vec![],
2714                edges: vec![],
2715                edge_retires: vec![],
2716                chunks: vec![],
2717                runs: vec![],
2718                steps: vec![],
2719                actions: vec![],
2720                optional_backfills: vec![],
2721                vec_inserts: vec![],
2722                operational_writes: vec![OperationalWrite::Put {
2723                    collection: "connector_health".to_owned(),
2724                    record_key: "gmail".to_owned(),
2725                    payload_json: r#"{"bogus":true}"#.to_owned(),
2726                    source_ref: Some("src-1".to_owned()),
2727                }],
2728            })
2729            .expect("write receipt");
2730
2731        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2732        let payload: String = conn
2733            .query_row(
2734                "SELECT payload_json FROM operational_current \
2735                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2736                [],
2737                |row| row.get(0),
2738            )
2739            .expect("current payload");
2740        assert_eq!(payload, r#"{"bogus":true}"#);
2741    }
2742
2743    #[test]
2744    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2745        let db = NamedTempFile::new().expect("temporary db");
2746        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2747        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2748        conn.execute(
2749            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2750             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2751            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2752        )
2753        .expect("seed collection");
2754        drop(conn);
2755        let writer = WriterActor::start(
2756            db.path(),
2757            Arc::new(SchemaManager::new()),
2758            ProvenanceMode::Warn,
2759            Arc::new(TelemetryCounters::default()),
2760        )
2761        .expect("writer");
2762
2763        let receipt = writer
2764            .submit(WriteRequest {
2765                label: "report-only-validation".to_owned(),
2766                nodes: vec![],
2767                node_retires: vec![],
2768                edges: vec![],
2769                edge_retires: vec![],
2770                chunks: vec![],
2771                runs: vec![],
2772                steps: vec![],
2773                actions: vec![],
2774                optional_backfills: vec![],
2775                vec_inserts: vec![],
2776                operational_writes: vec![OperationalWrite::Put {
2777                    collection: "connector_health".to_owned(),
2778                    record_key: "gmail".to_owned(),
2779                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2780                    source_ref: Some("src-1".to_owned()),
2781                }],
2782            })
2783            .expect("report_only write should succeed");
2784
2785        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2786        assert_eq!(receipt.warnings.len(), 1);
2787        assert!(
2788            receipt.warnings[0].contains("connector_health"),
2789            "warning should identify collection"
2790        );
2791        assert!(
2792            receipt.warnings[0].contains("must be one of"),
2793            "warning should explain validation failure"
2794        );
2795
2796        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2797        let payload: String = conn
2798            .query_row(
2799                "SELECT payload_json FROM operational_current \
2800                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2801                [],
2802                |row| row.get(0),
2803            )
2804            .expect("current payload");
2805        assert_eq!(payload, r#"{"status":"bogus"}"#);
2806    }
2807
2808    #[test]
2809    fn writer_rejects_operational_write_for_missing_collection() {
2810        let db = NamedTempFile::new().expect("temporary db");
2811        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2812        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2813        drop(conn);
2814        let writer = WriterActor::start(
2815            db.path(),
2816            Arc::new(SchemaManager::new()),
2817            ProvenanceMode::Warn,
2818            Arc::new(TelemetryCounters::default()),
2819        )
2820        .expect("writer");
2821
2822        let result = writer.submit(WriteRequest {
2823            label: "missing-operational-collection".to_owned(),
2824            nodes: vec![],
2825            node_retires: vec![],
2826            edges: vec![],
2827            edge_retires: vec![],
2828            chunks: vec![],
2829            runs: vec![],
2830            steps: vec![],
2831            actions: vec![],
2832            optional_backfills: vec![],
2833            vec_inserts: vec![],
2834            operational_writes: vec![OperationalWrite::Put {
2835                collection: "connector_health".to_owned(),
2836                record_key: "gmail".to_owned(),
2837                payload_json: r#"{"status":"ok"}"#.to_owned(),
2838                source_ref: Some("src-1".to_owned()),
2839            }],
2840        });
2841
2842        assert!(
2843            matches!(result, Err(EngineError::InvalidWrite(_))),
2844            "missing operational collection must return InvalidWrite"
2845        );
2846    }
2847
2848    #[test]
2849    fn writer_append_operational_write_records_history_without_current_row() {
2850        let db = NamedTempFile::new().expect("temporary db");
2851        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2852        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2853        conn.execute(
2854            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2855             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2856            [],
2857        )
2858        .expect("seed collection");
2859        drop(conn);
2860        let writer = WriterActor::start(
2861            db.path(),
2862            Arc::new(SchemaManager::new()),
2863            ProvenanceMode::Warn,
2864            Arc::new(TelemetryCounters::default()),
2865        )
2866        .expect("writer");
2867
2868        writer
2869            .submit(WriteRequest {
2870                label: "append-operational".to_owned(),
2871                nodes: vec![],
2872                node_retires: vec![],
2873                edges: vec![],
2874                edge_retires: vec![],
2875                chunks: vec![],
2876                runs: vec![],
2877                steps: vec![],
2878                actions: vec![],
2879                optional_backfills: vec![],
2880                vec_inserts: vec![],
2881                operational_writes: vec![OperationalWrite::Append {
2882                    collection: "audit_log".to_owned(),
2883                    record_key: "evt-1".to_owned(),
2884                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2885                    source_ref: Some("src-1".to_owned()),
2886                }],
2887            })
2888            .expect("write receipt");
2889
2890        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2891        let mutation: (String, String) = conn
2892            .query_row(
2893                "SELECT op_kind, payload_json FROM operational_mutations \
2894                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2895                [],
2896                |row| Ok((row.get(0)?, row.get(1)?)),
2897            )
2898            .expect("mutation row");
2899        assert_eq!(mutation.0, "append");
2900        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2901        let current_count: i64 = conn
2902            .query_row(
2903                "SELECT count(*) FROM operational_current \
2904                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2905                [],
2906                |row| row.get(0),
2907            )
2908            .expect("current count");
2909        assert_eq!(current_count, 0);
2910    }
2911
2912    #[test]
2913    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2914        let db = NamedTempFile::new().expect("temporary db");
2915        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2916        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2917        conn.execute(
2918            "INSERT INTO operational_collections \
2919             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2920             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2921                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2922            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2923        )
2924        .expect("seed collection");
2925        drop(conn);
2926        let writer = WriterActor::start(
2927            db.path(),
2928            Arc::new(SchemaManager::new()),
2929            ProvenanceMode::Warn,
2930            Arc::new(TelemetryCounters::default()),
2931        )
2932        .expect("writer");
2933
2934        let error = writer
2935            .submit(WriteRequest {
2936                label: "invalid-append".to_owned(),
2937                nodes: vec![],
2938                node_retires: vec![],
2939                edges: vec![],
2940                edge_retires: vec![],
2941                chunks: vec![],
2942                runs: vec![],
2943                steps: vec![],
2944                actions: vec![],
2945                optional_backfills: vec![],
2946                vec_inserts: vec![],
2947                operational_writes: vec![OperationalWrite::Append {
2948                    collection: "audit_log".to_owned(),
2949                    record_key: "evt-1".to_owned(),
2950                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2951                    source_ref: Some("src-1".to_owned()),
2952                }],
2953            })
2954            .expect_err("invalid append must reject");
2955        assert!(matches!(error, EngineError::InvalidWrite(_)));
2956        assert!(error.to_string().contains("must be one of"));
2957
2958        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2959        let mutation_count: i64 = conn
2960            .query_row(
2961                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2962                [],
2963                |row| row.get(0),
2964            )
2965            .expect("mutation count");
2966        assert_eq!(mutation_count, 0);
2967        let filter_count: i64 = conn
2968            .query_row(
2969                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2970                [],
2971                |row| row.get(0),
2972            )
2973            .expect("filter count");
2974        assert_eq!(filter_count, 0);
2975    }
2976
2977    #[test]
2978    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2979        let db = NamedTempFile::new().expect("temporary db");
2980        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2981        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2982        conn.execute(
2983            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2984             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2985            [],
2986        )
2987        .expect("seed collection");
2988        drop(conn);
2989        let writer = WriterActor::start(
2990            db.path(),
2991            Arc::new(SchemaManager::new()),
2992            ProvenanceMode::Warn,
2993            Arc::new(TelemetryCounters::default()),
2994        )
2995        .expect("writer");
2996
2997        writer
2998            .submit(WriteRequest {
2999                label: "put-operational".to_owned(),
3000                nodes: vec![],
3001                node_retires: vec![],
3002                edges: vec![],
3003                edge_retires: vec![],
3004                chunks: vec![],
3005                runs: vec![],
3006                steps: vec![],
3007                actions: vec![],
3008                optional_backfills: vec![],
3009                vec_inserts: vec![],
3010                operational_writes: vec![OperationalWrite::Put {
3011                    collection: "connector_health".to_owned(),
3012                    record_key: "gmail".to_owned(),
3013                    payload_json: r#"{"status":"ok"}"#.to_owned(),
3014                    source_ref: Some("src-1".to_owned()),
3015                }],
3016            })
3017            .expect("put receipt");
3018
3019        writer
3020            .submit(WriteRequest {
3021                label: "delete-operational".to_owned(),
3022                nodes: vec![],
3023                node_retires: vec![],
3024                edges: vec![],
3025                edge_retires: vec![],
3026                chunks: vec![],
3027                runs: vec![],
3028                steps: vec![],
3029                actions: vec![],
3030                optional_backfills: vec![],
3031                vec_inserts: vec![],
3032                operational_writes: vec![OperationalWrite::Delete {
3033                    collection: "connector_health".to_owned(),
3034                    record_key: "gmail".to_owned(),
3035                    source_ref: Some("src-2".to_owned()),
3036                }],
3037            })
3038            .expect("delete receipt");
3039
3040        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3041        let mutation_kinds: Vec<String> = {
3042            let mut stmt = conn
3043                .prepare(
3044                    "SELECT op_kind FROM operational_mutations \
3045                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3046                     ORDER BY mutation_order ASC",
3047                )
3048                .expect("stmt");
3049            stmt.query_map([], |row| row.get(0))
3050                .expect("rows")
3051                .collect::<Result<_, _>>()
3052                .expect("collect")
3053        };
3054        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
3055        let current_count: i64 = conn
3056            .query_row(
3057                "SELECT count(*) FROM operational_current \
3058                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3059                [],
3060                |row| row.get(0),
3061            )
3062            .expect("current count");
3063        assert_eq!(current_count, 0);
3064    }
3065
3066    #[test]
3067    fn writer_delete_bypasses_validation_contract() {
3068        let db = NamedTempFile::new().expect("temporary db");
3069        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3070        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3071        conn.execute(
3072            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3073             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3074            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3075        )
3076        .expect("seed collection");
3077        drop(conn);
3078        let writer = WriterActor::start(
3079            db.path(),
3080            Arc::new(SchemaManager::new()),
3081            ProvenanceMode::Warn,
3082            Arc::new(TelemetryCounters::default()),
3083        )
3084        .expect("writer");
3085
3086        writer
3087            .submit(WriteRequest {
3088                label: "valid-put".to_owned(),
3089                nodes: vec![],
3090                node_retires: vec![],
3091                edges: vec![],
3092                edge_retires: vec![],
3093                chunks: vec![],
3094                runs: vec![],
3095                steps: vec![],
3096                actions: vec![],
3097                optional_backfills: vec![],
3098                vec_inserts: vec![],
3099                operational_writes: vec![OperationalWrite::Put {
3100                    collection: "connector_health".to_owned(),
3101                    record_key: "gmail".to_owned(),
3102                    payload_json: r#"{"status":"ok"}"#.to_owned(),
3103                    source_ref: Some("src-1".to_owned()),
3104                }],
3105            })
3106            .expect("put receipt");
3107        writer
3108            .submit(WriteRequest {
3109                label: "delete-after-put".to_owned(),
3110                nodes: vec![],
3111                node_retires: vec![],
3112                edges: vec![],
3113                edge_retires: vec![],
3114                chunks: vec![],
3115                runs: vec![],
3116                steps: vec![],
3117                actions: vec![],
3118                optional_backfills: vec![],
3119                vec_inserts: vec![],
3120                operational_writes: vec![OperationalWrite::Delete {
3121                    collection: "connector_health".to_owned(),
3122                    record_key: "gmail".to_owned(),
3123                    source_ref: Some("src-2".to_owned()),
3124                }],
3125            })
3126            .expect("delete receipt");
3127
3128        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3129        let current_count: i64 = conn
3130            .query_row(
3131                "SELECT count(*) FROM operational_current \
3132                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3133                [],
3134                |row| row.get(0),
3135            )
3136            .expect("current count");
3137        assert_eq!(current_count, 0);
3138    }
3139
3140    #[test]
3141    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
3142        let db = NamedTempFile::new().expect("temporary db");
3143        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3144        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3145        conn.execute(
3146            "INSERT INTO operational_collections \
3147             (name, kind, schema_json, retention_json, secondary_indexes_json) \
3148             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3149            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
3150        )
3151        .expect("seed collection");
3152        drop(conn);
3153        let writer = WriterActor::start(
3154            db.path(),
3155            Arc::new(SchemaManager::new()),
3156            ProvenanceMode::Warn,
3157            Arc::new(TelemetryCounters::default()),
3158        )
3159        .expect("writer");
3160
3161        writer
3162            .submit(WriteRequest {
3163                label: "secondary-index-put".to_owned(),
3164                nodes: vec![],
3165                node_retires: vec![],
3166                edges: vec![],
3167                edge_retires: vec![],
3168                chunks: vec![],
3169                runs: vec![],
3170                steps: vec![],
3171                actions: vec![],
3172                optional_backfills: vec![],
3173                vec_inserts: vec![],
3174                operational_writes: vec![OperationalWrite::Put {
3175                    collection: "connector_health".to_owned(),
3176                    record_key: "gmail".to_owned(),
3177                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
3178                        .to_owned(),
3179                    source_ref: Some("src-1".to_owned()),
3180                }],
3181            })
3182            .expect("put receipt");
3183
3184        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3185        let current_entry_count: i64 = conn
3186            .query_row(
3187                "SELECT count(*) FROM operational_secondary_index_entries \
3188                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3189                [],
3190                |row| row.get(0),
3191            )
3192            .expect("current secondary index count");
3193        assert_eq!(current_entry_count, 2);
3194        drop(conn);
3195
3196        writer
3197            .submit(WriteRequest {
3198                label: "secondary-index-delete".to_owned(),
3199                nodes: vec![],
3200                node_retires: vec![],
3201                edges: vec![],
3202                edge_retires: vec![],
3203                chunks: vec![],
3204                runs: vec![],
3205                steps: vec![],
3206                actions: vec![],
3207                optional_backfills: vec![],
3208                vec_inserts: vec![],
3209                operational_writes: vec![OperationalWrite::Delete {
3210                    collection: "connector_health".to_owned(),
3211                    record_key: "gmail".to_owned(),
3212                    source_ref: Some("src-2".to_owned()),
3213                }],
3214            })
3215            .expect("delete receipt");
3216
3217        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3218        let current_entry_count: i64 = conn
3219            .query_row(
3220                "SELECT count(*) FROM operational_secondary_index_entries \
3221                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3222                [],
3223                |row| row.get(0),
3224            )
3225            .expect("current secondary index count");
3226        assert_eq!(current_entry_count, 0);
3227    }
3228
3229    #[test]
3230    fn writer_latest_state_operational_writes_persist_mutation_order() {
3231        let db = NamedTempFile::new().expect("temporary db");
3232        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3233        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3234        conn.execute(
3235            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3236             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3237            [],
3238        )
3239        .expect("seed collection");
3240        drop(conn);
3241
3242        let writer = WriterActor::start(
3243            db.path(),
3244            Arc::new(SchemaManager::new()),
3245            ProvenanceMode::Warn,
3246            Arc::new(TelemetryCounters::default()),
3247        )
3248        .expect("writer");
3249
3250        writer
3251            .submit(WriteRequest {
3252                label: "ordered-operational-batch".to_owned(),
3253                nodes: vec![],
3254                node_retires: vec![],
3255                edges: vec![],
3256                edge_retires: vec![],
3257                chunks: vec![],
3258                runs: vec![],
3259                steps: vec![],
3260                actions: vec![],
3261                optional_backfills: vec![],
3262                vec_inserts: vec![],
3263                operational_writes: vec![
3264                    OperationalWrite::Put {
3265                        collection: "connector_health".to_owned(),
3266                        record_key: "gmail".to_owned(),
3267                        payload_json: r#"{"status":"old"}"#.to_owned(),
3268                        source_ref: Some("src-1".to_owned()),
3269                    },
3270                    OperationalWrite::Delete {
3271                        collection: "connector_health".to_owned(),
3272                        record_key: "gmail".to_owned(),
3273                        source_ref: Some("src-2".to_owned()),
3274                    },
3275                    OperationalWrite::Put {
3276                        collection: "connector_health".to_owned(),
3277                        record_key: "gmail".to_owned(),
3278                        payload_json: r#"{"status":"new"}"#.to_owned(),
3279                        source_ref: Some("src-3".to_owned()),
3280                    },
3281                ],
3282            })
3283            .expect("write receipt");
3284
3285        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3286        let rows: Vec<(String, i64)> = {
3287            let mut stmt = conn
3288                .prepare(
3289                    "SELECT op_kind, mutation_order FROM operational_mutations \
3290                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3291                     ORDER BY mutation_order ASC",
3292                )
3293                .expect("stmt");
3294            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3295                .expect("rows")
3296                .collect::<Result<_, _>>()
3297                .expect("collect")
3298        };
3299        assert_eq!(
3300            rows,
3301            vec![
3302                ("put".to_owned(), 1),
3303                ("delete".to_owned(), 2),
3304                ("put".to_owned(), 3),
3305            ]
3306        );
3307        let payload: String = conn
3308            .query_row(
3309                "SELECT payload_json FROM operational_current \
3310                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3311                [],
3312                |row| row.get(0),
3313            )
3314            .expect("current payload");
3315        assert_eq!(payload, r#"{"status":"new"}"#);
3316    }
3317
3318    #[test]
3319    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3320        let db = NamedTempFile::new().expect("temporary db");
3321        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3322        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3323        conn.execute(
3324            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3325             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3326            [],
3327        )
3328        .expect("seed collection");
3329
3330        let request = WriteRequest {
3331            label: "disabled-race".to_owned(),
3332            nodes: vec![],
3333            node_retires: vec![],
3334            edges: vec![],
3335            edge_retires: vec![],
3336            chunks: vec![],
3337            runs: vec![],
3338            steps: vec![],
3339            actions: vec![],
3340            optional_backfills: vec![],
3341            vec_inserts: vec![],
3342            operational_writes: vec![OperationalWrite::Put {
3343                collection: "connector_health".to_owned(),
3344                record_key: "gmail".to_owned(),
3345                payload_json: r#"{"status":"ok"}"#.to_owned(),
3346                source_ref: Some("src-1".to_owned()),
3347            }],
3348        };
3349        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3350        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3351
3352        conn.execute(
3353            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3354            [],
3355        )
3356        .expect("disable collection after preflight");
3357
3358        let error =
3359            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3360        assert!(matches!(error, EngineError::InvalidWrite(_)));
3361        assert!(error.to_string().contains("is disabled"));
3362
3363        let mutation_count: i64 = conn
3364            .query_row(
3365                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3366                [],
3367                |row| row.get(0),
3368            )
3369            .expect("mutation count");
3370        assert_eq!(mutation_count, 0);
3371
3372        let current_count: i64 = conn
3373            .query_row(
3374                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3375                [],
3376                |row| row.get(0),
3377            )
3378            .expect("current count");
3379        assert_eq!(current_count, 0);
3380    }
3381
3382    #[test]
3383    fn writer_enforce_validation_rejects_invalid_put_atomically() {
3384        let db = NamedTempFile::new().expect("temporary db");
3385        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3386        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3387        conn.execute(
3388            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3389             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3390            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3391        )
3392        .expect("seed collection");
3393        drop(conn);
3394        let writer = WriterActor::start(
3395            db.path(),
3396            Arc::new(SchemaManager::new()),
3397            ProvenanceMode::Warn,
3398            Arc::new(TelemetryCounters::default()),
3399        )
3400        .expect("writer");
3401
3402        let error = writer
3403            .submit(WriteRequest {
3404                label: "invalid-put".to_owned(),
3405                nodes: vec![NodeInsert {
3406                    row_id: "row-1".to_owned(),
3407                    logical_id: "lg-1".to_owned(),
3408                    kind: "Meeting".to_owned(),
3409                    properties: "{}".to_owned(),
3410                    source_ref: Some("src-1".to_owned()),
3411                    upsert: false,
3412                    chunk_policy: ChunkPolicy::Preserve,
3413                    content_ref: None,
3414                }],
3415                node_retires: vec![],
3416                edges: vec![],
3417                edge_retires: vec![],
3418                chunks: vec![],
3419                runs: vec![],
3420                steps: vec![],
3421                actions: vec![],
3422                optional_backfills: vec![],
3423                vec_inserts: vec![],
3424                operational_writes: vec![OperationalWrite::Put {
3425                    collection: "connector_health".to_owned(),
3426                    record_key: "gmail".to_owned(),
3427                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
3428                    source_ref: Some("src-1".to_owned()),
3429                }],
3430            })
3431            .expect_err("invalid put must reject");
3432        assert!(matches!(error, EngineError::InvalidWrite(_)));
3433        assert!(error.to_string().contains("must be one of"));
3434
3435        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3436        let node_count: i64 = conn
3437            .query_row(
3438                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3439                [],
3440                |row| row.get(0),
3441            )
3442            .expect("node count");
3443        assert_eq!(node_count, 0);
3444        let mutation_count: i64 = conn
3445            .query_row(
3446                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3447                [],
3448                |row| row.get(0),
3449            )
3450            .expect("mutation count");
3451        assert_eq!(mutation_count, 0);
3452        let current_count: i64 = conn
3453            .query_row(
3454                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3455                [],
3456                |row| row.get(0),
3457            )
3458            .expect("current count");
3459        assert_eq!(current_count, 0);
3460    }
3461
3462    #[test]
3463    fn writer_rejects_append_against_latest_state_collection() {
3464        let db = NamedTempFile::new().expect("temporary db");
3465        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3466        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3467        conn.execute(
3468            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3469             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3470            [],
3471        )
3472        .expect("seed collection");
3473        drop(conn);
3474        let writer = WriterActor::start(
3475            db.path(),
3476            Arc::new(SchemaManager::new()),
3477            ProvenanceMode::Warn,
3478            Arc::new(TelemetryCounters::default()),
3479        )
3480        .expect("writer");
3481
3482        let result = writer.submit(WriteRequest {
3483            label: "bad-append".to_owned(),
3484            nodes: vec![],
3485            node_retires: vec![],
3486            edges: vec![],
3487            edge_retires: vec![],
3488            chunks: vec![],
3489            runs: vec![],
3490            steps: vec![],
3491            actions: vec![],
3492            optional_backfills: vec![],
3493            vec_inserts: vec![],
3494            operational_writes: vec![OperationalWrite::Append {
3495                collection: "connector_health".to_owned(),
3496                record_key: "gmail".to_owned(),
3497                payload_json: r#"{"status":"ok"}"#.to_owned(),
3498                source_ref: Some("src-1".to_owned()),
3499            }],
3500        });
3501
3502        assert!(
3503            matches!(result, Err(EngineError::InvalidWrite(_))),
3504            "latest_state collection must reject Append"
3505        );
3506    }
3507
3508    #[test]
3509    fn writer_upsert_supersedes_prior_active_node() {
3510        let db = NamedTempFile::new().expect("temporary db");
3511        let writer = WriterActor::start(
3512            db.path(),
3513            Arc::new(SchemaManager::new()),
3514            ProvenanceMode::Warn,
3515            Arc::new(TelemetryCounters::default()),
3516        )
3517        .expect("writer");
3518
3519        writer
3520            .submit(WriteRequest {
3521                label: "v1".to_owned(),
3522                nodes: vec![NodeInsert {
3523                    row_id: "row-1".to_owned(),
3524                    logical_id: "lg-1".to_owned(),
3525                    kind: "Meeting".to_owned(),
3526                    properties: r#"{"version":1}"#.to_owned(),
3527                    source_ref: Some("src-1".to_owned()),
3528                    upsert: false,
3529                    chunk_policy: ChunkPolicy::Preserve,
3530                    content_ref: None,
3531                }],
3532                node_retires: vec![],
3533                edges: vec![],
3534                edge_retires: vec![],
3535                chunks: vec![],
3536                runs: vec![],
3537                steps: vec![],
3538                actions: vec![],
3539                optional_backfills: vec![],
3540                vec_inserts: vec![],
3541                operational_writes: vec![],
3542            })
3543            .expect("v1 write");
3544
3545        writer
3546            .submit(WriteRequest {
3547                label: "v2".to_owned(),
3548                nodes: vec![NodeInsert {
3549                    row_id: "row-2".to_owned(),
3550                    logical_id: "lg-1".to_owned(),
3551                    kind: "Meeting".to_owned(),
3552                    properties: r#"{"version":2}"#.to_owned(),
3553                    source_ref: Some("src-2".to_owned()),
3554                    upsert: true,
3555                    chunk_policy: ChunkPolicy::Preserve,
3556                    content_ref: None,
3557                }],
3558                node_retires: vec![],
3559                edges: vec![],
3560                edge_retires: vec![],
3561                chunks: vec![],
3562                runs: vec![],
3563                steps: vec![],
3564                actions: vec![],
3565                optional_backfills: vec![],
3566                vec_inserts: vec![],
3567                operational_writes: vec![],
3568            })
3569            .expect("v2 upsert write");
3570
3571        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3572        let (active_row_id, props): (String, String) = conn
3573            .query_row(
3574                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3575                [],
3576                |row| Ok((row.get(0)?, row.get(1)?)),
3577            )
3578            .expect("active row");
3579        assert_eq!(active_row_id, "row-2");
3580        assert!(props.contains("\"version\":2"));
3581
3582        let superseded: i64 = conn
3583            .query_row(
3584                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3585                [],
3586                |row| row.get(0),
3587            )
3588            .expect("superseded count");
3589        assert_eq!(superseded, 1);
3590    }
3591
3592    #[test]
3593    fn writer_inserts_edge_between_two_nodes() {
3594        let db = NamedTempFile::new().expect("temporary db");
3595        let writer = WriterActor::start(
3596            db.path(),
3597            Arc::new(SchemaManager::new()),
3598            ProvenanceMode::Warn,
3599            Arc::new(TelemetryCounters::default()),
3600        )
3601        .expect("writer");
3602
3603        writer
3604            .submit(WriteRequest {
3605                label: "nodes-and-edge".to_owned(),
3606                nodes: vec![
3607                    NodeInsert {
3608                        row_id: "row-meeting".to_owned(),
3609                        logical_id: "meeting-1".to_owned(),
3610                        kind: "Meeting".to_owned(),
3611                        properties: "{}".to_owned(),
3612                        source_ref: Some("src-1".to_owned()),
3613                        upsert: false,
3614                        chunk_policy: ChunkPolicy::Preserve,
3615                        content_ref: None,
3616                    },
3617                    NodeInsert {
3618                        row_id: "row-task".to_owned(),
3619                        logical_id: "task-1".to_owned(),
3620                        kind: "Task".to_owned(),
3621                        properties: "{}".to_owned(),
3622                        source_ref: Some("src-1".to_owned()),
3623                        upsert: false,
3624                        chunk_policy: ChunkPolicy::Preserve,
3625                        content_ref: None,
3626                    },
3627                ],
3628                node_retires: vec![],
3629                edges: vec![EdgeInsert {
3630                    row_id: "edge-1".to_owned(),
3631                    logical_id: "edge-lg-1".to_owned(),
3632                    source_logical_id: "meeting-1".to_owned(),
3633                    target_logical_id: "task-1".to_owned(),
3634                    kind: "HAS_TASK".to_owned(),
3635                    properties: "{}".to_owned(),
3636                    source_ref: Some("src-1".to_owned()),
3637                    upsert: false,
3638                }],
3639                edge_retires: vec![],
3640                chunks: vec![],
3641                runs: vec![],
3642                steps: vec![],
3643                actions: vec![],
3644                optional_backfills: vec![],
3645                vec_inserts: vec![],
3646                operational_writes: vec![],
3647            })
3648            .expect("write receipt");
3649
3650        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3651        let (src, tgt, kind): (String, String, String) = conn
3652            .query_row(
3653                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3654                [],
3655                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3656            )
3657            .expect("edge row");
3658        assert_eq!(src, "meeting-1");
3659        assert_eq!(tgt, "task-1");
3660        assert_eq!(kind, "HAS_TASK");
3661    }
3662
3663    #[test]
3664    #[allow(clippy::too_many_lines)]
3665    fn writer_upsert_supersedes_prior_active_edge() {
3666        let db = NamedTempFile::new().expect("temporary db");
3667        let writer = WriterActor::start(
3668            db.path(),
3669            Arc::new(SchemaManager::new()),
3670            ProvenanceMode::Warn,
3671            Arc::new(TelemetryCounters::default()),
3672        )
3673        .expect("writer");
3674
3675        // Write two nodes
3676        writer
3677            .submit(WriteRequest {
3678                label: "nodes".to_owned(),
3679                nodes: vec![
3680                    NodeInsert {
3681                        row_id: "row-a".to_owned(),
3682                        logical_id: "node-a".to_owned(),
3683                        kind: "Meeting".to_owned(),
3684                        properties: "{}".to_owned(),
3685                        source_ref: Some("src-1".to_owned()),
3686                        upsert: false,
3687                        chunk_policy: ChunkPolicy::Preserve,
3688                        content_ref: None,
3689                    },
3690                    NodeInsert {
3691                        row_id: "row-b".to_owned(),
3692                        logical_id: "node-b".to_owned(),
3693                        kind: "Task".to_owned(),
3694                        properties: "{}".to_owned(),
3695                        source_ref: Some("src-1".to_owned()),
3696                        upsert: false,
3697                        chunk_policy: ChunkPolicy::Preserve,
3698                        content_ref: None,
3699                    },
3700                ],
3701                node_retires: vec![],
3702                edges: vec![],
3703                edge_retires: vec![],
3704                chunks: vec![],
3705                runs: vec![],
3706                steps: vec![],
3707                actions: vec![],
3708                optional_backfills: vec![],
3709                vec_inserts: vec![],
3710                operational_writes: vec![],
3711            })
3712            .expect("nodes write");
3713
3714        // Write v1 edge
3715        writer
3716            .submit(WriteRequest {
3717                label: "edge-v1".to_owned(),
3718                nodes: vec![],
3719                node_retires: vec![],
3720                edges: vec![EdgeInsert {
3721                    row_id: "edge-row-1".to_owned(),
3722                    logical_id: "edge-lg-1".to_owned(),
3723                    source_logical_id: "node-a".to_owned(),
3724                    target_logical_id: "node-b".to_owned(),
3725                    kind: "HAS_TASK".to_owned(),
3726                    properties: r#"{"weight":1}"#.to_owned(),
3727                    source_ref: Some("src-1".to_owned()),
3728                    upsert: false,
3729                }],
3730                edge_retires: vec![],
3731                chunks: vec![],
3732                runs: vec![],
3733                steps: vec![],
3734                actions: vec![],
3735                optional_backfills: vec![],
3736                vec_inserts: vec![],
3737                operational_writes: vec![],
3738            })
3739            .expect("edge v1 write");
3740
3741        // Upsert v2 edge
3742        writer
3743            .submit(WriteRequest {
3744                label: "edge-v2".to_owned(),
3745                nodes: vec![],
3746                node_retires: vec![],
3747                edges: vec![EdgeInsert {
3748                    row_id: "edge-row-2".to_owned(),
3749                    logical_id: "edge-lg-1".to_owned(),
3750                    source_logical_id: "node-a".to_owned(),
3751                    target_logical_id: "node-b".to_owned(),
3752                    kind: "HAS_TASK".to_owned(),
3753                    properties: r#"{"weight":2}"#.to_owned(),
3754                    source_ref: Some("src-2".to_owned()),
3755                    upsert: true,
3756                }],
3757                edge_retires: vec![],
3758                chunks: vec![],
3759                runs: vec![],
3760                steps: vec![],
3761                actions: vec![],
3762                optional_backfills: vec![],
3763                vec_inserts: vec![],
3764                operational_writes: vec![],
3765            })
3766            .expect("edge v2 upsert");
3767
3768        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3769        let (active_row_id, props): (String, String) = conn
3770            .query_row(
3771                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3772                [],
3773                |row| Ok((row.get(0)?, row.get(1)?)),
3774            )
3775            .expect("active edge");
3776        assert_eq!(active_row_id, "edge-row-2");
3777        assert!(props.contains("\"weight\":2"));
3778
3779        let superseded: i64 = conn
3780            .query_row(
3781                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3782                [],
3783                |row| row.get(0),
3784            )
3785            .expect("superseded count");
3786        assert_eq!(superseded, 1);
3787    }
3788
3789    #[test]
3790    fn writer_fts_rows_are_written_to_database() {
3791        let db = NamedTempFile::new().expect("temporary db");
3792        let writer = WriterActor::start(
3793            db.path(),
3794            Arc::new(SchemaManager::new()),
3795            ProvenanceMode::Warn,
3796            Arc::new(TelemetryCounters::default()),
3797        )
3798        .expect("writer");
3799
3800        writer
3801            .submit(WriteRequest {
3802                label: "seed".to_owned(),
3803                nodes: vec![NodeInsert {
3804                    row_id: "row-1".to_owned(),
3805                    logical_id: "logical-1".to_owned(),
3806                    kind: "Meeting".to_owned(),
3807                    properties: "{}".to_owned(),
3808                    source_ref: Some("src-1".to_owned()),
3809                    upsert: false,
3810                    chunk_policy: ChunkPolicy::Preserve,
3811                    content_ref: None,
3812                }],
3813                node_retires: vec![],
3814                edges: vec![],
3815                edge_retires: vec![],
3816                chunks: vec![ChunkInsert {
3817                    id: "chunk-1".to_owned(),
3818                    node_logical_id: "logical-1".to_owned(),
3819                    text_content: "budget discussion".to_owned(),
3820                    byte_start: None,
3821                    byte_end: None,
3822                    content_hash: None,
3823                }],
3824                runs: vec![],
3825                steps: vec![],
3826                actions: vec![],
3827                optional_backfills: vec![],
3828                vec_inserts: vec![],
3829                operational_writes: vec![],
3830            })
3831            .expect("write receipt");
3832
3833        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3834        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3835            conn.query_row(
3836                "SELECT chunk_id, node_logical_id, kind, text_content \
3837                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3838                [],
3839                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3840            )
3841            .expect("fts row");
3842        assert_eq!(chunk_id, "chunk-1");
3843        assert_eq!(node_logical_id, "logical-1");
3844        assert_eq!(kind, "Meeting");
3845        assert_eq!(text_content, "budget discussion");
3846    }
3847
3848    #[test]
3849    fn writer_receipt_warns_on_nodes_without_source_ref() {
3850        let db = NamedTempFile::new().expect("temporary db");
3851        let writer = WriterActor::start(
3852            db.path(),
3853            Arc::new(SchemaManager::new()),
3854            ProvenanceMode::Warn,
3855            Arc::new(TelemetryCounters::default()),
3856        )
3857        .expect("writer");
3858
3859        let receipt = writer
3860            .submit(WriteRequest {
3861                label: "no-source".to_owned(),
3862                nodes: vec![NodeInsert {
3863                    row_id: "row-1".to_owned(),
3864                    logical_id: "logical-1".to_owned(),
3865                    kind: "Meeting".to_owned(),
3866                    properties: "{}".to_owned(),
3867                    source_ref: None,
3868                    upsert: false,
3869                    chunk_policy: ChunkPolicy::Preserve,
3870                    content_ref: None,
3871                }],
3872                node_retires: vec![],
3873                edges: vec![],
3874                edge_retires: vec![],
3875                chunks: vec![],
3876                runs: vec![],
3877                steps: vec![],
3878                actions: vec![],
3879                optional_backfills: vec![],
3880                vec_inserts: vec![],
3881                operational_writes: vec![],
3882            })
3883            .expect("write receipt");
3884
3885        assert_eq!(receipt.provenance_warnings.len(), 1);
3886        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3887    }
3888
3889    #[test]
3890    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3891        let db = NamedTempFile::new().expect("temporary db");
3892        let writer = WriterActor::start(
3893            db.path(),
3894            Arc::new(SchemaManager::new()),
3895            ProvenanceMode::Warn,
3896            Arc::new(TelemetryCounters::default()),
3897        )
3898        .expect("writer");
3899
3900        let receipt = writer
3901            .submit(WriteRequest {
3902                label: "with-source".to_owned(),
3903                nodes: vec![NodeInsert {
3904                    row_id: "row-1".to_owned(),
3905                    logical_id: "logical-1".to_owned(),
3906                    kind: "Meeting".to_owned(),
3907                    properties: "{}".to_owned(),
3908                    source_ref: Some("src-1".to_owned()),
3909                    upsert: false,
3910                    chunk_policy: ChunkPolicy::Preserve,
3911                    content_ref: None,
3912                }],
3913                node_retires: vec![],
3914                edges: vec![],
3915                edge_retires: vec![],
3916                chunks: vec![],
3917                runs: vec![],
3918                steps: vec![],
3919                actions: vec![],
3920                optional_backfills: vec![],
3921                vec_inserts: vec![],
3922                operational_writes: vec![],
3923            })
3924            .expect("write receipt");
3925
3926        assert!(receipt.provenance_warnings.is_empty());
3927    }
3928
3929    #[test]
3930    fn writer_accepts_chunk_for_pre_existing_node() {
3931        let db = NamedTempFile::new().expect("temporary db");
3932        let writer = WriterActor::start(
3933            db.path(),
3934            Arc::new(SchemaManager::new()),
3935            ProvenanceMode::Warn,
3936            Arc::new(TelemetryCounters::default()),
3937        )
3938        .expect("writer");
3939
3940        // Request 1: submit node only
3941        writer
3942            .submit(WriteRequest {
3943                label: "r1".to_owned(),
3944                nodes: vec![NodeInsert {
3945                    row_id: "row-1".to_owned(),
3946                    logical_id: "logical-1".to_owned(),
3947                    kind: "Meeting".to_owned(),
3948                    properties: "{}".to_owned(),
3949                    source_ref: Some("src-1".to_owned()),
3950                    upsert: false,
3951                    chunk_policy: ChunkPolicy::Preserve,
3952                    content_ref: None,
3953                }],
3954                node_retires: vec![],
3955                edges: vec![],
3956                edge_retires: vec![],
3957                chunks: vec![],
3958                runs: vec![],
3959                steps: vec![],
3960                actions: vec![],
3961                optional_backfills: vec![],
3962                vec_inserts: vec![],
3963                operational_writes: vec![],
3964            })
3965            .expect("r1 write");
3966
3967        // Request 2: submit chunk for pre-existing node
3968        writer
3969            .submit(WriteRequest {
3970                label: "r2".to_owned(),
3971                nodes: vec![],
3972                node_retires: vec![],
3973                edges: vec![],
3974                edge_retires: vec![],
3975                chunks: vec![ChunkInsert {
3976                    id: "chunk-1".to_owned(),
3977                    node_logical_id: "logical-1".to_owned(),
3978                    text_content: "budget discussion".to_owned(),
3979                    byte_start: None,
3980                    byte_end: None,
3981                    content_hash: None,
3982                }],
3983                runs: vec![],
3984                steps: vec![],
3985                actions: vec![],
3986                optional_backfills: vec![],
3987                vec_inserts: vec![],
3988                operational_writes: vec![],
3989            })
3990            .expect("r2 write — chunk for pre-existing node");
3991
3992        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3993        let count: i64 = conn
3994            .query_row(
3995                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3996                [],
3997                |row| row.get(0),
3998            )
3999            .expect("fts count");
4000        assert_eq!(
4001            count, 1,
4002            "FTS row must exist for chunk attached to pre-existing node"
4003        );
4004    }
4005
4006    #[test]
4007    fn writer_rejects_chunk_for_completely_unknown_node() {
4008        let db = NamedTempFile::new().expect("temporary db");
4009        let writer = WriterActor::start(
4010            db.path(),
4011            Arc::new(SchemaManager::new()),
4012            ProvenanceMode::Warn,
4013            Arc::new(TelemetryCounters::default()),
4014        )
4015        .expect("writer");
4016
4017        let result = writer.submit(WriteRequest {
4018            label: "bad".to_owned(),
4019            nodes: vec![],
4020            node_retires: vec![],
4021            edges: vec![],
4022            edge_retires: vec![],
4023            chunks: vec![ChunkInsert {
4024                id: "chunk-1".to_owned(),
4025                node_logical_id: "nonexistent".to_owned(),
4026                text_content: "some text".to_owned(),
4027                byte_start: None,
4028                byte_end: None,
4029                content_hash: None,
4030            }],
4031            runs: vec![],
4032            steps: vec![],
4033            actions: vec![],
4034            optional_backfills: vec![],
4035            vec_inserts: vec![],
4036            operational_writes: vec![],
4037        });
4038
4039        assert!(
4040            matches!(result, Err(EngineError::InvalidWrite(_))),
4041            "completely unknown node must return InvalidWrite"
4042        );
4043    }
4044
4045    #[test]
4046    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
4047        let db = NamedTempFile::new().expect("temporary db");
4048        let writer = WriterActor::start(
4049            db.path(),
4050            Arc::new(SchemaManager::new()),
4051            ProvenanceMode::Warn,
4052            Arc::new(TelemetryCounters::default()),
4053        )
4054        .expect("writer");
4055
4056        let receipt = writer
4057            .submit(WriteRequest {
4058                label: "seed".to_owned(),
4059                nodes: vec![NodeInsert {
4060                    row_id: "row-1".to_owned(),
4061                    logical_id: "logical-1".to_owned(),
4062                    kind: "Meeting".to_owned(),
4063                    properties: "{}".to_owned(),
4064                    source_ref: None,
4065                    upsert: false,
4066                    chunk_policy: ChunkPolicy::Preserve,
4067                    content_ref: None,
4068                }],
4069                node_retires: vec![],
4070                edges: vec![],
4071                edge_retires: vec![],
4072                chunks: vec![ChunkInsert {
4073                    id: "chunk-1".to_owned(),
4074                    node_logical_id: "logical-1".to_owned(),
4075                    text_content: "budget discussion".to_owned(),
4076                    byte_start: None,
4077                    byte_end: None,
4078                    content_hash: None,
4079                }],
4080                runs: vec![],
4081                steps: vec![],
4082                actions: vec![],
4083                optional_backfills: vec![],
4084                vec_inserts: vec![],
4085                operational_writes: vec![],
4086            })
4087            .expect("write receipt");
4088
4089        assert_eq!(receipt.label, "seed");
4090    }
4091
4092    #[test]
4093    fn writer_node_retire_supersedes_active_node() {
4094        let db = NamedTempFile::new().expect("temporary db");
4095        let writer = WriterActor::start(
4096            db.path(),
4097            Arc::new(SchemaManager::new()),
4098            ProvenanceMode::Warn,
4099            Arc::new(TelemetryCounters::default()),
4100        )
4101        .expect("writer");
4102
4103        writer
4104            .submit(WriteRequest {
4105                label: "seed".to_owned(),
4106                nodes: vec![NodeInsert {
4107                    row_id: "row-1".to_owned(),
4108                    logical_id: "meeting-1".to_owned(),
4109                    kind: "Meeting".to_owned(),
4110                    properties: "{}".to_owned(),
4111                    source_ref: Some("src-1".to_owned()),
4112                    upsert: false,
4113                    chunk_policy: ChunkPolicy::Preserve,
4114                    content_ref: None,
4115                }],
4116                node_retires: vec![],
4117                edges: vec![],
4118                edge_retires: vec![],
4119                chunks: vec![],
4120                runs: vec![],
4121                steps: vec![],
4122                actions: vec![],
4123                optional_backfills: vec![],
4124                vec_inserts: vec![],
4125                operational_writes: vec![],
4126            })
4127            .expect("seed write");
4128
4129        writer
4130            .submit(WriteRequest {
4131                label: "retire".to_owned(),
4132                nodes: vec![],
4133                node_retires: vec![NodeRetire {
4134                    logical_id: "meeting-1".to_owned(),
4135                    source_ref: Some("src-2".to_owned()),
4136                }],
4137                edges: vec![],
4138                edge_retires: vec![],
4139                chunks: vec![],
4140                runs: vec![],
4141                steps: vec![],
4142                actions: vec![],
4143                optional_backfills: vec![],
4144                vec_inserts: vec![],
4145                operational_writes: vec![],
4146            })
4147            .expect("retire write");
4148
4149        let conn = rusqlite::Connection::open(db.path()).expect("open");
4150        let active: i64 = conn
4151            .query_row(
4152                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
4153                [],
4154                |r| r.get(0),
4155            )
4156            .expect("count active");
4157        let historical: i64 = conn
4158            .query_row(
4159                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
4160                [],
4161                |r| r.get(0),
4162            )
4163            .expect("count historical");
4164
4165        assert_eq!(active, 0, "active count must be 0 after retire");
4166        assert_eq!(historical, 1, "historical count must be 1 after retire");
4167    }
4168
4169    #[test]
4170    fn writer_node_retire_preserves_chunks_and_clears_fts() {
4171        let db = NamedTempFile::new().expect("temporary db");
4172        let writer = WriterActor::start(
4173            db.path(),
4174            Arc::new(SchemaManager::new()),
4175            ProvenanceMode::Warn,
4176            Arc::new(TelemetryCounters::default()),
4177        )
4178        .expect("writer");
4179
4180        writer
4181            .submit(WriteRequest {
4182                label: "seed".to_owned(),
4183                nodes: vec![NodeInsert {
4184                    row_id: "row-1".to_owned(),
4185                    logical_id: "meeting-1".to_owned(),
4186                    kind: "Meeting".to_owned(),
4187                    properties: "{}".to_owned(),
4188                    source_ref: Some("src-1".to_owned()),
4189                    upsert: false,
4190                    chunk_policy: ChunkPolicy::Preserve,
4191                    content_ref: None,
4192                }],
4193                node_retires: vec![],
4194                edges: vec![],
4195                edge_retires: vec![],
4196                chunks: vec![ChunkInsert {
4197                    id: "chunk-1".to_owned(),
4198                    node_logical_id: "meeting-1".to_owned(),
4199                    text_content: "budget discussion".to_owned(),
4200                    byte_start: None,
4201                    byte_end: None,
4202                    content_hash: None,
4203                }],
4204                runs: vec![],
4205                steps: vec![],
4206                actions: vec![],
4207                optional_backfills: vec![],
4208                vec_inserts: vec![],
4209                operational_writes: vec![],
4210            })
4211            .expect("seed write");
4212
4213        writer
4214            .submit(WriteRequest {
4215                label: "retire".to_owned(),
4216                nodes: vec![],
4217                node_retires: vec![NodeRetire {
4218                    logical_id: "meeting-1".to_owned(),
4219                    source_ref: Some("src-2".to_owned()),
4220                }],
4221                edges: vec![],
4222                edge_retires: vec![],
4223                chunks: vec![],
4224                runs: vec![],
4225                steps: vec![],
4226                actions: vec![],
4227                optional_backfills: vec![],
4228                vec_inserts: vec![],
4229                operational_writes: vec![],
4230            })
4231            .expect("retire write");
4232
4233        let conn = rusqlite::Connection::open(db.path()).expect("open");
4234        let chunk_count: i64 = conn
4235            .query_row(
4236                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4237                [],
4238                |r| r.get(0),
4239            )
4240            .expect("chunk count");
4241        let fts_count: i64 = conn
4242            .query_row(
4243                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4244                [],
4245                |r| r.get(0),
4246            )
4247            .expect("fts count");
4248
4249        assert_eq!(
4250            chunk_count, 1,
4251            "chunks must remain after node retire so restore can re-establish content"
4252        );
4253        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4254    }
4255
4256    #[test]
4257    fn writer_edge_retire_supersedes_active_edge() {
4258        let db = NamedTempFile::new().expect("temporary db");
4259        let writer = WriterActor::start(
4260            db.path(),
4261            Arc::new(SchemaManager::new()),
4262            ProvenanceMode::Warn,
4263            Arc::new(TelemetryCounters::default()),
4264        )
4265        .expect("writer");
4266
4267        writer
4268            .submit(WriteRequest {
4269                label: "seed".to_owned(),
4270                nodes: vec![
4271                    NodeInsert {
4272                        row_id: "row-a".to_owned(),
4273                        logical_id: "node-a".to_owned(),
4274                        kind: "Meeting".to_owned(),
4275                        properties: "{}".to_owned(),
4276                        source_ref: Some("src-1".to_owned()),
4277                        upsert: false,
4278                        chunk_policy: ChunkPolicy::Preserve,
4279                        content_ref: None,
4280                    },
4281                    NodeInsert {
4282                        row_id: "row-b".to_owned(),
4283                        logical_id: "node-b".to_owned(),
4284                        kind: "Task".to_owned(),
4285                        properties: "{}".to_owned(),
4286                        source_ref: Some("src-1".to_owned()),
4287                        upsert: false,
4288                        chunk_policy: ChunkPolicy::Preserve,
4289                        content_ref: None,
4290                    },
4291                ],
4292                node_retires: vec![],
4293                edges: vec![EdgeInsert {
4294                    row_id: "edge-1".to_owned(),
4295                    logical_id: "edge-lg-1".to_owned(),
4296                    source_logical_id: "node-a".to_owned(),
4297                    target_logical_id: "node-b".to_owned(),
4298                    kind: "HAS_TASK".to_owned(),
4299                    properties: "{}".to_owned(),
4300                    source_ref: Some("src-1".to_owned()),
4301                    upsert: false,
4302                }],
4303                edge_retires: vec![],
4304                chunks: vec![],
4305                runs: vec![],
4306                steps: vec![],
4307                actions: vec![],
4308                optional_backfills: vec![],
4309                vec_inserts: vec![],
4310                operational_writes: vec![],
4311            })
4312            .expect("seed write");
4313
4314        writer
4315            .submit(WriteRequest {
4316                label: "retire-edge".to_owned(),
4317                nodes: vec![],
4318                node_retires: vec![],
4319                edges: vec![],
4320                edge_retires: vec![EdgeRetire {
4321                    logical_id: "edge-lg-1".to_owned(),
4322                    source_ref: Some("src-2".to_owned()),
4323                }],
4324                chunks: vec![],
4325                runs: vec![],
4326                steps: vec![],
4327                actions: vec![],
4328                optional_backfills: vec![],
4329                vec_inserts: vec![],
4330                operational_writes: vec![],
4331            })
4332            .expect("retire edge write");
4333
4334        let conn = rusqlite::Connection::open(db.path()).expect("open");
4335        let active: i64 = conn
4336            .query_row(
4337                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4338                [],
4339                |r| r.get(0),
4340            )
4341            .expect("active edge count");
4342
4343        assert_eq!(active, 0, "active edge count must be 0 after retire");
4344    }
4345
4346    #[test]
4347    fn writer_retire_without_source_ref_emits_provenance_warning() {
4348        let db = NamedTempFile::new().expect("temporary db");
4349        let writer = WriterActor::start(
4350            db.path(),
4351            Arc::new(SchemaManager::new()),
4352            ProvenanceMode::Warn,
4353            Arc::new(TelemetryCounters::default()),
4354        )
4355        .expect("writer");
4356
4357        writer
4358            .submit(WriteRequest {
4359                label: "seed".to_owned(),
4360                nodes: vec![NodeInsert {
4361                    row_id: "row-1".to_owned(),
4362                    logical_id: "meeting-1".to_owned(),
4363                    kind: "Meeting".to_owned(),
4364                    properties: "{}".to_owned(),
4365                    source_ref: Some("src-1".to_owned()),
4366                    upsert: false,
4367                    chunk_policy: ChunkPolicy::Preserve,
4368                    content_ref: None,
4369                }],
4370                node_retires: vec![],
4371                edges: vec![],
4372                edge_retires: vec![],
4373                chunks: vec![],
4374                runs: vec![],
4375                steps: vec![],
4376                actions: vec![],
4377                optional_backfills: vec![],
4378                vec_inserts: vec![],
4379                operational_writes: vec![],
4380            })
4381            .expect("seed write");
4382
4383        let receipt = writer
4384            .submit(WriteRequest {
4385                label: "retire-no-src".to_owned(),
4386                nodes: vec![],
4387                node_retires: vec![NodeRetire {
4388                    logical_id: "meeting-1".to_owned(),
4389                    source_ref: None,
4390                }],
4391                edges: vec![],
4392                edge_retires: vec![],
4393                chunks: vec![],
4394                runs: vec![],
4395                steps: vec![],
4396                actions: vec![],
4397                optional_backfills: vec![],
4398                vec_inserts: vec![],
4399                operational_writes: vec![],
4400            })
4401            .expect("retire write");
4402
4403        assert!(
4404            !receipt.provenance_warnings.is_empty(),
4405            "retire without source_ref must emit a provenance warning"
4406        );
4407    }
4408
4409    #[test]
4410    #[allow(clippy::too_many_lines)]
4411    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4412        let db = NamedTempFile::new().expect("temporary db");
4413        let writer = WriterActor::start(
4414            db.path(),
4415            Arc::new(SchemaManager::new()),
4416            ProvenanceMode::Warn,
4417            Arc::new(TelemetryCounters::default()),
4418        )
4419        .expect("writer");
4420
4421        writer
4422            .submit(WriteRequest {
4423                label: "v1".to_owned(),
4424                nodes: vec![NodeInsert {
4425                    row_id: "row-1".to_owned(),
4426                    logical_id: "meeting-1".to_owned(),
4427                    kind: "Meeting".to_owned(),
4428                    properties: "{}".to_owned(),
4429                    source_ref: Some("src-1".to_owned()),
4430                    upsert: false,
4431                    chunk_policy: ChunkPolicy::Preserve,
4432                    content_ref: None,
4433                }],
4434                node_retires: vec![],
4435                edges: vec![],
4436                edge_retires: vec![],
4437                chunks: vec![ChunkInsert {
4438                    id: "chunk-old".to_owned(),
4439                    node_logical_id: "meeting-1".to_owned(),
4440                    text_content: "old text".to_owned(),
4441                    byte_start: None,
4442                    byte_end: None,
4443                    content_hash: None,
4444                }],
4445                runs: vec![],
4446                steps: vec![],
4447                actions: vec![],
4448                optional_backfills: vec![],
4449                vec_inserts: vec![],
4450                operational_writes: vec![],
4451            })
4452            .expect("v1 write");
4453
4454        writer
4455            .submit(WriteRequest {
4456                label: "v2".to_owned(),
4457                nodes: vec![NodeInsert {
4458                    row_id: "row-2".to_owned(),
4459                    logical_id: "meeting-1".to_owned(),
4460                    kind: "Meeting".to_owned(),
4461                    properties: "{}".to_owned(),
4462                    source_ref: Some("src-2".to_owned()),
4463                    upsert: true,
4464                    chunk_policy: ChunkPolicy::Replace,
4465                    content_ref: None,
4466                }],
4467                node_retires: vec![],
4468                edges: vec![],
4469                edge_retires: vec![],
4470                chunks: vec![ChunkInsert {
4471                    id: "chunk-new".to_owned(),
4472                    node_logical_id: "meeting-1".to_owned(),
4473                    text_content: "new text".to_owned(),
4474                    byte_start: None,
4475                    byte_end: None,
4476                    content_hash: None,
4477                }],
4478                runs: vec![],
4479                steps: vec![],
4480                actions: vec![],
4481                optional_backfills: vec![],
4482                vec_inserts: vec![],
4483                operational_writes: vec![],
4484            })
4485            .expect("v2 write");
4486
4487        let conn = rusqlite::Connection::open(db.path()).expect("open");
4488        let old_chunk: i64 = conn
4489            .query_row(
4490                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4491                [],
4492                |r| r.get(0),
4493            )
4494            .expect("old chunk count");
4495        let new_chunk: i64 = conn
4496            .query_row(
4497                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4498                [],
4499                |r| r.get(0),
4500            )
4501            .expect("new chunk count");
4502        let fts_old: i64 = conn
4503            .query_row(
4504                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4505                [],
4506                |r| r.get(0),
4507            )
4508            .expect("old fts count");
4509
4510        assert_eq!(
4511            old_chunk, 0,
4512            "old chunk must be deleted by ChunkPolicy::Replace"
4513        );
4514        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4515        assert_eq!(
4516            fts_old, 0,
4517            "old FTS row must be deleted by ChunkPolicy::Replace"
4518        );
4519    }
4520
4521    #[test]
4522    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4523        let db = NamedTempFile::new().expect("temporary db");
4524        let writer = WriterActor::start(
4525            db.path(),
4526            Arc::new(SchemaManager::new()),
4527            ProvenanceMode::Warn,
4528            Arc::new(TelemetryCounters::default()),
4529        )
4530        .expect("writer");
4531
4532        writer
4533            .submit(WriteRequest {
4534                label: "v1".to_owned(),
4535                nodes: vec![NodeInsert {
4536                    row_id: "row-1".to_owned(),
4537                    logical_id: "meeting-1".to_owned(),
4538                    kind: "Meeting".to_owned(),
4539                    properties: "{}".to_owned(),
4540                    source_ref: Some("src-1".to_owned()),
4541                    upsert: false,
4542                    chunk_policy: ChunkPolicy::Preserve,
4543                    content_ref: None,
4544                }],
4545                node_retires: vec![],
4546                edges: vec![],
4547                edge_retires: vec![],
4548                chunks: vec![ChunkInsert {
4549                    id: "chunk-old".to_owned(),
4550                    node_logical_id: "meeting-1".to_owned(),
4551                    text_content: "old text".to_owned(),
4552                    byte_start: None,
4553                    byte_end: None,
4554                    content_hash: None,
4555                }],
4556                runs: vec![],
4557                steps: vec![],
4558                actions: vec![],
4559                optional_backfills: vec![],
4560                vec_inserts: vec![],
4561                operational_writes: vec![],
4562            })
4563            .expect("v1 write");
4564
4565        writer
4566            .submit(WriteRequest {
4567                label: "v2-props-only".to_owned(),
4568                nodes: vec![NodeInsert {
4569                    row_id: "row-2".to_owned(),
4570                    logical_id: "meeting-1".to_owned(),
4571                    kind: "Meeting".to_owned(),
4572                    properties: r#"{"status":"updated"}"#.to_owned(),
4573                    source_ref: Some("src-2".to_owned()),
4574                    upsert: true,
4575                    chunk_policy: ChunkPolicy::Preserve,
4576                    content_ref: None,
4577                }],
4578                node_retires: vec![],
4579                edges: vec![],
4580                edge_retires: vec![],
4581                chunks: vec![],
4582                runs: vec![],
4583                steps: vec![],
4584                actions: vec![],
4585                optional_backfills: vec![],
4586                vec_inserts: vec![],
4587                operational_writes: vec![],
4588            })
4589            .expect("v2 preserve write");
4590
4591        let conn = rusqlite::Connection::open(db.path()).expect("open");
4592        let old_chunk: i64 = conn
4593            .query_row(
4594                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4595                [],
4596                |r| r.get(0),
4597            )
4598            .expect("old chunk count");
4599
4600        assert_eq!(
4601            old_chunk, 1,
4602            "old chunk must be preserved by ChunkPolicy::Preserve"
4603        );
4604    }
4605
4606    #[test]
4607    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4608        let db = NamedTempFile::new().expect("temporary db");
4609        let writer = WriterActor::start(
4610            db.path(),
4611            Arc::new(SchemaManager::new()),
4612            ProvenanceMode::Warn,
4613            Arc::new(TelemetryCounters::default()),
4614        )
4615        .expect("writer");
4616
4617        writer
4618            .submit(WriteRequest {
4619                label: "v1".to_owned(),
4620                nodes: vec![NodeInsert {
4621                    row_id: "row-1".to_owned(),
4622                    logical_id: "meeting-1".to_owned(),
4623                    kind: "Meeting".to_owned(),
4624                    properties: "{}".to_owned(),
4625                    source_ref: Some("src-1".to_owned()),
4626                    upsert: false,
4627                    chunk_policy: ChunkPolicy::Preserve,
4628                    content_ref: None,
4629                }],
4630                node_retires: vec![],
4631                edges: vec![],
4632                edge_retires: vec![],
4633                chunks: vec![ChunkInsert {
4634                    id: "chunk-existing".to_owned(),
4635                    node_logical_id: "meeting-1".to_owned(),
4636                    text_content: "existing text".to_owned(),
4637                    byte_start: None,
4638                    byte_end: None,
4639                    content_hash: None,
4640                }],
4641                runs: vec![],
4642                steps: vec![],
4643                actions: vec![],
4644                optional_backfills: vec![],
4645                vec_inserts: vec![],
4646                operational_writes: vec![],
4647            })
4648            .expect("v1 write");
4649
4650        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4651        writer
4652            .submit(WriteRequest {
4653                label: "insert-no-upsert".to_owned(),
4654                nodes: vec![NodeInsert {
4655                    row_id: "row-2".to_owned(),
4656                    logical_id: "meeting-2".to_owned(),
4657                    kind: "Meeting".to_owned(),
4658                    properties: "{}".to_owned(),
4659                    source_ref: Some("src-2".to_owned()),
4660                    upsert: false,
4661                    chunk_policy: ChunkPolicy::Replace,
4662                    content_ref: None,
4663                }],
4664                node_retires: vec![],
4665                edges: vec![],
4666                edge_retires: vec![],
4667                chunks: vec![],
4668                runs: vec![],
4669                steps: vec![],
4670                actions: vec![],
4671                optional_backfills: vec![],
4672                vec_inserts: vec![],
4673                operational_writes: vec![],
4674            })
4675            .expect("insert no-upsert write");
4676
4677        let conn = rusqlite::Connection::open(db.path()).expect("open");
4678        let existing_chunk: i64 = conn
4679            .query_row(
4680                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4681                [],
4682                |r| r.get(0),
4683            )
4684            .expect("chunk count");
4685
4686        assert_eq!(
4687            existing_chunk, 1,
4688            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4689        );
4690    }
4691
4692    #[test]
4693    fn writer_run_upsert_supersedes_prior_active_run() {
4694        let db = NamedTempFile::new().expect("temporary db");
4695        let writer = WriterActor::start(
4696            db.path(),
4697            Arc::new(SchemaManager::new()),
4698            ProvenanceMode::Warn,
4699            Arc::new(TelemetryCounters::default()),
4700        )
4701        .expect("writer");
4702
4703        writer
4704            .submit(WriteRequest {
4705                label: "v1".to_owned(),
4706                nodes: vec![],
4707                node_retires: vec![],
4708                edges: vec![],
4709                edge_retires: vec![],
4710                chunks: vec![],
4711                runs: vec![RunInsert {
4712                    id: "run-v1".to_owned(),
4713                    kind: "session".to_owned(),
4714                    status: "completed".to_owned(),
4715                    properties: "{}".to_owned(),
4716                    source_ref: Some("src-1".to_owned()),
4717                    upsert: false,
4718                    supersedes_id: None,
4719                }],
4720                steps: vec![],
4721                actions: vec![],
4722                optional_backfills: vec![],
4723                vec_inserts: vec![],
4724                operational_writes: vec![],
4725            })
4726            .expect("v1 run write");
4727
4728        writer
4729            .submit(WriteRequest {
4730                label: "v2".to_owned(),
4731                nodes: vec![],
4732                node_retires: vec![],
4733                edges: vec![],
4734                edge_retires: vec![],
4735                chunks: vec![],
4736                runs: vec![RunInsert {
4737                    id: "run-v2".to_owned(),
4738                    kind: "session".to_owned(),
4739                    status: "completed".to_owned(),
4740                    properties: "{}".to_owned(),
4741                    source_ref: Some("src-2".to_owned()),
4742                    upsert: true,
4743                    supersedes_id: Some("run-v1".to_owned()),
4744                }],
4745                steps: vec![],
4746                actions: vec![],
4747                optional_backfills: vec![],
4748                vec_inserts: vec![],
4749                operational_writes: vec![],
4750            })
4751            .expect("v2 run write");
4752
4753        let conn = rusqlite::Connection::open(db.path()).expect("open");
4754        let v1_historical: i64 = conn
4755            .query_row(
4756                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4757                [],
4758                |r| r.get(0),
4759            )
4760            .expect("v1 historical count");
4761        let v2_active: i64 = conn
4762            .query_row(
4763                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4764                [],
4765                |r| r.get(0),
4766            )
4767            .expect("v2 active count");
4768
4769        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4770        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4771    }
4772
4773    #[test]
4774    fn writer_step_upsert_supersedes_prior_active_step() {
4775        let db = NamedTempFile::new().expect("temporary db");
4776        let writer = WriterActor::start(
4777            db.path(),
4778            Arc::new(SchemaManager::new()),
4779            ProvenanceMode::Warn,
4780            Arc::new(TelemetryCounters::default()),
4781        )
4782        .expect("writer");
4783
4784        writer
4785            .submit(WriteRequest {
4786                label: "v1".to_owned(),
4787                nodes: vec![],
4788                node_retires: vec![],
4789                edges: vec![],
4790                edge_retires: vec![],
4791                chunks: vec![],
4792                runs: vec![RunInsert {
4793                    id: "run-1".to_owned(),
4794                    kind: "session".to_owned(),
4795                    status: "completed".to_owned(),
4796                    properties: "{}".to_owned(),
4797                    source_ref: Some("src-1".to_owned()),
4798                    upsert: false,
4799                    supersedes_id: None,
4800                }],
4801                steps: vec![StepInsert {
4802                    id: "step-v1".to_owned(),
4803                    run_id: "run-1".to_owned(),
4804                    kind: "llm".to_owned(),
4805                    status: "completed".to_owned(),
4806                    properties: "{}".to_owned(),
4807                    source_ref: Some("src-1".to_owned()),
4808                    upsert: false,
4809                    supersedes_id: None,
4810                }],
4811                actions: vec![],
4812                optional_backfills: vec![],
4813                vec_inserts: vec![],
4814                operational_writes: vec![],
4815            })
4816            .expect("v1 step write");
4817
4818        writer
4819            .submit(WriteRequest {
4820                label: "v2".to_owned(),
4821                nodes: vec![],
4822                node_retires: vec![],
4823                edges: vec![],
4824                edge_retires: vec![],
4825                chunks: vec![],
4826                runs: vec![],
4827                steps: vec![StepInsert {
4828                    id: "step-v2".to_owned(),
4829                    run_id: "run-1".to_owned(),
4830                    kind: "llm".to_owned(),
4831                    status: "completed".to_owned(),
4832                    properties: "{}".to_owned(),
4833                    source_ref: Some("src-2".to_owned()),
4834                    upsert: true,
4835                    supersedes_id: Some("step-v1".to_owned()),
4836                }],
4837                actions: vec![],
4838                optional_backfills: vec![],
4839                vec_inserts: vec![],
4840                operational_writes: vec![],
4841            })
4842            .expect("v2 step write");
4843
4844        let conn = rusqlite::Connection::open(db.path()).expect("open");
4845        let v1_historical: i64 = conn
4846            .query_row(
4847                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4848                [],
4849                |r| r.get(0),
4850            )
4851            .expect("v1 historical count");
4852        let v2_active: i64 = conn
4853            .query_row(
4854                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4855                [],
4856                |r| r.get(0),
4857            )
4858            .expect("v2 active count");
4859
4860        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4861        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4862    }
4863
4864    #[test]
4865    fn writer_action_upsert_supersedes_prior_active_action() {
4866        let db = NamedTempFile::new().expect("temporary db");
4867        let writer = WriterActor::start(
4868            db.path(),
4869            Arc::new(SchemaManager::new()),
4870            ProvenanceMode::Warn,
4871            Arc::new(TelemetryCounters::default()),
4872        )
4873        .expect("writer");
4874
4875        writer
4876            .submit(WriteRequest {
4877                label: "v1".to_owned(),
4878                nodes: vec![],
4879                node_retires: vec![],
4880                edges: vec![],
4881                edge_retires: vec![],
4882                chunks: vec![],
4883                runs: vec![RunInsert {
4884                    id: "run-1".to_owned(),
4885                    kind: "session".to_owned(),
4886                    status: "completed".to_owned(),
4887                    properties: "{}".to_owned(),
4888                    source_ref: Some("src-1".to_owned()),
4889                    upsert: false,
4890                    supersedes_id: None,
4891                }],
4892                steps: vec![StepInsert {
4893                    id: "step-1".to_owned(),
4894                    run_id: "run-1".to_owned(),
4895                    kind: "llm".to_owned(),
4896                    status: "completed".to_owned(),
4897                    properties: "{}".to_owned(),
4898                    source_ref: Some("src-1".to_owned()),
4899                    upsert: false,
4900                    supersedes_id: None,
4901                }],
4902                actions: vec![ActionInsert {
4903                    id: "action-v1".to_owned(),
4904                    step_id: "step-1".to_owned(),
4905                    kind: "emit".to_owned(),
4906                    status: "completed".to_owned(),
4907                    properties: "{}".to_owned(),
4908                    source_ref: Some("src-1".to_owned()),
4909                    upsert: false,
4910                    supersedes_id: None,
4911                }],
4912                optional_backfills: vec![],
4913                vec_inserts: vec![],
4914                operational_writes: vec![],
4915            })
4916            .expect("v1 action write");
4917
4918        writer
4919            .submit(WriteRequest {
4920                label: "v2".to_owned(),
4921                nodes: vec![],
4922                node_retires: vec![],
4923                edges: vec![],
4924                edge_retires: vec![],
4925                chunks: vec![],
4926                runs: vec![],
4927                steps: vec![],
4928                actions: vec![ActionInsert {
4929                    id: "action-v2".to_owned(),
4930                    step_id: "step-1".to_owned(),
4931                    kind: "emit".to_owned(),
4932                    status: "completed".to_owned(),
4933                    properties: "{}".to_owned(),
4934                    source_ref: Some("src-2".to_owned()),
4935                    upsert: true,
4936                    supersedes_id: Some("action-v1".to_owned()),
4937                }],
4938                optional_backfills: vec![],
4939                vec_inserts: vec![],
4940                operational_writes: vec![],
4941            })
4942            .expect("v2 action write");
4943
4944        let conn = rusqlite::Connection::open(db.path()).expect("open");
4945        let v1_historical: i64 = conn
4946            .query_row(
4947                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4948                [],
4949                |r| r.get(0),
4950            )
4951            .expect("v1 historical count");
4952        let v2_active: i64 = conn
4953            .query_row(
4954                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4955                [],
4956                |r| r.get(0),
4957            )
4958            .expect("v2 active count");
4959
4960        assert_eq!(
4961            v1_historical, 1,
4962            "action-v1 must be historical after upsert"
4963        );
4964        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4965    }
4966
4967    // P0: runtime upsert without supersedes_id must be rejected
4968
4969    #[test]
4970    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4971        let db = NamedTempFile::new().expect("temporary db");
4972        let writer = WriterActor::start(
4973            db.path(),
4974            Arc::new(SchemaManager::new()),
4975            ProvenanceMode::Warn,
4976            Arc::new(TelemetryCounters::default()),
4977        )
4978        .expect("writer");
4979
4980        let result = writer.submit(WriteRequest {
4981            label: "bad".to_owned(),
4982            nodes: vec![],
4983            node_retires: vec![],
4984            edges: vec![],
4985            edge_retires: vec![],
4986            chunks: vec![],
4987            runs: vec![RunInsert {
4988                id: "run-1".to_owned(),
4989                kind: "session".to_owned(),
4990                status: "completed".to_owned(),
4991                properties: "{}".to_owned(),
4992                source_ref: None,
4993                upsert: true,
4994                supersedes_id: None,
4995            }],
4996            steps: vec![],
4997            actions: vec![],
4998            optional_backfills: vec![],
4999            vec_inserts: vec![],
5000            operational_writes: vec![],
5001        });
5002
5003        assert!(
5004            matches!(result, Err(EngineError::InvalidWrite(_))),
5005            "run upsert=true without supersedes_id must return InvalidWrite"
5006        );
5007    }
5008
5009    #[test]
5010    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
5011        let db = NamedTempFile::new().expect("temporary db");
5012        let writer = WriterActor::start(
5013            db.path(),
5014            Arc::new(SchemaManager::new()),
5015            ProvenanceMode::Warn,
5016            Arc::new(TelemetryCounters::default()),
5017        )
5018        .expect("writer");
5019
5020        let result = writer.submit(WriteRequest {
5021            label: "bad".to_owned(),
5022            nodes: vec![],
5023            node_retires: vec![],
5024            edges: vec![],
5025            edge_retires: vec![],
5026            chunks: vec![],
5027            runs: vec![],
5028            steps: vec![StepInsert {
5029                id: "step-1".to_owned(),
5030                run_id: "run-1".to_owned(),
5031                kind: "llm".to_owned(),
5032                status: "completed".to_owned(),
5033                properties: "{}".to_owned(),
5034                source_ref: None,
5035                upsert: true,
5036                supersedes_id: None,
5037            }],
5038            actions: vec![],
5039            optional_backfills: vec![],
5040            vec_inserts: vec![],
5041            operational_writes: vec![],
5042        });
5043
5044        assert!(
5045            matches!(result, Err(EngineError::InvalidWrite(_))),
5046            "step upsert=true without supersedes_id must return InvalidWrite"
5047        );
5048    }
5049
5050    #[test]
5051    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
5052        let db = NamedTempFile::new().expect("temporary db");
5053        let writer = WriterActor::start(
5054            db.path(),
5055            Arc::new(SchemaManager::new()),
5056            ProvenanceMode::Warn,
5057            Arc::new(TelemetryCounters::default()),
5058        )
5059        .expect("writer");
5060
5061        let result = writer.submit(WriteRequest {
5062            label: "bad".to_owned(),
5063            nodes: vec![],
5064            node_retires: vec![],
5065            edges: vec![],
5066            edge_retires: vec![],
5067            chunks: vec![],
5068            runs: vec![],
5069            steps: vec![],
5070            actions: vec![ActionInsert {
5071                id: "action-1".to_owned(),
5072                step_id: "step-1".to_owned(),
5073                kind: "emit".to_owned(),
5074                status: "completed".to_owned(),
5075                properties: "{}".to_owned(),
5076                source_ref: None,
5077                upsert: true,
5078                supersedes_id: None,
5079            }],
5080            optional_backfills: vec![],
5081            vec_inserts: vec![],
5082            operational_writes: vec![],
5083        });
5084
5085        assert!(
5086            matches!(result, Err(EngineError::InvalidWrite(_))),
5087            "action upsert=true without supersedes_id must return InvalidWrite"
5088        );
5089    }
5090
5091    // P1a/b: provenance warnings for edge inserts and runtime table inserts
5092
5093    #[test]
5094    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
5095        let db = NamedTempFile::new().expect("temporary db");
5096        let writer = WriterActor::start(
5097            db.path(),
5098            Arc::new(SchemaManager::new()),
5099            ProvenanceMode::Warn,
5100            Arc::new(TelemetryCounters::default()),
5101        )
5102        .expect("writer");
5103
5104        let receipt = writer
5105            .submit(WriteRequest {
5106                label: "test".to_owned(),
5107                nodes: vec![
5108                    NodeInsert {
5109                        row_id: "row-a".to_owned(),
5110                        logical_id: "node-a".to_owned(),
5111                        kind: "Meeting".to_owned(),
5112                        properties: "{}".to_owned(),
5113                        source_ref: Some("src-1".to_owned()),
5114                        upsert: false,
5115                        chunk_policy: ChunkPolicy::Preserve,
5116                        content_ref: None,
5117                    },
5118                    NodeInsert {
5119                        row_id: "row-b".to_owned(),
5120                        logical_id: "node-b".to_owned(),
5121                        kind: "Task".to_owned(),
5122                        properties: "{}".to_owned(),
5123                        source_ref: Some("src-1".to_owned()),
5124                        upsert: false,
5125                        chunk_policy: ChunkPolicy::Preserve,
5126                        content_ref: None,
5127                    },
5128                ],
5129                node_retires: vec![],
5130                edges: vec![EdgeInsert {
5131                    row_id: "edge-1".to_owned(),
5132                    logical_id: "edge-lg-1".to_owned(),
5133                    source_logical_id: "node-a".to_owned(),
5134                    target_logical_id: "node-b".to_owned(),
5135                    kind: "HAS_TASK".to_owned(),
5136                    properties: "{}".to_owned(),
5137                    source_ref: None,
5138                    upsert: false,
5139                }],
5140                edge_retires: vec![],
5141                chunks: vec![],
5142                runs: vec![],
5143                steps: vec![],
5144                actions: vec![],
5145                optional_backfills: vec![],
5146                vec_inserts: vec![],
5147                operational_writes: vec![],
5148            })
5149            .expect("write");
5150
5151        assert!(
5152            !receipt.provenance_warnings.is_empty(),
5153            "edge insert without source_ref must emit a provenance warning"
5154        );
5155    }
5156
5157    #[test]
5158    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
5159        let db = NamedTempFile::new().expect("temporary db");
5160        let writer = WriterActor::start(
5161            db.path(),
5162            Arc::new(SchemaManager::new()),
5163            ProvenanceMode::Warn,
5164            Arc::new(TelemetryCounters::default()),
5165        )
5166        .expect("writer");
5167
5168        let receipt = writer
5169            .submit(WriteRequest {
5170                label: "test".to_owned(),
5171                nodes: vec![],
5172                node_retires: vec![],
5173                edges: vec![],
5174                edge_retires: vec![],
5175                chunks: vec![],
5176                runs: vec![RunInsert {
5177                    id: "run-1".to_owned(),
5178                    kind: "session".to_owned(),
5179                    status: "completed".to_owned(),
5180                    properties: "{}".to_owned(),
5181                    source_ref: None,
5182                    upsert: false,
5183                    supersedes_id: None,
5184                }],
5185                steps: vec![],
5186                actions: vec![],
5187                optional_backfills: vec![],
5188                vec_inserts: vec![],
5189                operational_writes: vec![],
5190            })
5191            .expect("write");
5192
5193        assert!(
5194            !receipt.provenance_warnings.is_empty(),
5195            "run insert without source_ref must emit a provenance warning"
5196        );
5197    }
5198
5199    // P1c: retire a node AND submit chunks for the same logical_id in one request
5200
5201    #[test]
5202    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5203        let db = NamedTempFile::new().expect("temporary db");
5204        let writer = WriterActor::start(
5205            db.path(),
5206            Arc::new(SchemaManager::new()),
5207            ProvenanceMode::Warn,
5208            Arc::new(TelemetryCounters::default()),
5209        )
5210        .expect("writer");
5211
5212        // First seed the node so it exists
5213        writer
5214            .submit(WriteRequest {
5215                label: "seed".to_owned(),
5216                nodes: vec![NodeInsert {
5217                    row_id: "row-1".to_owned(),
5218                    logical_id: "meeting-1".to_owned(),
5219                    kind: "Meeting".to_owned(),
5220                    properties: "{}".to_owned(),
5221                    source_ref: Some("src-1".to_owned()),
5222                    upsert: false,
5223                    chunk_policy: ChunkPolicy::Preserve,
5224                    content_ref: None,
5225                }],
5226                node_retires: vec![],
5227                edges: vec![],
5228                edge_retires: vec![],
5229                chunks: vec![],
5230                runs: vec![],
5231                steps: vec![],
5232                actions: vec![],
5233                optional_backfills: vec![],
5234                vec_inserts: vec![],
5235                operational_writes: vec![],
5236            })
5237            .expect("seed write");
5238
5239        // Now try to retire it AND add a chunk for it in the same request
5240        let result = writer.submit(WriteRequest {
5241            label: "bad".to_owned(),
5242            nodes: vec![],
5243            node_retires: vec![NodeRetire {
5244                logical_id: "meeting-1".to_owned(),
5245                source_ref: Some("src-2".to_owned()),
5246            }],
5247            edges: vec![],
5248            edge_retires: vec![],
5249            chunks: vec![ChunkInsert {
5250                id: "chunk-bad".to_owned(),
5251                node_logical_id: "meeting-1".to_owned(),
5252                text_content: "some text".to_owned(),
5253                byte_start: None,
5254                byte_end: None,
5255                content_hash: None,
5256            }],
5257            runs: vec![],
5258            steps: vec![],
5259            actions: vec![],
5260            optional_backfills: vec![],
5261            vec_inserts: vec![],
5262            operational_writes: vec![],
5263        });
5264
5265        assert!(
5266            matches!(result, Err(EngineError::InvalidWrite(_))),
5267            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5268        );
5269    }
5270
5271    // --- Item 1: prepare_cached batch insert ---
5272
5273    #[test]
5274    fn writer_batch_insert_multiple_nodes() {
5275        let db = NamedTempFile::new().expect("temporary db");
5276        let writer = WriterActor::start(
5277            db.path(),
5278            Arc::new(SchemaManager::new()),
5279            ProvenanceMode::Warn,
5280            Arc::new(TelemetryCounters::default()),
5281        )
5282        .expect("writer");
5283
5284        let nodes: Vec<NodeInsert> = (0..100)
5285            .map(|i| NodeInsert {
5286                row_id: format!("row-{i}"),
5287                logical_id: format!("lg-{i}"),
5288                kind: "Note".to_owned(),
5289                properties: "{}".to_owned(),
5290                source_ref: Some("batch-src".to_owned()),
5291                upsert: false,
5292                chunk_policy: ChunkPolicy::Preserve,
5293                content_ref: None,
5294            })
5295            .collect();
5296
5297        writer
5298            .submit(WriteRequest {
5299                label: "batch".to_owned(),
5300                nodes,
5301                node_retires: vec![],
5302                edges: vec![],
5303                edge_retires: vec![],
5304                chunks: vec![],
5305                runs: vec![],
5306                steps: vec![],
5307                actions: vec![],
5308                optional_backfills: vec![],
5309                vec_inserts: vec![],
5310                operational_writes: vec![],
5311            })
5312            .expect("batch write");
5313
5314        let conn = rusqlite::Connection::open(db.path()).expect("open");
5315        let count: i64 = conn
5316            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5317            .expect("count nodes");
5318        assert_eq!(
5319            count, 100,
5320            "all 100 nodes must be present after batch insert"
5321        );
5322    }
5323
5324    // --- Item 2: ID validation ---
5325
5326    #[test]
5327    fn prepare_write_rejects_empty_node_row_id() {
5328        let db = NamedTempFile::new().expect("temporary db");
5329        let writer = WriterActor::start(
5330            db.path(),
5331            Arc::new(SchemaManager::new()),
5332            ProvenanceMode::Warn,
5333            Arc::new(TelemetryCounters::default()),
5334        )
5335        .expect("writer");
5336
5337        let result = writer.submit(WriteRequest {
5338            label: "test".to_owned(),
5339            nodes: vec![NodeInsert {
5340                row_id: String::new(),
5341                logical_id: "lg-1".to_owned(),
5342                kind: "Note".to_owned(),
5343                properties: "{}".to_owned(),
5344                source_ref: None,
5345                upsert: false,
5346                chunk_policy: ChunkPolicy::Preserve,
5347                content_ref: None,
5348            }],
5349            node_retires: vec![],
5350            edges: vec![],
5351            edge_retires: vec![],
5352            chunks: vec![],
5353            runs: vec![],
5354            steps: vec![],
5355            actions: vec![],
5356            optional_backfills: vec![],
5357            vec_inserts: vec![],
5358            operational_writes: vec![],
5359        });
5360
5361        assert!(
5362            matches!(result, Err(EngineError::InvalidWrite(_))),
5363            "empty row_id must be rejected"
5364        );
5365    }
5366
5367    #[test]
5368    fn prepare_write_rejects_empty_node_logical_id() {
5369        let db = NamedTempFile::new().expect("temporary db");
5370        let writer = WriterActor::start(
5371            db.path(),
5372            Arc::new(SchemaManager::new()),
5373            ProvenanceMode::Warn,
5374            Arc::new(TelemetryCounters::default()),
5375        )
5376        .expect("writer");
5377
5378        let result = writer.submit(WriteRequest {
5379            label: "test".to_owned(),
5380            nodes: vec![NodeInsert {
5381                row_id: "row-1".to_owned(),
5382                logical_id: String::new(),
5383                kind: "Note".to_owned(),
5384                properties: "{}".to_owned(),
5385                source_ref: None,
5386                upsert: false,
5387                chunk_policy: ChunkPolicy::Preserve,
5388                content_ref: None,
5389            }],
5390            node_retires: vec![],
5391            edges: vec![],
5392            edge_retires: vec![],
5393            chunks: vec![],
5394            runs: vec![],
5395            steps: vec![],
5396            actions: vec![],
5397            optional_backfills: vec![],
5398            vec_inserts: vec![],
5399            operational_writes: vec![],
5400        });
5401
5402        assert!(
5403            matches!(result, Err(EngineError::InvalidWrite(_))),
5404            "empty logical_id must be rejected"
5405        );
5406    }
5407
5408    #[test]
5409    fn prepare_write_rejects_duplicate_row_ids_in_request() {
5410        let db = NamedTempFile::new().expect("temporary db");
5411        let writer = WriterActor::start(
5412            db.path(),
5413            Arc::new(SchemaManager::new()),
5414            ProvenanceMode::Warn,
5415            Arc::new(TelemetryCounters::default()),
5416        )
5417        .expect("writer");
5418
5419        let result = writer.submit(WriteRequest {
5420            label: "test".to_owned(),
5421            nodes: vec![
5422                NodeInsert {
5423                    row_id: "row-1".to_owned(),
5424                    logical_id: "lg-1".to_owned(),
5425                    kind: "Note".to_owned(),
5426                    properties: "{}".to_owned(),
5427                    source_ref: None,
5428                    upsert: false,
5429                    chunk_policy: ChunkPolicy::Preserve,
5430                    content_ref: None,
5431                },
5432                NodeInsert {
5433                    row_id: "row-1".to_owned(), // duplicate
5434                    logical_id: "lg-2".to_owned(),
5435                    kind: "Note".to_owned(),
5436                    properties: "{}".to_owned(),
5437                    source_ref: None,
5438                    upsert: false,
5439                    chunk_policy: ChunkPolicy::Preserve,
5440                    content_ref: None,
5441                },
5442            ],
5443            node_retires: vec![],
5444            edges: vec![],
5445            edge_retires: vec![],
5446            chunks: vec![],
5447            runs: vec![],
5448            steps: vec![],
5449            actions: vec![],
5450            optional_backfills: vec![],
5451            vec_inserts: vec![],
5452            operational_writes: vec![],
5453        });
5454
5455        assert!(
5456            matches!(result, Err(EngineError::InvalidWrite(_))),
5457            "duplicate row_id within request must be rejected"
5458        );
5459    }
5460
5461    #[test]
5462    fn prepare_write_rejects_empty_chunk_id() {
5463        let db = NamedTempFile::new().expect("temporary db");
5464        let writer = WriterActor::start(
5465            db.path(),
5466            Arc::new(SchemaManager::new()),
5467            ProvenanceMode::Warn,
5468            Arc::new(TelemetryCounters::default()),
5469        )
5470        .expect("writer");
5471
5472        let result = writer.submit(WriteRequest {
5473            label: "test".to_owned(),
5474            nodes: vec![NodeInsert {
5475                row_id: "row-1".to_owned(),
5476                logical_id: "lg-1".to_owned(),
5477                kind: "Note".to_owned(),
5478                properties: "{}".to_owned(),
5479                source_ref: None,
5480                upsert: false,
5481                chunk_policy: ChunkPolicy::Preserve,
5482                content_ref: None,
5483            }],
5484            node_retires: vec![],
5485            edges: vec![],
5486            edge_retires: vec![],
5487            chunks: vec![ChunkInsert {
5488                id: String::new(),
5489                node_logical_id: "lg-1".to_owned(),
5490                text_content: "some text".to_owned(),
5491                byte_start: None,
5492                byte_end: None,
5493                content_hash: None,
5494            }],
5495            runs: vec![],
5496            steps: vec![],
5497            actions: vec![],
5498            optional_backfills: vec![],
5499            vec_inserts: vec![],
5500            operational_writes: vec![],
5501        });
5502
5503        assert!(
5504            matches!(result, Err(EngineError::InvalidWrite(_))),
5505            "empty chunk id must be rejected"
5506        );
5507    }
5508
5509    // --- Item 4: provenance warning coverage tests ---
5510
5511    #[test]
5512    fn writer_receipt_warns_on_step_without_source_ref() {
5513        let db = NamedTempFile::new().expect("temporary db");
5514        let writer = WriterActor::start(
5515            db.path(),
5516            Arc::new(SchemaManager::new()),
5517            ProvenanceMode::Warn,
5518            Arc::new(TelemetryCounters::default()),
5519        )
5520        .expect("writer");
5521
5522        // seed a run first so step FK is satisfied
5523        writer
5524            .submit(WriteRequest {
5525                label: "seed-run".to_owned(),
5526                nodes: vec![],
5527                node_retires: vec![],
5528                edges: vec![],
5529                edge_retires: vec![],
5530                chunks: vec![],
5531                runs: vec![RunInsert {
5532                    id: "run-1".to_owned(),
5533                    kind: "session".to_owned(),
5534                    status: "active".to_owned(),
5535                    properties: "{}".to_owned(),
5536                    source_ref: Some("src-1".to_owned()),
5537                    upsert: false,
5538                    supersedes_id: None,
5539                }],
5540                steps: vec![],
5541                actions: vec![],
5542                optional_backfills: vec![],
5543                vec_inserts: vec![],
5544                operational_writes: vec![],
5545            })
5546            .expect("seed run");
5547
5548        let receipt = writer
5549            .submit(WriteRequest {
5550                label: "test".to_owned(),
5551                nodes: vec![],
5552                node_retires: vec![],
5553                edges: vec![],
5554                edge_retires: vec![],
5555                chunks: vec![],
5556                runs: vec![],
5557                steps: vec![StepInsert {
5558                    id: "step-1".to_owned(),
5559                    run_id: "run-1".to_owned(),
5560                    kind: "llm_call".to_owned(),
5561                    status: "completed".to_owned(),
5562                    properties: "{}".to_owned(),
5563                    source_ref: None,
5564                    upsert: false,
5565                    supersedes_id: None,
5566                }],
5567                actions: vec![],
5568                optional_backfills: vec![],
5569                vec_inserts: vec![],
5570                operational_writes: vec![],
5571            })
5572            .expect("write");
5573
5574        assert!(
5575            !receipt.provenance_warnings.is_empty(),
5576            "step insert without source_ref must emit a provenance warning"
5577        );
5578    }
5579
5580    #[test]
5581    fn writer_receipt_warns_on_action_without_source_ref() {
5582        let db = NamedTempFile::new().expect("temporary db");
5583        let writer = WriterActor::start(
5584            db.path(),
5585            Arc::new(SchemaManager::new()),
5586            ProvenanceMode::Warn,
5587            Arc::new(TelemetryCounters::default()),
5588        )
5589        .expect("writer");
5590
5591        // seed run and step so action FK is satisfied
5592        writer
5593            .submit(WriteRequest {
5594                label: "seed".to_owned(),
5595                nodes: vec![],
5596                node_retires: vec![],
5597                edges: vec![],
5598                edge_retires: vec![],
5599                chunks: vec![],
5600                runs: vec![RunInsert {
5601                    id: "run-1".to_owned(),
5602                    kind: "session".to_owned(),
5603                    status: "active".to_owned(),
5604                    properties: "{}".to_owned(),
5605                    source_ref: Some("src-1".to_owned()),
5606                    upsert: false,
5607                    supersedes_id: None,
5608                }],
5609                steps: vec![StepInsert {
5610                    id: "step-1".to_owned(),
5611                    run_id: "run-1".to_owned(),
5612                    kind: "llm_call".to_owned(),
5613                    status: "completed".to_owned(),
5614                    properties: "{}".to_owned(),
5615                    source_ref: Some("src-1".to_owned()),
5616                    upsert: false,
5617                    supersedes_id: None,
5618                }],
5619                actions: vec![],
5620                optional_backfills: vec![],
5621                vec_inserts: vec![],
5622                operational_writes: vec![],
5623            })
5624            .expect("seed");
5625
5626        let receipt = writer
5627            .submit(WriteRequest {
5628                label: "test".to_owned(),
5629                nodes: vec![],
5630                node_retires: vec![],
5631                edges: vec![],
5632                edge_retires: vec![],
5633                chunks: vec![],
5634                runs: vec![],
5635                steps: vec![],
5636                actions: vec![ActionInsert {
5637                    id: "action-1".to_owned(),
5638                    step_id: "step-1".to_owned(),
5639                    kind: "tool_call".to_owned(),
5640                    status: "completed".to_owned(),
5641                    properties: "{}".to_owned(),
5642                    source_ref: None,
5643                    upsert: false,
5644                    supersedes_id: None,
5645                }],
5646                optional_backfills: vec![],
5647                vec_inserts: vec![],
5648                operational_writes: vec![],
5649            })
5650            .expect("write");
5651
5652        assert!(
5653            !receipt.provenance_warnings.is_empty(),
5654            "action insert without source_ref must emit a provenance warning"
5655        );
5656    }
5657
5658    #[test]
5659    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5660        let db = NamedTempFile::new().expect("temporary db");
5661        let writer = WriterActor::start(
5662            db.path(),
5663            Arc::new(SchemaManager::new()),
5664            ProvenanceMode::Warn,
5665            Arc::new(TelemetryCounters::default()),
5666        )
5667        .expect("writer");
5668
5669        let receipt = writer
5670            .submit(WriteRequest {
5671                label: "test".to_owned(),
5672                nodes: vec![NodeInsert {
5673                    row_id: "row-1".to_owned(),
5674                    logical_id: "node-1".to_owned(),
5675                    kind: "Note".to_owned(),
5676                    properties: "{}".to_owned(),
5677                    source_ref: Some("src-1".to_owned()),
5678                    upsert: false,
5679                    chunk_policy: ChunkPolicy::Preserve,
5680                    content_ref: None,
5681                }],
5682                node_retires: vec![],
5683                edges: vec![],
5684                edge_retires: vec![],
5685                chunks: vec![],
5686                runs: vec![RunInsert {
5687                    id: "run-1".to_owned(),
5688                    kind: "session".to_owned(),
5689                    status: "active".to_owned(),
5690                    properties: "{}".to_owned(),
5691                    source_ref: Some("src-1".to_owned()),
5692                    upsert: false,
5693                    supersedes_id: None,
5694                }],
5695                steps: vec![StepInsert {
5696                    id: "step-1".to_owned(),
5697                    run_id: "run-1".to_owned(),
5698                    kind: "llm_call".to_owned(),
5699                    status: "completed".to_owned(),
5700                    properties: "{}".to_owned(),
5701                    source_ref: Some("src-1".to_owned()),
5702                    upsert: false,
5703                    supersedes_id: None,
5704                }],
5705                actions: vec![ActionInsert {
5706                    id: "action-1".to_owned(),
5707                    step_id: "step-1".to_owned(),
5708                    kind: "tool_call".to_owned(),
5709                    status: "completed".to_owned(),
5710                    properties: "{}".to_owned(),
5711                    source_ref: Some("src-1".to_owned()),
5712                    upsert: false,
5713                    supersedes_id: None,
5714                }],
5715                optional_backfills: vec![],
5716                vec_inserts: vec![],
5717                operational_writes: vec![],
5718            })
5719            .expect("write");
5720
5721        assert!(
5722            receipt.provenance_warnings.is_empty(),
5723            "no warnings expected when all types have source_ref; got: {:?}",
5724            receipt.provenance_warnings
5725        );
5726    }
5727
5728    // --- Item 4 Task 2: ProvenanceMode tests ---
5729
5730    #[test]
5731    fn default_provenance_mode_is_warn() {
5732        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5733        let db = NamedTempFile::new().expect("temporary db");
5734        let writer = WriterActor::start(
5735            db.path(),
5736            Arc::new(SchemaManager::new()),
5737            ProvenanceMode::default(),
5738            Arc::new(TelemetryCounters::default()),
5739        )
5740        .expect("writer");
5741
5742        let receipt = writer
5743            .submit(WriteRequest {
5744                label: "test".to_owned(),
5745                nodes: vec![NodeInsert {
5746                    row_id: "row-1".to_owned(),
5747                    logical_id: "node-1".to_owned(),
5748                    kind: "Note".to_owned(),
5749                    properties: "{}".to_owned(),
5750                    source_ref: None,
5751                    upsert: false,
5752                    chunk_policy: ChunkPolicy::Preserve,
5753                    content_ref: None,
5754                }],
5755                node_retires: vec![],
5756                edges: vec![],
5757                edge_retires: vec![],
5758                chunks: vec![],
5759                runs: vec![],
5760                steps: vec![],
5761                actions: vec![],
5762                optional_backfills: vec![],
5763                vec_inserts: vec![],
5764                operational_writes: vec![],
5765            })
5766            .expect("Warn mode must not reject missing source_ref");
5767
5768        assert!(
5769            !receipt.provenance_warnings.is_empty(),
5770            "Warn mode must emit a warning instead of rejecting"
5771        );
5772    }
5773
5774    #[test]
5775    fn require_mode_rejects_node_without_source_ref() {
5776        let db = NamedTempFile::new().expect("temporary db");
5777        let writer = WriterActor::start(
5778            db.path(),
5779            Arc::new(SchemaManager::new()),
5780            ProvenanceMode::Require,
5781            Arc::new(TelemetryCounters::default()),
5782        )
5783        .expect("writer");
5784
5785        let result = writer.submit(WriteRequest {
5786            label: "test".to_owned(),
5787            nodes: vec![NodeInsert {
5788                row_id: "row-1".to_owned(),
5789                logical_id: "node-1".to_owned(),
5790                kind: "Note".to_owned(),
5791                properties: "{}".to_owned(),
5792                source_ref: None,
5793                upsert: false,
5794                chunk_policy: ChunkPolicy::Preserve,
5795                content_ref: None,
5796            }],
5797            node_retires: vec![],
5798            edges: vec![],
5799            edge_retires: vec![],
5800            chunks: vec![],
5801            runs: vec![],
5802            steps: vec![],
5803            actions: vec![],
5804            optional_backfills: vec![],
5805            vec_inserts: vec![],
5806            operational_writes: vec![],
5807        });
5808
5809        assert!(
5810            matches!(result, Err(EngineError::InvalidWrite(_))),
5811            "Require mode must reject node without source_ref"
5812        );
5813    }
5814
5815    #[test]
5816    fn require_mode_accepts_node_with_source_ref() {
5817        let db = NamedTempFile::new().expect("temporary db");
5818        let writer = WriterActor::start(
5819            db.path(),
5820            Arc::new(SchemaManager::new()),
5821            ProvenanceMode::Require,
5822            Arc::new(TelemetryCounters::default()),
5823        )
5824        .expect("writer");
5825
5826        let result = writer.submit(WriteRequest {
5827            label: "test".to_owned(),
5828            nodes: vec![NodeInsert {
5829                row_id: "row-1".to_owned(),
5830                logical_id: "node-1".to_owned(),
5831                kind: "Note".to_owned(),
5832                properties: "{}".to_owned(),
5833                source_ref: Some("src-1".to_owned()),
5834                upsert: false,
5835                chunk_policy: ChunkPolicy::Preserve,
5836                content_ref: None,
5837            }],
5838            node_retires: vec![],
5839            edges: vec![],
5840            edge_retires: vec![],
5841            chunks: vec![],
5842            runs: vec![],
5843            steps: vec![],
5844            actions: vec![],
5845            optional_backfills: vec![],
5846            vec_inserts: vec![],
5847            operational_writes: vec![],
5848        });
5849
5850        assert!(
5851            result.is_ok(),
5852            "Require mode must accept node with source_ref"
5853        );
5854    }
5855
5856    #[test]
5857    fn require_mode_rejects_edge_without_source_ref() {
5858        let db = NamedTempFile::new().expect("temporary db");
5859        let writer = WriterActor::start(
5860            db.path(),
5861            Arc::new(SchemaManager::new()),
5862            ProvenanceMode::Require,
5863            Arc::new(TelemetryCounters::default()),
5864        )
5865        .expect("writer");
5866
5867        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5868        let result = writer.submit(WriteRequest {
5869            label: "test".to_owned(),
5870            nodes: vec![
5871                NodeInsert {
5872                    row_id: "row-a".to_owned(),
5873                    logical_id: "node-a".to_owned(),
5874                    kind: "Note".to_owned(),
5875                    properties: "{}".to_owned(),
5876                    source_ref: Some("src-1".to_owned()),
5877                    upsert: false,
5878                    chunk_policy: ChunkPolicy::Preserve,
5879                    content_ref: None,
5880                },
5881                NodeInsert {
5882                    row_id: "row-b".to_owned(),
5883                    logical_id: "node-b".to_owned(),
5884                    kind: "Note".to_owned(),
5885                    properties: "{}".to_owned(),
5886                    source_ref: Some("src-1".to_owned()),
5887                    upsert: false,
5888                    chunk_policy: ChunkPolicy::Preserve,
5889                    content_ref: None,
5890                },
5891            ],
5892            node_retires: vec![],
5893            edges: vec![EdgeInsert {
5894                row_id: "edge-row-1".to_owned(),
5895                logical_id: "edge-1".to_owned(),
5896                source_logical_id: "node-a".to_owned(),
5897                target_logical_id: "node-b".to_owned(),
5898                kind: "LINKS_TO".to_owned(),
5899                properties: "{}".to_owned(),
5900                source_ref: None,
5901                upsert: false,
5902            }],
5903            edge_retires: vec![],
5904            chunks: vec![],
5905            runs: vec![],
5906            steps: vec![],
5907            actions: vec![],
5908            optional_backfills: vec![],
5909            vec_inserts: vec![],
5910            operational_writes: vec![],
5911        });
5912
5913        assert!(
5914            matches!(result, Err(EngineError::InvalidWrite(_))),
5915            "Require mode must reject edge without source_ref"
5916        );
5917    }
5918
5919    // --- Item 5: FTS projection coverage tests ---
5920
5921    #[test]
5922    fn fts_row_has_correct_kind_from_co_submitted_node() {
5923        let db = NamedTempFile::new().expect("temporary db");
5924        let writer = WriterActor::start(
5925            db.path(),
5926            Arc::new(SchemaManager::new()),
5927            ProvenanceMode::Warn,
5928            Arc::new(TelemetryCounters::default()),
5929        )
5930        .expect("writer");
5931
5932        writer
5933            .submit(WriteRequest {
5934                label: "test".to_owned(),
5935                nodes: vec![NodeInsert {
5936                    row_id: "row-1".to_owned(),
5937                    logical_id: "node-1".to_owned(),
5938                    kind: "Meeting".to_owned(),
5939                    properties: "{}".to_owned(),
5940                    source_ref: Some("src-1".to_owned()),
5941                    upsert: false,
5942                    chunk_policy: ChunkPolicy::Preserve,
5943                    content_ref: None,
5944                }],
5945                node_retires: vec![],
5946                edges: vec![],
5947                edge_retires: vec![],
5948                chunks: vec![ChunkInsert {
5949                    id: "chunk-1".to_owned(),
5950                    node_logical_id: "node-1".to_owned(),
5951                    text_content: "some text".to_owned(),
5952                    byte_start: None,
5953                    byte_end: None,
5954                    content_hash: None,
5955                }],
5956                runs: vec![],
5957                steps: vec![],
5958                actions: vec![],
5959                optional_backfills: vec![],
5960                vec_inserts: vec![],
5961                operational_writes: vec![],
5962            })
5963            .expect("write");
5964
5965        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5966        let kind: String = conn
5967            .query_row(
5968                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5969                [],
5970                |row| row.get(0),
5971            )
5972            .expect("fts row");
5973
5974        assert_eq!(kind, "Meeting");
5975    }
5976
5977    #[test]
5978    fn fts_row_has_correct_text_content() {
5979        let db = NamedTempFile::new().expect("temporary db");
5980        let writer = WriterActor::start(
5981            db.path(),
5982            Arc::new(SchemaManager::new()),
5983            ProvenanceMode::Warn,
5984            Arc::new(TelemetryCounters::default()),
5985        )
5986        .expect("writer");
5987
5988        writer
5989            .submit(WriteRequest {
5990                label: "test".to_owned(),
5991                nodes: vec![NodeInsert {
5992                    row_id: "row-1".to_owned(),
5993                    logical_id: "node-1".to_owned(),
5994                    kind: "Note".to_owned(),
5995                    properties: "{}".to_owned(),
5996                    source_ref: Some("src-1".to_owned()),
5997                    upsert: false,
5998                    chunk_policy: ChunkPolicy::Preserve,
5999                    content_ref: None,
6000                }],
6001                node_retires: vec![],
6002                edges: vec![],
6003                edge_retires: vec![],
6004                chunks: vec![ChunkInsert {
6005                    id: "chunk-1".to_owned(),
6006                    node_logical_id: "node-1".to_owned(),
6007                    text_content: "exactly this text".to_owned(),
6008                    byte_start: None,
6009                    byte_end: None,
6010                    content_hash: None,
6011                }],
6012                runs: vec![],
6013                steps: vec![],
6014                actions: vec![],
6015                optional_backfills: vec![],
6016                vec_inserts: vec![],
6017                operational_writes: vec![],
6018            })
6019            .expect("write");
6020
6021        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6022        let text: String = conn
6023            .query_row(
6024                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6025                [],
6026                |row| row.get(0),
6027            )
6028            .expect("fts row");
6029
6030        assert_eq!(text, "exactly this text");
6031    }
6032
6033    #[test]
6034    fn fts_row_has_correct_kind_from_pre_existing_node() {
6035        let db = NamedTempFile::new().expect("temporary db");
6036        let writer = WriterActor::start(
6037            db.path(),
6038            Arc::new(SchemaManager::new()),
6039            ProvenanceMode::Warn,
6040            Arc::new(TelemetryCounters::default()),
6041        )
6042        .expect("writer");
6043
6044        // Request 1: node only
6045        writer
6046            .submit(WriteRequest {
6047                label: "r1".to_owned(),
6048                nodes: vec![NodeInsert {
6049                    row_id: "row-1".to_owned(),
6050                    logical_id: "node-1".to_owned(),
6051                    kind: "Document".to_owned(),
6052                    properties: "{}".to_owned(),
6053                    source_ref: Some("src-1".to_owned()),
6054                    upsert: false,
6055                    chunk_policy: ChunkPolicy::Preserve,
6056                    content_ref: None,
6057                }],
6058                node_retires: vec![],
6059                edges: vec![],
6060                edge_retires: vec![],
6061                chunks: vec![],
6062                runs: vec![],
6063                steps: vec![],
6064                actions: vec![],
6065                optional_backfills: vec![],
6066                vec_inserts: vec![],
6067                operational_writes: vec![],
6068            })
6069            .expect("r1 write");
6070
6071        // Request 2: chunk for pre-existing node
6072        writer
6073            .submit(WriteRequest {
6074                label: "r2".to_owned(),
6075                nodes: vec![],
6076                node_retires: vec![],
6077                edges: vec![],
6078                edge_retires: vec![],
6079                chunks: vec![ChunkInsert {
6080                    id: "chunk-1".to_owned(),
6081                    node_logical_id: "node-1".to_owned(),
6082                    text_content: "some text".to_owned(),
6083                    byte_start: None,
6084                    byte_end: None,
6085                    content_hash: None,
6086                }],
6087                runs: vec![],
6088                steps: vec![],
6089                actions: vec![],
6090                optional_backfills: vec![],
6091                vec_inserts: vec![],
6092                operational_writes: vec![],
6093            })
6094            .expect("r2 write");
6095
6096        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6097        let kind: String = conn
6098            .query_row(
6099                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6100                [],
6101                |row| row.get(0),
6102            )
6103            .expect("fts row");
6104
6105        assert_eq!(kind, "Document");
6106    }
6107
6108    #[test]
6109    fn fts_derives_rows_for_multiple_chunks_per_node() {
6110        let db = NamedTempFile::new().expect("temporary db");
6111        let writer = WriterActor::start(
6112            db.path(),
6113            Arc::new(SchemaManager::new()),
6114            ProvenanceMode::Warn,
6115            Arc::new(TelemetryCounters::default()),
6116        )
6117        .expect("writer");
6118
6119        writer
6120            .submit(WriteRequest {
6121                label: "test".to_owned(),
6122                nodes: vec![NodeInsert {
6123                    row_id: "row-1".to_owned(),
6124                    logical_id: "node-1".to_owned(),
6125                    kind: "Meeting".to_owned(),
6126                    properties: "{}".to_owned(),
6127                    source_ref: Some("src-1".to_owned()),
6128                    upsert: false,
6129                    chunk_policy: ChunkPolicy::Preserve,
6130                    content_ref: None,
6131                }],
6132                node_retires: vec![],
6133                edges: vec![],
6134                edge_retires: vec![],
6135                chunks: vec![
6136                    ChunkInsert {
6137                        id: "chunk-a".to_owned(),
6138                        node_logical_id: "node-1".to_owned(),
6139                        text_content: "intro".to_owned(),
6140                        byte_start: None,
6141                        byte_end: None,
6142                        content_hash: None,
6143                    },
6144                    ChunkInsert {
6145                        id: "chunk-b".to_owned(),
6146                        node_logical_id: "node-1".to_owned(),
6147                        text_content: "body".to_owned(),
6148                        byte_start: None,
6149                        byte_end: None,
6150                        content_hash: None,
6151                    },
6152                    ChunkInsert {
6153                        id: "chunk-c".to_owned(),
6154                        node_logical_id: "node-1".to_owned(),
6155                        text_content: "conclusion".to_owned(),
6156                        byte_start: None,
6157                        byte_end: None,
6158                        content_hash: None,
6159                    },
6160                ],
6161                runs: vec![],
6162                steps: vec![],
6163                actions: vec![],
6164                optional_backfills: vec![],
6165                vec_inserts: vec![],
6166                operational_writes: vec![],
6167            })
6168            .expect("write");
6169
6170        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6171        let count: i64 = conn
6172            .query_row(
6173                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6174                [],
6175                |row| row.get(0),
6176            )
6177            .expect("fts count");
6178
6179        assert_eq!(count, 3, "three chunks must produce three FTS rows");
6180    }
6181
6182    #[test]
6183    fn fts_resolves_mixed_fast_and_db_paths() {
6184        let db = NamedTempFile::new().expect("temporary db");
6185        let writer = WriterActor::start(
6186            db.path(),
6187            Arc::new(SchemaManager::new()),
6188            ProvenanceMode::Warn,
6189            Arc::new(TelemetryCounters::default()),
6190        )
6191        .expect("writer");
6192
6193        // Seed pre-existing node
6194        writer
6195            .submit(WriteRequest {
6196                label: "seed".to_owned(),
6197                nodes: vec![NodeInsert {
6198                    row_id: "row-existing".to_owned(),
6199                    logical_id: "existing-node".to_owned(),
6200                    kind: "Archive".to_owned(),
6201                    properties: "{}".to_owned(),
6202                    source_ref: Some("src-1".to_owned()),
6203                    upsert: false,
6204                    chunk_policy: ChunkPolicy::Preserve,
6205                    content_ref: None,
6206                }],
6207                node_retires: vec![],
6208                edges: vec![],
6209                edge_retires: vec![],
6210                chunks: vec![],
6211                runs: vec![],
6212                steps: vec![],
6213                actions: vec![],
6214                optional_backfills: vec![],
6215                vec_inserts: vec![],
6216                operational_writes: vec![],
6217            })
6218            .expect("seed");
6219
6220        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
6221        writer
6222            .submit(WriteRequest {
6223                label: "mixed".to_owned(),
6224                nodes: vec![NodeInsert {
6225                    row_id: "row-new".to_owned(),
6226                    logical_id: "new-node".to_owned(),
6227                    kind: "Inbox".to_owned(),
6228                    properties: "{}".to_owned(),
6229                    source_ref: Some("src-2".to_owned()),
6230                    upsert: false,
6231                    chunk_policy: ChunkPolicy::Preserve,
6232                    content_ref: None,
6233                }],
6234                node_retires: vec![],
6235                edges: vec![],
6236                edge_retires: vec![],
6237                chunks: vec![
6238                    ChunkInsert {
6239                        id: "chunk-fast".to_owned(),
6240                        node_logical_id: "new-node".to_owned(),
6241                        text_content: "new content".to_owned(),
6242                        byte_start: None,
6243                        byte_end: None,
6244                        content_hash: None,
6245                    },
6246                    ChunkInsert {
6247                        id: "chunk-db".to_owned(),
6248                        node_logical_id: "existing-node".to_owned(),
6249                        text_content: "archive content".to_owned(),
6250                        byte_start: None,
6251                        byte_end: None,
6252                        content_hash: None,
6253                    },
6254                ],
6255                runs: vec![],
6256                steps: vec![],
6257                actions: vec![],
6258                optional_backfills: vec![],
6259                vec_inserts: vec![],
6260                operational_writes: vec![],
6261            })
6262            .expect("mixed write");
6263
6264        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6265        let fast_kind: String = conn
6266            .query_row(
6267                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6268                [],
6269                |row| row.get(0),
6270            )
6271            .expect("fast path fts row");
6272        let db_kind: String = conn
6273            .query_row(
6274                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6275                [],
6276                |row| row.get(0),
6277            )
6278            .expect("db path fts row");
6279
6280        assert_eq!(fast_kind, "Inbox");
6281        assert_eq!(db_kind, "Archive");
6282    }
6283
6284    #[test]
6285    fn prepare_write_rejects_empty_chunk_text() {
6286        let db = NamedTempFile::new().expect("temporary db");
6287        let writer = WriterActor::start(
6288            db.path(),
6289            Arc::new(SchemaManager::new()),
6290            ProvenanceMode::Warn,
6291            Arc::new(TelemetryCounters::default()),
6292        )
6293        .expect("writer");
6294
6295        let result = writer.submit(WriteRequest {
6296            label: "test".to_owned(),
6297            nodes: vec![NodeInsert {
6298                row_id: "row-1".to_owned(),
6299                logical_id: "node-1".to_owned(),
6300                kind: "Note".to_owned(),
6301                properties: "{}".to_owned(),
6302                source_ref: None,
6303                upsert: false,
6304                chunk_policy: ChunkPolicy::Preserve,
6305                content_ref: None,
6306            }],
6307            node_retires: vec![],
6308            edges: vec![],
6309            edge_retires: vec![],
6310            chunks: vec![ChunkInsert {
6311                id: "chunk-1".to_owned(),
6312                node_logical_id: "node-1".to_owned(),
6313                text_content: String::new(),
6314                byte_start: None,
6315                byte_end: None,
6316                content_hash: None,
6317            }],
6318            runs: vec![],
6319            steps: vec![],
6320            actions: vec![],
6321            optional_backfills: vec![],
6322            vec_inserts: vec![],
6323            operational_writes: vec![],
6324        });
6325
6326        assert!(
6327            matches!(result, Err(EngineError::InvalidWrite(_))),
6328            "empty text_content must be rejected"
6329        );
6330    }
6331
6332    #[test]
6333    fn receipt_reports_zero_backfills_when_none_submitted() {
6334        let db = NamedTempFile::new().expect("temporary db");
6335        let writer = WriterActor::start(
6336            db.path(),
6337            Arc::new(SchemaManager::new()),
6338            ProvenanceMode::Warn,
6339            Arc::new(TelemetryCounters::default()),
6340        )
6341        .expect("writer");
6342
6343        let receipt = writer
6344            .submit(WriteRequest {
6345                label: "test".to_owned(),
6346                nodes: vec![NodeInsert {
6347                    row_id: "row-1".to_owned(),
6348                    logical_id: "node-1".to_owned(),
6349                    kind: "Note".to_owned(),
6350                    properties: "{}".to_owned(),
6351                    source_ref: Some("src-1".to_owned()),
6352                    upsert: false,
6353                    chunk_policy: ChunkPolicy::Preserve,
6354                    content_ref: None,
6355                }],
6356                node_retires: vec![],
6357                edges: vec![],
6358                edge_retires: vec![],
6359                chunks: vec![],
6360                runs: vec![],
6361                steps: vec![],
6362                actions: vec![],
6363                optional_backfills: vec![],
6364                vec_inserts: vec![],
6365                operational_writes: vec![],
6366            })
6367            .expect("write");
6368
6369        assert_eq!(receipt.optional_backfill_count, 0);
6370    }
6371
6372    #[test]
6373    fn receipt_reports_correct_backfill_count() {
6374        let db = NamedTempFile::new().expect("temporary db");
6375        let writer = WriterActor::start(
6376            db.path(),
6377            Arc::new(SchemaManager::new()),
6378            ProvenanceMode::Warn,
6379            Arc::new(TelemetryCounters::default()),
6380        )
6381        .expect("writer");
6382
6383        let receipt = writer
6384            .submit(WriteRequest {
6385                label: "test".to_owned(),
6386                nodes: vec![NodeInsert {
6387                    row_id: "row-1".to_owned(),
6388                    logical_id: "node-1".to_owned(),
6389                    kind: "Note".to_owned(),
6390                    properties: "{}".to_owned(),
6391                    source_ref: Some("src-1".to_owned()),
6392                    upsert: false,
6393                    chunk_policy: ChunkPolicy::Preserve,
6394                    content_ref: None,
6395                }],
6396                node_retires: vec![],
6397                edges: vec![],
6398                edge_retires: vec![],
6399                chunks: vec![],
6400                runs: vec![],
6401                steps: vec![],
6402                actions: vec![],
6403                optional_backfills: vec![
6404                    OptionalProjectionTask {
6405                        target: ProjectionTarget::Fts,
6406                        payload: "p1".to_owned(),
6407                    },
6408                    OptionalProjectionTask {
6409                        target: ProjectionTarget::Vec,
6410                        payload: "p2".to_owned(),
6411                    },
6412                    OptionalProjectionTask {
6413                        target: ProjectionTarget::All,
6414                        payload: "p3".to_owned(),
6415                    },
6416                ],
6417                vec_inserts: vec![],
6418                operational_writes: vec![],
6419            })
6420            .expect("write");
6421
6422        assert_eq!(receipt.optional_backfill_count, 3);
6423    }
6424
6425    #[test]
6426    fn backfill_tasks_are_not_executed_during_write() {
6427        let db = NamedTempFile::new().expect("temporary db");
6428        let writer = WriterActor::start(
6429            db.path(),
6430            Arc::new(SchemaManager::new()),
6431            ProvenanceMode::Warn,
6432            Arc::new(TelemetryCounters::default()),
6433        )
6434        .expect("writer");
6435
6436        // Write a node + chunk. Submit a backfill task targeting FTS.
6437        // The write path must not create any extra FTS rows beyond the required one.
6438        writer
6439            .submit(WriteRequest {
6440                label: "test".to_owned(),
6441                nodes: vec![NodeInsert {
6442                    row_id: "row-1".to_owned(),
6443                    logical_id: "node-1".to_owned(),
6444                    kind: "Note".to_owned(),
6445                    properties: "{}".to_owned(),
6446                    source_ref: Some("src-1".to_owned()),
6447                    upsert: false,
6448                    chunk_policy: ChunkPolicy::Preserve,
6449                    content_ref: None,
6450                }],
6451                node_retires: vec![],
6452                edges: vec![],
6453                edge_retires: vec![],
6454                chunks: vec![ChunkInsert {
6455                    id: "chunk-1".to_owned(),
6456                    node_logical_id: "node-1".to_owned(),
6457                    text_content: "required text".to_owned(),
6458                    byte_start: None,
6459                    byte_end: None,
6460                    content_hash: None,
6461                }],
6462                runs: vec![],
6463                steps: vec![],
6464                actions: vec![],
6465                optional_backfills: vec![OptionalProjectionTask {
6466                    target: ProjectionTarget::Fts,
6467                    payload: "backfill-payload".to_owned(),
6468                }],
6469                vec_inserts: vec![],
6470                operational_writes: vec![],
6471            })
6472            .expect("write");
6473
6474        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6475        let count: i64 = conn
6476            .query_row(
6477                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6478                [],
6479                |row| row.get(0),
6480            )
6481            .expect("fts count");
6482
6483        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6484    }
6485
6486    #[test]
6487    fn fts_row_uses_new_kind_after_node_replace() {
6488        let db = NamedTempFile::new().expect("temporary db");
6489        let writer = WriterActor::start(
6490            db.path(),
6491            Arc::new(SchemaManager::new()),
6492            ProvenanceMode::Warn,
6493            Arc::new(TelemetryCounters::default()),
6494        )
6495        .expect("writer");
6496
6497        // Write original node as "Note"
6498        writer
6499            .submit(WriteRequest {
6500                label: "v1".to_owned(),
6501                nodes: vec![NodeInsert {
6502                    row_id: "row-1".to_owned(),
6503                    logical_id: "node-1".to_owned(),
6504                    kind: "Note".to_owned(),
6505                    properties: "{}".to_owned(),
6506                    source_ref: Some("src-1".to_owned()),
6507                    upsert: false,
6508                    chunk_policy: ChunkPolicy::Preserve,
6509                    content_ref: None,
6510                }],
6511                node_retires: vec![],
6512                edges: vec![],
6513                edge_retires: vec![],
6514                chunks: vec![ChunkInsert {
6515                    id: "chunk-v1".to_owned(),
6516                    node_logical_id: "node-1".to_owned(),
6517                    text_content: "original".to_owned(),
6518                    byte_start: None,
6519                    byte_end: None,
6520                    content_hash: None,
6521                }],
6522                runs: vec![],
6523                steps: vec![],
6524                actions: vec![],
6525                optional_backfills: vec![],
6526                vec_inserts: vec![],
6527                operational_writes: vec![],
6528            })
6529            .expect("v1 write");
6530
6531        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
6532        writer
6533            .submit(WriteRequest {
6534                label: "v2".to_owned(),
6535                nodes: vec![NodeInsert {
6536                    row_id: "row-2".to_owned(),
6537                    logical_id: "node-1".to_owned(),
6538                    kind: "Meeting".to_owned(),
6539                    properties: "{}".to_owned(),
6540                    source_ref: Some("src-2".to_owned()),
6541                    upsert: true,
6542                    chunk_policy: ChunkPolicy::Replace,
6543                    content_ref: None,
6544                }],
6545                node_retires: vec![],
6546                edges: vec![],
6547                edge_retires: vec![],
6548                chunks: vec![ChunkInsert {
6549                    id: "chunk-v2".to_owned(),
6550                    node_logical_id: "node-1".to_owned(),
6551                    text_content: "updated".to_owned(),
6552                    byte_start: None,
6553                    byte_end: None,
6554                    content_hash: None,
6555                }],
6556                runs: vec![],
6557                steps: vec![],
6558                actions: vec![],
6559                optional_backfills: vec![],
6560                vec_inserts: vec![],
6561                operational_writes: vec![],
6562            })
6563            .expect("v2 write");
6564
6565        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6566
6567        // Old FTS row must be gone
6568        let old_count: i64 = conn
6569            .query_row(
6570                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6571                [],
6572                |row| row.get(0),
6573            )
6574            .expect("old fts count");
6575        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6576
6577        // New FTS row must use the new kind
6578        let new_kind: String = conn
6579            .query_row(
6580                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6581                [],
6582                |row| row.get(0),
6583            )
6584            .expect("new fts row");
6585        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6586    }
6587
6588    // --- Item 3: VecInsert tests ---
6589
6590    #[test]
6591    fn vec_insert_empty_chunk_id_is_rejected() {
6592        let db = NamedTempFile::new().expect("temporary db");
6593        let writer = WriterActor::start(
6594            db.path(),
6595            Arc::new(SchemaManager::new()),
6596            ProvenanceMode::Warn,
6597            Arc::new(TelemetryCounters::default()),
6598        )
6599        .expect("writer");
6600        let result = writer.submit(WriteRequest {
6601            label: "vec-test".to_owned(),
6602            nodes: vec![],
6603            node_retires: vec![],
6604            edges: vec![],
6605            edge_retires: vec![],
6606            chunks: vec![],
6607            runs: vec![],
6608            steps: vec![],
6609            actions: vec![],
6610            optional_backfills: vec![],
6611            vec_inserts: vec![VecInsert {
6612                chunk_id: String::new(),
6613                embedding: vec![0.1, 0.2, 0.3],
6614            }],
6615            operational_writes: vec![],
6616        });
6617        assert!(
6618            matches!(result, Err(EngineError::InvalidWrite(_))),
6619            "empty chunk_id in VecInsert must be rejected"
6620        );
6621    }
6622
6623    #[test]
6624    fn vec_insert_empty_embedding_is_rejected() {
6625        let db = NamedTempFile::new().expect("temporary db");
6626        let writer = WriterActor::start(
6627            db.path(),
6628            Arc::new(SchemaManager::new()),
6629            ProvenanceMode::Warn,
6630            Arc::new(TelemetryCounters::default()),
6631        )
6632        .expect("writer");
6633        let result = writer.submit(WriteRequest {
6634            label: "vec-test".to_owned(),
6635            nodes: vec![],
6636            node_retires: vec![],
6637            edges: vec![],
6638            edge_retires: vec![],
6639            chunks: vec![],
6640            runs: vec![],
6641            steps: vec![],
6642            actions: vec![],
6643            optional_backfills: vec![],
6644            vec_inserts: vec![VecInsert {
6645                chunk_id: "chunk-1".to_owned(),
6646                embedding: vec![],
6647            }],
6648            operational_writes: vec![],
6649        });
6650        assert!(
6651            matches!(result, Err(EngineError::InvalidWrite(_))),
6652            "empty embedding in VecInsert must be rejected"
6653        );
6654    }
6655
6656    #[test]
6657    fn vec_insert_noop_without_feature() {
6658        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6659        // (no error) but not write to any vec table.
6660        let db = NamedTempFile::new().expect("temporary db");
6661        let writer = WriterActor::start(
6662            db.path(),
6663            Arc::new(SchemaManager::new()),
6664            ProvenanceMode::Warn,
6665            Arc::new(TelemetryCounters::default()),
6666        )
6667        .expect("writer");
6668        let result = writer.submit(WriteRequest {
6669            label: "vec-noop".to_owned(),
6670            nodes: vec![],
6671            node_retires: vec![],
6672            edges: vec![],
6673            edge_retires: vec![],
6674            chunks: vec![],
6675            runs: vec![],
6676            steps: vec![],
6677            actions: vec![],
6678            optional_backfills: vec![],
6679            vec_inserts: vec![VecInsert {
6680                chunk_id: "chunk-noop".to_owned(),
6681                embedding: vec![1.0, 2.0, 3.0],
6682            }],
6683            operational_writes: vec![],
6684        });
6685        #[cfg(not(feature = "sqlite-vec"))]
6686        result.expect("noop VecInsert without feature must succeed");
6687        // The result variable is used above; silence unused warning for cfg-on path.
6688        #[cfg(feature = "sqlite-vec")]
6689        let _ = result;
6690    }
6691
6692    #[cfg(feature = "sqlite-vec")]
6693    #[test]
6694    fn node_retire_preserves_vec_rows_for_later_restore() {
6695        use crate::sqlite::open_connection_with_vec;
6696
6697        let db = NamedTempFile::new().expect("temporary db");
6698        let schema_manager = Arc::new(SchemaManager::new());
6699
6700        {
6701            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6702            schema_manager.bootstrap(&conn).expect("bootstrap");
6703        }
6704
6705        let writer = WriterActor::start(
6706            db.path(),
6707            Arc::clone(&schema_manager),
6708            ProvenanceMode::Warn,
6709            Arc::new(TelemetryCounters::default()),
6710        )
6711        .expect("writer");
6712
6713        // Insert node + chunk + vec row
6714        writer
6715            .submit(WriteRequest {
6716                label: "setup".to_owned(),
6717                nodes: vec![NodeInsert {
6718                    row_id: "row-retire-vec".to_owned(),
6719                    logical_id: "node-retire-vec".to_owned(),
6720                    kind: "Doc".to_owned(),
6721                    properties: "{}".to_owned(),
6722                    source_ref: Some("src".to_owned()),
6723                    upsert: false,
6724                    chunk_policy: ChunkPolicy::Preserve,
6725                    content_ref: None,
6726                }],
6727                node_retires: vec![],
6728                edges: vec![],
6729                edge_retires: vec![],
6730                chunks: vec![ChunkInsert {
6731                    id: "chunk-retire-vec".to_owned(),
6732                    node_logical_id: "node-retire-vec".to_owned(),
6733                    text_content: "text".to_owned(),
6734                    byte_start: None,
6735                    byte_end: None,
6736                    content_hash: None,
6737                }],
6738                runs: vec![],
6739                steps: vec![],
6740                actions: vec![],
6741                optional_backfills: vec![],
6742                vec_inserts: vec![VecInsert {
6743                    chunk_id: "chunk-retire-vec".to_owned(),
6744                    embedding: vec![0.1, 0.2, 0.3],
6745                }],
6746                operational_writes: vec![],
6747            })
6748            .expect("setup write");
6749
6750        // Retire the node
6751        writer
6752            .submit(WriteRequest {
6753                label: "retire".to_owned(),
6754                nodes: vec![],
6755                node_retires: vec![NodeRetire {
6756                    logical_id: "node-retire-vec".to_owned(),
6757                    source_ref: Some("src".to_owned()),
6758                }],
6759                edges: vec![],
6760                edge_retires: vec![],
6761                chunks: vec![],
6762                runs: vec![],
6763                steps: vec![],
6764                actions: vec![],
6765                optional_backfills: vec![],
6766                vec_inserts: vec![],
6767                operational_writes: vec![],
6768            })
6769            .expect("retire write");
6770
6771        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6772        let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
6773        let count: i64 = conn
6774            .query_row(
6775                &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-retire-vec'"),
6776                [],
6777                |row| row.get(0),
6778            )
6779            .expect("count");
6780        assert_eq!(
6781            count, 1,
6782            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6783        );
6784    }
6785
6786    #[cfg(feature = "sqlite-vec")]
6787    #[test]
6788    #[allow(clippy::too_many_lines)]
6789    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6790        use crate::sqlite::open_connection_with_vec;
6791
6792        let db = NamedTempFile::new().expect("temporary db");
6793        let schema_manager = Arc::new(SchemaManager::new());
6794
6795        {
6796            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6797            schema_manager.bootstrap(&conn).expect("bootstrap");
6798        }
6799
6800        let writer = WriterActor::start(
6801            db.path(),
6802            Arc::clone(&schema_manager),
6803            ProvenanceMode::Warn,
6804            Arc::new(TelemetryCounters::default()),
6805        )
6806        .expect("writer");
6807
6808        // Insert node + chunk-A + vec-A
6809        writer
6810            .submit(WriteRequest {
6811                label: "v1".to_owned(),
6812                nodes: vec![NodeInsert {
6813                    row_id: "row-replace-v1".to_owned(),
6814                    logical_id: "node-replace-vec".to_owned(),
6815                    kind: "Doc".to_owned(),
6816                    properties: "{}".to_owned(),
6817                    source_ref: Some("src".to_owned()),
6818                    upsert: false,
6819                    chunk_policy: ChunkPolicy::Preserve,
6820                    content_ref: None,
6821                }],
6822                node_retires: vec![],
6823                edges: vec![],
6824                edge_retires: vec![],
6825                chunks: vec![ChunkInsert {
6826                    id: "chunk-replace-A".to_owned(),
6827                    node_logical_id: "node-replace-vec".to_owned(),
6828                    text_content: "version one".to_owned(),
6829                    byte_start: None,
6830                    byte_end: None,
6831                    content_hash: None,
6832                }],
6833                runs: vec![],
6834                steps: vec![],
6835                actions: vec![],
6836                optional_backfills: vec![],
6837                vec_inserts: vec![VecInsert {
6838                    chunk_id: "chunk-replace-A".to_owned(),
6839                    embedding: vec![0.1, 0.2, 0.3],
6840                }],
6841                operational_writes: vec![],
6842            })
6843            .expect("v1 write");
6844
6845        // Upsert with Replace + chunk-B + vec-B
6846        writer
6847            .submit(WriteRequest {
6848                label: "v2".to_owned(),
6849                nodes: vec![NodeInsert {
6850                    row_id: "row-replace-v2".to_owned(),
6851                    logical_id: "node-replace-vec".to_owned(),
6852                    kind: "Doc".to_owned(),
6853                    properties: "{}".to_owned(),
6854                    source_ref: Some("src".to_owned()),
6855                    upsert: true,
6856                    chunk_policy: ChunkPolicy::Replace,
6857                    content_ref: None,
6858                }],
6859                node_retires: vec![],
6860                edges: vec![],
6861                edge_retires: vec![],
6862                chunks: vec![ChunkInsert {
6863                    id: "chunk-replace-B".to_owned(),
6864                    node_logical_id: "node-replace-vec".to_owned(),
6865                    text_content: "version two".to_owned(),
6866                    byte_start: None,
6867                    byte_end: None,
6868                    content_hash: None,
6869                }],
6870                runs: vec![],
6871                steps: vec![],
6872                actions: vec![],
6873                optional_backfills: vec![],
6874                vec_inserts: vec![VecInsert {
6875                    chunk_id: "chunk-replace-B".to_owned(),
6876                    embedding: vec![0.4, 0.5, 0.6],
6877                }],
6878                operational_writes: vec![],
6879            })
6880            .expect("v2 write");
6881
6882        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6883        let vec_table = fathomdb_schema::vec_kind_table_name("Doc");
6884        let count_a: i64 = conn
6885            .query_row(
6886                &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-replace-A'"),
6887                [],
6888                |row| row.get(0),
6889            )
6890            .expect("count A");
6891        let count_b: i64 = conn
6892            .query_row(
6893                &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-replace-B'"),
6894                [],
6895                |row| row.get(0),
6896            )
6897            .expect("count B");
6898        assert_eq!(
6899            count_a, 0,
6900            "old vec row (chunk-A) must be deleted on Replace"
6901        );
6902        assert_eq!(
6903            count_b, 1,
6904            "new vec row (chunk-B) must be present after Replace"
6905        );
6906    }
6907
6908    #[cfg(feature = "sqlite-vec")]
6909    #[test]
6910    fn vec_insert_is_persisted_when_feature_enabled() {
6911        use crate::sqlite::open_connection_with_vec;
6912
6913        let db = NamedTempFile::new().expect("temporary db");
6914        let schema_manager = Arc::new(SchemaManager::new());
6915
6916        // Bootstrap the schema (vec table is auto-created on first VecInsert).
6917        {
6918            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6919            schema_manager.bootstrap(&conn).expect("bootstrap");
6920        }
6921
6922        let writer = WriterActor::start(
6923            db.path(),
6924            Arc::clone(&schema_manager),
6925            ProvenanceMode::Warn,
6926            Arc::new(TelemetryCounters::default()),
6927        )
6928        .expect("writer");
6929
6930        // Submit a node + chunk + vec insert so kind can be resolved from the request.
6931        writer
6932            .submit(WriteRequest {
6933                label: "vec-insert".to_owned(),
6934                nodes: vec![NodeInsert {
6935                    row_id: "row-vec".to_owned(),
6936                    logical_id: "node-vec".to_owned(),
6937                    kind: "Document".to_owned(),
6938                    properties: "{}".to_owned(),
6939                    source_ref: Some("test".to_owned()),
6940                    upsert: false,
6941                    chunk_policy: ChunkPolicy::Preserve,
6942                    content_ref: None,
6943                }],
6944                node_retires: vec![],
6945                edges: vec![],
6946                edge_retires: vec![],
6947                chunks: vec![ChunkInsert {
6948                    id: "chunk-vec".to_owned(),
6949                    node_logical_id: "node-vec".to_owned(),
6950                    text_content: "test text".to_owned(),
6951                    byte_start: None,
6952                    byte_end: None,
6953                    content_hash: None,
6954                }],
6955                runs: vec![],
6956                steps: vec![],
6957                actions: vec![],
6958                optional_backfills: vec![],
6959                vec_inserts: vec![VecInsert {
6960                    chunk_id: "chunk-vec".to_owned(),
6961                    embedding: vec![0.1, 0.2, 0.3],
6962                }],
6963                operational_writes: vec![],
6964            })
6965            .expect("vec insert write");
6966
6967        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6968        let vec_table = fathomdb_schema::vec_kind_table_name("Document");
6969        let count: i64 = conn
6970            .query_row(
6971                &format!("SELECT count(*) FROM {vec_table} WHERE chunk_id = 'chunk-vec'"),
6972                [],
6973                |row| row.get(0),
6974            )
6975            .expect("count");
6976        assert_eq!(count, 1, "VecInsert must persist a row in vec_document");
6977    }
6978
6979    // --- WriteRequest size validation tests ---
6980
6981    #[test]
6982    fn write_request_exceeding_node_limit_is_rejected() {
6983        let nodes: Vec<NodeInsert> = (0..10_001)
6984            .map(|i| NodeInsert {
6985                row_id: format!("row-{i}"),
6986                logical_id: format!("lg-{i}"),
6987                kind: "Note".to_owned(),
6988                properties: "{}".to_owned(),
6989                source_ref: None,
6990                upsert: false,
6991                chunk_policy: ChunkPolicy::Preserve,
6992                content_ref: None,
6993            })
6994            .collect();
6995
6996        let request = WriteRequest {
6997            label: "too-many-nodes".to_owned(),
6998            nodes,
6999            node_retires: vec![],
7000            edges: vec![],
7001            edge_retires: vec![],
7002            chunks: vec![],
7003            runs: vec![],
7004            steps: vec![],
7005            actions: vec![],
7006            optional_backfills: vec![],
7007            vec_inserts: vec![],
7008            operational_writes: vec![],
7009        };
7010
7011        let result = prepare_write(request, ProvenanceMode::Warn)
7012            .map(|_| ())
7013            .map_err(|e| format!("{e}"));
7014        assert!(
7015            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
7016            "exceeding node limit must return InvalidWrite: got {result:?}"
7017        );
7018    }
7019
7020    #[test]
7021    fn write_request_exceeding_total_limit_is_rejected() {
7022        // Spread items across fields to exceed 100_000 total
7023        // without exceeding any single per-field limit.
7024        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
7025        let request = WriteRequest {
7026            label: "too-many-total".to_owned(),
7027            nodes: (0..10_000)
7028                .map(|i| NodeInsert {
7029                    row_id: format!("row-{i}"),
7030                    logical_id: format!("lg-{i}"),
7031                    kind: "Note".to_owned(),
7032                    properties: "{}".to_owned(),
7033                    source_ref: None,
7034                    upsert: false,
7035                    chunk_policy: ChunkPolicy::Preserve,
7036                    content_ref: None,
7037                })
7038                .collect(),
7039            node_retires: vec![],
7040            edges: (0..10_000)
7041                .map(|i| EdgeInsert {
7042                    row_id: format!("edge-row-{i}"),
7043                    logical_id: format!("edge-lg-{i}"),
7044                    kind: "link".to_owned(),
7045                    source_logical_id: format!("lg-{i}"),
7046                    target_logical_id: format!("lg-{}", i + 1),
7047                    properties: "{}".to_owned(),
7048                    source_ref: None,
7049                    upsert: false,
7050                })
7051                .collect(),
7052            edge_retires: vec![],
7053            chunks: (0..50_000)
7054                .map(|i| ChunkInsert {
7055                    id: format!("chunk-{i}"),
7056                    node_logical_id: "lg-0".to_owned(),
7057                    text_content: "text".to_owned(),
7058                    byte_start: None,
7059                    byte_end: None,
7060                    content_hash: None,
7061                })
7062                .collect(),
7063            runs: vec![],
7064            steps: vec![],
7065            actions: vec![],
7066            optional_backfills: vec![],
7067            vec_inserts: (0..20_001)
7068                .map(|i| VecInsert {
7069                    chunk_id: format!("vec-chunk-{i}"),
7070                    embedding: vec![0.1],
7071                })
7072                .collect(),
7073            operational_writes: (0..10_000)
7074                .map(|i| OperationalWrite::Append {
7075                    collection: format!("col-{i}"),
7076                    record_key: format!("key-{i}"),
7077                    payload_json: "{}".to_owned(),
7078                    source_ref: None,
7079                })
7080                .collect(),
7081        };
7082
7083        let result = prepare_write(request, ProvenanceMode::Warn)
7084            .map(|_| ())
7085            .map_err(|e| format!("{e}"));
7086        assert!(
7087            matches!(result, Err(ref msg) if msg.contains("too many total items")),
7088            "exceeding total item limit must return InvalidWrite: got {result:?}"
7089        );
7090    }
7091
7092    #[test]
7093    fn write_request_within_limits_succeeds() {
7094        let db = NamedTempFile::new().expect("temporary db");
7095        let writer = WriterActor::start(
7096            db.path(),
7097            Arc::new(SchemaManager::new()),
7098            ProvenanceMode::Warn,
7099            Arc::new(TelemetryCounters::default()),
7100        )
7101        .expect("writer");
7102
7103        let result = writer.submit(WriteRequest {
7104            label: "within-limits".to_owned(),
7105            nodes: vec![NodeInsert {
7106                row_id: "row-1".to_owned(),
7107                logical_id: "lg-1".to_owned(),
7108                kind: "Note".to_owned(),
7109                properties: "{}".to_owned(),
7110                source_ref: None,
7111                upsert: false,
7112                chunk_policy: ChunkPolicy::Preserve,
7113                content_ref: None,
7114            }],
7115            node_retires: vec![],
7116            edges: vec![],
7117            edge_retires: vec![],
7118            chunks: vec![],
7119            runs: vec![],
7120            steps: vec![],
7121            actions: vec![],
7122            optional_backfills: vec![],
7123            vec_inserts: vec![],
7124            operational_writes: vec![],
7125        });
7126
7127        assert!(
7128            result.is_ok(),
7129            "write request within limits must succeed: got {result:?}"
7130        );
7131    }
7132
7133    #[test]
7134    fn property_fts_rows_created_on_node_insert() {
7135        let db = NamedTempFile::new().expect("temporary db");
7136        // Bootstrap and register a property schema before starting the writer.
7137        let schema = Arc::new(SchemaManager::new());
7138        {
7139            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7140            schema.bootstrap(&conn).expect("bootstrap");
7141            conn.execute(
7142                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7143                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
7144                [],
7145            )
7146            .expect("register schema");
7147        }
7148        let writer = WriterActor::start(
7149            db.path(),
7150            Arc::clone(&schema),
7151            ProvenanceMode::Warn,
7152            Arc::new(TelemetryCounters::default()),
7153        )
7154        .expect("writer");
7155
7156        writer
7157            .submit(WriteRequest {
7158                label: "goal-insert".to_owned(),
7159                nodes: vec![NodeInsert {
7160                    row_id: "row-1".to_owned(),
7161                    logical_id: "goal-1".to_owned(),
7162                    kind: "Goal".to_owned(),
7163                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
7164                        .to_owned(),
7165                    source_ref: Some("src-1".to_owned()),
7166                    upsert: false,
7167                    chunk_policy: ChunkPolicy::Preserve,
7168                    content_ref: None,
7169                }],
7170                node_retires: vec![],
7171                edges: vec![],
7172                edge_retires: vec![],
7173                chunks: vec![],
7174                runs: vec![],
7175                steps: vec![],
7176                actions: vec![],
7177                optional_backfills: vec![],
7178                vec_inserts: vec![],
7179                operational_writes: vec![],
7180            })
7181            .expect("write");
7182
7183        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7184        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7185        let text: String = conn
7186            .query_row(
7187                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7188                [],
7189                |row| row.get(0),
7190            )
7191            .expect("property FTS row must exist");
7192        assert_eq!(text, "Ship v2 Launch the redesign");
7193    }
7194
7195    #[test]
7196    fn property_fts_rows_replaced_on_upsert() {
7197        let db = NamedTempFile::new().expect("temporary db");
7198        let schema = Arc::new(SchemaManager::new());
7199        {
7200            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7201            schema.bootstrap(&conn).expect("bootstrap");
7202            conn.execute(
7203                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7204                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7205                [],
7206            )
7207            .expect("register schema");
7208        }
7209        let writer = WriterActor::start(
7210            db.path(),
7211            Arc::clone(&schema),
7212            ProvenanceMode::Warn,
7213            Arc::new(TelemetryCounters::default()),
7214        )
7215        .expect("writer");
7216
7217        // Initial insert.
7218        writer
7219            .submit(WriteRequest {
7220                label: "insert".to_owned(),
7221                nodes: vec![NodeInsert {
7222                    row_id: "row-1".to_owned(),
7223                    logical_id: "goal-1".to_owned(),
7224                    kind: "Goal".to_owned(),
7225                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7226                    source_ref: Some("src-1".to_owned()),
7227                    upsert: false,
7228                    chunk_policy: ChunkPolicy::Preserve,
7229                    content_ref: None,
7230                }],
7231                node_retires: vec![],
7232                edges: vec![],
7233                edge_retires: vec![],
7234                chunks: vec![],
7235                runs: vec![],
7236                steps: vec![],
7237                actions: vec![],
7238                optional_backfills: vec![],
7239                vec_inserts: vec![],
7240                operational_writes: vec![],
7241            })
7242            .expect("insert");
7243
7244        // Upsert with changed property.
7245        writer
7246            .submit(WriteRequest {
7247                label: "upsert".to_owned(),
7248                nodes: vec![NodeInsert {
7249                    row_id: "row-2".to_owned(),
7250                    logical_id: "goal-1".to_owned(),
7251                    kind: "Goal".to_owned(),
7252                    properties: r#"{"name":"Beta"}"#.to_owned(),
7253                    source_ref: Some("src-2".to_owned()),
7254                    upsert: true,
7255                    chunk_policy: ChunkPolicy::Preserve,
7256                    content_ref: None,
7257                }],
7258                node_retires: vec![],
7259                edges: vec![],
7260                edge_retires: vec![],
7261                chunks: vec![],
7262                runs: vec![],
7263                steps: vec![],
7264                actions: vec![],
7265                optional_backfills: vec![],
7266                vec_inserts: vec![],
7267                operational_writes: vec![],
7268            })
7269            .expect("upsert");
7270
7271        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7272        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7273        let count: i64 = conn
7274            .query_row(
7275                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7276                [],
7277                |row| row.get(0),
7278            )
7279            .expect("count");
7280        assert_eq!(
7281            count, 1,
7282            "must have exactly one property FTS row after upsert"
7283        );
7284
7285        let text: String = conn
7286            .query_row(
7287                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7288                [],
7289                |row| row.get(0),
7290            )
7291            .expect("text");
7292        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7293    }
7294
7295    #[test]
7296    fn property_fts_rows_deleted_on_retire() {
7297        let db = NamedTempFile::new().expect("temporary db");
7298        let schema = Arc::new(SchemaManager::new());
7299        {
7300            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7301            schema.bootstrap(&conn).expect("bootstrap");
7302            conn.execute(
7303                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7304                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7305                [],
7306            )
7307            .expect("register schema");
7308        }
7309        let writer = WriterActor::start(
7310            db.path(),
7311            Arc::clone(&schema),
7312            ProvenanceMode::Warn,
7313            Arc::new(TelemetryCounters::default()),
7314        )
7315        .expect("writer");
7316
7317        // Insert a node.
7318        writer
7319            .submit(WriteRequest {
7320                label: "insert".to_owned(),
7321                nodes: vec![NodeInsert {
7322                    row_id: "row-1".to_owned(),
7323                    logical_id: "goal-1".to_owned(),
7324                    kind: "Goal".to_owned(),
7325                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7326                    source_ref: Some("src-1".to_owned()),
7327                    upsert: false,
7328                    chunk_policy: ChunkPolicy::Preserve,
7329                    content_ref: None,
7330                }],
7331                node_retires: vec![],
7332                edges: vec![],
7333                edge_retires: vec![],
7334                chunks: vec![],
7335                runs: vec![],
7336                steps: vec![],
7337                actions: vec![],
7338                optional_backfills: vec![],
7339                vec_inserts: vec![],
7340                operational_writes: vec![],
7341            })
7342            .expect("insert");
7343
7344        // Retire the node.
7345        writer
7346            .submit(WriteRequest {
7347                label: "retire".to_owned(),
7348                nodes: vec![],
7349                node_retires: vec![NodeRetire {
7350                    logical_id: "goal-1".to_owned(),
7351                    source_ref: Some("forget-1".to_owned()),
7352                }],
7353                edges: vec![],
7354                edge_retires: vec![],
7355                chunks: vec![],
7356                runs: vec![],
7357                steps: vec![],
7358                actions: vec![],
7359                optional_backfills: vec![],
7360                vec_inserts: vec![],
7361                operational_writes: vec![],
7362            })
7363            .expect("retire");
7364
7365        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7366        let table = fathomdb_schema::fts_kind_table_name("Goal");
7367        let count: i64 = conn
7368            .query_row(
7369                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
7370                [],
7371                |row| row.get(0),
7372            )
7373            .expect("count");
7374        assert_eq!(count, 0, "property FTS row must be deleted on retire");
7375    }
7376
7377    #[test]
7378    fn no_property_fts_row_for_unregistered_kind() {
7379        let db = NamedTempFile::new().expect("temporary db");
7380        let schema = Arc::new(SchemaManager::new());
7381        {
7382            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7383            schema.bootstrap(&conn).expect("bootstrap");
7384            // No schema registered for "Note" kind.
7385        }
7386        let writer = WriterActor::start(
7387            db.path(),
7388            Arc::clone(&schema),
7389            ProvenanceMode::Warn,
7390            Arc::new(TelemetryCounters::default()),
7391        )
7392        .expect("writer");
7393
7394        writer
7395            .submit(WriteRequest {
7396                label: "insert".to_owned(),
7397                nodes: vec![NodeInsert {
7398                    row_id: "row-1".to_owned(),
7399                    logical_id: "note-1".to_owned(),
7400                    kind: "Note".to_owned(),
7401                    properties: r#"{"title":"hello"}"#.to_owned(),
7402                    source_ref: Some("src-1".to_owned()),
7403                    upsert: false,
7404                    chunk_policy: ChunkPolicy::Preserve,
7405                    content_ref: None,
7406                }],
7407                node_retires: vec![],
7408                edges: vec![],
7409                edge_retires: vec![],
7410                chunks: vec![],
7411                runs: vec![],
7412                steps: vec![],
7413                actions: vec![],
7414                optional_backfills: vec![],
7415                vec_inserts: vec![],
7416                operational_writes: vec![],
7417            })
7418            .expect("insert");
7419
7420        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7421        let table = fathomdb_schema::fts_kind_table_name("Note");
7422        let exists: bool = conn
7423            .query_row(
7424                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
7425                rusqlite::params![table],
7426                |row| row.get::<_, i64>(0),
7427            )
7428            .map(|c| c > 0)
7429            .expect("exists check");
7430        assert!(
7431            !exists,
7432            "no per-kind FTS table should be created for unregistered kind"
7433        );
7434    }
7435
7436    // --- A-6 per-kind FTS table tests ---
7437
7438    #[test]
7439    fn property_fts_write_goes_to_per_kind_table() {
7440        // After A-6: writes go to the per-kind FTS table, NOT fts_node_properties.
7441        let db = NamedTempFile::new().expect("temporary db");
7442        let schema = Arc::new(SchemaManager::new());
7443        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7444        {
7445            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7446            schema.bootstrap(&conn).expect("bootstrap");
7447            conn.execute(
7448                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7449                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7450                [],
7451            )
7452            .expect("register schema");
7453            // Create per-kind table (migration 21 would do this for existing schemas).
7454            conn.execute_batch(&format!(
7455                "CREATE VIRTUAL TABLE IF NOT EXISTS {goal_table} USING fts5(\
7456                    node_logical_id UNINDEXED, text_content, \
7457                    tokenize = 'porter unicode61 remove_diacritics 2'\
7458                )"
7459            ))
7460            .expect("create per-kind table");
7461        }
7462        let writer = WriterActor::start(
7463            db.path(),
7464            Arc::clone(&schema),
7465            ProvenanceMode::Warn,
7466            Arc::new(TelemetryCounters::default()),
7467        )
7468        .expect("writer");
7469
7470        writer
7471            .submit(WriteRequest {
7472                label: "insert".to_owned(),
7473                nodes: vec![NodeInsert {
7474                    row_id: "row-1".to_owned(),
7475                    logical_id: "goal-1".to_owned(),
7476                    kind: "Goal".to_owned(),
7477                    properties: r#"{"name":"Ship v2"}"#.to_owned(),
7478                    source_ref: Some("src-1".to_owned()),
7479                    upsert: false,
7480                    chunk_policy: ChunkPolicy::Preserve,
7481                    content_ref: None,
7482                }],
7483                node_retires: vec![],
7484                edges: vec![],
7485                edge_retires: vec![],
7486                chunks: vec![],
7487                runs: vec![],
7488                steps: vec![],
7489                actions: vec![],
7490                optional_backfills: vec![],
7491                vec_inserts: vec![],
7492                operational_writes: vec![],
7493            })
7494            .expect("write");
7495
7496        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7497        // Per-kind table must have the row.
7498        let per_kind_count: i64 = conn
7499            .query_row(
7500                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7501                [],
7502                |row| row.get(0),
7503            )
7504            .expect("per-kind count");
7505        assert_eq!(per_kind_count, 1, "per-kind table must have the row");
7506    }
7507
7508    #[test]
7509    fn property_fts_retire_removes_from_per_kind_table() {
7510        // After A-6: retire removes from fts_props_goal, NOT fts_node_properties.
7511        let db = NamedTempFile::new().expect("temporary db");
7512        let schema = Arc::new(SchemaManager::new());
7513        {
7514            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7515            schema.bootstrap(&conn).expect("bootstrap");
7516            conn.execute(
7517                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7518                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7519                [],
7520            )
7521            .expect("register schema");
7522            conn.execute_batch(
7523                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7524                    node_logical_id UNINDEXED, text_content, \
7525                    tokenize = 'porter unicode61 remove_diacritics 2'\
7526                )",
7527            )
7528            .expect("create per-kind table");
7529        }
7530        let writer = WriterActor::start(
7531            db.path(),
7532            Arc::clone(&schema),
7533            ProvenanceMode::Warn,
7534            Arc::new(TelemetryCounters::default()),
7535        )
7536        .expect("writer");
7537
7538        // Insert.
7539        writer
7540            .submit(WriteRequest {
7541                label: "insert".to_owned(),
7542                nodes: vec![NodeInsert {
7543                    row_id: "row-1".to_owned(),
7544                    logical_id: "goal-1".to_owned(),
7545                    kind: "Goal".to_owned(),
7546                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7547                    source_ref: Some("src-1".to_owned()),
7548                    upsert: false,
7549                    chunk_policy: ChunkPolicy::Preserve,
7550                    content_ref: None,
7551                }],
7552                node_retires: vec![],
7553                edges: vec![],
7554                edge_retires: vec![],
7555                chunks: vec![],
7556                runs: vec![],
7557                steps: vec![],
7558                actions: vec![],
7559                optional_backfills: vec![],
7560                vec_inserts: vec![],
7561                operational_writes: vec![],
7562            })
7563            .expect("insert");
7564
7565        // Retire.
7566        writer
7567            .submit(WriteRequest {
7568                label: "retire".to_owned(),
7569                nodes: vec![],
7570                node_retires: vec![NodeRetire {
7571                    logical_id: "goal-1".to_owned(),
7572                    source_ref: Some("forget-1".to_owned()),
7573                }],
7574                edges: vec![],
7575                edge_retires: vec![],
7576                chunks: vec![],
7577                runs: vec![],
7578                steps: vec![],
7579                actions: vec![],
7580                optional_backfills: vec![],
7581                vec_inserts: vec![],
7582                operational_writes: vec![],
7583            })
7584            .expect("retire");
7585
7586        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7587        let per_kind_count: i64 = conn
7588            .query_row(
7589                "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7590                [],
7591                |row| row.get(0),
7592            )
7593            .expect("per-kind count");
7594        assert_eq!(
7595            per_kind_count, 0,
7596            "per-kind table row must be removed on retire"
7597        );
7598    }
7599
7600    mod extract_json_path_tests {
7601        use super::super::extract_json_path;
7602        use serde_json::json;
7603
7604        #[test]
7605        fn string_value() {
7606            let v = json!({"name": "alice"});
7607            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7608        }
7609
7610        #[test]
7611        fn number_value() {
7612            let v = json!({"age": 42});
7613            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7614        }
7615
7616        #[test]
7617        fn bool_value() {
7618            let v = json!({"active": true});
7619            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7620        }
7621
7622        #[test]
7623        fn null_value() {
7624            let v = json!({"x": null});
7625            assert!(extract_json_path(&v, "$.x").is_empty());
7626        }
7627
7628        #[test]
7629        fn missing_path() {
7630            let v = json!({"name": "a"});
7631            assert!(extract_json_path(&v, "$.missing").is_empty());
7632        }
7633
7634        #[test]
7635        fn nested_path() {
7636            let v = json!({"address": {"city": "NYC"}});
7637            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7638        }
7639
7640        #[test]
7641        fn array_of_strings() {
7642            let v = json!({"tags": ["a", "b", "c"]});
7643            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7644        }
7645
7646        #[test]
7647        fn array_mixed_scalars() {
7648            let v = json!({"vals": ["x", 1, true]});
7649            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7650        }
7651
7652        #[test]
7653        fn array_only_objects_returns_empty() {
7654            let v = json!({"data": [{"k": "v"}]});
7655            assert!(extract_json_path(&v, "$.data").is_empty());
7656        }
7657
7658        #[test]
7659        fn array_mixed_objects_and_scalars() {
7660            let v = json!({"data": ["keep", {"skip": true}, "also"]});
7661            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7662        }
7663
7664        #[test]
7665        fn object_returns_empty() {
7666            let v = json!({"meta": {"k": "v"}});
7667            assert!(extract_json_path(&v, "$.meta").is_empty());
7668        }
7669
7670        #[test]
7671        fn no_prefix_returns_empty() {
7672            let v = json!({"name": "a"});
7673            assert!(extract_json_path(&v, "name").is_empty());
7674        }
7675    }
7676
7677    mod recursive_extraction_tests {
7678        use super::super::{
7679            LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7680            PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7681        };
7682        use serde_json::json;
7683
7684        fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7685            PropertyFtsSchema {
7686                paths,
7687                separator: " ".to_owned(),
7688                exclude_paths: Vec::new(),
7689            }
7690        }
7691
7692        fn schema_with_excludes(
7693            paths: Vec<PropertyPathEntry>,
7694            excludes: Vec<String>,
7695        ) -> PropertyFtsSchema {
7696            PropertyFtsSchema {
7697                paths,
7698                separator: " ".to_owned(),
7699                exclude_paths: excludes,
7700            }
7701        }
7702
7703        #[test]
7704        fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7705            let props = json!({"payload": {"b": "two", "a": "one"}});
7706            let (blob, positions, _stats) = extract_property_fts(
7707                &props,
7708                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7709            );
7710            let blob = blob.expect("blob emitted");
7711            let idx_one = blob.find("one").expect("contains 'one'");
7712            let idx_two = blob.find("two").expect("contains 'two'");
7713            assert!(
7714                idx_one < idx_two,
7715                "lex order: 'one' (key a) before 'two' (key b)"
7716            );
7717            assert_eq!(positions.len(), 2);
7718            assert_eq!(positions[0].leaf_path, "$.payload.a");
7719            assert_eq!(positions[1].leaf_path, "$.payload.b");
7720        }
7721
7722        #[test]
7723        fn recursive_extraction_walks_arrays_of_scalars() {
7724            let props = json!({"tags": ["red", "blue"]});
7725            let (_blob, positions, _stats) = extract_property_fts(
7726                &props,
7727                &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7728            );
7729            assert_eq!(positions.len(), 2);
7730            assert_eq!(positions[0].leaf_path, "$.tags[0]");
7731            assert_eq!(positions[1].leaf_path, "$.tags[1]");
7732        }
7733
7734        #[test]
7735        fn recursive_extraction_walks_arrays_of_objects() {
7736            let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7737            let (_blob, positions, _stats) = extract_property_fts(
7738                &props,
7739                &schema(vec![PropertyPathEntry::recursive("$.items")]),
7740            );
7741            assert_eq!(positions.len(), 2);
7742            assert_eq!(positions[0].leaf_path, "$.items[0].name");
7743            assert_eq!(positions[1].leaf_path, "$.items[1].name");
7744        }
7745
7746        #[test]
7747        fn recursive_extraction_stringifies_numbers_and_bools() {
7748            let props = json!({"root": {"n": 42, "ok": true}});
7749            let (blob, _positions, _stats) = extract_property_fts(
7750                &props,
7751                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7752            );
7753            let blob = blob.expect("blob emitted");
7754            assert!(blob.contains("42"));
7755            assert!(blob.contains("true"));
7756        }
7757
7758        #[test]
7759        fn recursive_extraction_skips_nulls_and_missing() {
7760            let props = json!({"root": {"x": null, "y": "present"}});
7761            let (blob, positions, _stats) = extract_property_fts(
7762                &props,
7763                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7764            );
7765            let blob = blob.expect("blob emitted");
7766            assert!(!blob.contains("null"));
7767            assert_eq!(positions.len(), 1);
7768            assert_eq!(positions[0].leaf_path, "$.root.y");
7769        }
7770
7771        #[test]
7772        fn recursive_extraction_respects_max_depth_guardrail() {
7773            // Build nested object 10 levels deep. MAX_RECURSIVE_DEPTH = 8.
7774            let mut inner = json!("leaf-value");
7775            for _ in 0..10 {
7776                inner = json!({ "k": inner });
7777            }
7778            let props = json!({ "root": inner });
7779            let (blob, positions, stats) = extract_property_fts(
7780                &props,
7781                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7782            );
7783            assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7784            // The walk should stop before reaching the leaf (depth 8).
7785            assert!(
7786                blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7787                "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7788            );
7789            // Row is still indexed with whatever was emitted — even if
7790            // that's nothing. The contract is only that the walk clamped.
7791            let _ = positions;
7792            let _ = MAX_RECURSIVE_DEPTH;
7793        }
7794
7795        #[test]
7796        fn recursive_extraction_respects_max_bytes_guardrail() {
7797            // Build an array of 4KB strings; total blob > 64KB.
7798            let leaves: Vec<String> = (0..40)
7799                .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7800                .collect();
7801            let props = json!({ "root": leaves });
7802            let (blob, positions, stats) = extract_property_fts(
7803                &props,
7804                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7805            );
7806            assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7807            let blob = blob.expect("blob must still be emitted");
7808            assert!(
7809                blob.len() <= MAX_EXTRACTED_BYTES,
7810                "blob must not exceed MAX_EXTRACTED_BYTES"
7811            );
7812            // No partial leaves: every position end must fall inside blob length.
7813            for pos in &positions {
7814                assert!(pos.end_offset <= blob.len());
7815                let slice = &blob[pos.start_offset..pos.end_offset];
7816                // Slice must equal some original leaf value; we only
7817                // verify it's not empty and doesn't straddle a separator.
7818                assert!(!slice.is_empty());
7819                assert!(!slice.contains(LEAF_SEPARATOR));
7820            }
7821            // Blob cannot end in a dangling separator.
7822            assert!(!blob.ends_with(LEAF_SEPARATOR));
7823        }
7824
7825        #[test]
7826        fn recursive_extraction_respects_exclude_paths() {
7827            let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7828            let (blob, positions, stats) = extract_property_fts(
7829                &props,
7830                &schema_with_excludes(
7831                    vec![PropertyPathEntry::recursive("$.payload")],
7832                    vec!["$.payload.priv".to_owned()],
7833                ),
7834            );
7835            let blob = blob.expect("blob emitted");
7836            assert!(blob.contains("yes"));
7837            assert!(!blob.contains("no"));
7838            assert_eq!(positions.len(), 1);
7839            assert_eq!(positions[0].leaf_path, "$.payload.pub");
7840            assert!(stats.excluded_subtree > 0);
7841        }
7842
7843        #[test]
7844        fn position_map_entries_match_emitted_leaves_in_order() {
7845            let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7846            let (blob, positions, _stats) = extract_property_fts(
7847                &props,
7848                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7849            );
7850            let blob = blob.expect("blob emitted");
7851            assert_eq!(positions.len(), 3);
7852            // Leaf paths in lexicographic order.
7853            assert_eq!(positions[0].leaf_path, "$.root.a");
7854            assert_eq!(positions[1].leaf_path, "$.root.b");
7855            assert_eq!(positions[2].leaf_path, "$.root.c");
7856            // Monotonic, non-overlapping offsets.
7857            let mut prev_end: usize = 0;
7858            for (i, pos) in positions.iter().enumerate() {
7859                assert!(pos.start_offset >= prev_end);
7860                assert!(pos.end_offset > pos.start_offset);
7861                assert!(pos.end_offset <= blob.len());
7862                let slice = &blob[pos.start_offset..pos.end_offset];
7863                match i {
7864                    0 => assert_eq!(slice, "alpha"),
7865                    1 => assert_eq!(slice, "bravo"),
7866                    2 => assert_eq!(slice, "charlie"),
7867                    _ => unreachable!(),
7868                }
7869                prev_end = pos.end_offset;
7870            }
7871        }
7872
7873        #[test]
7874        fn scalar_only_schema_produces_empty_position_map() {
7875            let props = json!({"name": "alpha", "title": "beta"});
7876            let (blob, positions, _stats) = extract_property_fts(
7877                &props,
7878                &schema(vec![
7879                    PropertyPathEntry::scalar("$.name"),
7880                    PropertyPathEntry::scalar("$.title"),
7881                ]),
7882            );
7883            assert_eq!(blob.as_deref(), Some("alpha beta"));
7884            assert!(
7885                positions.is_empty(),
7886                "scalar-only schema must emit no position entries"
7887            );
7888            // Sanity-check the type so the test fails loudly if the
7889            // signature changes.
7890            let _: Vec<PositionEntry> = positions;
7891        }
7892
7893        fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7894            let mut seen = std::collections::HashSet::new();
7895            for pos in positions {
7896                let start = pos.start_offset;
7897                assert!(
7898                    seen.insert(start),
7899                    "duplicate start_offset {start} in positions {positions:?}"
7900                );
7901            }
7902        }
7903
7904        #[test]
7905        fn recursive_extraction_empty_then_nonempty_in_array() {
7906            let props = json!({"payload": {"xs": ["", "x"]}});
7907            let (combined, positions, _stats) = extract_property_fts(
7908                &props,
7909                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7910            );
7911            assert_eq!(combined.as_deref(), Some("x"));
7912            assert_eq!(positions.len(), 1);
7913            assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7914            assert_eq!(positions[0].start_offset, 0);
7915            assert_eq!(positions[0].end_offset, 1);
7916            assert_unique_start_offsets(&positions);
7917        }
7918
7919        #[test]
7920        fn recursive_extraction_two_empties_then_nonempty_in_array() {
7921            let props = json!({"payload": {"xs": ["", "", "x"]}});
7922            let (combined, positions, _stats) = extract_property_fts(
7923                &props,
7924                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7925            );
7926            assert_eq!(combined.as_deref(), Some("x"));
7927            assert_eq!(positions.len(), 1);
7928            assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7929            assert_eq!(positions[0].start_offset, 0);
7930            assert_eq!(positions[0].end_offset, 1);
7931            assert_unique_start_offsets(&positions);
7932        }
7933
7934        #[test]
7935        fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7936            let props = json!({"payload": {"a": "", "b": "x"}});
7937            let (combined, positions, _stats) = extract_property_fts(
7938                &props,
7939                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7940            );
7941            assert_eq!(combined.as_deref(), Some("x"));
7942            assert_eq!(positions.len(), 1);
7943            assert_eq!(positions[0].leaf_path, "$.payload.b");
7944            assert_eq!(positions[0].start_offset, 0);
7945            assert_eq!(positions[0].end_offset, 1);
7946            assert_unique_start_offsets(&positions);
7947        }
7948
7949        #[test]
7950        fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7951            let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7952            let (combined, positions, _stats) = extract_property_fts(
7953                &props,
7954                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7955            );
7956            assert_eq!(combined.as_deref(), Some("x"));
7957            assert_eq!(positions.len(), 1);
7958            assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7959            assert_eq!(positions[0].start_offset, 0);
7960            assert_eq!(positions[0].end_offset, 1);
7961            assert_unique_start_offsets(&positions);
7962        }
7963
7964        #[test]
7965        fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7966            let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7967            let (combined, positions, _stats) = extract_property_fts(
7968                &props,
7969                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7970            );
7971            assert_eq!(combined.as_deref(), Some("x"));
7972            assert_eq!(positions.len(), 1);
7973            assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7974            assert_eq!(positions[0].start_offset, 0);
7975            assert_eq!(positions[0].end_offset, 1);
7976            assert_unique_start_offsets(&positions);
7977        }
7978
7979        #[test]
7980        fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7981            let cases = vec![
7982                json!({"payload": {}}),
7983                json!({"payload": {"a": ""}}),
7984                json!({"payload": {"xs": []}}),
7985                json!({"payload": {"xs": [""]}}),
7986                json!({"payload": {"xs": ["", ""]}}),
7987                json!({"payload": {"xs": ["", "", ""]}}),
7988            ];
7989            for case in cases {
7990                let (combined, positions, _stats) = extract_property_fts(
7991                    &case,
7992                    &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7993                );
7994                assert!(
7995                    combined.is_none(),
7996                    "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7997                );
7998                assert!(
7999                    positions.is_empty(),
8000                    "all-empty payload {case:?} must produce no positions, got {positions:?}"
8001                );
8002            }
8003        }
8004
8005        #[test]
8006        fn recursive_extraction_nonempty_then_empty_then_nonempty() {
8007            let props = json!({"payload": {"xs": ["x", "", "y"]}});
8008            let (combined, positions, _stats) = extract_property_fts(
8009                &props,
8010                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8011            );
8012            let blob = combined.expect("blob emitted");
8013            assert_eq!(positions.len(), 2);
8014            assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
8015            assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
8016            assert_eq!(positions[0].start_offset, 0);
8017            assert_eq!(positions[0].end_offset, 1);
8018            // The two non-empty leaves are separated by exactly one
8019            // LEAF_SEPARATOR; the empty middle leaf contributes neither
8020            // bytes nor a position.
8021            assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
8022            assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
8023            assert_eq!(
8024                &blob[positions[0].start_offset..positions[0].end_offset],
8025                "x"
8026            );
8027            assert_eq!(
8028                &blob[positions[1].start_offset..positions[1].end_offset],
8029                "y"
8030            );
8031            assert_unique_start_offsets(&positions);
8032        }
8033
8034        #[test]
8035        fn recursive_extraction_null_leaves_unchanged() {
8036            let props = json!({"payload": {"xs": [null, null]}});
8037            let (combined, positions, _stats) = extract_property_fts(
8038                &props,
8039                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8040            );
8041            assert!(combined.is_none());
8042            assert!(positions.is_empty());
8043        }
8044    }
8045
8046    mod parse_property_schema_json_tests {
8047        use super::super::parse_property_schema_json;
8048
8049        #[test]
8050        fn parse_property_schema_json_preserves_weight() {
8051            let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
8052            let schema = parse_property_schema_json(json, " ");
8053            assert_eq!(schema.paths.len(), 2);
8054            let title = &schema.paths[0];
8055            assert_eq!(title.path, "$.title");
8056            assert_eq!(title.weight, Some(2.0_f32));
8057            let body = &schema.paths[1];
8058            assert_eq!(body.path, "$.body");
8059            assert_eq!(body.weight, None);
8060        }
8061    }
8062
8063    // --- B-3: extract_property_fts_columns tests ---
8064
8065    mod extract_property_fts_columns_tests {
8066        use serde_json::json;
8067
8068        use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
8069
8070        fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
8071            PropertyFtsSchema {
8072                paths,
8073                separator: " ".to_owned(),
8074                exclude_paths: Vec::new(),
8075            }
8076        }
8077
8078        #[test]
8079        fn extract_property_fts_columns_returns_per_spec_text() {
8080            let props = json!({"title": "Hello", "body": "World"});
8081            let mut title_entry = PropertyPathEntry::scalar("$.title");
8082            title_entry.weight = Some(2.0);
8083            let mut body_entry = PropertyPathEntry::scalar("$.body");
8084            body_entry.weight = Some(1.0);
8085            let schema = weighted_schema(vec![title_entry, body_entry]);
8086            let cols = extract_property_fts_columns(&props, &schema);
8087            assert_eq!(cols.len(), 2);
8088            assert_eq!(cols[0].0, "title");
8089            assert_eq!(cols[0].1, "Hello");
8090            assert_eq!(cols[1].0, "body");
8091            assert_eq!(cols[1].1, "World");
8092        }
8093
8094        #[test]
8095        fn extract_property_fts_columns_missing_field_returns_empty_string() {
8096            let props = json!({"title": "Hello"});
8097            let mut title_entry = PropertyPathEntry::scalar("$.title");
8098            title_entry.weight = Some(2.0);
8099            let mut body_entry = PropertyPathEntry::scalar("$.body");
8100            body_entry.weight = Some(1.0);
8101            let schema = weighted_schema(vec![title_entry, body_entry]);
8102            let cols = extract_property_fts_columns(&props, &schema);
8103            assert_eq!(cols.len(), 2);
8104            assert_eq!(cols[0].1, "Hello");
8105            assert_eq!(cols[1].1, "", "missing field yields empty string");
8106        }
8107    }
8108
8109    // --- B-3: writer per-column INSERT for weighted schemas ---
8110
8111    #[test]
8112    fn writer_inserts_per_column_for_weighted_schema() {
8113        use fathomdb_schema::fts_column_name;
8114
8115        let db = NamedTempFile::new().expect("temporary db");
8116        let schema_manager = Arc::new(SchemaManager::new());
8117
8118        let article_table = fathomdb_schema::fts_kind_table_name("Article");
8119        // Bootstrap and create the weighted per-kind table.
8120        {
8121            let conn = rusqlite::Connection::open(db.path()).expect("conn");
8122            schema_manager.bootstrap(&conn).expect("bootstrap");
8123
8124            let title_col = fts_column_name("$.title", false);
8125            let body_col = fts_column_name("$.body", false);
8126
8127            // Register schema with weights.
8128            let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
8129            conn.execute(
8130                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
8131                 VALUES (?1, ?2, ' ')",
8132                rusqlite::params!["Article", paths_json],
8133            )
8134            .expect("register schema");
8135
8136            // Create the per-kind FTS table with per-column layout.
8137            conn.execute_batch(&format!(
8138                "CREATE VIRTUAL TABLE IF NOT EXISTS {article_table} USING fts5(\
8139                    node_logical_id UNINDEXED, {title_col}, {body_col}, \
8140                    tokenize = 'porter unicode61 remove_diacritics 2'\
8141                )"
8142            ))
8143            .expect("create weighted per-kind table");
8144        }
8145
8146        let writer = WriterActor::start(
8147            db.path(),
8148            Arc::clone(&schema_manager),
8149            ProvenanceMode::Warn,
8150            Arc::new(TelemetryCounters::default()),
8151        )
8152        .expect("writer");
8153
8154        writer
8155            .submit(WriteRequest {
8156                label: "insert".to_owned(),
8157                nodes: vec![NodeInsert {
8158                    row_id: "row-1".to_owned(),
8159                    logical_id: "article-1".to_owned(),
8160                    kind: "Article".to_owned(),
8161                    properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
8162                    source_ref: Some("src-1".to_owned()),
8163                    upsert: false,
8164                    chunk_policy: ChunkPolicy::Preserve,
8165                    content_ref: None,
8166                }],
8167                node_retires: vec![],
8168                edges: vec![],
8169                edge_retires: vec![],
8170                chunks: vec![],
8171                runs: vec![],
8172                steps: vec![],
8173                actions: vec![],
8174                optional_backfills: vec![],
8175                vec_inserts: vec![],
8176                operational_writes: vec![],
8177            })
8178            .expect("write");
8179
8180        let conn = rusqlite::Connection::open(db.path()).expect("conn");
8181        let title_col = fts_column_name("$.title", false);
8182        let body_col = fts_column_name("$.body", false);
8183
8184        // The per-kind table must have one row for article-1.
8185        let count: i64 = conn
8186            .query_row(
8187                &format!(
8188                    "SELECT count(*) FROM {article_table} WHERE node_logical_id = 'article-1'"
8189                ),
8190                [],
8191                |r| r.get(0),
8192            )
8193            .expect("count");
8194        assert_eq!(count, 1, "per-kind FTS table must have the row");
8195
8196        // Verify per-column values via direct SELECT (not MATCH).
8197        let (title_val, body_val): (String, String) = conn
8198            .query_row(
8199                &format!(
8200                    "SELECT {title_col}, {body_col} FROM {article_table} \
8201                     WHERE node_logical_id = 'article-1'"
8202                ),
8203                [],
8204                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
8205            )
8206            .expect("select per-column");
8207        assert_eq!(
8208            title_val, "Hello World",
8209            "title column must have correct value"
8210        );
8211        assert_eq!(body_val, "Foo Bar", "body column must have correct value");
8212    }
8213}