Skip to main content

fathomdb_engine/
writer.rs

1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::{DEFAULT_FTS_TOKENIZER, SchemaManager};
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16    OperationalValidationMode, extract_secondary_index_entries_for_current,
17    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18    parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23/// A deferred projection backfill task submitted alongside a write.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26    /// Which projection to backfill (FTS, vec, etc.).
27    pub target: ProjectionTarget,
28    /// JSON payload describing the backfill work.
29    pub payload: String,
30}
31
32/// Policy for handling existing chunks when upserting a node.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35    /// Keep existing chunks and FTS rows.
36    #[default]
37    Preserve,
38    /// Delete existing chunks and FTS rows before inserting new ones.
39    Replace,
40}
41
42/// Controls how missing `source_ref` values are handled at write time.
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
46    #[default]
47    Warn,
48    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
49    /// insert or retire is missing `source_ref`.
50    Require,
51}
52
53/// A node to be inserted in a [`WriteRequest`].
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56    pub row_id: String,
57    pub logical_id: String,
58    pub kind: String,
59    pub properties: String,
60    pub source_ref: Option<String>,
61    /// When true the writer supersedes the current active row for this `logical_id`
62    /// before inserting this new version. The supersession and insert are atomic.
63    pub upsert: bool,
64    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
65    pub chunk_policy: ChunkPolicy,
66    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
67    pub content_ref: Option<String>,
68}
69
70/// An edge to be inserted in a [`WriteRequest`].
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73    pub row_id: String,
74    pub logical_id: String,
75    pub source_logical_id: String,
76    pub target_logical_id: String,
77    pub kind: String,
78    pub properties: String,
79    pub source_ref: Option<String>,
80    /// When true the writer supersedes the current active edge for this `logical_id`
81    /// before inserting this new version. The supersession and insert are atomic.
82    pub upsert: bool,
83}
84
85/// A node to be retired (soft-deleted) in a [`WriteRequest`].
86#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88    pub logical_id: String,
89    pub source_ref: Option<String>,
90}
91
92/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95    pub logical_id: String,
96    pub source_ref: Option<String>,
97}
98
99/// A text chunk to be inserted in a [`WriteRequest`].
100#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102    pub id: String,
103    pub node_logical_id: String,
104    pub text_content: String,
105    pub byte_start: Option<i64>,
106    pub byte_end: Option<i64>,
107    /// Optional hash of the external content this chunk was derived from.
108    pub content_hash: Option<String>,
109}
110
111/// A vector embedding to attach to an existing chunk.
112///
113/// The `chunk_id` must reference a chunk already present in the database or
114/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
115/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
116/// without the feature the insert is silently skipped.
117#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119    pub chunk_id: String,
120    pub embedding: Vec<f32>,
121}
122
123/// A mutation to an operational collection submitted as part of a write request.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126    Append {
127        collection: String,
128        record_key: String,
129        payload_json: String,
130        source_ref: Option<String>,
131    },
132    Put {
133        collection: String,
134        record_key: String,
135        payload_json: String,
136        source_ref: Option<String>,
137    },
138    Delete {
139        collection: String,
140        record_key: String,
141        source_ref: Option<String>,
142    },
143}
144
145/// A run to be inserted in a [`WriteRequest`].
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148    pub id: String,
149    pub kind: String,
150    pub status: String,
151    pub properties: String,
152    pub source_ref: Option<String>,
153    pub upsert: bool,
154    pub supersedes_id: Option<String>,
155}
156
157/// A step to be inserted in a [`WriteRequest`].
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160    pub id: String,
161    pub run_id: String,
162    pub kind: String,
163    pub status: String,
164    pub properties: String,
165    pub source_ref: Option<String>,
166    pub upsert: bool,
167    pub supersedes_id: Option<String>,
168}
169
170/// An action to be inserted in a [`WriteRequest`].
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173    pub id: String,
174    pub step_id: String,
175    pub kind: String,
176    pub status: String,
177    pub properties: String,
178    pub source_ref: Option<String>,
179    pub upsert: bool,
180    pub supersedes_id: Option<String>,
181}
182
183// --- Write-request size limits ---
184const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
193///
194/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
195/// writer thread is **not** cancelled, so the write may still commit.  This is
196/// intentionally distinct from [`EngineError::WriterRejected`], which signals
197/// that the writer thread has disconnected and the write will **not** commit.
198const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
201#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203    pub label: String,
204    pub nodes: Vec<NodeInsert>,
205    pub node_retires: Vec<NodeRetire>,
206    pub edges: Vec<EdgeInsert>,
207    pub edge_retires: Vec<EdgeRetire>,
208    pub chunks: Vec<ChunkInsert>,
209    pub runs: Vec<RunInsert>,
210    pub steps: Vec<StepInsert>,
211    pub actions: Vec<ActionInsert>,
212    pub optional_backfills: Vec<OptionalProjectionTask>,
213    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
214    /// `sqlite-vec` feature is absent.
215    pub vec_inserts: Vec<VecInsert>,
216    pub operational_writes: Vec<OperationalWrite>,
217}
218
219/// Receipt returned after a successful write transaction.
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222    pub label: String,
223    pub optional_backfill_count: usize,
224    pub warnings: Vec<String>,
225    pub provenance_warnings: Vec<String>,
226}
227
228/// Request to update `last_accessed_at` timestamps for a batch of nodes.
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231    pub logical_ids: Vec<String>,
232    pub touched_at: i64,
233    pub source_ref: Option<String>,
234}
235
236/// Report from a last-access touch operation.
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239    pub touched_logical_ids: usize,
240    pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245    chunk_id: String,
246    node_logical_id: String,
247    kind: String,
248    text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253    node_logical_id: String,
254    kind: String,
255    text_content: String,
256    positions: Vec<PositionEntry>,
257    /// Per-column text extracted for weighted schemas. Empty for non-weighted schemas.
258    /// When non-empty, the writer uses per-column INSERT instead of single `text_content`.
259    columns: Vec<(String, String)>,
260}
261
262/// Maximum depth the recursive property-FTS extraction walk descends before
263/// clamping. A walk that reaches this depth will not emit leaves below it.
264pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
265
266/// Maximum blob size the recursive property-FTS extraction walk will emit.
267/// When the next complete leaf would push the blob past this cap, the walk
268/// stops on the preceding leaf boundary — the existing blob is indexed, the
269/// truncated leaf is not partially emitted.
270///
271/// This budget applies only to the recursive-walk portion of the blob.
272/// Scalar-prefix emissions and the scalar↔recursive [`LEAF_SEPARATOR`] token
273/// are not counted against it; the cap governs how much nested-subtree
274/// content is serialized, not the final blob length.
275pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
276
277/// Hard phrase-break separator inserted between two adjacent recursive
278/// leaves in the concatenated blob.
279///
280/// FTS5 phrase queries match on consecutive token positions, so simply
281/// inserting non-token whitespace or control characters does NOT prevent a
282/// phrase like `"foo bar"` from matching when `foo` ends one leaf and
283/// `bar` starts the next — the tokenizer would discard the control
284/// character and the two content tokens would still be adjacent.
285///
286/// To create a real position gap we embed a sentinel *token*
287/// `fathomdbphrasebreaksentinel` between leaves. Under `porter
288/// unicode61 remove_diacritics 2` this survives tokenization as its own
289/// position, so the content tokens on either side are separated by at
290/// least one intervening position and phrase queries spanning the gap
291/// cannot match. The sentinel is long, lowercase, and obviously
292/// synthetic to minimize the chance of accidental collisions with real
293/// content. It remains searchable as a word — callers should avoid
294/// passing it to `text_search` directly.
295///
296/// Validated by the integration test
297/// `leaf_separator_is_hard_phrase_break_under_unicode61_porter`.
298///
299/// Note: because the sentinel is a real token it may appear in FTS5
300/// `snippet()` output when the snippet window spans a leaf boundary.
301/// Phase 5 presentation-layer snippet post-processing is responsible for
302/// stripping it before the snippet is surfaced to callers.
303pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
304
305/// Whether a registered property-FTS path extracts a single scalar value
306/// or recursively walks the subtree rooted at the path.
307#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
308pub(crate) enum PropertyPathMode {
309    /// Legacy scalar extraction — resolve the path, append the scalar
310    /// (flattening arrays of scalars as before Phase 4).
311    #[default]
312    Scalar,
313    /// Recursively walk scalar leaves rooted at the path, emitting one
314    /// leaf per scalar with position-map tracking.
315    Recursive,
316}
317
318/// A single registered property-FTS path with its extraction mode.
319#[derive(Clone, Debug, PartialEq)]
320pub(crate) struct PropertyPathEntry {
321    pub path: String,
322    pub mode: PropertyPathMode,
323    /// Optional BM25 weight multiplier parsed from the rich JSON format.
324    pub weight: Option<f32>,
325}
326
327// f32 does not implement Eq (due to NaN), but weights are always finite in practice.
328impl Eq for PropertyPathEntry {}
329
330impl PropertyPathEntry {
331    pub(crate) fn scalar(path: impl Into<String>) -> Self {
332        Self {
333            path: path.into(),
334            mode: PropertyPathMode::Scalar,
335            weight: None,
336        }
337    }
338
339    #[cfg(test)]
340    pub(crate) fn recursive(path: impl Into<String>) -> Self {
341        Self {
342            path: path.into(),
343            mode: PropertyPathMode::Recursive,
344            weight: None,
345        }
346    }
347}
348
349/// Parsed property-FTS schema definition for a single kind.
350#[derive(Clone, Debug, PartialEq, Eq)]
351pub(crate) struct PropertyFtsSchema {
352    /// Property paths to extract. Each entry is matched exactly against
353    /// the JSON tree from the node's root; see [`PropertyPathMode`].
354    pub paths: Vec<PropertyPathEntry>,
355    /// Separator inserted between adjacent leaf values in the extracted
356    /// blob. Acts as a hard phrase break under FTS5.
357    pub separator: String,
358    /// JSON paths to skip during recursive extraction. Matched as an
359    /// *exact* path equality — the walk visits each object/array before
360    /// descending into it, so listing `$.x.y` suppresses the entire
361    /// subtree rooted at `$.x.y`. Prefix matching is NOT supported: an
362    /// exclude of `$.x` does not implicitly exclude `$.x.y`, and
363    /// `$.payload.priv` does not implicitly exclude `$.payload.priv.inner`
364    /// via prefix — the walker will still descend into any subtree that
365    /// is not itself listed exactly in `exclude_paths`.
366    pub exclude_paths: Vec<String>,
367}
368
369/// One position-map entry: a half-open `[start, end)` byte range within the
370/// extracted blob plus the `JSONPath` of the scalar leaf whose value occupies
371/// that range.
372#[derive(Clone, Debug, PartialEq, Eq)]
373pub(crate) struct PositionEntry {
374    pub start_offset: usize,
375    pub end_offset: usize,
376    pub leaf_path: String,
377}
378
379/// Per-kind extraction stats accumulated by a recursive walk. Surface-level
380/// Phase 4 logging only; Phase 5 may expose these to callers.
381#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
382pub(crate) struct ExtractStats {
383    pub depth_cap_hit: usize,
384    /// `true` once the extracted blob has reached `MAX_EXTRACTED_BYTES` and
385    /// the walker has stopped accepting additional leaves. Unlike the
386    /// depth counter this is a boolean: the walker's `stopped` guard
387    /// blocks further `emit_leaf` calls after the first truncation.
388    pub byte_cap_reached: bool,
389    pub excluded_subtree: usize,
390}
391
392impl ExtractStats {
393    fn merge(&mut self, other: ExtractStats) {
394        self.depth_cap_hit += other.depth_cap_hit;
395        self.byte_cap_reached |= other.byte_cap_reached;
396        self.excluded_subtree += other.excluded_subtree;
397    }
398}
399
400struct PreparedWrite {
401    label: String,
402    nodes: Vec<NodeInsert>,
403    node_retires: Vec<NodeRetire>,
404    edges: Vec<EdgeInsert>,
405    edge_retires: Vec<EdgeRetire>,
406    chunks: Vec<ChunkInsert>,
407    runs: Vec<RunInsert>,
408    steps: Vec<StepInsert>,
409    actions: Vec<ActionInsert>,
410    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
411    /// consumed by the cfg-gated apply block.
412    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
413    vec_inserts: Vec<VecInsert>,
414    operational_writes: Vec<OperationalWrite>,
415    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
416    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
417    operational_validation_warnings: Vec<String>,
418    /// `node_logical_id` → kind for nodes co-submitted in this request.
419    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
420    node_kinds: HashMap<String, String>,
421    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
422    required_fts_rows: Vec<FtsProjectionRow>,
423    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
424    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
425    optional_backfills: Vec<OptionalProjectionTask>,
426}
427
428enum WriteMessage {
429    Submit {
430        prepared: Box<PreparedWrite>,
431        reply: Sender<Result<WriteReceipt, EngineError>>,
432    },
433    TouchLastAccessed {
434        request: LastAccessTouchRequest,
435        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
436    },
437}
438
439/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
440///
441/// On drop, the channel is closed and the writer thread is joined, ensuring all
442/// in-flight writes complete and the `SQLite` connection closes cleanly.
443#[derive(Debug)]
444pub struct WriterActor {
445    sender: ManuallyDrop<SyncSender<WriteMessage>>,
446    thread_handle: Option<thread::JoinHandle<()>>,
447    provenance_mode: ProvenanceMode,
448    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
449    _telemetry: Arc<TelemetryCounters>,
450}
451
452impl WriterActor {
453    /// # Errors
454    /// Returns [`EngineError`] if the writer thread cannot be spawned.
455    pub fn start(
456        path: impl AsRef<Path>,
457        schema_manager: Arc<SchemaManager>,
458        provenance_mode: ProvenanceMode,
459        telemetry: Arc<TelemetryCounters>,
460    ) -> Result<Self, EngineError> {
461        let database_path = path.as_ref().to_path_buf();
462        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
463
464        let writer_telemetry = Arc::clone(&telemetry);
465        let handle = thread::Builder::new()
466            .name("fathomdb-writer".to_owned())
467            .spawn(move || {
468                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
469            })
470            .map_err(EngineError::Io)?;
471
472        Ok(Self {
473            sender: ManuallyDrop::new(sender),
474            thread_handle: Some(handle),
475            provenance_mode,
476            _telemetry: telemetry,
477        })
478    }
479
480    /// Returns `true` if the writer thread is still running.
481    fn is_thread_alive(&self) -> bool {
482        self.thread_handle
483            .as_ref()
484            .is_some_and(|h| !h.is_finished())
485    }
486
487    /// Returns `WriterRejected` if the writer thread has exited.
488    fn check_thread_alive(&self) -> Result<(), EngineError> {
489        if self.is_thread_alive() {
490            Ok(())
491        } else {
492            Err(EngineError::WriterRejected(
493                "writer thread has exited".to_owned(),
494            ))
495        }
496    }
497
498    /// # Errors
499    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
500    /// down, or the underlying `SQLite` transaction fails.
501    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
502        self.check_thread_alive()?;
503        let prepared = prepare_write(request, self.provenance_mode)?;
504        let (reply_tx, reply_rx) = mpsc::channel();
505        self.sender
506            .send(WriteMessage::Submit {
507                prepared: Box::new(prepared),
508                reply: reply_tx,
509            })
510            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
511
512        recv_with_timeout(&reply_rx)
513    }
514
515    /// # Errors
516    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
517    /// or the underlying `SQLite` transaction fails.
518    pub fn touch_last_accessed(
519        &self,
520        request: LastAccessTouchRequest,
521    ) -> Result<LastAccessTouchReport, EngineError> {
522        self.check_thread_alive()?;
523        prepare_touch_last_accessed(&request, self.provenance_mode)?;
524        let (reply_tx, reply_rx) = mpsc::channel();
525        self.sender
526            .send(WriteMessage::TouchLastAccessed {
527                request,
528                reply: reply_tx,
529            })
530            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
531
532        recv_with_timeout(&reply_rx)
533    }
534}
535
536#[cfg(not(feature = "tracing"))]
537#[allow(clippy::print_stderr)]
538fn stderr_panic_notice() {
539    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
540}
541
542impl Drop for WriterActor {
543    fn drop(&mut self) {
544        // Phase 1: close the channel so the writer thread's `for msg in receiver`
545        // loop exits after finishing any in-progress message.
546        // Must happen BEFORE join to avoid deadlock.
547        //
548        // SAFETY: drop is called exactly once, and no method accesses the sender
549        // after drop begins.
550        unsafe { ManuallyDrop::drop(&mut self.sender) };
551
552        // Phase 2: join the writer thread to ensure the SQLite connection closes.
553        if let Some(handle) = self.thread_handle.take() {
554            match handle.join() {
555                Ok(()) => {}
556                Err(payload) => {
557                    if std::thread::panicking() {
558                        trace_warn!(
559                            "writer thread panicked during shutdown (suppressed: already panicking)"
560                        );
561                        #[cfg(not(feature = "tracing"))]
562                        stderr_panic_notice();
563                    } else {
564                        std::panic::resume_unwind(payload);
565                    }
566                }
567            }
568        }
569    }
570}
571
572/// Wait for a reply from the writer thread, with a timeout.
573fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
574    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
575        .map_err(|error| match error {
576            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
577                "write timed out waiting for writer thread reply — the write may still commit"
578                    .to_owned(),
579            ),
580            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
581        })
582        .and_then(|result| result)
583}
584
585fn prepare_touch_last_accessed(
586    request: &LastAccessTouchRequest,
587    mode: ProvenanceMode,
588) -> Result<(), EngineError> {
589    if request.logical_ids.is_empty() {
590        return Err(EngineError::InvalidWrite(
591            "touch_last_accessed requires at least one logical_id".to_owned(),
592        ));
593    }
594    for logical_id in &request.logical_ids {
595        if logical_id.trim().is_empty() {
596            return Err(EngineError::InvalidWrite(
597                "touch_last_accessed requires non-empty logical_ids".to_owned(),
598            ));
599        }
600    }
601    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
602        return Err(EngineError::InvalidWrite(
603            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
604                .to_owned(),
605        ));
606    }
607    Ok(())
608}
609
610fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
611    let missing: Vec<String> = request
612        .nodes
613        .iter()
614        .filter(|n| n.source_ref.is_none())
615        .map(|n| format!("node '{}'", n.logical_id))
616        .chain(
617            request
618                .node_retires
619                .iter()
620                .filter(|r| r.source_ref.is_none())
621                .map(|r| format!("node retire '{}'", r.logical_id)),
622        )
623        .chain(
624            request
625                .edges
626                .iter()
627                .filter(|e| e.source_ref.is_none())
628                .map(|e| format!("edge '{}'", e.logical_id)),
629        )
630        .chain(
631            request
632                .edge_retires
633                .iter()
634                .filter(|r| r.source_ref.is_none())
635                .map(|r| format!("edge retire '{}'", r.logical_id)),
636        )
637        .chain(
638            request
639                .runs
640                .iter()
641                .filter(|r| r.source_ref.is_none())
642                .map(|r| format!("run '{}'", r.id)),
643        )
644        .chain(
645            request
646                .steps
647                .iter()
648                .filter(|s| s.source_ref.is_none())
649                .map(|s| format!("step '{}'", s.id)),
650        )
651        .chain(
652            request
653                .actions
654                .iter()
655                .filter(|a| a.source_ref.is_none())
656                .map(|a| format!("action '{}'", a.id)),
657        )
658        .chain(
659            request
660                .operational_writes
661                .iter()
662                .filter(|write| operational_write_source_ref(write).is_none())
663                .map(|write| {
664                    format!(
665                        "operational {} '{}:{}'",
666                        operational_write_kind(write),
667                        operational_write_collection(write),
668                        operational_write_record_key(write)
669                    )
670                }),
671        )
672        .collect();
673
674    if missing.is_empty() {
675        Ok(())
676    } else {
677        Err(EngineError::InvalidWrite(format!(
678            "ProvenanceMode::Require: missing source_ref on: {}",
679            missing.join(", ")
680        )))
681    }
682}
683
684fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
685    if request.nodes.len() > MAX_NODES {
686        return Err(EngineError::InvalidWrite(format!(
687            "too many nodes: {} exceeds limit of {MAX_NODES}",
688            request.nodes.len()
689        )));
690    }
691    if request.edges.len() > MAX_EDGES {
692        return Err(EngineError::InvalidWrite(format!(
693            "too many edges: {} exceeds limit of {MAX_EDGES}",
694            request.edges.len()
695        )));
696    }
697    if request.chunks.len() > MAX_CHUNKS {
698        return Err(EngineError::InvalidWrite(format!(
699            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
700            request.chunks.len()
701        )));
702    }
703    let retires = request.node_retires.len() + request.edge_retires.len();
704    if retires > MAX_RETIRES {
705        return Err(EngineError::InvalidWrite(format!(
706            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
707        )));
708    }
709    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
710    if runtime_items > MAX_RUNTIME_ITEMS {
711        return Err(EngineError::InvalidWrite(format!(
712            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
713        )));
714    }
715    if request.operational_writes.len() > MAX_OPERATIONAL {
716        return Err(EngineError::InvalidWrite(format!(
717            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
718            request.operational_writes.len()
719        )));
720    }
721    let total = request.nodes.len()
722        + request.node_retires.len()
723        + request.edges.len()
724        + request.edge_retires.len()
725        + request.chunks.len()
726        + request.runs.len()
727        + request.steps.len()
728        + request.actions.len()
729        + request.vec_inserts.len()
730        + request.operational_writes.len();
731    if total > MAX_TOTAL_ITEMS {
732        return Err(EngineError::InvalidWrite(format!(
733            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
734        )));
735    }
736    Ok(())
737}
738
739#[allow(clippy::too_many_lines)]
740fn prepare_write(
741    request: WriteRequest,
742    mode: ProvenanceMode,
743) -> Result<PreparedWrite, EngineError> {
744    validate_request_size(&request)?;
745
746    // --- ID validation: reject empty IDs ---
747    for node in &request.nodes {
748        if node.row_id.is_empty() {
749            return Err(EngineError::InvalidWrite(
750                "NodeInsert has empty row_id".to_owned(),
751            ));
752        }
753        if node.logical_id.is_empty() {
754            return Err(EngineError::InvalidWrite(
755                "NodeInsert has empty logical_id".to_owned(),
756            ));
757        }
758    }
759    for edge in &request.edges {
760        if edge.row_id.is_empty() {
761            return Err(EngineError::InvalidWrite(
762                "EdgeInsert has empty row_id".to_owned(),
763            ));
764        }
765        if edge.logical_id.is_empty() {
766            return Err(EngineError::InvalidWrite(
767                "EdgeInsert has empty logical_id".to_owned(),
768            ));
769        }
770    }
771    for chunk in &request.chunks {
772        if chunk.id.is_empty() {
773            return Err(EngineError::InvalidWrite(
774                "ChunkInsert has empty id".to_owned(),
775            ));
776        }
777        if chunk.text_content.is_empty() {
778            return Err(EngineError::InvalidWrite(format!(
779                "chunk '{}' has empty text_content; empty chunks are not allowed",
780                chunk.id
781            )));
782        }
783    }
784    for run in &request.runs {
785        if run.id.is_empty() {
786            return Err(EngineError::InvalidWrite(
787                "RunInsert has empty id".to_owned(),
788            ));
789        }
790    }
791    for step in &request.steps {
792        if step.id.is_empty() {
793            return Err(EngineError::InvalidWrite(
794                "StepInsert has empty id".to_owned(),
795            ));
796        }
797    }
798    for action in &request.actions {
799        if action.id.is_empty() {
800            return Err(EngineError::InvalidWrite(
801                "ActionInsert has empty id".to_owned(),
802            ));
803        }
804    }
805    for vi in &request.vec_inserts {
806        if vi.chunk_id.is_empty() {
807            return Err(EngineError::InvalidWrite(
808                "VecInsert has empty chunk_id".to_owned(),
809            ));
810        }
811        if vi.embedding.is_empty() {
812            return Err(EngineError::InvalidWrite(format!(
813                "VecInsert for chunk '{}' has empty embedding",
814                vi.chunk_id
815            )));
816        }
817    }
818    for operational in &request.operational_writes {
819        if operational_write_collection(operational).is_empty() {
820            return Err(EngineError::InvalidWrite(
821                "OperationalWrite has empty collection".to_owned(),
822            ));
823        }
824        if operational_write_record_key(operational).is_empty() {
825            return Err(EngineError::InvalidWrite(format!(
826                "OperationalWrite for collection '{}' has empty record_key",
827                operational_write_collection(operational)
828            )));
829        }
830        match operational {
831            OperationalWrite::Append { payload_json, .. }
832            | OperationalWrite::Put { payload_json, .. } => {
833                if payload_json.is_empty() {
834                    return Err(EngineError::InvalidWrite(format!(
835                        "OperationalWrite {} '{}:{}' has empty payload_json",
836                        operational_write_kind(operational),
837                        operational_write_collection(operational),
838                        operational_write_record_key(operational)
839                    )));
840                }
841            }
842            OperationalWrite::Delete { .. } => {}
843        }
844    }
845
846    // --- ID validation: reject duplicate row_ids within the request ---
847    {
848        let mut seen = std::collections::HashSet::new();
849        for node in &request.nodes {
850            if !seen.insert(node.row_id.as_str()) {
851                return Err(EngineError::InvalidWrite(format!(
852                    "duplicate row_id '{}' within the same WriteRequest",
853                    node.row_id
854                )));
855            }
856        }
857        for edge in &request.edges {
858            if !seen.insert(edge.row_id.as_str()) {
859                return Err(EngineError::InvalidWrite(format!(
860                    "duplicate row_id '{}' within the same WriteRequest",
861                    edge.row_id
862                )));
863            }
864        }
865    }
866
867    // --- ProvenanceMode::Require enforcement ---
868    if mode == ProvenanceMode::Require {
869        check_require_provenance(&request)?;
870    }
871
872    // --- Runtime table upsert validation ---
873    for run in &request.runs {
874        if run.upsert && run.supersedes_id.is_none() {
875            return Err(EngineError::InvalidWrite(format!(
876                "run '{}': upsert=true requires supersedes_id to be set",
877                run.id
878            )));
879        }
880    }
881    for step in &request.steps {
882        if step.upsert && step.supersedes_id.is_none() {
883            return Err(EngineError::InvalidWrite(format!(
884                "step '{}': upsert=true requires supersedes_id to be set",
885                step.id
886            )));
887        }
888    }
889    for action in &request.actions {
890        if action.upsert && action.supersedes_id.is_none() {
891            return Err(EngineError::InvalidWrite(format!(
892                "action '{}': upsert=true requires supersedes_id to be set",
893                action.id
894            )));
895        }
896    }
897
898    let node_kinds = request
899        .nodes
900        .iter()
901        .map(|node| (node.logical_id.clone(), node.kind.clone()))
902        .collect::<HashMap<_, _>>();
903
904    Ok(PreparedWrite {
905        label: request.label,
906        nodes: request.nodes,
907        node_retires: request.node_retires,
908        edges: request.edges,
909        edge_retires: request.edge_retires,
910        chunks: request.chunks,
911        runs: request.runs,
912        steps: request.steps,
913        actions: request.actions,
914        vec_inserts: request.vec_inserts,
915        operational_writes: request.operational_writes,
916        operational_collection_kinds: HashMap::new(),
917        operational_collection_filter_fields: HashMap::new(),
918        operational_validation_warnings: Vec::new(),
919        node_kinds,
920        required_fts_rows: Vec::new(),
921        required_property_fts_rows: Vec::new(),
922        optional_backfills: request.optional_backfills,
923    })
924}
925
926fn writer_loop(
927    database_path: &Path,
928    schema_manager: &Arc<SchemaManager>,
929    receiver: mpsc::Receiver<WriteMessage>,
930    telemetry: &TelemetryCounters,
931) {
932    trace_info!("writer thread started");
933
934    let mut conn = match sqlite::open_connection(database_path) {
935        Ok(conn) => conn,
936        Err(error) => {
937            trace_error!(error = %error, "writer thread: database connection failed");
938            reject_all(receiver, &error.to_string());
939            return;
940        }
941    };
942
943    if let Err(error) = schema_manager.bootstrap(&conn) {
944        trace_error!(error = %error, "writer thread: schema bootstrap failed");
945        reject_all(receiver, &error.to_string());
946        return;
947    }
948
949    for message in receiver {
950        match message {
951            WriteMessage::Submit {
952                mut prepared,
953                reply,
954            } => {
955                #[cfg(feature = "tracing")]
956                let start = std::time::Instant::now();
957                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
958                    resolve_and_apply(&mut conn, &mut prepared)
959                }));
960                if let Ok(inner) = result {
961                    #[allow(unused_variables)]
962                    if let Err(error) = &inner {
963                        trace_error!(
964                            label = %prepared.label,
965                            error = %error,
966                            "write failed"
967                        );
968                        telemetry.increment_errors();
969                    } else {
970                        let row_count = (prepared.nodes.len()
971                            + prepared.edges.len()
972                            + prepared.chunks.len()) as u64;
973                        telemetry.increment_writes(row_count);
974                        trace_info!(
975                            label = %prepared.label,
976                            nodes = prepared.nodes.len(),
977                            edges = prepared.edges.len(),
978                            chunks = prepared.chunks.len(),
979                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
980                            "write committed"
981                        );
982                    }
983                    let _ = reply.send(inner);
984                } else {
985                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
986                    telemetry.increment_errors();
987                    // Attempt to clean up any open transaction after a panic.
988                    let _ = conn.execute_batch("ROLLBACK");
989                    let _ = reply.send(Err(EngineError::WriterRejected(
990                        "writer thread panic during resolve_and_apply".to_owned(),
991                    )));
992                }
993            }
994            WriteMessage::TouchLastAccessed { request, reply } => {
995                let result = apply_touch_last_accessed(&mut conn, &request);
996                if result.is_ok() {
997                    telemetry.increment_writes(0);
998                } else {
999                    telemetry.increment_errors();
1000                }
1001                let _ = reply.send(result);
1002            }
1003        }
1004    }
1005
1006    trace_info!("writer thread shutting down");
1007}
1008
1009fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1010    for message in receiver {
1011        match message {
1012            WriteMessage::Submit { reply, .. } => {
1013                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1014            }
1015            WriteMessage::TouchLastAccessed { reply, .. } => {
1016                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1017            }
1018        }
1019    }
1020}
1021
1022/// Resolve FTS projection rows before the write transaction begins.
1023///
1024/// For each chunk in the prepared write, determines the node `kind` needed
1025/// to populate the FTS index. If the node was co-submitted in the same
1026/// request its kind is taken from `prepared.node_kinds` without a DB query.
1027/// If the node is pre-existing it is looked up in the database. If the node
1028/// cannot be found in either place, an `InvalidWrite` error is returned.
1029fn resolve_fts_rows(
1030    conn: &rusqlite::Connection,
1031    prepared: &mut PreparedWrite,
1032) -> Result<(), EngineError> {
1033    let retiring_ids: std::collections::HashSet<&str> = prepared
1034        .node_retires
1035        .iter()
1036        .map(|r| r.logical_id.as_str())
1037        .collect();
1038    for chunk in &prepared.chunks {
1039        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1040            return Err(EngineError::InvalidWrite(format!(
1041                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1042                 WriteRequest; retire and chunk insertion for the same node must not be combined",
1043                chunk.id, chunk.node_logical_id
1044            )));
1045        }
1046    }
1047    for chunk in &prepared.chunks {
1048        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1049            k.clone()
1050        } else {
1051            match conn.query_row(
1052                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1053                params![chunk.node_logical_id],
1054                |row| row.get::<_, String>(0),
1055            ) {
1056                Ok(kind) => kind,
1057                Err(rusqlite::Error::QueryReturnedNoRows) => {
1058                    return Err(EngineError::InvalidWrite(format!(
1059                        "chunk '{}' references node_logical_id '{}' that is not present in this \
1060                         write request or the database \
1061                         (v1 limitation: chunks and their nodes must be submitted together or the \
1062                         node must already exist)",
1063                        chunk.id, chunk.node_logical_id
1064                    )));
1065                }
1066                Err(e) => return Err(EngineError::Sqlite(e)),
1067            }
1068        };
1069        prepared.required_fts_rows.push(FtsProjectionRow {
1070            chunk_id: chunk.id.clone(),
1071            node_logical_id: chunk.node_logical_id.clone(),
1072            kind,
1073            text_content: chunk.text_content.clone(),
1074        });
1075    }
1076    trace_debug!(
1077        fts_rows = prepared.required_fts_rows.len(),
1078        chunks_processed = prepared.chunks.len(),
1079        "fts row resolution completed"
1080    );
1081    Ok(())
1082}
1083
1084/// Load FTS property schemas from the database and extract property values from
1085/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
1086fn resolve_property_fts_rows(
1087    conn: &rusqlite::Connection,
1088    prepared: &mut PreparedWrite,
1089) -> Result<(), EngineError> {
1090    if prepared.nodes.is_empty() {
1091        return Ok(());
1092    }
1093
1094    let schemas: HashMap<String, PropertyFtsSchema> =
1095        load_fts_property_schemas(conn)?.into_iter().collect();
1096
1097    if schemas.is_empty() {
1098        return Ok(());
1099    }
1100
1101    let mut combined_stats = ExtractStats::default();
1102    for node in &prepared.nodes {
1103        let Some(schema) = schemas.get(&node.kind) else {
1104            continue;
1105        };
1106        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1107        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
1108        let (text_content, positions, stats) = extract_property_fts(&props, schema);
1109        combined_stats.merge(stats);
1110        if let Some(text_content) = text_content {
1111            let columns = if has_weights {
1112                extract_property_fts_columns(&props, schema)
1113            } else {
1114                Vec::new()
1115            };
1116            prepared
1117                .required_property_fts_rows
1118                .push(FtsPropertyProjectionRow {
1119                    node_logical_id: node.logical_id.clone(),
1120                    kind: node.kind.clone(),
1121                    text_content,
1122                    positions,
1123                    columns,
1124                });
1125        }
1126    }
1127    if combined_stats != ExtractStats::default() {
1128        trace_debug!(
1129            depth_cap_hit = combined_stats.depth_cap_hit,
1130            byte_cap_reached = combined_stats.byte_cap_reached,
1131            excluded_subtree = combined_stats.excluded_subtree,
1132            "property fts recursive extraction guardrails engaged"
1133        );
1134    }
1135    trace_debug!(
1136        property_fts_rows = prepared.required_property_fts_rows.len(),
1137        nodes_processed = prepared.nodes.len(),
1138        "property fts row resolution completed"
1139    );
1140    Ok(())
1141}
1142
1143/// Extract scalar values from a JSON object at a `$.field` path.
1144/// Supports simple dot-notation paths like `$.name`, `$.address.city`.
1145/// Returns individual scalars so callers can join them with the configured separator.
1146pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1147    let Some(path) = path.strip_prefix("$.") else {
1148        return Vec::new();
1149    };
1150    let mut current = value;
1151    for segment in path.split('.') {
1152        match current.get(segment) {
1153            Some(v) => current = v,
1154            None => return Vec::new(),
1155        }
1156    }
1157    match current {
1158        serde_json::Value::String(s) => vec![s.clone()],
1159        serde_json::Value::Number(n) => vec![n.to_string()],
1160        serde_json::Value::Bool(b) => vec![b.to_string()],
1161        serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1162        serde_json::Value::Array(arr) => arr
1163            .iter()
1164            .filter_map(|v| match v {
1165                serde_json::Value::String(s) => Some(s.clone()),
1166                serde_json::Value::Number(n) => Some(n.to_string()),
1167                serde_json::Value::Bool(b) => Some(b.to_string()),
1168                _ => None,
1169            })
1170            .collect(),
1171    }
1172}
1173
1174/// Core property-FTS extraction. Produces the concatenated blob, the
1175/// position map identifying which byte range in the blob came from which
1176/// JSON leaf path, and the guardrail stats for the walk.
1177///
1178/// Emission rules:
1179/// - Path entries are processed in their registered order.
1180/// - Scalar-mode entries append the scalar value(s) directly (arrays of
1181///   scalars flatten, matching pre-Phase-4 behavior). Scalar emissions do
1182///   not produce position-map entries — the position map is only populated
1183///   when a recursive path contributes a leaf.
1184/// - Recursive-mode entries walk every descendant scalar leaf in stable
1185///   lexicographic key order. Each leaf emits a position-map entry whose
1186///   `[start, end)` spans the leaf value's bytes in the blob (NOT the
1187///   trailing separator).
1188/// - Between any two emitted values (whether from the same or different
1189///   path entries) the walk inserts [`LEAF_SEPARATOR`], a hard phrase
1190///   break under FTS5's `porter unicode61` tokenizer. Scalar-mode emissions
1191///   between each other use the schema's configured `separator` for
1192///   backwards compatibility with Phase 0.
1193pub(crate) fn extract_property_fts(
1194    props: &serde_json::Value,
1195    schema: &PropertyFtsSchema,
1196) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1197    let mut walker = RecursiveWalker {
1198        blob: String::new(),
1199        positions: Vec::new(),
1200        stats: ExtractStats::default(),
1201        exclude_paths: schema.exclude_paths.clone(),
1202        stopped: false,
1203    };
1204
1205    let mut scalar_parts: Vec<String> = Vec::new();
1206
1207    for entry in &schema.paths {
1208        match entry.mode {
1209            PropertyPathMode::Scalar => {
1210                scalar_parts.extend(extract_json_path(props, &entry.path));
1211            }
1212            PropertyPathMode::Recursive => {
1213                let root = resolve_path_root(props, &entry.path);
1214                if let Some(root) = root {
1215                    walker.walk(&entry.path, root, 0);
1216                }
1217            }
1218        }
1219    }
1220
1221    // Flush scalar parts into the blob. Scalar emissions go first so that
1222    // their byte offsets are predictable relative to any recursive leaves
1223    // that follow. Scalars use the configured `separator`; the boundary
1224    // between scalars and recursive leaves uses `LEAF_SEPARATOR` (so phrase
1225    // queries cannot cross it).
1226    let scalar_text = if scalar_parts.is_empty() {
1227        None
1228    } else {
1229        Some(scalar_parts.join(&schema.separator))
1230    };
1231
1232    let combined = match (scalar_text, walker.blob.is_empty()) {
1233        (None, true) => None,
1234        (None, false) => Some(walker.blob.clone()),
1235        (Some(s), true) => Some(s),
1236        (Some(mut s), false) => {
1237            // Shift recursive positions by the prefix length so they stay
1238            // correct relative to the combined blob.
1239            let offset = s.len() + LEAF_SEPARATOR.len();
1240            for pos in &mut walker.positions {
1241                pos.start_offset += offset;
1242                pos.end_offset += offset;
1243            }
1244            s.push_str(LEAF_SEPARATOR);
1245            s.push_str(&walker.blob);
1246            Some(s)
1247        }
1248    };
1249
1250    (combined, walker.positions, walker.stats)
1251}
1252
1253/// Extract per-column FTS text for a node's properties, one entry per spec.
1254///
1255/// Returns `Vec<(column_name, text)>` in spec order.
1256pub(crate) fn extract_property_fts_columns(
1257    props: &serde_json::Value,
1258    schema: &PropertyFtsSchema,
1259) -> Vec<(String, String)> {
1260    let mut result = Vec::new();
1261    for entry in &schema.paths {
1262        let is_recursive = matches!(entry.mode, PropertyPathMode::Recursive);
1263        let column_name = fathomdb_schema::fts_column_name(&entry.path, is_recursive);
1264        let text = match entry.mode {
1265            PropertyPathMode::Scalar => {
1266                let parts = extract_json_path(props, &entry.path);
1267                parts.join(&schema.separator)
1268            }
1269            PropertyPathMode::Recursive => {
1270                let mut walker = RecursiveWalker {
1271                    blob: String::new(),
1272                    positions: Vec::new(),
1273                    stats: ExtractStats::default(),
1274                    exclude_paths: schema.exclude_paths.clone(),
1275                    stopped: false,
1276                };
1277                if let Some(root) = resolve_path_root(props, &entry.path) {
1278                    walker.walk(&entry.path, root, 0);
1279                }
1280                walker.blob
1281            }
1282        };
1283        result.push((column_name, text));
1284    }
1285    result
1286}
1287
1288fn resolve_path_root<'a>(
1289    value: &'a serde_json::Value,
1290    path: &str,
1291) -> Option<&'a serde_json::Value> {
1292    let stripped = path.strip_prefix("$.")?;
1293    let mut current = value;
1294    for segment in stripped.split('.') {
1295        current = current.get(segment)?;
1296    }
1297    Some(current)
1298}
1299
1300struct RecursiveWalker {
1301    blob: String,
1302    positions: Vec<PositionEntry>,
1303    stats: ExtractStats,
1304    exclude_paths: Vec<String>,
1305    /// Once the byte cap is hit, the walker refuses to emit any further
1306    /// leaves. Subsequent walks still account for depth/exclude stats but
1307    /// do not append.
1308    stopped: bool,
1309}
1310
1311impl RecursiveWalker {
1312    fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1313        if self.stopped {
1314            return;
1315        }
1316        if self.exclude_paths.iter().any(|p| p == current_path) {
1317            self.stats.excluded_subtree += 1;
1318            return;
1319        }
1320        match value {
1321            serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1322            serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1323            serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1324            serde_json::Value::Null => {}
1325            serde_json::Value::Object(map) => {
1326                if depth >= MAX_RECURSIVE_DEPTH {
1327                    self.stats.depth_cap_hit += 1;
1328                    return;
1329                }
1330                let mut keys: Vec<&String> = map.keys().collect();
1331                keys.sort();
1332                for key in keys {
1333                    if self.stopped {
1334                        return;
1335                    }
1336                    let child_path = format!("{current_path}.{key}");
1337                    if let Some(child) = map.get(key) {
1338                        self.walk(&child_path, child, depth + 1);
1339                    }
1340                }
1341            }
1342            serde_json::Value::Array(items) => {
1343                if depth >= MAX_RECURSIVE_DEPTH {
1344                    self.stats.depth_cap_hit += 1;
1345                    return;
1346                }
1347                for (idx, item) in items.iter().enumerate() {
1348                    if self.stopped {
1349                        return;
1350                    }
1351                    let child_path = format!("{current_path}[{idx}]");
1352                    self.walk(&child_path, item, depth + 1);
1353                }
1354            }
1355        }
1356    }
1357
1358    fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1359        if self.stopped {
1360            return;
1361        }
1362        if value.is_empty() {
1363            return;
1364        }
1365        // Compute the projected blob size if we accept this leaf.
1366        // If we already emitted at least one leaf, we must account for
1367        // the separator that precedes this one.
1368        let sep_len = if self.blob.is_empty() {
1369            0
1370        } else {
1371            LEAF_SEPARATOR.len()
1372        };
1373        let projected_len = self.blob.len() + sep_len + value.len();
1374        if projected_len > MAX_EXTRACTED_BYTES {
1375            self.stats.byte_cap_reached = true;
1376            self.stopped = true;
1377            return;
1378        }
1379        if !self.blob.is_empty() {
1380            self.blob.push_str(LEAF_SEPARATOR);
1381        }
1382        let start_offset = self.blob.len();
1383        self.blob.push_str(value);
1384        let end_offset = self.blob.len();
1385        self.positions.push(PositionEntry {
1386            start_offset,
1387            end_offset,
1388            leaf_path: leaf_path.to_owned(),
1389        });
1390    }
1391}
1392
1393/// Load all registered FTS property schemas from the database, tolerating
1394/// both the legacy JSON shape (array of bare path strings = scalar mode)
1395/// and the Phase 4 shape (objects carrying `path`, `mode`, optional
1396/// `exclude_paths`, or a top-level object carrying `paths` + global
1397/// `exclude_paths`).
1398pub(crate) fn load_fts_property_schemas(
1399    conn: &rusqlite::Connection,
1400) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1401    let mut stmt =
1402        conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1403    stmt.query_map([], |row| {
1404        let kind: String = row.get(0)?;
1405        let paths_json: String = row.get(1)?;
1406        let separator: String = row.get(2)?;
1407        let schema = parse_property_schema_json(&paths_json, &separator);
1408        Ok((kind, schema))
1409    })?
1410    .collect::<Result<Vec<_>, _>>()
1411}
1412
1413pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1414    let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1415    let mut paths = Vec::new();
1416    let mut exclude_paths: Vec<String> = Vec::new();
1417
1418    let path_values: Vec<serde_json::Value> = match value {
1419        serde_json::Value::Array(arr) => arr,
1420        serde_json::Value::Object(map) => {
1421            if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1422                exclude_paths = excl
1423                    .iter()
1424                    .filter_map(|v| v.as_str().map(str::to_owned))
1425                    .collect();
1426            }
1427            match map.get("paths") {
1428                Some(serde_json::Value::Array(arr)) => arr.clone(),
1429                _ => Vec::new(),
1430            }
1431        }
1432        _ => Vec::new(),
1433    };
1434
1435    for entry in path_values {
1436        match entry {
1437            serde_json::Value::String(path) => {
1438                paths.push(PropertyPathEntry::scalar(path));
1439            }
1440            serde_json::Value::Object(map) => {
1441                let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1442                    continue;
1443                };
1444                let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1445                    PropertyPathMode::Scalar,
1446                    |m| match m {
1447                        "recursive" => PropertyPathMode::Recursive,
1448                        _ => PropertyPathMode::Scalar,
1449                    },
1450                );
1451                #[allow(clippy::cast_possible_truncation)]
1452                let weight = map
1453                    .get("weight")
1454                    .and_then(serde_json::Value::as_f64)
1455                    .map(|w| w as f32);
1456                paths.push(PropertyPathEntry {
1457                    path: path.to_owned(),
1458                    mode,
1459                    weight,
1460                });
1461                if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1462                    for p in excl {
1463                        if let Some(s) = p.as_str() {
1464                            exclude_paths.push(s.to_owned());
1465                        }
1466                    }
1467                }
1468            }
1469            _ => {}
1470        }
1471    }
1472
1473    PropertyFtsSchema {
1474        paths,
1475        separator: separator.to_owned(),
1476        exclude_paths,
1477    }
1478}
1479
1480fn resolve_operational_writes(
1481    conn: &rusqlite::Connection,
1482    prepared: &mut PreparedWrite,
1483) -> Result<(), EngineError> {
1484    let mut collection_kinds = HashMap::new();
1485    let mut collection_filter_fields = HashMap::new();
1486    let mut collection_validation_contracts = HashMap::new();
1487    for write in &prepared.operational_writes {
1488        let collection = operational_write_collection(write);
1489        if !collection_kinds.contains_key(collection) {
1490            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1491                .query_row(
1492                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1493                    params![collection],
1494                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1495                )
1496                .optional()
1497                .map_err(EngineError::Sqlite)?;
1498            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1499                .ok_or_else(|| {
1500                    EngineError::InvalidWrite(format!(
1501                        "operational collection '{collection}' is not registered"
1502                    ))
1503                })?;
1504            if disabled_at.is_some() {
1505                return Err(EngineError::InvalidWrite(format!(
1506                    "operational collection '{collection}' is disabled"
1507                )));
1508            }
1509            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1510                .map_err(EngineError::InvalidWrite)?;
1511            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1512            let validation_contract = parse_operational_validation_contract(&validation_json)
1513                .map_err(EngineError::InvalidWrite)?;
1514            collection_kinds.insert(collection.to_owned(), kind);
1515            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1516            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1517        }
1518
1519        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1520            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1521        })?;
1522        match (kind, write) {
1523            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1524            | (
1525                OperationalCollectionKind::LatestState,
1526                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1527            ) => {}
1528            (OperationalCollectionKind::AppendOnlyLog, _) => {
1529                return Err(EngineError::InvalidWrite(format!(
1530                    "operational collection '{collection}' is append_only_log and only accepts Append"
1531                )));
1532            }
1533            (OperationalCollectionKind::LatestState, _) => {
1534                return Err(EngineError::InvalidWrite(format!(
1535                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1536                )));
1537            }
1538        }
1539        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1540            let _ = check_operational_write_against_contract(write, contract)?;
1541        }
1542    }
1543    prepared.operational_collection_kinds = collection_kinds;
1544    prepared.operational_collection_filter_fields = collection_filter_fields;
1545    Ok(())
1546}
1547
1548fn parse_operational_filter_fields(
1549    filter_fields_json: &str,
1550) -> Result<Vec<OperationalFilterField>, EngineError> {
1551    let fields: Vec<OperationalFilterField> =
1552        serde_json::from_str(filter_fields_json).map_err(|error| {
1553            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1554        })?;
1555    let mut seen = std::collections::HashSet::new();
1556    for field in &fields {
1557        if field.name.trim().is_empty() {
1558            return Err(EngineError::InvalidWrite(
1559                "filter_fields_json field names must not be empty".to_owned(),
1560            ));
1561        }
1562        if !seen.insert(field.name.as_str()) {
1563            return Err(EngineError::InvalidWrite(format!(
1564                "filter_fields_json contains duplicate field '{}'",
1565                field.name
1566            )));
1567        }
1568        if field.modes.is_empty() {
1569            return Err(EngineError::InvalidWrite(format!(
1570                "filter_fields_json field '{}' must declare at least one mode",
1571                field.name
1572            )));
1573        }
1574        if field.modes.contains(&OperationalFilterMode::Prefix)
1575            && field.field_type != OperationalFilterFieldType::String
1576        {
1577            return Err(EngineError::InvalidWrite(format!(
1578                "filter field '{}' only supports prefix for string types",
1579                field.name
1580            )));
1581        }
1582    }
1583    Ok(fields)
1584}
1585
1586#[derive(Clone, Debug, PartialEq, Eq)]
1587struct OperationalFilterValueRow {
1588    field_name: String,
1589    string_value: Option<String>,
1590    integer_value: Option<i64>,
1591}
1592
1593fn extract_operational_filter_values(
1594    filter_fields: &[OperationalFilterField],
1595    payload_json: &str,
1596) -> Vec<OperationalFilterValueRow> {
1597    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1598        return Vec::new();
1599    };
1600    let Some(object) = parsed.as_object() else {
1601        return Vec::new();
1602    };
1603
1604    filter_fields
1605        .iter()
1606        .filter_map(|field| {
1607            let value = object.get(&field.name)?;
1608            match field.field_type {
1609                OperationalFilterFieldType::String => {
1610                    value
1611                        .as_str()
1612                        .map(|string_value| OperationalFilterValueRow {
1613                            field_name: field.name.clone(),
1614                            string_value: Some(string_value.to_owned()),
1615                            integer_value: None,
1616                        })
1617                }
1618                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1619                    value
1620                        .as_i64()
1621                        .map(|integer_value| OperationalFilterValueRow {
1622                            field_name: field.name.clone(),
1623                            string_value: None,
1624                            integer_value: Some(integer_value),
1625                        })
1626                }
1627            }
1628        })
1629        .collect()
1630}
1631
1632fn resolve_and_apply(
1633    conn: &mut rusqlite::Connection,
1634    prepared: &mut PreparedWrite,
1635) -> Result<WriteReceipt, EngineError> {
1636    resolve_fts_rows(conn, prepared)?;
1637    resolve_property_fts_rows(conn, prepared)?;
1638    resolve_operational_writes(conn, prepared)?;
1639    apply_write(conn, prepared)
1640}
1641
1642fn apply_touch_last_accessed(
1643    conn: &mut rusqlite::Connection,
1644    request: &LastAccessTouchRequest,
1645) -> Result<LastAccessTouchReport, EngineError> {
1646    let mut seen = std::collections::HashSet::new();
1647    let logical_ids = request
1648        .logical_ids
1649        .iter()
1650        .filter(|logical_id| seen.insert(logical_id.as_str()))
1651        .cloned()
1652        .collect::<Vec<_>>();
1653    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1654
1655    for logical_id in &logical_ids {
1656        let exists = tx
1657            .query_row(
1658                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1659                params![logical_id],
1660                |row| row.get::<_, i64>(0),
1661            )
1662            .optional()?
1663            .is_some();
1664        if !exists {
1665            return Err(EngineError::InvalidWrite(format!(
1666                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1667            )));
1668        }
1669    }
1670
1671    {
1672        let mut upsert_metadata = tx.prepare_cached(
1673            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1674             VALUES (?1, ?2, ?2) \
1675             ON CONFLICT(logical_id) DO UPDATE SET \
1676                 last_accessed_at = excluded.last_accessed_at, \
1677                 updated_at = excluded.updated_at",
1678        )?;
1679        let mut insert_provenance = tx.prepare_cached(
1680            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1681             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1682        )?;
1683        for logical_id in &logical_ids {
1684            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1685            insert_provenance.execute(params![
1686                new_id(),
1687                logical_id,
1688                request.source_ref.as_deref(),
1689                format!("{{\"touched_at\":{}}}", request.touched_at),
1690            ])?;
1691        }
1692    }
1693
1694    tx.commit()?;
1695    Ok(LastAccessTouchReport {
1696        touched_logical_ids: logical_ids.len(),
1697        touched_at: request.touched_at,
1698    })
1699}
1700
1701fn ensure_operational_collections_writable(
1702    tx: &rusqlite::Transaction<'_>,
1703    prepared: &PreparedWrite,
1704) -> Result<(), EngineError> {
1705    for collection in prepared.operational_collection_kinds.keys() {
1706        let disabled_at: Option<Option<i64>> = tx
1707            .query_row(
1708                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1709                params![collection],
1710                |row| row.get::<_, Option<i64>>(0),
1711            )
1712            .optional()?;
1713        match disabled_at {
1714            Some(Some(_)) => {
1715                return Err(EngineError::InvalidWrite(format!(
1716                    "operational collection '{collection}' is disabled"
1717                )));
1718            }
1719            Some(None) => {}
1720            None => {
1721                return Err(EngineError::InvalidWrite(format!(
1722                    "operational collection '{collection}' is not registered"
1723                )));
1724            }
1725        }
1726    }
1727    Ok(())
1728}
1729
1730fn validate_operational_writes_against_live_contracts(
1731    tx: &rusqlite::Transaction<'_>,
1732    prepared: &PreparedWrite,
1733) -> Result<Vec<String>, EngineError> {
1734    let mut collection_validation_contracts =
1735        HashMap::<String, Option<OperationalValidationContract>>::new();
1736    for collection in prepared.operational_collection_kinds.keys() {
1737        let validation_json: String = tx
1738            .query_row(
1739                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1740                params![collection],
1741                |row| row.get(0),
1742            )
1743            .map_err(EngineError::Sqlite)?;
1744        let validation_contract = parse_operational_validation_contract(&validation_json)
1745            .map_err(EngineError::InvalidWrite)?;
1746        collection_validation_contracts.insert(collection.clone(), validation_contract);
1747    }
1748
1749    let mut warnings = Vec::new();
1750    for write in &prepared.operational_writes {
1751        if let Some(Some(contract)) =
1752            collection_validation_contracts.get(operational_write_collection(write))
1753            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1754        {
1755            warnings.push(warning);
1756        }
1757    }
1758
1759    Ok(warnings)
1760}
1761
1762fn load_live_operational_secondary_indexes(
1763    tx: &rusqlite::Transaction<'_>,
1764    prepared: &PreparedWrite,
1765) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1766    let mut collection_indexes = HashMap::new();
1767    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1768        let secondary_indexes_json: String = tx
1769            .query_row(
1770                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1771                params![collection],
1772                |row| row.get(0),
1773            )
1774            .map_err(EngineError::Sqlite)?;
1775        let indexes =
1776            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1777                .map_err(EngineError::InvalidWrite)?;
1778        collection_indexes.insert(collection.clone(), indexes);
1779    }
1780    Ok(collection_indexes)
1781}
1782
1783fn check_operational_write_against_contract(
1784    write: &OperationalWrite,
1785    contract: &OperationalValidationContract,
1786) -> Result<Option<String>, EngineError> {
1787    if contract.mode == OperationalValidationMode::Disabled {
1788        return Ok(None);
1789    }
1790
1791    let (payload_json, collection, record_key) = match write {
1792        OperationalWrite::Append {
1793            collection,
1794            record_key,
1795            payload_json,
1796            ..
1797        }
1798        | OperationalWrite::Put {
1799            collection,
1800            record_key,
1801            payload_json,
1802            ..
1803        } => (
1804            payload_json.as_str(),
1805            collection.as_str(),
1806            record_key.as_str(),
1807        ),
1808        OperationalWrite::Delete { .. } => return Ok(None),
1809    };
1810
1811    match validate_operational_payload_against_contract(contract, payload_json) {
1812        Ok(()) => Ok(None),
1813        Err(message) => match contract.mode {
1814            OperationalValidationMode::Disabled => Ok(None),
1815            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1816                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1817                kind = operational_write_kind(write)
1818            ))),
1819            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1820                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1821                kind = operational_write_kind(write)
1822            ))),
1823        },
1824    }
1825}
1826
1827#[allow(clippy::too_many_lines)]
1828fn apply_write(
1829    conn: &mut rusqlite::Connection,
1830    prepared: &mut PreparedWrite,
1831) -> Result<WriteReceipt, EngineError> {
1832    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1833
1834    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1835    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1836    {
1837        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1838        let mut del_prop_positions = tx
1839            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1840        // Double-write cleanup: also remove staging rows for this node so that a
1841        // rebuild-in-progress does not re-insert a retired node at swap time.
1842        let mut del_staging = tx.prepare_cached(
1843            "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1844        )?;
1845        let mut sup_node = tx.prepare_cached(
1846            "UPDATE nodes SET superseded_at = unixepoch() \
1847             WHERE logical_id = ?1 AND superseded_at IS NULL",
1848        )?;
1849        let mut ins_event = tx.prepare_cached(
1850            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1851             VALUES (?1, 'node_retire', ?2, ?3)",
1852        )?;
1853        for retire in &prepared.node_retires {
1854            del_fts.execute(params![retire.logical_id])?;
1855            // Delete from the per-kind FTS table: look up the node's kind first.
1856            let kind_opt: Option<String> = tx
1857                .query_row(
1858                    "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1859                    params![retire.logical_id],
1860                    |r| r.get(0),
1861                )
1862                .optional()?;
1863            if let Some(kind) = kind_opt {
1864                let table = fathomdb_schema::fts_kind_table_name(&kind);
1865                // Only delete if the per-kind table exists (may not exist if no schema registered).
1866                let table_exists: bool = tx
1867                    .query_row(
1868                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1869                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1870                        rusqlite::params![table],
1871                        |r| r.get::<_, i64>(0),
1872                    )
1873                    .unwrap_or(0)
1874                    > 0;
1875                if table_exists {
1876                    tx.execute(
1877                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1878                        params![retire.logical_id],
1879                    )?;
1880                }
1881            }
1882            del_prop_positions.execute(params![retire.logical_id])?;
1883            del_staging.execute(params![retire.logical_id])?;
1884            sup_node.execute(params![retire.logical_id])?;
1885            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1886        }
1887    }
1888
1889    // Edge retires: mark superseded, record audit event.
1890    {
1891        let mut sup_edge = tx.prepare_cached(
1892            "UPDATE edges SET superseded_at = unixepoch() \
1893             WHERE logical_id = ?1 AND superseded_at IS NULL",
1894        )?;
1895        let mut ins_event = tx.prepare_cached(
1896            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1897             VALUES (?1, 'edge_retire', ?2, ?3)",
1898        )?;
1899        for retire in &prepared.edge_retires {
1900            sup_edge.execute(params![retire.logical_id])?;
1901            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1902        }
1903    }
1904
1905    // Node inserts (with optional upsert + chunk-policy handling).
1906    {
1907        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1908        let mut del_prop_positions = tx
1909            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1910        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1911        let mut sup_node = tx.prepare_cached(
1912            "UPDATE nodes SET superseded_at = unixepoch() \
1913             WHERE logical_id = ?1 AND superseded_at IS NULL",
1914        )?;
1915        let mut ins_node = tx.prepare_cached(
1916            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1917             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1918        )?;
1919        #[cfg(feature = "sqlite-vec")]
1920        let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1921                            (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1922        #[cfg(feature = "sqlite-vec")]
1923        let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1924            Ok(stmt) => Some(stmt),
1925            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1926            Err(e) => return Err(e.into()),
1927        };
1928        for node in &prepared.nodes {
1929            if node.upsert {
1930                // Property FTS rows are always replaced on upsert since properties change.
1931                // Delete from the per-kind table (table name is dynamic).
1932                let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1933                // Only delete if the per-kind table exists (may not exist if no schema registered).
1934                let table_exists: bool = tx
1935                    .query_row(
1936                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1937                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1938                        rusqlite::params![table],
1939                        |r| r.get::<_, i64>(0),
1940                    )
1941                    .unwrap_or(0)
1942                    > 0;
1943                if table_exists {
1944                    tx.execute(
1945                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1946                        params![node.logical_id],
1947                    )?;
1948                }
1949                del_prop_positions.execute(params![node.logical_id])?;
1950                if node.chunk_policy == ChunkPolicy::Replace {
1951                    #[cfg(feature = "sqlite-vec")]
1952                    if let Some(ref mut stmt) = del_vec {
1953                        stmt.execute(params![node.logical_id])?;
1954                    }
1955                    del_fts.execute(params![node.logical_id])?;
1956                    del_chunks.execute(params![node.logical_id])?;
1957                }
1958                sup_node.execute(params![node.logical_id])?;
1959            }
1960            ins_node.execute(params![
1961                node.row_id,
1962                node.logical_id,
1963                node.kind,
1964                node.properties,
1965                node.source_ref,
1966                node.content_ref,
1967            ])?;
1968        }
1969    }
1970
1971    // Edge inserts (with optional upsert).
1972    {
1973        let mut sup_edge = tx.prepare_cached(
1974            "UPDATE edges SET superseded_at = unixepoch() \
1975             WHERE logical_id = ?1 AND superseded_at IS NULL",
1976        )?;
1977        let mut ins_edge = tx.prepare_cached(
1978            "INSERT INTO edges \
1979             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1980             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1981        )?;
1982        for edge in &prepared.edges {
1983            if edge.upsert {
1984                sup_edge.execute(params![edge.logical_id])?;
1985            }
1986            ins_edge.execute(params![
1987                edge.row_id,
1988                edge.logical_id,
1989                edge.source_logical_id,
1990                edge.target_logical_id,
1991                edge.kind,
1992                edge.properties,
1993                edge.source_ref,
1994            ])?;
1995        }
1996    }
1997
1998    // Chunk inserts.
1999    {
2000        let mut ins_chunk = tx.prepare_cached(
2001            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
2002             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2003        )?;
2004        for chunk in &prepared.chunks {
2005            ins_chunk.execute(params![
2006                chunk.id,
2007                chunk.node_logical_id,
2008                chunk.text_content,
2009                chunk.byte_start,
2010                chunk.byte_end,
2011                chunk.content_hash,
2012            ])?;
2013        }
2014    }
2015
2016    // Run inserts (with optional upsert).
2017    {
2018        let mut sup_run = tx.prepare_cached(
2019            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2020        )?;
2021        let mut ins_run = tx.prepare_cached(
2022            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
2023             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
2024        )?;
2025        for run in &prepared.runs {
2026            if run.upsert
2027                && let Some(ref prior_id) = run.supersedes_id
2028            {
2029                sup_run.execute(params![prior_id])?;
2030            }
2031            ins_run.execute(params![
2032                run.id,
2033                run.kind,
2034                run.status,
2035                run.properties,
2036                run.source_ref
2037            ])?;
2038        }
2039    }
2040
2041    // Step inserts (with optional upsert).
2042    {
2043        let mut sup_step = tx.prepare_cached(
2044            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2045        )?;
2046        let mut ins_step = tx.prepare_cached(
2047            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
2048             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2049        )?;
2050        for step in &prepared.steps {
2051            if step.upsert
2052                && let Some(ref prior_id) = step.supersedes_id
2053            {
2054                sup_step.execute(params![prior_id])?;
2055            }
2056            ins_step.execute(params![
2057                step.id,
2058                step.run_id,
2059                step.kind,
2060                step.status,
2061                step.properties,
2062                step.source_ref,
2063            ])?;
2064        }
2065    }
2066
2067    // Action inserts (with optional upsert).
2068    {
2069        let mut sup_action = tx.prepare_cached(
2070            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2071        )?;
2072        let mut ins_action = tx.prepare_cached(
2073            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
2074             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2075        )?;
2076        for action in &prepared.actions {
2077            if action.upsert
2078                && let Some(ref prior_id) = action.supersedes_id
2079            {
2080                sup_action.execute(params![prior_id])?;
2081            }
2082            ins_action.execute(params![
2083                action.id,
2084                action.step_id,
2085                action.kind,
2086                action.status,
2087                action.properties,
2088                action.source_ref,
2089            ])?;
2090        }
2091    }
2092
2093    // Operational mutation log writes and latest-state current rows.
2094    {
2095        ensure_operational_collections_writable(&tx, prepared)?;
2096        prepared.operational_validation_warnings =
2097            validate_operational_writes_against_live_contracts(&tx, prepared)?;
2098        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
2099
2100        let mut next_mutation_order: i64 = tx.query_row(
2101            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
2102            [],
2103            |row| row.get(0),
2104        )?;
2105        let mut ins_mutation = tx.prepare_cached(
2106            "INSERT INTO operational_mutations \
2107             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2108             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2109        )?;
2110        let mut ins_filter_value = tx.prepare_cached(
2111            "INSERT INTO operational_filter_values \
2112             (mutation_id, collection_name, field_name, string_value, integer_value) \
2113             VALUES (?1, ?2, ?3, ?4, ?5)",
2114        )?;
2115        let mut upsert_current = tx.prepare_cached(
2116            "INSERT INTO operational_current \
2117             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2118             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2119             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2120                 payload_json = excluded.payload_json, \
2121                 updated_at = excluded.updated_at, \
2122                 last_mutation_id = excluded.last_mutation_id",
2123        )?;
2124        let mut del_current = tx.prepare_cached(
2125            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2126        )?;
2127        let mut del_current_secondary_indexes = tx.prepare_cached(
2128            "DELETE FROM operational_secondary_index_entries \
2129             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2130        )?;
2131        let mut ins_secondary_index = tx.prepare_cached(
2132            "INSERT INTO operational_secondary_index_entries \
2133             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2134              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2135             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2136        )?;
2137        let mut current_row_stmt = tx.prepare_cached(
2138            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2139             WHERE collection_name = ?1 AND record_key = ?2",
2140        )?;
2141
2142        for write in &prepared.operational_writes {
2143            let collection = operational_write_collection(write);
2144            let record_key = operational_write_record_key(write);
2145            let mutation_id = new_id();
2146            next_mutation_order += 1;
2147            let payload_json = operational_write_payload(write);
2148            ins_mutation.execute(params![
2149                &mutation_id,
2150                collection,
2151                record_key,
2152                operational_write_kind(write),
2153                payload_json,
2154                operational_write_source_ref(write),
2155                next_mutation_order,
2156            ])?;
2157            if let Some(indexes) = collection_secondary_indexes.get(collection) {
2158                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2159                    ins_secondary_index.execute(params![
2160                        collection,
2161                        entry.index_name,
2162                        "mutation",
2163                        &mutation_id,
2164                        record_key,
2165                        entry.sort_timestamp,
2166                        entry.slot1_text,
2167                        entry.slot1_integer,
2168                        entry.slot2_text,
2169                        entry.slot2_integer,
2170                        entry.slot3_text,
2171                        entry.slot3_integer,
2172                    ])?;
2173                }
2174            }
2175            if let Some(filter_fields) = prepared
2176                .operational_collection_filter_fields
2177                .get(collection)
2178            {
2179                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2180                    ins_filter_value.execute(params![
2181                        &mutation_id,
2182                        collection,
2183                        filter_value.field_name,
2184                        filter_value.string_value,
2185                        filter_value.integer_value,
2186                    ])?;
2187                }
2188            }
2189
2190            if prepared.operational_collection_kinds.get(collection)
2191                == Some(&OperationalCollectionKind::LatestState)
2192            {
2193                del_current_secondary_indexes.execute(params![collection, record_key])?;
2194                match write {
2195                    OperationalWrite::Put { payload_json, .. } => {
2196                        upsert_current.execute(params![
2197                            collection,
2198                            record_key,
2199                            payload_json,
2200                            &mutation_id,
2201                        ])?;
2202                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
2203                            let (current_payload_json, updated_at, last_mutation_id): (
2204                                String,
2205                                i64,
2206                                String,
2207                            ) = current_row_stmt
2208                                .query_row(params![collection, record_key], |row| {
2209                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2210                                })?;
2211                            for entry in extract_secondary_index_entries_for_current(
2212                                indexes,
2213                                &current_payload_json,
2214                                updated_at,
2215                            ) {
2216                                ins_secondary_index.execute(params![
2217                                    collection,
2218                                    entry.index_name,
2219                                    "current",
2220                                    last_mutation_id.as_str(),
2221                                    record_key,
2222                                    entry.sort_timestamp,
2223                                    entry.slot1_text,
2224                                    entry.slot1_integer,
2225                                    entry.slot2_text,
2226                                    entry.slot2_integer,
2227                                    entry.slot3_text,
2228                                    entry.slot3_integer,
2229                                ])?;
2230                            }
2231                        }
2232                    }
2233                    OperationalWrite::Delete { .. } => {
2234                        del_current.execute(params![collection, record_key])?;
2235                    }
2236                    OperationalWrite::Append { .. } => {}
2237                }
2238            }
2239        }
2240    }
2241
2242    // FTS row inserts.
2243    {
2244        let mut ins_fts = tx.prepare_cached(
2245            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2246             VALUES (?1, ?2, ?3, ?4)",
2247        )?;
2248        for fts_row in &prepared.required_fts_rows {
2249            ins_fts.execute(params![
2250                fts_row.chunk_id,
2251                fts_row.node_logical_id,
2252                fts_row.kind,
2253                fts_row.text_content,
2254            ])?;
2255        }
2256    }
2257
2258    // Property FTS row inserts.
2259    if !prepared.required_property_fts_rows.is_empty() {
2260        let mut ins_positions = tx.prepare_cached(
2261            "INSERT INTO fts_node_property_positions \
2262             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2263             VALUES (?1, ?2, ?3, ?4, ?5)",
2264        )?;
2265        // Delete any stale position rows for these nodes; writer already
2266        // deleted the per-kind FTS rows in the upsert block above.
2267        let mut del_positions = tx
2268            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2269        for row in &prepared.required_property_fts_rows {
2270            del_positions.execute(params![row.node_logical_id])?;
2271            // Insert into per-kind FTS table (table name is dynamic).
2272            let table = fathomdb_schema::fts_kind_table_name(&row.kind);
2273            if row.columns.is_empty() {
2274                // Non-weighted schema: single text_content column.
2275                // Ensure the per-kind table exists (handles new-kind first write).
2276                tx.execute_batch(&format!(
2277                    "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
2278                        node_logical_id UNINDEXED, text_content, \
2279                        tokenize = '{DEFAULT_FTS_TOKENIZER}'\
2280                    )"
2281                ))?;
2282                tx.prepare(&format!(
2283                    "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
2284                ))?
2285                .execute(params![row.node_logical_id, row.text_content])?;
2286            } else {
2287                // Weighted schema: per-column INSERT.
2288                let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
2289                let placeholders: Vec<String> = (2..=row.columns.len() + 1)
2290                    .map(|i| format!("?{i}"))
2291                    .collect();
2292                let sql = format!(
2293                    "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
2294                    cols = col_names.join(", "),
2295                    placeholders = placeholders.join(", "),
2296                );
2297                let mut stmt = tx.prepare(&sql)?;
2298                stmt.execute(rusqlite::params_from_iter(
2299                    std::iter::once(row.node_logical_id.as_str())
2300                        .chain(row.columns.iter().map(|(_, v)| v.as_str())),
2301                ))?;
2302            }
2303            for pos in &row.positions {
2304                ins_positions.execute(params![
2305                    row.node_logical_id,
2306                    row.kind,
2307                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2308                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2309                    pos.leaf_path,
2310                ])?;
2311            }
2312        }
2313    }
2314
2315    // Double-write to staging: for nodes whose kind has an active rebuild
2316    // (state IN ('PENDING','BUILDING','SWAPPING')), upsert the precomputed
2317    // text_content into fts_property_rebuild_staging.  Both this upsert and
2318    // the per-kind FTS insert above happen inside the same transaction.
2319    if !prepared.required_property_fts_rows.is_empty() {
2320        let mut ins_staging_simple = tx.prepare_cached(
2321            "INSERT INTO fts_property_rebuild_staging \
2322             (kind, node_logical_id, text_content) \
2323             VALUES (?1, ?2, ?3) \
2324             ON CONFLICT(kind, node_logical_id) DO UPDATE \
2325             SET text_content = excluded.text_content",
2326        )?;
2327        let mut ins_staging_columns = tx.prepare_cached(
2328            "INSERT INTO fts_property_rebuild_staging \
2329             (kind, node_logical_id, text_content, columns_json) \
2330             VALUES (?1, ?2, ?3, ?4) \
2331             ON CONFLICT(kind, node_logical_id) DO UPDATE \
2332             SET text_content = excluded.text_content, \
2333                 columns_json = excluded.columns_json",
2334        )?;
2335        let mut check_rebuild = tx.prepare_cached(
2336            "SELECT 1 FROM fts_property_rebuild_state \
2337             WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2338        )?;
2339        for row in &prepared.required_property_fts_rows {
2340            let in_rebuild: bool = check_rebuild
2341                .query_row(params![row.kind], |_| Ok(true))
2342                .optional()?
2343                .unwrap_or(false);
2344            if in_rebuild {
2345                if row.columns.is_empty() {
2346                    ins_staging_simple.execute(params![
2347                        row.kind,
2348                        row.node_logical_id,
2349                        row.text_content
2350                    ])?;
2351                } else {
2352                    let json_map: serde_json::Map<String, serde_json::Value> = row
2353                        .columns
2354                        .iter()
2355                        .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
2356                        .collect();
2357                    let columns_json =
2358                        serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
2359                    ins_staging_columns.execute(params![
2360                        row.kind,
2361                        row.node_logical_id,
2362                        row.text_content,
2363                        columns_json
2364                    ])?;
2365                }
2366            }
2367        }
2368    }
2369
2370    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
2371    #[cfg(feature = "sqlite-vec")]
2372    {
2373        match tx
2374            .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2375        {
2376            Ok(mut ins_vec) => {
2377                for vi in &prepared.vec_inserts {
2378                    let bytes: Vec<u8> =
2379                        vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2380                    ins_vec.execute(params![vi.chunk_id, bytes])?;
2381                }
2382            }
2383            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2384                // vec profile absent: vec inserts are silently skipped.
2385            }
2386            Err(e) => return Err(e.into()),
2387        }
2388    }
2389
2390    tx.commit()?;
2391
2392    let provenance_warnings: Vec<String> = prepared
2393        .nodes
2394        .iter()
2395        .filter(|node| node.source_ref.is_none())
2396        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2397        .chain(
2398            prepared
2399                .node_retires
2400                .iter()
2401                .filter(|r| r.source_ref.is_none())
2402                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2403        )
2404        .chain(
2405            prepared
2406                .edges
2407                .iter()
2408                .filter(|e| e.source_ref.is_none())
2409                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2410        )
2411        .chain(
2412            prepared
2413                .edge_retires
2414                .iter()
2415                .filter(|r| r.source_ref.is_none())
2416                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2417        )
2418        .chain(
2419            prepared
2420                .runs
2421                .iter()
2422                .filter(|r| r.source_ref.is_none())
2423                .map(|r| format!("run '{}' has no source_ref", r.id)),
2424        )
2425        .chain(
2426            prepared
2427                .steps
2428                .iter()
2429                .filter(|s| s.source_ref.is_none())
2430                .map(|s| format!("step '{}' has no source_ref", s.id)),
2431        )
2432        .chain(
2433            prepared
2434                .actions
2435                .iter()
2436                .filter(|a| a.source_ref.is_none())
2437                .map(|a| format!("action '{}' has no source_ref", a.id)),
2438        )
2439        .chain(
2440            prepared
2441                .operational_writes
2442                .iter()
2443                .filter(|write| operational_write_source_ref(write).is_none())
2444                .map(|write| {
2445                    format!(
2446                        "operational {} '{}:{}' has no source_ref",
2447                        operational_write_kind(write),
2448                        operational_write_collection(write),
2449                        operational_write_record_key(write)
2450                    )
2451                }),
2452        )
2453        .collect();
2454
2455    let mut warnings = provenance_warnings.clone();
2456    warnings.extend(prepared.operational_validation_warnings.clone());
2457
2458    Ok(WriteReceipt {
2459        label: prepared.label.clone(),
2460        optional_backfill_count: prepared.optional_backfills.len(),
2461        warnings,
2462        provenance_warnings,
2463    })
2464}
2465
2466fn operational_write_collection(write: &OperationalWrite) -> &str {
2467    match write {
2468        OperationalWrite::Append { collection, .. }
2469        | OperationalWrite::Put { collection, .. }
2470        | OperationalWrite::Delete { collection, .. } => collection,
2471    }
2472}
2473
2474fn operational_write_record_key(write: &OperationalWrite) -> &str {
2475    match write {
2476        OperationalWrite::Append { record_key, .. }
2477        | OperationalWrite::Put { record_key, .. }
2478        | OperationalWrite::Delete { record_key, .. } => record_key,
2479    }
2480}
2481
2482fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2483    match write {
2484        OperationalWrite::Append { .. } => "append",
2485        OperationalWrite::Put { .. } => "put",
2486        OperationalWrite::Delete { .. } => "delete",
2487    }
2488}
2489
2490fn operational_write_payload(write: &OperationalWrite) -> &str {
2491    match write {
2492        OperationalWrite::Append { payload_json, .. }
2493        | OperationalWrite::Put { payload_json, .. } => payload_json,
2494        OperationalWrite::Delete { .. } => "null",
2495    }
2496}
2497
2498fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2499    match write {
2500        OperationalWrite::Append { source_ref, .. }
2501        | OperationalWrite::Put { source_ref, .. }
2502        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2503    }
2504}
2505
2506#[cfg(test)]
2507#[allow(clippy::expect_used)]
2508mod tests {
2509    use std::sync::Arc;
2510
2511    use fathomdb_schema::SchemaManager;
2512    use tempfile::NamedTempFile;
2513
2514    use super::{apply_write, prepare_write, resolve_operational_writes};
2515    use crate::{
2516        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2517        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2518        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2519        projection::ProjectionTarget,
2520    };
2521
2522    #[test]
2523    fn writer_executes_runtime_table_rows() {
2524        let db = NamedTempFile::new().expect("temporary db");
2525        let writer = WriterActor::start(
2526            db.path(),
2527            Arc::new(SchemaManager::new()),
2528            ProvenanceMode::Warn,
2529            Arc::new(TelemetryCounters::default()),
2530        )
2531        .expect("writer");
2532
2533        let receipt = writer
2534            .submit(WriteRequest {
2535                label: "runtime".to_owned(),
2536                nodes: vec![],
2537                node_retires: vec![],
2538                edges: vec![],
2539                edge_retires: vec![],
2540                chunks: vec![],
2541                runs: vec![RunInsert {
2542                    id: "run-1".to_owned(),
2543                    kind: "session".to_owned(),
2544                    status: "completed".to_owned(),
2545                    properties: "{}".to_owned(),
2546                    source_ref: Some("src-1".to_owned()),
2547                    upsert: false,
2548                    supersedes_id: None,
2549                }],
2550                steps: vec![StepInsert {
2551                    id: "step-1".to_owned(),
2552                    run_id: "run-1".to_owned(),
2553                    kind: "llm".to_owned(),
2554                    status: "completed".to_owned(),
2555                    properties: "{}".to_owned(),
2556                    source_ref: Some("src-1".to_owned()),
2557                    upsert: false,
2558                    supersedes_id: None,
2559                }],
2560                actions: vec![ActionInsert {
2561                    id: "action-1".to_owned(),
2562                    step_id: "step-1".to_owned(),
2563                    kind: "emit".to_owned(),
2564                    status: "completed".to_owned(),
2565                    properties: "{}".to_owned(),
2566                    source_ref: Some("src-1".to_owned()),
2567                    upsert: false,
2568                    supersedes_id: None,
2569                }],
2570                optional_backfills: vec![],
2571                vec_inserts: vec![],
2572                operational_writes: vec![],
2573            })
2574            .expect("write receipt");
2575
2576        assert_eq!(receipt.label, "runtime");
2577    }
2578
2579    #[test]
2580    fn writer_put_operational_write_updates_current_and_mutations() {
2581        let db = NamedTempFile::new().expect("temporary db");
2582        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2583        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2584        conn.execute(
2585            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2586             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2587            [],
2588        )
2589        .expect("seed collection");
2590        drop(conn);
2591        let writer = WriterActor::start(
2592            db.path(),
2593            Arc::new(SchemaManager::new()),
2594            ProvenanceMode::Warn,
2595            Arc::new(TelemetryCounters::default()),
2596        )
2597        .expect("writer");
2598
2599        writer
2600            .submit(WriteRequest {
2601                label: "node-and-operational".to_owned(),
2602                nodes: vec![NodeInsert {
2603                    row_id: "row-1".to_owned(),
2604                    logical_id: "lg-1".to_owned(),
2605                    kind: "Meeting".to_owned(),
2606                    properties: "{}".to_owned(),
2607                    source_ref: Some("src-1".to_owned()),
2608                    upsert: false,
2609                    chunk_policy: ChunkPolicy::Preserve,
2610                    content_ref: None,
2611                }],
2612                node_retires: vec![],
2613                edges: vec![],
2614                edge_retires: vec![],
2615                chunks: vec![],
2616                runs: vec![],
2617                steps: vec![],
2618                actions: vec![],
2619                optional_backfills: vec![],
2620                vec_inserts: vec![],
2621                operational_writes: vec![OperationalWrite::Put {
2622                    collection: "connector_health".to_owned(),
2623                    record_key: "gmail".to_owned(),
2624                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2625                    source_ref: Some("src-1".to_owned()),
2626                }],
2627            })
2628            .expect("write receipt");
2629
2630        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2631        let node_count: i64 = conn
2632            .query_row(
2633                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2634                [],
2635                |row| row.get(0),
2636            )
2637            .expect("node count");
2638        assert_eq!(node_count, 1);
2639        let mutation_count: i64 = conn
2640            .query_row(
2641                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2642                 AND record_key = 'gmail'",
2643                [],
2644                |row| row.get(0),
2645            )
2646            .expect("mutation count");
2647        assert_eq!(mutation_count, 1);
2648        let payload: String = conn
2649            .query_row(
2650                "SELECT payload_json FROM operational_current \
2651                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2652                [],
2653                |row| row.get(0),
2654            )
2655            .expect("current payload");
2656        assert_eq!(payload, r#"{"status":"ok"}"#);
2657    }
2658
2659    #[test]
2660    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2661        let db = NamedTempFile::new().expect("temporary db");
2662        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2663        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2664        conn.execute(
2665            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2666             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2667            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2668        )
2669        .expect("seed collection");
2670        drop(conn);
2671        let writer = WriterActor::start(
2672            db.path(),
2673            Arc::new(SchemaManager::new()),
2674            ProvenanceMode::Warn,
2675            Arc::new(TelemetryCounters::default()),
2676        )
2677        .expect("writer");
2678
2679        writer
2680            .submit(WriteRequest {
2681                label: "disabled-validation".to_owned(),
2682                nodes: vec![],
2683                node_retires: vec![],
2684                edges: vec![],
2685                edge_retires: vec![],
2686                chunks: vec![],
2687                runs: vec![],
2688                steps: vec![],
2689                actions: vec![],
2690                optional_backfills: vec![],
2691                vec_inserts: vec![],
2692                operational_writes: vec![OperationalWrite::Put {
2693                    collection: "connector_health".to_owned(),
2694                    record_key: "gmail".to_owned(),
2695                    payload_json: r#"{"bogus":true}"#.to_owned(),
2696                    source_ref: Some("src-1".to_owned()),
2697                }],
2698            })
2699            .expect("write receipt");
2700
2701        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2702        let payload: String = conn
2703            .query_row(
2704                "SELECT payload_json FROM operational_current \
2705                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2706                [],
2707                |row| row.get(0),
2708            )
2709            .expect("current payload");
2710        assert_eq!(payload, r#"{"bogus":true}"#);
2711    }
2712
2713    #[test]
2714    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2715        let db = NamedTempFile::new().expect("temporary db");
2716        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2717        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2718        conn.execute(
2719            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2720             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2721            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2722        )
2723        .expect("seed collection");
2724        drop(conn);
2725        let writer = WriterActor::start(
2726            db.path(),
2727            Arc::new(SchemaManager::new()),
2728            ProvenanceMode::Warn,
2729            Arc::new(TelemetryCounters::default()),
2730        )
2731        .expect("writer");
2732
2733        let receipt = writer
2734            .submit(WriteRequest {
2735                label: "report-only-validation".to_owned(),
2736                nodes: vec![],
2737                node_retires: vec![],
2738                edges: vec![],
2739                edge_retires: vec![],
2740                chunks: vec![],
2741                runs: vec![],
2742                steps: vec![],
2743                actions: vec![],
2744                optional_backfills: vec![],
2745                vec_inserts: vec![],
2746                operational_writes: vec![OperationalWrite::Put {
2747                    collection: "connector_health".to_owned(),
2748                    record_key: "gmail".to_owned(),
2749                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2750                    source_ref: Some("src-1".to_owned()),
2751                }],
2752            })
2753            .expect("report_only write should succeed");
2754
2755        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2756        assert_eq!(receipt.warnings.len(), 1);
2757        assert!(
2758            receipt.warnings[0].contains("connector_health"),
2759            "warning should identify collection"
2760        );
2761        assert!(
2762            receipt.warnings[0].contains("must be one of"),
2763            "warning should explain validation failure"
2764        );
2765
2766        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2767        let payload: String = conn
2768            .query_row(
2769                "SELECT payload_json FROM operational_current \
2770                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2771                [],
2772                |row| row.get(0),
2773            )
2774            .expect("current payload");
2775        assert_eq!(payload, r#"{"status":"bogus"}"#);
2776    }
2777
2778    #[test]
2779    fn writer_rejects_operational_write_for_missing_collection() {
2780        let db = NamedTempFile::new().expect("temporary db");
2781        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2782        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2783        drop(conn);
2784        let writer = WriterActor::start(
2785            db.path(),
2786            Arc::new(SchemaManager::new()),
2787            ProvenanceMode::Warn,
2788            Arc::new(TelemetryCounters::default()),
2789        )
2790        .expect("writer");
2791
2792        let result = writer.submit(WriteRequest {
2793            label: "missing-operational-collection".to_owned(),
2794            nodes: vec![],
2795            node_retires: vec![],
2796            edges: vec![],
2797            edge_retires: vec![],
2798            chunks: vec![],
2799            runs: vec![],
2800            steps: vec![],
2801            actions: vec![],
2802            optional_backfills: vec![],
2803            vec_inserts: vec![],
2804            operational_writes: vec![OperationalWrite::Put {
2805                collection: "connector_health".to_owned(),
2806                record_key: "gmail".to_owned(),
2807                payload_json: r#"{"status":"ok"}"#.to_owned(),
2808                source_ref: Some("src-1".to_owned()),
2809            }],
2810        });
2811
2812        assert!(
2813            matches!(result, Err(EngineError::InvalidWrite(_))),
2814            "missing operational collection must return InvalidWrite"
2815        );
2816    }
2817
2818    #[test]
2819    fn writer_append_operational_write_records_history_without_current_row() {
2820        let db = NamedTempFile::new().expect("temporary db");
2821        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2822        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2823        conn.execute(
2824            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2825             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2826            [],
2827        )
2828        .expect("seed collection");
2829        drop(conn);
2830        let writer = WriterActor::start(
2831            db.path(),
2832            Arc::new(SchemaManager::new()),
2833            ProvenanceMode::Warn,
2834            Arc::new(TelemetryCounters::default()),
2835        )
2836        .expect("writer");
2837
2838        writer
2839            .submit(WriteRequest {
2840                label: "append-operational".to_owned(),
2841                nodes: vec![],
2842                node_retires: vec![],
2843                edges: vec![],
2844                edge_retires: vec![],
2845                chunks: vec![],
2846                runs: vec![],
2847                steps: vec![],
2848                actions: vec![],
2849                optional_backfills: vec![],
2850                vec_inserts: vec![],
2851                operational_writes: vec![OperationalWrite::Append {
2852                    collection: "audit_log".to_owned(),
2853                    record_key: "evt-1".to_owned(),
2854                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2855                    source_ref: Some("src-1".to_owned()),
2856                }],
2857            })
2858            .expect("write receipt");
2859
2860        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2861        let mutation: (String, String) = conn
2862            .query_row(
2863                "SELECT op_kind, payload_json FROM operational_mutations \
2864                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2865                [],
2866                |row| Ok((row.get(0)?, row.get(1)?)),
2867            )
2868            .expect("mutation row");
2869        assert_eq!(mutation.0, "append");
2870        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2871        let current_count: i64 = conn
2872            .query_row(
2873                "SELECT count(*) FROM operational_current \
2874                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2875                [],
2876                |row| row.get(0),
2877            )
2878            .expect("current count");
2879        assert_eq!(current_count, 0);
2880    }
2881
2882    #[test]
2883    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2884        let db = NamedTempFile::new().expect("temporary db");
2885        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2886        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2887        conn.execute(
2888            "INSERT INTO operational_collections \
2889             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2890             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2891                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2892            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2893        )
2894        .expect("seed collection");
2895        drop(conn);
2896        let writer = WriterActor::start(
2897            db.path(),
2898            Arc::new(SchemaManager::new()),
2899            ProvenanceMode::Warn,
2900            Arc::new(TelemetryCounters::default()),
2901        )
2902        .expect("writer");
2903
2904        let error = writer
2905            .submit(WriteRequest {
2906                label: "invalid-append".to_owned(),
2907                nodes: vec![],
2908                node_retires: vec![],
2909                edges: vec![],
2910                edge_retires: vec![],
2911                chunks: vec![],
2912                runs: vec![],
2913                steps: vec![],
2914                actions: vec![],
2915                optional_backfills: vec![],
2916                vec_inserts: vec![],
2917                operational_writes: vec![OperationalWrite::Append {
2918                    collection: "audit_log".to_owned(),
2919                    record_key: "evt-1".to_owned(),
2920                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2921                    source_ref: Some("src-1".to_owned()),
2922                }],
2923            })
2924            .expect_err("invalid append must reject");
2925        assert!(matches!(error, EngineError::InvalidWrite(_)));
2926        assert!(error.to_string().contains("must be one of"));
2927
2928        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2929        let mutation_count: i64 = conn
2930            .query_row(
2931                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2932                [],
2933                |row| row.get(0),
2934            )
2935            .expect("mutation count");
2936        assert_eq!(mutation_count, 0);
2937        let filter_count: i64 = conn
2938            .query_row(
2939                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2940                [],
2941                |row| row.get(0),
2942            )
2943            .expect("filter count");
2944        assert_eq!(filter_count, 0);
2945    }
2946
2947    #[test]
2948    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2949        let db = NamedTempFile::new().expect("temporary db");
2950        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2951        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2952        conn.execute(
2953            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2954             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2955            [],
2956        )
2957        .expect("seed collection");
2958        drop(conn);
2959        let writer = WriterActor::start(
2960            db.path(),
2961            Arc::new(SchemaManager::new()),
2962            ProvenanceMode::Warn,
2963            Arc::new(TelemetryCounters::default()),
2964        )
2965        .expect("writer");
2966
2967        writer
2968            .submit(WriteRequest {
2969                label: "put-operational".to_owned(),
2970                nodes: vec![],
2971                node_retires: vec![],
2972                edges: vec![],
2973                edge_retires: vec![],
2974                chunks: vec![],
2975                runs: vec![],
2976                steps: vec![],
2977                actions: vec![],
2978                optional_backfills: vec![],
2979                vec_inserts: vec![],
2980                operational_writes: vec![OperationalWrite::Put {
2981                    collection: "connector_health".to_owned(),
2982                    record_key: "gmail".to_owned(),
2983                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2984                    source_ref: Some("src-1".to_owned()),
2985                }],
2986            })
2987            .expect("put receipt");
2988
2989        writer
2990            .submit(WriteRequest {
2991                label: "delete-operational".to_owned(),
2992                nodes: vec![],
2993                node_retires: vec![],
2994                edges: vec![],
2995                edge_retires: vec![],
2996                chunks: vec![],
2997                runs: vec![],
2998                steps: vec![],
2999                actions: vec![],
3000                optional_backfills: vec![],
3001                vec_inserts: vec![],
3002                operational_writes: vec![OperationalWrite::Delete {
3003                    collection: "connector_health".to_owned(),
3004                    record_key: "gmail".to_owned(),
3005                    source_ref: Some("src-2".to_owned()),
3006                }],
3007            })
3008            .expect("delete receipt");
3009
3010        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3011        let mutation_kinds: Vec<String> = {
3012            let mut stmt = conn
3013                .prepare(
3014                    "SELECT op_kind FROM operational_mutations \
3015                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3016                     ORDER BY mutation_order ASC",
3017                )
3018                .expect("stmt");
3019            stmt.query_map([], |row| row.get(0))
3020                .expect("rows")
3021                .collect::<Result<_, _>>()
3022                .expect("collect")
3023        };
3024        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
3025        let current_count: i64 = conn
3026            .query_row(
3027                "SELECT count(*) FROM operational_current \
3028                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3029                [],
3030                |row| row.get(0),
3031            )
3032            .expect("current count");
3033        assert_eq!(current_count, 0);
3034    }
3035
3036    #[test]
3037    fn writer_delete_bypasses_validation_contract() {
3038        let db = NamedTempFile::new().expect("temporary db");
3039        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3040        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3041        conn.execute(
3042            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3043             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3044            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3045        )
3046        .expect("seed collection");
3047        drop(conn);
3048        let writer = WriterActor::start(
3049            db.path(),
3050            Arc::new(SchemaManager::new()),
3051            ProvenanceMode::Warn,
3052            Arc::new(TelemetryCounters::default()),
3053        )
3054        .expect("writer");
3055
3056        writer
3057            .submit(WriteRequest {
3058                label: "valid-put".to_owned(),
3059                nodes: vec![],
3060                node_retires: vec![],
3061                edges: vec![],
3062                edge_retires: vec![],
3063                chunks: vec![],
3064                runs: vec![],
3065                steps: vec![],
3066                actions: vec![],
3067                optional_backfills: vec![],
3068                vec_inserts: vec![],
3069                operational_writes: vec![OperationalWrite::Put {
3070                    collection: "connector_health".to_owned(),
3071                    record_key: "gmail".to_owned(),
3072                    payload_json: r#"{"status":"ok"}"#.to_owned(),
3073                    source_ref: Some("src-1".to_owned()),
3074                }],
3075            })
3076            .expect("put receipt");
3077        writer
3078            .submit(WriteRequest {
3079                label: "delete-after-put".to_owned(),
3080                nodes: vec![],
3081                node_retires: vec![],
3082                edges: vec![],
3083                edge_retires: vec![],
3084                chunks: vec![],
3085                runs: vec![],
3086                steps: vec![],
3087                actions: vec![],
3088                optional_backfills: vec![],
3089                vec_inserts: vec![],
3090                operational_writes: vec![OperationalWrite::Delete {
3091                    collection: "connector_health".to_owned(),
3092                    record_key: "gmail".to_owned(),
3093                    source_ref: Some("src-2".to_owned()),
3094                }],
3095            })
3096            .expect("delete receipt");
3097
3098        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3099        let current_count: i64 = conn
3100            .query_row(
3101                "SELECT count(*) FROM operational_current \
3102                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3103                [],
3104                |row| row.get(0),
3105            )
3106            .expect("current count");
3107        assert_eq!(current_count, 0);
3108    }
3109
3110    #[test]
3111    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
3112        let db = NamedTempFile::new().expect("temporary db");
3113        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3114        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3115        conn.execute(
3116            "INSERT INTO operational_collections \
3117             (name, kind, schema_json, retention_json, secondary_indexes_json) \
3118             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3119            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
3120        )
3121        .expect("seed collection");
3122        drop(conn);
3123        let writer = WriterActor::start(
3124            db.path(),
3125            Arc::new(SchemaManager::new()),
3126            ProvenanceMode::Warn,
3127            Arc::new(TelemetryCounters::default()),
3128        )
3129        .expect("writer");
3130
3131        writer
3132            .submit(WriteRequest {
3133                label: "secondary-index-put".to_owned(),
3134                nodes: vec![],
3135                node_retires: vec![],
3136                edges: vec![],
3137                edge_retires: vec![],
3138                chunks: vec![],
3139                runs: vec![],
3140                steps: vec![],
3141                actions: vec![],
3142                optional_backfills: vec![],
3143                vec_inserts: vec![],
3144                operational_writes: vec![OperationalWrite::Put {
3145                    collection: "connector_health".to_owned(),
3146                    record_key: "gmail".to_owned(),
3147                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
3148                        .to_owned(),
3149                    source_ref: Some("src-1".to_owned()),
3150                }],
3151            })
3152            .expect("put receipt");
3153
3154        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3155        let current_entry_count: i64 = conn
3156            .query_row(
3157                "SELECT count(*) FROM operational_secondary_index_entries \
3158                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3159                [],
3160                |row| row.get(0),
3161            )
3162            .expect("current secondary index count");
3163        assert_eq!(current_entry_count, 2);
3164        drop(conn);
3165
3166        writer
3167            .submit(WriteRequest {
3168                label: "secondary-index-delete".to_owned(),
3169                nodes: vec![],
3170                node_retires: vec![],
3171                edges: vec![],
3172                edge_retires: vec![],
3173                chunks: vec![],
3174                runs: vec![],
3175                steps: vec![],
3176                actions: vec![],
3177                optional_backfills: vec![],
3178                vec_inserts: vec![],
3179                operational_writes: vec![OperationalWrite::Delete {
3180                    collection: "connector_health".to_owned(),
3181                    record_key: "gmail".to_owned(),
3182                    source_ref: Some("src-2".to_owned()),
3183                }],
3184            })
3185            .expect("delete receipt");
3186
3187        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3188        let current_entry_count: i64 = conn
3189            .query_row(
3190                "SELECT count(*) FROM operational_secondary_index_entries \
3191                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3192                [],
3193                |row| row.get(0),
3194            )
3195            .expect("current secondary index count");
3196        assert_eq!(current_entry_count, 0);
3197    }
3198
3199    #[test]
3200    fn writer_latest_state_operational_writes_persist_mutation_order() {
3201        let db = NamedTempFile::new().expect("temporary db");
3202        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3203        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3204        conn.execute(
3205            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3206             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3207            [],
3208        )
3209        .expect("seed collection");
3210        drop(conn);
3211
3212        let writer = WriterActor::start(
3213            db.path(),
3214            Arc::new(SchemaManager::new()),
3215            ProvenanceMode::Warn,
3216            Arc::new(TelemetryCounters::default()),
3217        )
3218        .expect("writer");
3219
3220        writer
3221            .submit(WriteRequest {
3222                label: "ordered-operational-batch".to_owned(),
3223                nodes: vec![],
3224                node_retires: vec![],
3225                edges: vec![],
3226                edge_retires: vec![],
3227                chunks: vec![],
3228                runs: vec![],
3229                steps: vec![],
3230                actions: vec![],
3231                optional_backfills: vec![],
3232                vec_inserts: vec![],
3233                operational_writes: vec![
3234                    OperationalWrite::Put {
3235                        collection: "connector_health".to_owned(),
3236                        record_key: "gmail".to_owned(),
3237                        payload_json: r#"{"status":"old"}"#.to_owned(),
3238                        source_ref: Some("src-1".to_owned()),
3239                    },
3240                    OperationalWrite::Delete {
3241                        collection: "connector_health".to_owned(),
3242                        record_key: "gmail".to_owned(),
3243                        source_ref: Some("src-2".to_owned()),
3244                    },
3245                    OperationalWrite::Put {
3246                        collection: "connector_health".to_owned(),
3247                        record_key: "gmail".to_owned(),
3248                        payload_json: r#"{"status":"new"}"#.to_owned(),
3249                        source_ref: Some("src-3".to_owned()),
3250                    },
3251                ],
3252            })
3253            .expect("write receipt");
3254
3255        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3256        let rows: Vec<(String, i64)> = {
3257            let mut stmt = conn
3258                .prepare(
3259                    "SELECT op_kind, mutation_order FROM operational_mutations \
3260                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3261                     ORDER BY mutation_order ASC",
3262                )
3263                .expect("stmt");
3264            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3265                .expect("rows")
3266                .collect::<Result<_, _>>()
3267                .expect("collect")
3268        };
3269        assert_eq!(
3270            rows,
3271            vec![
3272                ("put".to_owned(), 1),
3273                ("delete".to_owned(), 2),
3274                ("put".to_owned(), 3),
3275            ]
3276        );
3277        let payload: String = conn
3278            .query_row(
3279                "SELECT payload_json FROM operational_current \
3280                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3281                [],
3282                |row| row.get(0),
3283            )
3284            .expect("current payload");
3285        assert_eq!(payload, r#"{"status":"new"}"#);
3286    }
3287
3288    #[test]
3289    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3290        let db = NamedTempFile::new().expect("temporary db");
3291        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3292        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3293        conn.execute(
3294            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3295             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3296            [],
3297        )
3298        .expect("seed collection");
3299
3300        let request = WriteRequest {
3301            label: "disabled-race".to_owned(),
3302            nodes: vec![],
3303            node_retires: vec![],
3304            edges: vec![],
3305            edge_retires: vec![],
3306            chunks: vec![],
3307            runs: vec![],
3308            steps: vec![],
3309            actions: vec![],
3310            optional_backfills: vec![],
3311            vec_inserts: vec![],
3312            operational_writes: vec![OperationalWrite::Put {
3313                collection: "connector_health".to_owned(),
3314                record_key: "gmail".to_owned(),
3315                payload_json: r#"{"status":"ok"}"#.to_owned(),
3316                source_ref: Some("src-1".to_owned()),
3317            }],
3318        };
3319        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3320        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3321
3322        conn.execute(
3323            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3324            [],
3325        )
3326        .expect("disable collection after preflight");
3327
3328        let error =
3329            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3330        assert!(matches!(error, EngineError::InvalidWrite(_)));
3331        assert!(error.to_string().contains("is disabled"));
3332
3333        let mutation_count: i64 = conn
3334            .query_row(
3335                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3336                [],
3337                |row| row.get(0),
3338            )
3339            .expect("mutation count");
3340        assert_eq!(mutation_count, 0);
3341
3342        let current_count: i64 = conn
3343            .query_row(
3344                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3345                [],
3346                |row| row.get(0),
3347            )
3348            .expect("current count");
3349        assert_eq!(current_count, 0);
3350    }
3351
3352    #[test]
3353    fn writer_enforce_validation_rejects_invalid_put_atomically() {
3354        let db = NamedTempFile::new().expect("temporary db");
3355        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3356        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3357        conn.execute(
3358            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3359             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3360            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3361        )
3362        .expect("seed collection");
3363        drop(conn);
3364        let writer = WriterActor::start(
3365            db.path(),
3366            Arc::new(SchemaManager::new()),
3367            ProvenanceMode::Warn,
3368            Arc::new(TelemetryCounters::default()),
3369        )
3370        .expect("writer");
3371
3372        let error = writer
3373            .submit(WriteRequest {
3374                label: "invalid-put".to_owned(),
3375                nodes: vec![NodeInsert {
3376                    row_id: "row-1".to_owned(),
3377                    logical_id: "lg-1".to_owned(),
3378                    kind: "Meeting".to_owned(),
3379                    properties: "{}".to_owned(),
3380                    source_ref: Some("src-1".to_owned()),
3381                    upsert: false,
3382                    chunk_policy: ChunkPolicy::Preserve,
3383                    content_ref: None,
3384                }],
3385                node_retires: vec![],
3386                edges: vec![],
3387                edge_retires: vec![],
3388                chunks: vec![],
3389                runs: vec![],
3390                steps: vec![],
3391                actions: vec![],
3392                optional_backfills: vec![],
3393                vec_inserts: vec![],
3394                operational_writes: vec![OperationalWrite::Put {
3395                    collection: "connector_health".to_owned(),
3396                    record_key: "gmail".to_owned(),
3397                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
3398                    source_ref: Some("src-1".to_owned()),
3399                }],
3400            })
3401            .expect_err("invalid put must reject");
3402        assert!(matches!(error, EngineError::InvalidWrite(_)));
3403        assert!(error.to_string().contains("must be one of"));
3404
3405        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3406        let node_count: i64 = conn
3407            .query_row(
3408                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3409                [],
3410                |row| row.get(0),
3411            )
3412            .expect("node count");
3413        assert_eq!(node_count, 0);
3414        let mutation_count: i64 = conn
3415            .query_row(
3416                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3417                [],
3418                |row| row.get(0),
3419            )
3420            .expect("mutation count");
3421        assert_eq!(mutation_count, 0);
3422        let current_count: i64 = conn
3423            .query_row(
3424                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3425                [],
3426                |row| row.get(0),
3427            )
3428            .expect("current count");
3429        assert_eq!(current_count, 0);
3430    }
3431
3432    #[test]
3433    fn writer_rejects_append_against_latest_state_collection() {
3434        let db = NamedTempFile::new().expect("temporary db");
3435        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3436        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3437        conn.execute(
3438            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3439             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3440            [],
3441        )
3442        .expect("seed collection");
3443        drop(conn);
3444        let writer = WriterActor::start(
3445            db.path(),
3446            Arc::new(SchemaManager::new()),
3447            ProvenanceMode::Warn,
3448            Arc::new(TelemetryCounters::default()),
3449        )
3450        .expect("writer");
3451
3452        let result = writer.submit(WriteRequest {
3453            label: "bad-append".to_owned(),
3454            nodes: vec![],
3455            node_retires: vec![],
3456            edges: vec![],
3457            edge_retires: vec![],
3458            chunks: vec![],
3459            runs: vec![],
3460            steps: vec![],
3461            actions: vec![],
3462            optional_backfills: vec![],
3463            vec_inserts: vec![],
3464            operational_writes: vec![OperationalWrite::Append {
3465                collection: "connector_health".to_owned(),
3466                record_key: "gmail".to_owned(),
3467                payload_json: r#"{"status":"ok"}"#.to_owned(),
3468                source_ref: Some("src-1".to_owned()),
3469            }],
3470        });
3471
3472        assert!(
3473            matches!(result, Err(EngineError::InvalidWrite(_))),
3474            "latest_state collection must reject Append"
3475        );
3476    }
3477
3478    #[test]
3479    fn writer_upsert_supersedes_prior_active_node() {
3480        let db = NamedTempFile::new().expect("temporary db");
3481        let writer = WriterActor::start(
3482            db.path(),
3483            Arc::new(SchemaManager::new()),
3484            ProvenanceMode::Warn,
3485            Arc::new(TelemetryCounters::default()),
3486        )
3487        .expect("writer");
3488
3489        writer
3490            .submit(WriteRequest {
3491                label: "v1".to_owned(),
3492                nodes: vec![NodeInsert {
3493                    row_id: "row-1".to_owned(),
3494                    logical_id: "lg-1".to_owned(),
3495                    kind: "Meeting".to_owned(),
3496                    properties: r#"{"version":1}"#.to_owned(),
3497                    source_ref: Some("src-1".to_owned()),
3498                    upsert: false,
3499                    chunk_policy: ChunkPolicy::Preserve,
3500                    content_ref: None,
3501                }],
3502                node_retires: vec![],
3503                edges: vec![],
3504                edge_retires: vec![],
3505                chunks: vec![],
3506                runs: vec![],
3507                steps: vec![],
3508                actions: vec![],
3509                optional_backfills: vec![],
3510                vec_inserts: vec![],
3511                operational_writes: vec![],
3512            })
3513            .expect("v1 write");
3514
3515        writer
3516            .submit(WriteRequest {
3517                label: "v2".to_owned(),
3518                nodes: vec![NodeInsert {
3519                    row_id: "row-2".to_owned(),
3520                    logical_id: "lg-1".to_owned(),
3521                    kind: "Meeting".to_owned(),
3522                    properties: r#"{"version":2}"#.to_owned(),
3523                    source_ref: Some("src-2".to_owned()),
3524                    upsert: true,
3525                    chunk_policy: ChunkPolicy::Preserve,
3526                    content_ref: None,
3527                }],
3528                node_retires: vec![],
3529                edges: vec![],
3530                edge_retires: vec![],
3531                chunks: vec![],
3532                runs: vec![],
3533                steps: vec![],
3534                actions: vec![],
3535                optional_backfills: vec![],
3536                vec_inserts: vec![],
3537                operational_writes: vec![],
3538            })
3539            .expect("v2 upsert write");
3540
3541        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3542        let (active_row_id, props): (String, String) = conn
3543            .query_row(
3544                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3545                [],
3546                |row| Ok((row.get(0)?, row.get(1)?)),
3547            )
3548            .expect("active row");
3549        assert_eq!(active_row_id, "row-2");
3550        assert!(props.contains("\"version\":2"));
3551
3552        let superseded: i64 = conn
3553            .query_row(
3554                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3555                [],
3556                |row| row.get(0),
3557            )
3558            .expect("superseded count");
3559        assert_eq!(superseded, 1);
3560    }
3561
3562    #[test]
3563    fn writer_inserts_edge_between_two_nodes() {
3564        let db = NamedTempFile::new().expect("temporary db");
3565        let writer = WriterActor::start(
3566            db.path(),
3567            Arc::new(SchemaManager::new()),
3568            ProvenanceMode::Warn,
3569            Arc::new(TelemetryCounters::default()),
3570        )
3571        .expect("writer");
3572
3573        writer
3574            .submit(WriteRequest {
3575                label: "nodes-and-edge".to_owned(),
3576                nodes: vec![
3577                    NodeInsert {
3578                        row_id: "row-meeting".to_owned(),
3579                        logical_id: "meeting-1".to_owned(),
3580                        kind: "Meeting".to_owned(),
3581                        properties: "{}".to_owned(),
3582                        source_ref: Some("src-1".to_owned()),
3583                        upsert: false,
3584                        chunk_policy: ChunkPolicy::Preserve,
3585                        content_ref: None,
3586                    },
3587                    NodeInsert {
3588                        row_id: "row-task".to_owned(),
3589                        logical_id: "task-1".to_owned(),
3590                        kind: "Task".to_owned(),
3591                        properties: "{}".to_owned(),
3592                        source_ref: Some("src-1".to_owned()),
3593                        upsert: false,
3594                        chunk_policy: ChunkPolicy::Preserve,
3595                        content_ref: None,
3596                    },
3597                ],
3598                node_retires: vec![],
3599                edges: vec![EdgeInsert {
3600                    row_id: "edge-1".to_owned(),
3601                    logical_id: "edge-lg-1".to_owned(),
3602                    source_logical_id: "meeting-1".to_owned(),
3603                    target_logical_id: "task-1".to_owned(),
3604                    kind: "HAS_TASK".to_owned(),
3605                    properties: "{}".to_owned(),
3606                    source_ref: Some("src-1".to_owned()),
3607                    upsert: false,
3608                }],
3609                edge_retires: vec![],
3610                chunks: vec![],
3611                runs: vec![],
3612                steps: vec![],
3613                actions: vec![],
3614                optional_backfills: vec![],
3615                vec_inserts: vec![],
3616                operational_writes: vec![],
3617            })
3618            .expect("write receipt");
3619
3620        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3621        let (src, tgt, kind): (String, String, String) = conn
3622            .query_row(
3623                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3624                [],
3625                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3626            )
3627            .expect("edge row");
3628        assert_eq!(src, "meeting-1");
3629        assert_eq!(tgt, "task-1");
3630        assert_eq!(kind, "HAS_TASK");
3631    }
3632
3633    #[test]
3634    #[allow(clippy::too_many_lines)]
3635    fn writer_upsert_supersedes_prior_active_edge() {
3636        let db = NamedTempFile::new().expect("temporary db");
3637        let writer = WriterActor::start(
3638            db.path(),
3639            Arc::new(SchemaManager::new()),
3640            ProvenanceMode::Warn,
3641            Arc::new(TelemetryCounters::default()),
3642        )
3643        .expect("writer");
3644
3645        // Write two nodes
3646        writer
3647            .submit(WriteRequest {
3648                label: "nodes".to_owned(),
3649                nodes: vec![
3650                    NodeInsert {
3651                        row_id: "row-a".to_owned(),
3652                        logical_id: "node-a".to_owned(),
3653                        kind: "Meeting".to_owned(),
3654                        properties: "{}".to_owned(),
3655                        source_ref: Some("src-1".to_owned()),
3656                        upsert: false,
3657                        chunk_policy: ChunkPolicy::Preserve,
3658                        content_ref: None,
3659                    },
3660                    NodeInsert {
3661                        row_id: "row-b".to_owned(),
3662                        logical_id: "node-b".to_owned(),
3663                        kind: "Task".to_owned(),
3664                        properties: "{}".to_owned(),
3665                        source_ref: Some("src-1".to_owned()),
3666                        upsert: false,
3667                        chunk_policy: ChunkPolicy::Preserve,
3668                        content_ref: None,
3669                    },
3670                ],
3671                node_retires: vec![],
3672                edges: vec![],
3673                edge_retires: vec![],
3674                chunks: vec![],
3675                runs: vec![],
3676                steps: vec![],
3677                actions: vec![],
3678                optional_backfills: vec![],
3679                vec_inserts: vec![],
3680                operational_writes: vec![],
3681            })
3682            .expect("nodes write");
3683
3684        // Write v1 edge
3685        writer
3686            .submit(WriteRequest {
3687                label: "edge-v1".to_owned(),
3688                nodes: vec![],
3689                node_retires: vec![],
3690                edges: vec![EdgeInsert {
3691                    row_id: "edge-row-1".to_owned(),
3692                    logical_id: "edge-lg-1".to_owned(),
3693                    source_logical_id: "node-a".to_owned(),
3694                    target_logical_id: "node-b".to_owned(),
3695                    kind: "HAS_TASK".to_owned(),
3696                    properties: r#"{"weight":1}"#.to_owned(),
3697                    source_ref: Some("src-1".to_owned()),
3698                    upsert: false,
3699                }],
3700                edge_retires: vec![],
3701                chunks: vec![],
3702                runs: vec![],
3703                steps: vec![],
3704                actions: vec![],
3705                optional_backfills: vec![],
3706                vec_inserts: vec![],
3707                operational_writes: vec![],
3708            })
3709            .expect("edge v1 write");
3710
3711        // Upsert v2 edge
3712        writer
3713            .submit(WriteRequest {
3714                label: "edge-v2".to_owned(),
3715                nodes: vec![],
3716                node_retires: vec![],
3717                edges: vec![EdgeInsert {
3718                    row_id: "edge-row-2".to_owned(),
3719                    logical_id: "edge-lg-1".to_owned(),
3720                    source_logical_id: "node-a".to_owned(),
3721                    target_logical_id: "node-b".to_owned(),
3722                    kind: "HAS_TASK".to_owned(),
3723                    properties: r#"{"weight":2}"#.to_owned(),
3724                    source_ref: Some("src-2".to_owned()),
3725                    upsert: true,
3726                }],
3727                edge_retires: vec![],
3728                chunks: vec![],
3729                runs: vec![],
3730                steps: vec![],
3731                actions: vec![],
3732                optional_backfills: vec![],
3733                vec_inserts: vec![],
3734                operational_writes: vec![],
3735            })
3736            .expect("edge v2 upsert");
3737
3738        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3739        let (active_row_id, props): (String, String) = conn
3740            .query_row(
3741                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3742                [],
3743                |row| Ok((row.get(0)?, row.get(1)?)),
3744            )
3745            .expect("active edge");
3746        assert_eq!(active_row_id, "edge-row-2");
3747        assert!(props.contains("\"weight\":2"));
3748
3749        let superseded: i64 = conn
3750            .query_row(
3751                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3752                [],
3753                |row| row.get(0),
3754            )
3755            .expect("superseded count");
3756        assert_eq!(superseded, 1);
3757    }
3758
3759    #[test]
3760    fn writer_fts_rows_are_written_to_database() {
3761        let db = NamedTempFile::new().expect("temporary db");
3762        let writer = WriterActor::start(
3763            db.path(),
3764            Arc::new(SchemaManager::new()),
3765            ProvenanceMode::Warn,
3766            Arc::new(TelemetryCounters::default()),
3767        )
3768        .expect("writer");
3769
3770        writer
3771            .submit(WriteRequest {
3772                label: "seed".to_owned(),
3773                nodes: vec![NodeInsert {
3774                    row_id: "row-1".to_owned(),
3775                    logical_id: "logical-1".to_owned(),
3776                    kind: "Meeting".to_owned(),
3777                    properties: "{}".to_owned(),
3778                    source_ref: Some("src-1".to_owned()),
3779                    upsert: false,
3780                    chunk_policy: ChunkPolicy::Preserve,
3781                    content_ref: None,
3782                }],
3783                node_retires: vec![],
3784                edges: vec![],
3785                edge_retires: vec![],
3786                chunks: vec![ChunkInsert {
3787                    id: "chunk-1".to_owned(),
3788                    node_logical_id: "logical-1".to_owned(),
3789                    text_content: "budget discussion".to_owned(),
3790                    byte_start: None,
3791                    byte_end: None,
3792                    content_hash: None,
3793                }],
3794                runs: vec![],
3795                steps: vec![],
3796                actions: vec![],
3797                optional_backfills: vec![],
3798                vec_inserts: vec![],
3799                operational_writes: vec![],
3800            })
3801            .expect("write receipt");
3802
3803        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3804        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3805            conn.query_row(
3806                "SELECT chunk_id, node_logical_id, kind, text_content \
3807                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3808                [],
3809                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3810            )
3811            .expect("fts row");
3812        assert_eq!(chunk_id, "chunk-1");
3813        assert_eq!(node_logical_id, "logical-1");
3814        assert_eq!(kind, "Meeting");
3815        assert_eq!(text_content, "budget discussion");
3816    }
3817
3818    #[test]
3819    fn writer_receipt_warns_on_nodes_without_source_ref() {
3820        let db = NamedTempFile::new().expect("temporary db");
3821        let writer = WriterActor::start(
3822            db.path(),
3823            Arc::new(SchemaManager::new()),
3824            ProvenanceMode::Warn,
3825            Arc::new(TelemetryCounters::default()),
3826        )
3827        .expect("writer");
3828
3829        let receipt = writer
3830            .submit(WriteRequest {
3831                label: "no-source".to_owned(),
3832                nodes: vec![NodeInsert {
3833                    row_id: "row-1".to_owned(),
3834                    logical_id: "logical-1".to_owned(),
3835                    kind: "Meeting".to_owned(),
3836                    properties: "{}".to_owned(),
3837                    source_ref: None,
3838                    upsert: false,
3839                    chunk_policy: ChunkPolicy::Preserve,
3840                    content_ref: None,
3841                }],
3842                node_retires: vec![],
3843                edges: vec![],
3844                edge_retires: vec![],
3845                chunks: vec![],
3846                runs: vec![],
3847                steps: vec![],
3848                actions: vec![],
3849                optional_backfills: vec![],
3850                vec_inserts: vec![],
3851                operational_writes: vec![],
3852            })
3853            .expect("write receipt");
3854
3855        assert_eq!(receipt.provenance_warnings.len(), 1);
3856        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3857    }
3858
3859    #[test]
3860    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3861        let db = NamedTempFile::new().expect("temporary db");
3862        let writer = WriterActor::start(
3863            db.path(),
3864            Arc::new(SchemaManager::new()),
3865            ProvenanceMode::Warn,
3866            Arc::new(TelemetryCounters::default()),
3867        )
3868        .expect("writer");
3869
3870        let receipt = writer
3871            .submit(WriteRequest {
3872                label: "with-source".to_owned(),
3873                nodes: vec![NodeInsert {
3874                    row_id: "row-1".to_owned(),
3875                    logical_id: "logical-1".to_owned(),
3876                    kind: "Meeting".to_owned(),
3877                    properties: "{}".to_owned(),
3878                    source_ref: Some("src-1".to_owned()),
3879                    upsert: false,
3880                    chunk_policy: ChunkPolicy::Preserve,
3881                    content_ref: None,
3882                }],
3883                node_retires: vec![],
3884                edges: vec![],
3885                edge_retires: vec![],
3886                chunks: vec![],
3887                runs: vec![],
3888                steps: vec![],
3889                actions: vec![],
3890                optional_backfills: vec![],
3891                vec_inserts: vec![],
3892                operational_writes: vec![],
3893            })
3894            .expect("write receipt");
3895
3896        assert!(receipt.provenance_warnings.is_empty());
3897    }
3898
3899    #[test]
3900    fn writer_accepts_chunk_for_pre_existing_node() {
3901        let db = NamedTempFile::new().expect("temporary db");
3902        let writer = WriterActor::start(
3903            db.path(),
3904            Arc::new(SchemaManager::new()),
3905            ProvenanceMode::Warn,
3906            Arc::new(TelemetryCounters::default()),
3907        )
3908        .expect("writer");
3909
3910        // Request 1: submit node only
3911        writer
3912            .submit(WriteRequest {
3913                label: "r1".to_owned(),
3914                nodes: vec![NodeInsert {
3915                    row_id: "row-1".to_owned(),
3916                    logical_id: "logical-1".to_owned(),
3917                    kind: "Meeting".to_owned(),
3918                    properties: "{}".to_owned(),
3919                    source_ref: Some("src-1".to_owned()),
3920                    upsert: false,
3921                    chunk_policy: ChunkPolicy::Preserve,
3922                    content_ref: None,
3923                }],
3924                node_retires: vec![],
3925                edges: vec![],
3926                edge_retires: vec![],
3927                chunks: vec![],
3928                runs: vec![],
3929                steps: vec![],
3930                actions: vec![],
3931                optional_backfills: vec![],
3932                vec_inserts: vec![],
3933                operational_writes: vec![],
3934            })
3935            .expect("r1 write");
3936
3937        // Request 2: submit chunk for pre-existing node
3938        writer
3939            .submit(WriteRequest {
3940                label: "r2".to_owned(),
3941                nodes: vec![],
3942                node_retires: vec![],
3943                edges: vec![],
3944                edge_retires: vec![],
3945                chunks: vec![ChunkInsert {
3946                    id: "chunk-1".to_owned(),
3947                    node_logical_id: "logical-1".to_owned(),
3948                    text_content: "budget discussion".to_owned(),
3949                    byte_start: None,
3950                    byte_end: None,
3951                    content_hash: None,
3952                }],
3953                runs: vec![],
3954                steps: vec![],
3955                actions: vec![],
3956                optional_backfills: vec![],
3957                vec_inserts: vec![],
3958                operational_writes: vec![],
3959            })
3960            .expect("r2 write — chunk for pre-existing node");
3961
3962        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3963        let count: i64 = conn
3964            .query_row(
3965                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3966                [],
3967                |row| row.get(0),
3968            )
3969            .expect("fts count");
3970        assert_eq!(
3971            count, 1,
3972            "FTS row must exist for chunk attached to pre-existing node"
3973        );
3974    }
3975
3976    #[test]
3977    fn writer_rejects_chunk_for_completely_unknown_node() {
3978        let db = NamedTempFile::new().expect("temporary db");
3979        let writer = WriterActor::start(
3980            db.path(),
3981            Arc::new(SchemaManager::new()),
3982            ProvenanceMode::Warn,
3983            Arc::new(TelemetryCounters::default()),
3984        )
3985        .expect("writer");
3986
3987        let result = writer.submit(WriteRequest {
3988            label: "bad".to_owned(),
3989            nodes: vec![],
3990            node_retires: vec![],
3991            edges: vec![],
3992            edge_retires: vec![],
3993            chunks: vec![ChunkInsert {
3994                id: "chunk-1".to_owned(),
3995                node_logical_id: "nonexistent".to_owned(),
3996                text_content: "some text".to_owned(),
3997                byte_start: None,
3998                byte_end: None,
3999                content_hash: None,
4000            }],
4001            runs: vec![],
4002            steps: vec![],
4003            actions: vec![],
4004            optional_backfills: vec![],
4005            vec_inserts: vec![],
4006            operational_writes: vec![],
4007        });
4008
4009        assert!(
4010            matches!(result, Err(EngineError::InvalidWrite(_))),
4011            "completely unknown node must return InvalidWrite"
4012        );
4013    }
4014
4015    #[test]
4016    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
4017        let db = NamedTempFile::new().expect("temporary db");
4018        let writer = WriterActor::start(
4019            db.path(),
4020            Arc::new(SchemaManager::new()),
4021            ProvenanceMode::Warn,
4022            Arc::new(TelemetryCounters::default()),
4023        )
4024        .expect("writer");
4025
4026        let receipt = writer
4027            .submit(WriteRequest {
4028                label: "seed".to_owned(),
4029                nodes: vec![NodeInsert {
4030                    row_id: "row-1".to_owned(),
4031                    logical_id: "logical-1".to_owned(),
4032                    kind: "Meeting".to_owned(),
4033                    properties: "{}".to_owned(),
4034                    source_ref: None,
4035                    upsert: false,
4036                    chunk_policy: ChunkPolicy::Preserve,
4037                    content_ref: None,
4038                }],
4039                node_retires: vec![],
4040                edges: vec![],
4041                edge_retires: vec![],
4042                chunks: vec![ChunkInsert {
4043                    id: "chunk-1".to_owned(),
4044                    node_logical_id: "logical-1".to_owned(),
4045                    text_content: "budget discussion".to_owned(),
4046                    byte_start: None,
4047                    byte_end: None,
4048                    content_hash: None,
4049                }],
4050                runs: vec![],
4051                steps: vec![],
4052                actions: vec![],
4053                optional_backfills: vec![],
4054                vec_inserts: vec![],
4055                operational_writes: vec![],
4056            })
4057            .expect("write receipt");
4058
4059        assert_eq!(receipt.label, "seed");
4060    }
4061
4062    #[test]
4063    fn writer_node_retire_supersedes_active_node() {
4064        let db = NamedTempFile::new().expect("temporary db");
4065        let writer = WriterActor::start(
4066            db.path(),
4067            Arc::new(SchemaManager::new()),
4068            ProvenanceMode::Warn,
4069            Arc::new(TelemetryCounters::default()),
4070        )
4071        .expect("writer");
4072
4073        writer
4074            .submit(WriteRequest {
4075                label: "seed".to_owned(),
4076                nodes: vec![NodeInsert {
4077                    row_id: "row-1".to_owned(),
4078                    logical_id: "meeting-1".to_owned(),
4079                    kind: "Meeting".to_owned(),
4080                    properties: "{}".to_owned(),
4081                    source_ref: Some("src-1".to_owned()),
4082                    upsert: false,
4083                    chunk_policy: ChunkPolicy::Preserve,
4084                    content_ref: None,
4085                }],
4086                node_retires: vec![],
4087                edges: vec![],
4088                edge_retires: vec![],
4089                chunks: vec![],
4090                runs: vec![],
4091                steps: vec![],
4092                actions: vec![],
4093                optional_backfills: vec![],
4094                vec_inserts: vec![],
4095                operational_writes: vec![],
4096            })
4097            .expect("seed write");
4098
4099        writer
4100            .submit(WriteRequest {
4101                label: "retire".to_owned(),
4102                nodes: vec![],
4103                node_retires: vec![NodeRetire {
4104                    logical_id: "meeting-1".to_owned(),
4105                    source_ref: Some("src-2".to_owned()),
4106                }],
4107                edges: vec![],
4108                edge_retires: vec![],
4109                chunks: vec![],
4110                runs: vec![],
4111                steps: vec![],
4112                actions: vec![],
4113                optional_backfills: vec![],
4114                vec_inserts: vec![],
4115                operational_writes: vec![],
4116            })
4117            .expect("retire write");
4118
4119        let conn = rusqlite::Connection::open(db.path()).expect("open");
4120        let active: i64 = conn
4121            .query_row(
4122                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
4123                [],
4124                |r| r.get(0),
4125            )
4126            .expect("count active");
4127        let historical: i64 = conn
4128            .query_row(
4129                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
4130                [],
4131                |r| r.get(0),
4132            )
4133            .expect("count historical");
4134
4135        assert_eq!(active, 0, "active count must be 0 after retire");
4136        assert_eq!(historical, 1, "historical count must be 1 after retire");
4137    }
4138
4139    #[test]
4140    fn writer_node_retire_preserves_chunks_and_clears_fts() {
4141        let db = NamedTempFile::new().expect("temporary db");
4142        let writer = WriterActor::start(
4143            db.path(),
4144            Arc::new(SchemaManager::new()),
4145            ProvenanceMode::Warn,
4146            Arc::new(TelemetryCounters::default()),
4147        )
4148        .expect("writer");
4149
4150        writer
4151            .submit(WriteRequest {
4152                label: "seed".to_owned(),
4153                nodes: vec![NodeInsert {
4154                    row_id: "row-1".to_owned(),
4155                    logical_id: "meeting-1".to_owned(),
4156                    kind: "Meeting".to_owned(),
4157                    properties: "{}".to_owned(),
4158                    source_ref: Some("src-1".to_owned()),
4159                    upsert: false,
4160                    chunk_policy: ChunkPolicy::Preserve,
4161                    content_ref: None,
4162                }],
4163                node_retires: vec![],
4164                edges: vec![],
4165                edge_retires: vec![],
4166                chunks: vec![ChunkInsert {
4167                    id: "chunk-1".to_owned(),
4168                    node_logical_id: "meeting-1".to_owned(),
4169                    text_content: "budget discussion".to_owned(),
4170                    byte_start: None,
4171                    byte_end: None,
4172                    content_hash: None,
4173                }],
4174                runs: vec![],
4175                steps: vec![],
4176                actions: vec![],
4177                optional_backfills: vec![],
4178                vec_inserts: vec![],
4179                operational_writes: vec![],
4180            })
4181            .expect("seed write");
4182
4183        writer
4184            .submit(WriteRequest {
4185                label: "retire".to_owned(),
4186                nodes: vec![],
4187                node_retires: vec![NodeRetire {
4188                    logical_id: "meeting-1".to_owned(),
4189                    source_ref: Some("src-2".to_owned()),
4190                }],
4191                edges: vec![],
4192                edge_retires: vec![],
4193                chunks: vec![],
4194                runs: vec![],
4195                steps: vec![],
4196                actions: vec![],
4197                optional_backfills: vec![],
4198                vec_inserts: vec![],
4199                operational_writes: vec![],
4200            })
4201            .expect("retire write");
4202
4203        let conn = rusqlite::Connection::open(db.path()).expect("open");
4204        let chunk_count: i64 = conn
4205            .query_row(
4206                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4207                [],
4208                |r| r.get(0),
4209            )
4210            .expect("chunk count");
4211        let fts_count: i64 = conn
4212            .query_row(
4213                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4214                [],
4215                |r| r.get(0),
4216            )
4217            .expect("fts count");
4218
4219        assert_eq!(
4220            chunk_count, 1,
4221            "chunks must remain after node retire so restore can re-establish content"
4222        );
4223        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4224    }
4225
4226    #[test]
4227    fn writer_edge_retire_supersedes_active_edge() {
4228        let db = NamedTempFile::new().expect("temporary db");
4229        let writer = WriterActor::start(
4230            db.path(),
4231            Arc::new(SchemaManager::new()),
4232            ProvenanceMode::Warn,
4233            Arc::new(TelemetryCounters::default()),
4234        )
4235        .expect("writer");
4236
4237        writer
4238            .submit(WriteRequest {
4239                label: "seed".to_owned(),
4240                nodes: vec![
4241                    NodeInsert {
4242                        row_id: "row-a".to_owned(),
4243                        logical_id: "node-a".to_owned(),
4244                        kind: "Meeting".to_owned(),
4245                        properties: "{}".to_owned(),
4246                        source_ref: Some("src-1".to_owned()),
4247                        upsert: false,
4248                        chunk_policy: ChunkPolicy::Preserve,
4249                        content_ref: None,
4250                    },
4251                    NodeInsert {
4252                        row_id: "row-b".to_owned(),
4253                        logical_id: "node-b".to_owned(),
4254                        kind: "Task".to_owned(),
4255                        properties: "{}".to_owned(),
4256                        source_ref: Some("src-1".to_owned()),
4257                        upsert: false,
4258                        chunk_policy: ChunkPolicy::Preserve,
4259                        content_ref: None,
4260                    },
4261                ],
4262                node_retires: vec![],
4263                edges: vec![EdgeInsert {
4264                    row_id: "edge-1".to_owned(),
4265                    logical_id: "edge-lg-1".to_owned(),
4266                    source_logical_id: "node-a".to_owned(),
4267                    target_logical_id: "node-b".to_owned(),
4268                    kind: "HAS_TASK".to_owned(),
4269                    properties: "{}".to_owned(),
4270                    source_ref: Some("src-1".to_owned()),
4271                    upsert: false,
4272                }],
4273                edge_retires: vec![],
4274                chunks: vec![],
4275                runs: vec![],
4276                steps: vec![],
4277                actions: vec![],
4278                optional_backfills: vec![],
4279                vec_inserts: vec![],
4280                operational_writes: vec![],
4281            })
4282            .expect("seed write");
4283
4284        writer
4285            .submit(WriteRequest {
4286                label: "retire-edge".to_owned(),
4287                nodes: vec![],
4288                node_retires: vec![],
4289                edges: vec![],
4290                edge_retires: vec![EdgeRetire {
4291                    logical_id: "edge-lg-1".to_owned(),
4292                    source_ref: Some("src-2".to_owned()),
4293                }],
4294                chunks: vec![],
4295                runs: vec![],
4296                steps: vec![],
4297                actions: vec![],
4298                optional_backfills: vec![],
4299                vec_inserts: vec![],
4300                operational_writes: vec![],
4301            })
4302            .expect("retire edge write");
4303
4304        let conn = rusqlite::Connection::open(db.path()).expect("open");
4305        let active: i64 = conn
4306            .query_row(
4307                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4308                [],
4309                |r| r.get(0),
4310            )
4311            .expect("active edge count");
4312
4313        assert_eq!(active, 0, "active edge count must be 0 after retire");
4314    }
4315
4316    #[test]
4317    fn writer_retire_without_source_ref_emits_provenance_warning() {
4318        let db = NamedTempFile::new().expect("temporary db");
4319        let writer = WriterActor::start(
4320            db.path(),
4321            Arc::new(SchemaManager::new()),
4322            ProvenanceMode::Warn,
4323            Arc::new(TelemetryCounters::default()),
4324        )
4325        .expect("writer");
4326
4327        writer
4328            .submit(WriteRequest {
4329                label: "seed".to_owned(),
4330                nodes: vec![NodeInsert {
4331                    row_id: "row-1".to_owned(),
4332                    logical_id: "meeting-1".to_owned(),
4333                    kind: "Meeting".to_owned(),
4334                    properties: "{}".to_owned(),
4335                    source_ref: Some("src-1".to_owned()),
4336                    upsert: false,
4337                    chunk_policy: ChunkPolicy::Preserve,
4338                    content_ref: None,
4339                }],
4340                node_retires: vec![],
4341                edges: vec![],
4342                edge_retires: vec![],
4343                chunks: vec![],
4344                runs: vec![],
4345                steps: vec![],
4346                actions: vec![],
4347                optional_backfills: vec![],
4348                vec_inserts: vec![],
4349                operational_writes: vec![],
4350            })
4351            .expect("seed write");
4352
4353        let receipt = writer
4354            .submit(WriteRequest {
4355                label: "retire-no-src".to_owned(),
4356                nodes: vec![],
4357                node_retires: vec![NodeRetire {
4358                    logical_id: "meeting-1".to_owned(),
4359                    source_ref: None,
4360                }],
4361                edges: vec![],
4362                edge_retires: vec![],
4363                chunks: vec![],
4364                runs: vec![],
4365                steps: vec![],
4366                actions: vec![],
4367                optional_backfills: vec![],
4368                vec_inserts: vec![],
4369                operational_writes: vec![],
4370            })
4371            .expect("retire write");
4372
4373        assert!(
4374            !receipt.provenance_warnings.is_empty(),
4375            "retire without source_ref must emit a provenance warning"
4376        );
4377    }
4378
4379    #[test]
4380    #[allow(clippy::too_many_lines)]
4381    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4382        let db = NamedTempFile::new().expect("temporary db");
4383        let writer = WriterActor::start(
4384            db.path(),
4385            Arc::new(SchemaManager::new()),
4386            ProvenanceMode::Warn,
4387            Arc::new(TelemetryCounters::default()),
4388        )
4389        .expect("writer");
4390
4391        writer
4392            .submit(WriteRequest {
4393                label: "v1".to_owned(),
4394                nodes: vec![NodeInsert {
4395                    row_id: "row-1".to_owned(),
4396                    logical_id: "meeting-1".to_owned(),
4397                    kind: "Meeting".to_owned(),
4398                    properties: "{}".to_owned(),
4399                    source_ref: Some("src-1".to_owned()),
4400                    upsert: false,
4401                    chunk_policy: ChunkPolicy::Preserve,
4402                    content_ref: None,
4403                }],
4404                node_retires: vec![],
4405                edges: vec![],
4406                edge_retires: vec![],
4407                chunks: vec![ChunkInsert {
4408                    id: "chunk-old".to_owned(),
4409                    node_logical_id: "meeting-1".to_owned(),
4410                    text_content: "old text".to_owned(),
4411                    byte_start: None,
4412                    byte_end: None,
4413                    content_hash: None,
4414                }],
4415                runs: vec![],
4416                steps: vec![],
4417                actions: vec![],
4418                optional_backfills: vec![],
4419                vec_inserts: vec![],
4420                operational_writes: vec![],
4421            })
4422            .expect("v1 write");
4423
4424        writer
4425            .submit(WriteRequest {
4426                label: "v2".to_owned(),
4427                nodes: vec![NodeInsert {
4428                    row_id: "row-2".to_owned(),
4429                    logical_id: "meeting-1".to_owned(),
4430                    kind: "Meeting".to_owned(),
4431                    properties: "{}".to_owned(),
4432                    source_ref: Some("src-2".to_owned()),
4433                    upsert: true,
4434                    chunk_policy: ChunkPolicy::Replace,
4435                    content_ref: None,
4436                }],
4437                node_retires: vec![],
4438                edges: vec![],
4439                edge_retires: vec![],
4440                chunks: vec![ChunkInsert {
4441                    id: "chunk-new".to_owned(),
4442                    node_logical_id: "meeting-1".to_owned(),
4443                    text_content: "new text".to_owned(),
4444                    byte_start: None,
4445                    byte_end: None,
4446                    content_hash: None,
4447                }],
4448                runs: vec![],
4449                steps: vec![],
4450                actions: vec![],
4451                optional_backfills: vec![],
4452                vec_inserts: vec![],
4453                operational_writes: vec![],
4454            })
4455            .expect("v2 write");
4456
4457        let conn = rusqlite::Connection::open(db.path()).expect("open");
4458        let old_chunk: i64 = conn
4459            .query_row(
4460                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4461                [],
4462                |r| r.get(0),
4463            )
4464            .expect("old chunk count");
4465        let new_chunk: i64 = conn
4466            .query_row(
4467                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4468                [],
4469                |r| r.get(0),
4470            )
4471            .expect("new chunk count");
4472        let fts_old: i64 = conn
4473            .query_row(
4474                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4475                [],
4476                |r| r.get(0),
4477            )
4478            .expect("old fts count");
4479
4480        assert_eq!(
4481            old_chunk, 0,
4482            "old chunk must be deleted by ChunkPolicy::Replace"
4483        );
4484        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4485        assert_eq!(
4486            fts_old, 0,
4487            "old FTS row must be deleted by ChunkPolicy::Replace"
4488        );
4489    }
4490
4491    #[test]
4492    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4493        let db = NamedTempFile::new().expect("temporary db");
4494        let writer = WriterActor::start(
4495            db.path(),
4496            Arc::new(SchemaManager::new()),
4497            ProvenanceMode::Warn,
4498            Arc::new(TelemetryCounters::default()),
4499        )
4500        .expect("writer");
4501
4502        writer
4503            .submit(WriteRequest {
4504                label: "v1".to_owned(),
4505                nodes: vec![NodeInsert {
4506                    row_id: "row-1".to_owned(),
4507                    logical_id: "meeting-1".to_owned(),
4508                    kind: "Meeting".to_owned(),
4509                    properties: "{}".to_owned(),
4510                    source_ref: Some("src-1".to_owned()),
4511                    upsert: false,
4512                    chunk_policy: ChunkPolicy::Preserve,
4513                    content_ref: None,
4514                }],
4515                node_retires: vec![],
4516                edges: vec![],
4517                edge_retires: vec![],
4518                chunks: vec![ChunkInsert {
4519                    id: "chunk-old".to_owned(),
4520                    node_logical_id: "meeting-1".to_owned(),
4521                    text_content: "old text".to_owned(),
4522                    byte_start: None,
4523                    byte_end: None,
4524                    content_hash: None,
4525                }],
4526                runs: vec![],
4527                steps: vec![],
4528                actions: vec![],
4529                optional_backfills: vec![],
4530                vec_inserts: vec![],
4531                operational_writes: vec![],
4532            })
4533            .expect("v1 write");
4534
4535        writer
4536            .submit(WriteRequest {
4537                label: "v2-props-only".to_owned(),
4538                nodes: vec![NodeInsert {
4539                    row_id: "row-2".to_owned(),
4540                    logical_id: "meeting-1".to_owned(),
4541                    kind: "Meeting".to_owned(),
4542                    properties: r#"{"status":"updated"}"#.to_owned(),
4543                    source_ref: Some("src-2".to_owned()),
4544                    upsert: true,
4545                    chunk_policy: ChunkPolicy::Preserve,
4546                    content_ref: None,
4547                }],
4548                node_retires: vec![],
4549                edges: vec![],
4550                edge_retires: vec![],
4551                chunks: vec![],
4552                runs: vec![],
4553                steps: vec![],
4554                actions: vec![],
4555                optional_backfills: vec![],
4556                vec_inserts: vec![],
4557                operational_writes: vec![],
4558            })
4559            .expect("v2 preserve write");
4560
4561        let conn = rusqlite::Connection::open(db.path()).expect("open");
4562        let old_chunk: i64 = conn
4563            .query_row(
4564                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4565                [],
4566                |r| r.get(0),
4567            )
4568            .expect("old chunk count");
4569
4570        assert_eq!(
4571            old_chunk, 1,
4572            "old chunk must be preserved by ChunkPolicy::Preserve"
4573        );
4574    }
4575
4576    #[test]
4577    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4578        let db = NamedTempFile::new().expect("temporary db");
4579        let writer = WriterActor::start(
4580            db.path(),
4581            Arc::new(SchemaManager::new()),
4582            ProvenanceMode::Warn,
4583            Arc::new(TelemetryCounters::default()),
4584        )
4585        .expect("writer");
4586
4587        writer
4588            .submit(WriteRequest {
4589                label: "v1".to_owned(),
4590                nodes: vec![NodeInsert {
4591                    row_id: "row-1".to_owned(),
4592                    logical_id: "meeting-1".to_owned(),
4593                    kind: "Meeting".to_owned(),
4594                    properties: "{}".to_owned(),
4595                    source_ref: Some("src-1".to_owned()),
4596                    upsert: false,
4597                    chunk_policy: ChunkPolicy::Preserve,
4598                    content_ref: None,
4599                }],
4600                node_retires: vec![],
4601                edges: vec![],
4602                edge_retires: vec![],
4603                chunks: vec![ChunkInsert {
4604                    id: "chunk-existing".to_owned(),
4605                    node_logical_id: "meeting-1".to_owned(),
4606                    text_content: "existing text".to_owned(),
4607                    byte_start: None,
4608                    byte_end: None,
4609                    content_hash: None,
4610                }],
4611                runs: vec![],
4612                steps: vec![],
4613                actions: vec![],
4614                optional_backfills: vec![],
4615                vec_inserts: vec![],
4616                operational_writes: vec![],
4617            })
4618            .expect("v1 write");
4619
4620        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4621        writer
4622            .submit(WriteRequest {
4623                label: "insert-no-upsert".to_owned(),
4624                nodes: vec![NodeInsert {
4625                    row_id: "row-2".to_owned(),
4626                    logical_id: "meeting-2".to_owned(),
4627                    kind: "Meeting".to_owned(),
4628                    properties: "{}".to_owned(),
4629                    source_ref: Some("src-2".to_owned()),
4630                    upsert: false,
4631                    chunk_policy: ChunkPolicy::Replace,
4632                    content_ref: None,
4633                }],
4634                node_retires: vec![],
4635                edges: vec![],
4636                edge_retires: vec![],
4637                chunks: vec![],
4638                runs: vec![],
4639                steps: vec![],
4640                actions: vec![],
4641                optional_backfills: vec![],
4642                vec_inserts: vec![],
4643                operational_writes: vec![],
4644            })
4645            .expect("insert no-upsert write");
4646
4647        let conn = rusqlite::Connection::open(db.path()).expect("open");
4648        let existing_chunk: i64 = conn
4649            .query_row(
4650                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4651                [],
4652                |r| r.get(0),
4653            )
4654            .expect("chunk count");
4655
4656        assert_eq!(
4657            existing_chunk, 1,
4658            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4659        );
4660    }
4661
4662    #[test]
4663    fn writer_run_upsert_supersedes_prior_active_run() {
4664        let db = NamedTempFile::new().expect("temporary db");
4665        let writer = WriterActor::start(
4666            db.path(),
4667            Arc::new(SchemaManager::new()),
4668            ProvenanceMode::Warn,
4669            Arc::new(TelemetryCounters::default()),
4670        )
4671        .expect("writer");
4672
4673        writer
4674            .submit(WriteRequest {
4675                label: "v1".to_owned(),
4676                nodes: vec![],
4677                node_retires: vec![],
4678                edges: vec![],
4679                edge_retires: vec![],
4680                chunks: vec![],
4681                runs: vec![RunInsert {
4682                    id: "run-v1".to_owned(),
4683                    kind: "session".to_owned(),
4684                    status: "completed".to_owned(),
4685                    properties: "{}".to_owned(),
4686                    source_ref: Some("src-1".to_owned()),
4687                    upsert: false,
4688                    supersedes_id: None,
4689                }],
4690                steps: vec![],
4691                actions: vec![],
4692                optional_backfills: vec![],
4693                vec_inserts: vec![],
4694                operational_writes: vec![],
4695            })
4696            .expect("v1 run write");
4697
4698        writer
4699            .submit(WriteRequest {
4700                label: "v2".to_owned(),
4701                nodes: vec![],
4702                node_retires: vec![],
4703                edges: vec![],
4704                edge_retires: vec![],
4705                chunks: vec![],
4706                runs: vec![RunInsert {
4707                    id: "run-v2".to_owned(),
4708                    kind: "session".to_owned(),
4709                    status: "completed".to_owned(),
4710                    properties: "{}".to_owned(),
4711                    source_ref: Some("src-2".to_owned()),
4712                    upsert: true,
4713                    supersedes_id: Some("run-v1".to_owned()),
4714                }],
4715                steps: vec![],
4716                actions: vec![],
4717                optional_backfills: vec![],
4718                vec_inserts: vec![],
4719                operational_writes: vec![],
4720            })
4721            .expect("v2 run write");
4722
4723        let conn = rusqlite::Connection::open(db.path()).expect("open");
4724        let v1_historical: i64 = conn
4725            .query_row(
4726                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4727                [],
4728                |r| r.get(0),
4729            )
4730            .expect("v1 historical count");
4731        let v2_active: i64 = conn
4732            .query_row(
4733                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4734                [],
4735                |r| r.get(0),
4736            )
4737            .expect("v2 active count");
4738
4739        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4740        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4741    }
4742
4743    #[test]
4744    fn writer_step_upsert_supersedes_prior_active_step() {
4745        let db = NamedTempFile::new().expect("temporary db");
4746        let writer = WriterActor::start(
4747            db.path(),
4748            Arc::new(SchemaManager::new()),
4749            ProvenanceMode::Warn,
4750            Arc::new(TelemetryCounters::default()),
4751        )
4752        .expect("writer");
4753
4754        writer
4755            .submit(WriteRequest {
4756                label: "v1".to_owned(),
4757                nodes: vec![],
4758                node_retires: vec![],
4759                edges: vec![],
4760                edge_retires: vec![],
4761                chunks: vec![],
4762                runs: vec![RunInsert {
4763                    id: "run-1".to_owned(),
4764                    kind: "session".to_owned(),
4765                    status: "completed".to_owned(),
4766                    properties: "{}".to_owned(),
4767                    source_ref: Some("src-1".to_owned()),
4768                    upsert: false,
4769                    supersedes_id: None,
4770                }],
4771                steps: vec![StepInsert {
4772                    id: "step-v1".to_owned(),
4773                    run_id: "run-1".to_owned(),
4774                    kind: "llm".to_owned(),
4775                    status: "completed".to_owned(),
4776                    properties: "{}".to_owned(),
4777                    source_ref: Some("src-1".to_owned()),
4778                    upsert: false,
4779                    supersedes_id: None,
4780                }],
4781                actions: vec![],
4782                optional_backfills: vec![],
4783                vec_inserts: vec![],
4784                operational_writes: vec![],
4785            })
4786            .expect("v1 step write");
4787
4788        writer
4789            .submit(WriteRequest {
4790                label: "v2".to_owned(),
4791                nodes: vec![],
4792                node_retires: vec![],
4793                edges: vec![],
4794                edge_retires: vec![],
4795                chunks: vec![],
4796                runs: vec![],
4797                steps: vec![StepInsert {
4798                    id: "step-v2".to_owned(),
4799                    run_id: "run-1".to_owned(),
4800                    kind: "llm".to_owned(),
4801                    status: "completed".to_owned(),
4802                    properties: "{}".to_owned(),
4803                    source_ref: Some("src-2".to_owned()),
4804                    upsert: true,
4805                    supersedes_id: Some("step-v1".to_owned()),
4806                }],
4807                actions: vec![],
4808                optional_backfills: vec![],
4809                vec_inserts: vec![],
4810                operational_writes: vec![],
4811            })
4812            .expect("v2 step write");
4813
4814        let conn = rusqlite::Connection::open(db.path()).expect("open");
4815        let v1_historical: i64 = conn
4816            .query_row(
4817                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4818                [],
4819                |r| r.get(0),
4820            )
4821            .expect("v1 historical count");
4822        let v2_active: i64 = conn
4823            .query_row(
4824                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4825                [],
4826                |r| r.get(0),
4827            )
4828            .expect("v2 active count");
4829
4830        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4831        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4832    }
4833
4834    #[test]
4835    fn writer_action_upsert_supersedes_prior_active_action() {
4836        let db = NamedTempFile::new().expect("temporary db");
4837        let writer = WriterActor::start(
4838            db.path(),
4839            Arc::new(SchemaManager::new()),
4840            ProvenanceMode::Warn,
4841            Arc::new(TelemetryCounters::default()),
4842        )
4843        .expect("writer");
4844
4845        writer
4846            .submit(WriteRequest {
4847                label: "v1".to_owned(),
4848                nodes: vec![],
4849                node_retires: vec![],
4850                edges: vec![],
4851                edge_retires: vec![],
4852                chunks: vec![],
4853                runs: vec![RunInsert {
4854                    id: "run-1".to_owned(),
4855                    kind: "session".to_owned(),
4856                    status: "completed".to_owned(),
4857                    properties: "{}".to_owned(),
4858                    source_ref: Some("src-1".to_owned()),
4859                    upsert: false,
4860                    supersedes_id: None,
4861                }],
4862                steps: vec![StepInsert {
4863                    id: "step-1".to_owned(),
4864                    run_id: "run-1".to_owned(),
4865                    kind: "llm".to_owned(),
4866                    status: "completed".to_owned(),
4867                    properties: "{}".to_owned(),
4868                    source_ref: Some("src-1".to_owned()),
4869                    upsert: false,
4870                    supersedes_id: None,
4871                }],
4872                actions: vec![ActionInsert {
4873                    id: "action-v1".to_owned(),
4874                    step_id: "step-1".to_owned(),
4875                    kind: "emit".to_owned(),
4876                    status: "completed".to_owned(),
4877                    properties: "{}".to_owned(),
4878                    source_ref: Some("src-1".to_owned()),
4879                    upsert: false,
4880                    supersedes_id: None,
4881                }],
4882                optional_backfills: vec![],
4883                vec_inserts: vec![],
4884                operational_writes: vec![],
4885            })
4886            .expect("v1 action write");
4887
4888        writer
4889            .submit(WriteRequest {
4890                label: "v2".to_owned(),
4891                nodes: vec![],
4892                node_retires: vec![],
4893                edges: vec![],
4894                edge_retires: vec![],
4895                chunks: vec![],
4896                runs: vec![],
4897                steps: vec![],
4898                actions: vec![ActionInsert {
4899                    id: "action-v2".to_owned(),
4900                    step_id: "step-1".to_owned(),
4901                    kind: "emit".to_owned(),
4902                    status: "completed".to_owned(),
4903                    properties: "{}".to_owned(),
4904                    source_ref: Some("src-2".to_owned()),
4905                    upsert: true,
4906                    supersedes_id: Some("action-v1".to_owned()),
4907                }],
4908                optional_backfills: vec![],
4909                vec_inserts: vec![],
4910                operational_writes: vec![],
4911            })
4912            .expect("v2 action write");
4913
4914        let conn = rusqlite::Connection::open(db.path()).expect("open");
4915        let v1_historical: i64 = conn
4916            .query_row(
4917                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4918                [],
4919                |r| r.get(0),
4920            )
4921            .expect("v1 historical count");
4922        let v2_active: i64 = conn
4923            .query_row(
4924                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4925                [],
4926                |r| r.get(0),
4927            )
4928            .expect("v2 active count");
4929
4930        assert_eq!(
4931            v1_historical, 1,
4932            "action-v1 must be historical after upsert"
4933        );
4934        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4935    }
4936
4937    // P0: runtime upsert without supersedes_id must be rejected
4938
4939    #[test]
4940    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4941        let db = NamedTempFile::new().expect("temporary db");
4942        let writer = WriterActor::start(
4943            db.path(),
4944            Arc::new(SchemaManager::new()),
4945            ProvenanceMode::Warn,
4946            Arc::new(TelemetryCounters::default()),
4947        )
4948        .expect("writer");
4949
4950        let result = writer.submit(WriteRequest {
4951            label: "bad".to_owned(),
4952            nodes: vec![],
4953            node_retires: vec![],
4954            edges: vec![],
4955            edge_retires: vec![],
4956            chunks: vec![],
4957            runs: vec![RunInsert {
4958                id: "run-1".to_owned(),
4959                kind: "session".to_owned(),
4960                status: "completed".to_owned(),
4961                properties: "{}".to_owned(),
4962                source_ref: None,
4963                upsert: true,
4964                supersedes_id: None,
4965            }],
4966            steps: vec![],
4967            actions: vec![],
4968            optional_backfills: vec![],
4969            vec_inserts: vec![],
4970            operational_writes: vec![],
4971        });
4972
4973        assert!(
4974            matches!(result, Err(EngineError::InvalidWrite(_))),
4975            "run upsert=true without supersedes_id must return InvalidWrite"
4976        );
4977    }
4978
4979    #[test]
4980    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4981        let db = NamedTempFile::new().expect("temporary db");
4982        let writer = WriterActor::start(
4983            db.path(),
4984            Arc::new(SchemaManager::new()),
4985            ProvenanceMode::Warn,
4986            Arc::new(TelemetryCounters::default()),
4987        )
4988        .expect("writer");
4989
4990        let result = writer.submit(WriteRequest {
4991            label: "bad".to_owned(),
4992            nodes: vec![],
4993            node_retires: vec![],
4994            edges: vec![],
4995            edge_retires: vec![],
4996            chunks: vec![],
4997            runs: vec![],
4998            steps: vec![StepInsert {
4999                id: "step-1".to_owned(),
5000                run_id: "run-1".to_owned(),
5001                kind: "llm".to_owned(),
5002                status: "completed".to_owned(),
5003                properties: "{}".to_owned(),
5004                source_ref: None,
5005                upsert: true,
5006                supersedes_id: None,
5007            }],
5008            actions: vec![],
5009            optional_backfills: vec![],
5010            vec_inserts: vec![],
5011            operational_writes: vec![],
5012        });
5013
5014        assert!(
5015            matches!(result, Err(EngineError::InvalidWrite(_))),
5016            "step upsert=true without supersedes_id must return InvalidWrite"
5017        );
5018    }
5019
5020    #[test]
5021    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
5022        let db = NamedTempFile::new().expect("temporary db");
5023        let writer = WriterActor::start(
5024            db.path(),
5025            Arc::new(SchemaManager::new()),
5026            ProvenanceMode::Warn,
5027            Arc::new(TelemetryCounters::default()),
5028        )
5029        .expect("writer");
5030
5031        let result = writer.submit(WriteRequest {
5032            label: "bad".to_owned(),
5033            nodes: vec![],
5034            node_retires: vec![],
5035            edges: vec![],
5036            edge_retires: vec![],
5037            chunks: vec![],
5038            runs: vec![],
5039            steps: vec![],
5040            actions: vec![ActionInsert {
5041                id: "action-1".to_owned(),
5042                step_id: "step-1".to_owned(),
5043                kind: "emit".to_owned(),
5044                status: "completed".to_owned(),
5045                properties: "{}".to_owned(),
5046                source_ref: None,
5047                upsert: true,
5048                supersedes_id: None,
5049            }],
5050            optional_backfills: vec![],
5051            vec_inserts: vec![],
5052            operational_writes: vec![],
5053        });
5054
5055        assert!(
5056            matches!(result, Err(EngineError::InvalidWrite(_))),
5057            "action upsert=true without supersedes_id must return InvalidWrite"
5058        );
5059    }
5060
5061    // P1a/b: provenance warnings for edge inserts and runtime table inserts
5062
5063    #[test]
5064    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
5065        let db = NamedTempFile::new().expect("temporary db");
5066        let writer = WriterActor::start(
5067            db.path(),
5068            Arc::new(SchemaManager::new()),
5069            ProvenanceMode::Warn,
5070            Arc::new(TelemetryCounters::default()),
5071        )
5072        .expect("writer");
5073
5074        let receipt = writer
5075            .submit(WriteRequest {
5076                label: "test".to_owned(),
5077                nodes: vec![
5078                    NodeInsert {
5079                        row_id: "row-a".to_owned(),
5080                        logical_id: "node-a".to_owned(),
5081                        kind: "Meeting".to_owned(),
5082                        properties: "{}".to_owned(),
5083                        source_ref: Some("src-1".to_owned()),
5084                        upsert: false,
5085                        chunk_policy: ChunkPolicy::Preserve,
5086                        content_ref: None,
5087                    },
5088                    NodeInsert {
5089                        row_id: "row-b".to_owned(),
5090                        logical_id: "node-b".to_owned(),
5091                        kind: "Task".to_owned(),
5092                        properties: "{}".to_owned(),
5093                        source_ref: Some("src-1".to_owned()),
5094                        upsert: false,
5095                        chunk_policy: ChunkPolicy::Preserve,
5096                        content_ref: None,
5097                    },
5098                ],
5099                node_retires: vec![],
5100                edges: vec![EdgeInsert {
5101                    row_id: "edge-1".to_owned(),
5102                    logical_id: "edge-lg-1".to_owned(),
5103                    source_logical_id: "node-a".to_owned(),
5104                    target_logical_id: "node-b".to_owned(),
5105                    kind: "HAS_TASK".to_owned(),
5106                    properties: "{}".to_owned(),
5107                    source_ref: None,
5108                    upsert: false,
5109                }],
5110                edge_retires: vec![],
5111                chunks: vec![],
5112                runs: vec![],
5113                steps: vec![],
5114                actions: vec![],
5115                optional_backfills: vec![],
5116                vec_inserts: vec![],
5117                operational_writes: vec![],
5118            })
5119            .expect("write");
5120
5121        assert!(
5122            !receipt.provenance_warnings.is_empty(),
5123            "edge insert without source_ref must emit a provenance warning"
5124        );
5125    }
5126
5127    #[test]
5128    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
5129        let db = NamedTempFile::new().expect("temporary db");
5130        let writer = WriterActor::start(
5131            db.path(),
5132            Arc::new(SchemaManager::new()),
5133            ProvenanceMode::Warn,
5134            Arc::new(TelemetryCounters::default()),
5135        )
5136        .expect("writer");
5137
5138        let receipt = writer
5139            .submit(WriteRequest {
5140                label: "test".to_owned(),
5141                nodes: vec![],
5142                node_retires: vec![],
5143                edges: vec![],
5144                edge_retires: vec![],
5145                chunks: vec![],
5146                runs: vec![RunInsert {
5147                    id: "run-1".to_owned(),
5148                    kind: "session".to_owned(),
5149                    status: "completed".to_owned(),
5150                    properties: "{}".to_owned(),
5151                    source_ref: None,
5152                    upsert: false,
5153                    supersedes_id: None,
5154                }],
5155                steps: vec![],
5156                actions: vec![],
5157                optional_backfills: vec![],
5158                vec_inserts: vec![],
5159                operational_writes: vec![],
5160            })
5161            .expect("write");
5162
5163        assert!(
5164            !receipt.provenance_warnings.is_empty(),
5165            "run insert without source_ref must emit a provenance warning"
5166        );
5167    }
5168
5169    // P1c: retire a node AND submit chunks for the same logical_id in one request
5170
5171    #[test]
5172    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5173        let db = NamedTempFile::new().expect("temporary db");
5174        let writer = WriterActor::start(
5175            db.path(),
5176            Arc::new(SchemaManager::new()),
5177            ProvenanceMode::Warn,
5178            Arc::new(TelemetryCounters::default()),
5179        )
5180        .expect("writer");
5181
5182        // First seed the node so it exists
5183        writer
5184            .submit(WriteRequest {
5185                label: "seed".to_owned(),
5186                nodes: vec![NodeInsert {
5187                    row_id: "row-1".to_owned(),
5188                    logical_id: "meeting-1".to_owned(),
5189                    kind: "Meeting".to_owned(),
5190                    properties: "{}".to_owned(),
5191                    source_ref: Some("src-1".to_owned()),
5192                    upsert: false,
5193                    chunk_policy: ChunkPolicy::Preserve,
5194                    content_ref: None,
5195                }],
5196                node_retires: vec![],
5197                edges: vec![],
5198                edge_retires: vec![],
5199                chunks: vec![],
5200                runs: vec![],
5201                steps: vec![],
5202                actions: vec![],
5203                optional_backfills: vec![],
5204                vec_inserts: vec![],
5205                operational_writes: vec![],
5206            })
5207            .expect("seed write");
5208
5209        // Now try to retire it AND add a chunk for it in the same request
5210        let result = writer.submit(WriteRequest {
5211            label: "bad".to_owned(),
5212            nodes: vec![],
5213            node_retires: vec![NodeRetire {
5214                logical_id: "meeting-1".to_owned(),
5215                source_ref: Some("src-2".to_owned()),
5216            }],
5217            edges: vec![],
5218            edge_retires: vec![],
5219            chunks: vec![ChunkInsert {
5220                id: "chunk-bad".to_owned(),
5221                node_logical_id: "meeting-1".to_owned(),
5222                text_content: "some text".to_owned(),
5223                byte_start: None,
5224                byte_end: None,
5225                content_hash: None,
5226            }],
5227            runs: vec![],
5228            steps: vec![],
5229            actions: vec![],
5230            optional_backfills: vec![],
5231            vec_inserts: vec![],
5232            operational_writes: vec![],
5233        });
5234
5235        assert!(
5236            matches!(result, Err(EngineError::InvalidWrite(_))),
5237            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5238        );
5239    }
5240
5241    // --- Item 1: prepare_cached batch insert ---
5242
5243    #[test]
5244    fn writer_batch_insert_multiple_nodes() {
5245        let db = NamedTempFile::new().expect("temporary db");
5246        let writer = WriterActor::start(
5247            db.path(),
5248            Arc::new(SchemaManager::new()),
5249            ProvenanceMode::Warn,
5250            Arc::new(TelemetryCounters::default()),
5251        )
5252        .expect("writer");
5253
5254        let nodes: Vec<NodeInsert> = (0..100)
5255            .map(|i| NodeInsert {
5256                row_id: format!("row-{i}"),
5257                logical_id: format!("lg-{i}"),
5258                kind: "Note".to_owned(),
5259                properties: "{}".to_owned(),
5260                source_ref: Some("batch-src".to_owned()),
5261                upsert: false,
5262                chunk_policy: ChunkPolicy::Preserve,
5263                content_ref: None,
5264            })
5265            .collect();
5266
5267        writer
5268            .submit(WriteRequest {
5269                label: "batch".to_owned(),
5270                nodes,
5271                node_retires: vec![],
5272                edges: vec![],
5273                edge_retires: vec![],
5274                chunks: vec![],
5275                runs: vec![],
5276                steps: vec![],
5277                actions: vec![],
5278                optional_backfills: vec![],
5279                vec_inserts: vec![],
5280                operational_writes: vec![],
5281            })
5282            .expect("batch write");
5283
5284        let conn = rusqlite::Connection::open(db.path()).expect("open");
5285        let count: i64 = conn
5286            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5287            .expect("count nodes");
5288        assert_eq!(
5289            count, 100,
5290            "all 100 nodes must be present after batch insert"
5291        );
5292    }
5293
5294    // --- Item 2: ID validation ---
5295
5296    #[test]
5297    fn prepare_write_rejects_empty_node_row_id() {
5298        let db = NamedTempFile::new().expect("temporary db");
5299        let writer = WriterActor::start(
5300            db.path(),
5301            Arc::new(SchemaManager::new()),
5302            ProvenanceMode::Warn,
5303            Arc::new(TelemetryCounters::default()),
5304        )
5305        .expect("writer");
5306
5307        let result = writer.submit(WriteRequest {
5308            label: "test".to_owned(),
5309            nodes: vec![NodeInsert {
5310                row_id: String::new(),
5311                logical_id: "lg-1".to_owned(),
5312                kind: "Note".to_owned(),
5313                properties: "{}".to_owned(),
5314                source_ref: None,
5315                upsert: false,
5316                chunk_policy: ChunkPolicy::Preserve,
5317                content_ref: None,
5318            }],
5319            node_retires: vec![],
5320            edges: vec![],
5321            edge_retires: vec![],
5322            chunks: vec![],
5323            runs: vec![],
5324            steps: vec![],
5325            actions: vec![],
5326            optional_backfills: vec![],
5327            vec_inserts: vec![],
5328            operational_writes: vec![],
5329        });
5330
5331        assert!(
5332            matches!(result, Err(EngineError::InvalidWrite(_))),
5333            "empty row_id must be rejected"
5334        );
5335    }
5336
5337    #[test]
5338    fn prepare_write_rejects_empty_node_logical_id() {
5339        let db = NamedTempFile::new().expect("temporary db");
5340        let writer = WriterActor::start(
5341            db.path(),
5342            Arc::new(SchemaManager::new()),
5343            ProvenanceMode::Warn,
5344            Arc::new(TelemetryCounters::default()),
5345        )
5346        .expect("writer");
5347
5348        let result = writer.submit(WriteRequest {
5349            label: "test".to_owned(),
5350            nodes: vec![NodeInsert {
5351                row_id: "row-1".to_owned(),
5352                logical_id: String::new(),
5353                kind: "Note".to_owned(),
5354                properties: "{}".to_owned(),
5355                source_ref: None,
5356                upsert: false,
5357                chunk_policy: ChunkPolicy::Preserve,
5358                content_ref: None,
5359            }],
5360            node_retires: vec![],
5361            edges: vec![],
5362            edge_retires: vec![],
5363            chunks: vec![],
5364            runs: vec![],
5365            steps: vec![],
5366            actions: vec![],
5367            optional_backfills: vec![],
5368            vec_inserts: vec![],
5369            operational_writes: vec![],
5370        });
5371
5372        assert!(
5373            matches!(result, Err(EngineError::InvalidWrite(_))),
5374            "empty logical_id must be rejected"
5375        );
5376    }
5377
5378    #[test]
5379    fn prepare_write_rejects_duplicate_row_ids_in_request() {
5380        let db = NamedTempFile::new().expect("temporary db");
5381        let writer = WriterActor::start(
5382            db.path(),
5383            Arc::new(SchemaManager::new()),
5384            ProvenanceMode::Warn,
5385            Arc::new(TelemetryCounters::default()),
5386        )
5387        .expect("writer");
5388
5389        let result = writer.submit(WriteRequest {
5390            label: "test".to_owned(),
5391            nodes: vec![
5392                NodeInsert {
5393                    row_id: "row-1".to_owned(),
5394                    logical_id: "lg-1".to_owned(),
5395                    kind: "Note".to_owned(),
5396                    properties: "{}".to_owned(),
5397                    source_ref: None,
5398                    upsert: false,
5399                    chunk_policy: ChunkPolicy::Preserve,
5400                    content_ref: None,
5401                },
5402                NodeInsert {
5403                    row_id: "row-1".to_owned(), // duplicate
5404                    logical_id: "lg-2".to_owned(),
5405                    kind: "Note".to_owned(),
5406                    properties: "{}".to_owned(),
5407                    source_ref: None,
5408                    upsert: false,
5409                    chunk_policy: ChunkPolicy::Preserve,
5410                    content_ref: None,
5411                },
5412            ],
5413            node_retires: vec![],
5414            edges: vec![],
5415            edge_retires: vec![],
5416            chunks: vec![],
5417            runs: vec![],
5418            steps: vec![],
5419            actions: vec![],
5420            optional_backfills: vec![],
5421            vec_inserts: vec![],
5422            operational_writes: vec![],
5423        });
5424
5425        assert!(
5426            matches!(result, Err(EngineError::InvalidWrite(_))),
5427            "duplicate row_id within request must be rejected"
5428        );
5429    }
5430
5431    #[test]
5432    fn prepare_write_rejects_empty_chunk_id() {
5433        let db = NamedTempFile::new().expect("temporary db");
5434        let writer = WriterActor::start(
5435            db.path(),
5436            Arc::new(SchemaManager::new()),
5437            ProvenanceMode::Warn,
5438            Arc::new(TelemetryCounters::default()),
5439        )
5440        .expect("writer");
5441
5442        let result = writer.submit(WriteRequest {
5443            label: "test".to_owned(),
5444            nodes: vec![NodeInsert {
5445                row_id: "row-1".to_owned(),
5446                logical_id: "lg-1".to_owned(),
5447                kind: "Note".to_owned(),
5448                properties: "{}".to_owned(),
5449                source_ref: None,
5450                upsert: false,
5451                chunk_policy: ChunkPolicy::Preserve,
5452                content_ref: None,
5453            }],
5454            node_retires: vec![],
5455            edges: vec![],
5456            edge_retires: vec![],
5457            chunks: vec![ChunkInsert {
5458                id: String::new(),
5459                node_logical_id: "lg-1".to_owned(),
5460                text_content: "some text".to_owned(),
5461                byte_start: None,
5462                byte_end: None,
5463                content_hash: None,
5464            }],
5465            runs: vec![],
5466            steps: vec![],
5467            actions: vec![],
5468            optional_backfills: vec![],
5469            vec_inserts: vec![],
5470            operational_writes: vec![],
5471        });
5472
5473        assert!(
5474            matches!(result, Err(EngineError::InvalidWrite(_))),
5475            "empty chunk id must be rejected"
5476        );
5477    }
5478
5479    // --- Item 4: provenance warning coverage tests ---
5480
5481    #[test]
5482    fn writer_receipt_warns_on_step_without_source_ref() {
5483        let db = NamedTempFile::new().expect("temporary db");
5484        let writer = WriterActor::start(
5485            db.path(),
5486            Arc::new(SchemaManager::new()),
5487            ProvenanceMode::Warn,
5488            Arc::new(TelemetryCounters::default()),
5489        )
5490        .expect("writer");
5491
5492        // seed a run first so step FK is satisfied
5493        writer
5494            .submit(WriteRequest {
5495                label: "seed-run".to_owned(),
5496                nodes: vec![],
5497                node_retires: vec![],
5498                edges: vec![],
5499                edge_retires: vec![],
5500                chunks: vec![],
5501                runs: vec![RunInsert {
5502                    id: "run-1".to_owned(),
5503                    kind: "session".to_owned(),
5504                    status: "active".to_owned(),
5505                    properties: "{}".to_owned(),
5506                    source_ref: Some("src-1".to_owned()),
5507                    upsert: false,
5508                    supersedes_id: None,
5509                }],
5510                steps: vec![],
5511                actions: vec![],
5512                optional_backfills: vec![],
5513                vec_inserts: vec![],
5514                operational_writes: vec![],
5515            })
5516            .expect("seed run");
5517
5518        let receipt = writer
5519            .submit(WriteRequest {
5520                label: "test".to_owned(),
5521                nodes: vec![],
5522                node_retires: vec![],
5523                edges: vec![],
5524                edge_retires: vec![],
5525                chunks: vec![],
5526                runs: vec![],
5527                steps: vec![StepInsert {
5528                    id: "step-1".to_owned(),
5529                    run_id: "run-1".to_owned(),
5530                    kind: "llm_call".to_owned(),
5531                    status: "completed".to_owned(),
5532                    properties: "{}".to_owned(),
5533                    source_ref: None,
5534                    upsert: false,
5535                    supersedes_id: None,
5536                }],
5537                actions: vec![],
5538                optional_backfills: vec![],
5539                vec_inserts: vec![],
5540                operational_writes: vec![],
5541            })
5542            .expect("write");
5543
5544        assert!(
5545            !receipt.provenance_warnings.is_empty(),
5546            "step insert without source_ref must emit a provenance warning"
5547        );
5548    }
5549
5550    #[test]
5551    fn writer_receipt_warns_on_action_without_source_ref() {
5552        let db = NamedTempFile::new().expect("temporary db");
5553        let writer = WriterActor::start(
5554            db.path(),
5555            Arc::new(SchemaManager::new()),
5556            ProvenanceMode::Warn,
5557            Arc::new(TelemetryCounters::default()),
5558        )
5559        .expect("writer");
5560
5561        // seed run and step so action FK is satisfied
5562        writer
5563            .submit(WriteRequest {
5564                label: "seed".to_owned(),
5565                nodes: vec![],
5566                node_retires: vec![],
5567                edges: vec![],
5568                edge_retires: vec![],
5569                chunks: vec![],
5570                runs: vec![RunInsert {
5571                    id: "run-1".to_owned(),
5572                    kind: "session".to_owned(),
5573                    status: "active".to_owned(),
5574                    properties: "{}".to_owned(),
5575                    source_ref: Some("src-1".to_owned()),
5576                    upsert: false,
5577                    supersedes_id: None,
5578                }],
5579                steps: vec![StepInsert {
5580                    id: "step-1".to_owned(),
5581                    run_id: "run-1".to_owned(),
5582                    kind: "llm_call".to_owned(),
5583                    status: "completed".to_owned(),
5584                    properties: "{}".to_owned(),
5585                    source_ref: Some("src-1".to_owned()),
5586                    upsert: false,
5587                    supersedes_id: None,
5588                }],
5589                actions: vec![],
5590                optional_backfills: vec![],
5591                vec_inserts: vec![],
5592                operational_writes: vec![],
5593            })
5594            .expect("seed");
5595
5596        let receipt = writer
5597            .submit(WriteRequest {
5598                label: "test".to_owned(),
5599                nodes: vec![],
5600                node_retires: vec![],
5601                edges: vec![],
5602                edge_retires: vec![],
5603                chunks: vec![],
5604                runs: vec![],
5605                steps: vec![],
5606                actions: vec![ActionInsert {
5607                    id: "action-1".to_owned(),
5608                    step_id: "step-1".to_owned(),
5609                    kind: "tool_call".to_owned(),
5610                    status: "completed".to_owned(),
5611                    properties: "{}".to_owned(),
5612                    source_ref: None,
5613                    upsert: false,
5614                    supersedes_id: None,
5615                }],
5616                optional_backfills: vec![],
5617                vec_inserts: vec![],
5618                operational_writes: vec![],
5619            })
5620            .expect("write");
5621
5622        assert!(
5623            !receipt.provenance_warnings.is_empty(),
5624            "action insert without source_ref must emit a provenance warning"
5625        );
5626    }
5627
5628    #[test]
5629    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5630        let db = NamedTempFile::new().expect("temporary db");
5631        let writer = WriterActor::start(
5632            db.path(),
5633            Arc::new(SchemaManager::new()),
5634            ProvenanceMode::Warn,
5635            Arc::new(TelemetryCounters::default()),
5636        )
5637        .expect("writer");
5638
5639        let receipt = writer
5640            .submit(WriteRequest {
5641                label: "test".to_owned(),
5642                nodes: vec![NodeInsert {
5643                    row_id: "row-1".to_owned(),
5644                    logical_id: "node-1".to_owned(),
5645                    kind: "Note".to_owned(),
5646                    properties: "{}".to_owned(),
5647                    source_ref: Some("src-1".to_owned()),
5648                    upsert: false,
5649                    chunk_policy: ChunkPolicy::Preserve,
5650                    content_ref: None,
5651                }],
5652                node_retires: vec![],
5653                edges: vec![],
5654                edge_retires: vec![],
5655                chunks: vec![],
5656                runs: vec![RunInsert {
5657                    id: "run-1".to_owned(),
5658                    kind: "session".to_owned(),
5659                    status: "active".to_owned(),
5660                    properties: "{}".to_owned(),
5661                    source_ref: Some("src-1".to_owned()),
5662                    upsert: false,
5663                    supersedes_id: None,
5664                }],
5665                steps: vec![StepInsert {
5666                    id: "step-1".to_owned(),
5667                    run_id: "run-1".to_owned(),
5668                    kind: "llm_call".to_owned(),
5669                    status: "completed".to_owned(),
5670                    properties: "{}".to_owned(),
5671                    source_ref: Some("src-1".to_owned()),
5672                    upsert: false,
5673                    supersedes_id: None,
5674                }],
5675                actions: vec![ActionInsert {
5676                    id: "action-1".to_owned(),
5677                    step_id: "step-1".to_owned(),
5678                    kind: "tool_call".to_owned(),
5679                    status: "completed".to_owned(),
5680                    properties: "{}".to_owned(),
5681                    source_ref: Some("src-1".to_owned()),
5682                    upsert: false,
5683                    supersedes_id: None,
5684                }],
5685                optional_backfills: vec![],
5686                vec_inserts: vec![],
5687                operational_writes: vec![],
5688            })
5689            .expect("write");
5690
5691        assert!(
5692            receipt.provenance_warnings.is_empty(),
5693            "no warnings expected when all types have source_ref; got: {:?}",
5694            receipt.provenance_warnings
5695        );
5696    }
5697
5698    // --- Item 4 Task 2: ProvenanceMode tests ---
5699
5700    #[test]
5701    fn default_provenance_mode_is_warn() {
5702        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5703        let db = NamedTempFile::new().expect("temporary db");
5704        let writer = WriterActor::start(
5705            db.path(),
5706            Arc::new(SchemaManager::new()),
5707            ProvenanceMode::default(),
5708            Arc::new(TelemetryCounters::default()),
5709        )
5710        .expect("writer");
5711
5712        let receipt = writer
5713            .submit(WriteRequest {
5714                label: "test".to_owned(),
5715                nodes: vec![NodeInsert {
5716                    row_id: "row-1".to_owned(),
5717                    logical_id: "node-1".to_owned(),
5718                    kind: "Note".to_owned(),
5719                    properties: "{}".to_owned(),
5720                    source_ref: None,
5721                    upsert: false,
5722                    chunk_policy: ChunkPolicy::Preserve,
5723                    content_ref: None,
5724                }],
5725                node_retires: vec![],
5726                edges: vec![],
5727                edge_retires: vec![],
5728                chunks: vec![],
5729                runs: vec![],
5730                steps: vec![],
5731                actions: vec![],
5732                optional_backfills: vec![],
5733                vec_inserts: vec![],
5734                operational_writes: vec![],
5735            })
5736            .expect("Warn mode must not reject missing source_ref");
5737
5738        assert!(
5739            !receipt.provenance_warnings.is_empty(),
5740            "Warn mode must emit a warning instead of rejecting"
5741        );
5742    }
5743
5744    #[test]
5745    fn require_mode_rejects_node_without_source_ref() {
5746        let db = NamedTempFile::new().expect("temporary db");
5747        let writer = WriterActor::start(
5748            db.path(),
5749            Arc::new(SchemaManager::new()),
5750            ProvenanceMode::Require,
5751            Arc::new(TelemetryCounters::default()),
5752        )
5753        .expect("writer");
5754
5755        let result = writer.submit(WriteRequest {
5756            label: "test".to_owned(),
5757            nodes: vec![NodeInsert {
5758                row_id: "row-1".to_owned(),
5759                logical_id: "node-1".to_owned(),
5760                kind: "Note".to_owned(),
5761                properties: "{}".to_owned(),
5762                source_ref: None,
5763                upsert: false,
5764                chunk_policy: ChunkPolicy::Preserve,
5765                content_ref: None,
5766            }],
5767            node_retires: vec![],
5768            edges: vec![],
5769            edge_retires: vec![],
5770            chunks: vec![],
5771            runs: vec![],
5772            steps: vec![],
5773            actions: vec![],
5774            optional_backfills: vec![],
5775            vec_inserts: vec![],
5776            operational_writes: vec![],
5777        });
5778
5779        assert!(
5780            matches!(result, Err(EngineError::InvalidWrite(_))),
5781            "Require mode must reject node without source_ref"
5782        );
5783    }
5784
5785    #[test]
5786    fn require_mode_accepts_node_with_source_ref() {
5787        let db = NamedTempFile::new().expect("temporary db");
5788        let writer = WriterActor::start(
5789            db.path(),
5790            Arc::new(SchemaManager::new()),
5791            ProvenanceMode::Require,
5792            Arc::new(TelemetryCounters::default()),
5793        )
5794        .expect("writer");
5795
5796        let result = writer.submit(WriteRequest {
5797            label: "test".to_owned(),
5798            nodes: vec![NodeInsert {
5799                row_id: "row-1".to_owned(),
5800                logical_id: "node-1".to_owned(),
5801                kind: "Note".to_owned(),
5802                properties: "{}".to_owned(),
5803                source_ref: Some("src-1".to_owned()),
5804                upsert: false,
5805                chunk_policy: ChunkPolicy::Preserve,
5806                content_ref: None,
5807            }],
5808            node_retires: vec![],
5809            edges: vec![],
5810            edge_retires: vec![],
5811            chunks: vec![],
5812            runs: vec![],
5813            steps: vec![],
5814            actions: vec![],
5815            optional_backfills: vec![],
5816            vec_inserts: vec![],
5817            operational_writes: vec![],
5818        });
5819
5820        assert!(
5821            result.is_ok(),
5822            "Require mode must accept node with source_ref"
5823        );
5824    }
5825
5826    #[test]
5827    fn require_mode_rejects_edge_without_source_ref() {
5828        let db = NamedTempFile::new().expect("temporary db");
5829        let writer = WriterActor::start(
5830            db.path(),
5831            Arc::new(SchemaManager::new()),
5832            ProvenanceMode::Require,
5833            Arc::new(TelemetryCounters::default()),
5834        )
5835        .expect("writer");
5836
5837        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5838        let result = writer.submit(WriteRequest {
5839            label: "test".to_owned(),
5840            nodes: vec![
5841                NodeInsert {
5842                    row_id: "row-a".to_owned(),
5843                    logical_id: "node-a".to_owned(),
5844                    kind: "Note".to_owned(),
5845                    properties: "{}".to_owned(),
5846                    source_ref: Some("src-1".to_owned()),
5847                    upsert: false,
5848                    chunk_policy: ChunkPolicy::Preserve,
5849                    content_ref: None,
5850                },
5851                NodeInsert {
5852                    row_id: "row-b".to_owned(),
5853                    logical_id: "node-b".to_owned(),
5854                    kind: "Note".to_owned(),
5855                    properties: "{}".to_owned(),
5856                    source_ref: Some("src-1".to_owned()),
5857                    upsert: false,
5858                    chunk_policy: ChunkPolicy::Preserve,
5859                    content_ref: None,
5860                },
5861            ],
5862            node_retires: vec![],
5863            edges: vec![EdgeInsert {
5864                row_id: "edge-row-1".to_owned(),
5865                logical_id: "edge-1".to_owned(),
5866                source_logical_id: "node-a".to_owned(),
5867                target_logical_id: "node-b".to_owned(),
5868                kind: "LINKS_TO".to_owned(),
5869                properties: "{}".to_owned(),
5870                source_ref: None,
5871                upsert: false,
5872            }],
5873            edge_retires: vec![],
5874            chunks: vec![],
5875            runs: vec![],
5876            steps: vec![],
5877            actions: vec![],
5878            optional_backfills: vec![],
5879            vec_inserts: vec![],
5880            operational_writes: vec![],
5881        });
5882
5883        assert!(
5884            matches!(result, Err(EngineError::InvalidWrite(_))),
5885            "Require mode must reject edge without source_ref"
5886        );
5887    }
5888
5889    // --- Item 5: FTS projection coverage tests ---
5890
5891    #[test]
5892    fn fts_row_has_correct_kind_from_co_submitted_node() {
5893        let db = NamedTempFile::new().expect("temporary db");
5894        let writer = WriterActor::start(
5895            db.path(),
5896            Arc::new(SchemaManager::new()),
5897            ProvenanceMode::Warn,
5898            Arc::new(TelemetryCounters::default()),
5899        )
5900        .expect("writer");
5901
5902        writer
5903            .submit(WriteRequest {
5904                label: "test".to_owned(),
5905                nodes: vec![NodeInsert {
5906                    row_id: "row-1".to_owned(),
5907                    logical_id: "node-1".to_owned(),
5908                    kind: "Meeting".to_owned(),
5909                    properties: "{}".to_owned(),
5910                    source_ref: Some("src-1".to_owned()),
5911                    upsert: false,
5912                    chunk_policy: ChunkPolicy::Preserve,
5913                    content_ref: None,
5914                }],
5915                node_retires: vec![],
5916                edges: vec![],
5917                edge_retires: vec![],
5918                chunks: vec![ChunkInsert {
5919                    id: "chunk-1".to_owned(),
5920                    node_logical_id: "node-1".to_owned(),
5921                    text_content: "some text".to_owned(),
5922                    byte_start: None,
5923                    byte_end: None,
5924                    content_hash: None,
5925                }],
5926                runs: vec![],
5927                steps: vec![],
5928                actions: vec![],
5929                optional_backfills: vec![],
5930                vec_inserts: vec![],
5931                operational_writes: vec![],
5932            })
5933            .expect("write");
5934
5935        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5936        let kind: String = conn
5937            .query_row(
5938                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5939                [],
5940                |row| row.get(0),
5941            )
5942            .expect("fts row");
5943
5944        assert_eq!(kind, "Meeting");
5945    }
5946
5947    #[test]
5948    fn fts_row_has_correct_text_content() {
5949        let db = NamedTempFile::new().expect("temporary db");
5950        let writer = WriterActor::start(
5951            db.path(),
5952            Arc::new(SchemaManager::new()),
5953            ProvenanceMode::Warn,
5954            Arc::new(TelemetryCounters::default()),
5955        )
5956        .expect("writer");
5957
5958        writer
5959            .submit(WriteRequest {
5960                label: "test".to_owned(),
5961                nodes: vec![NodeInsert {
5962                    row_id: "row-1".to_owned(),
5963                    logical_id: "node-1".to_owned(),
5964                    kind: "Note".to_owned(),
5965                    properties: "{}".to_owned(),
5966                    source_ref: Some("src-1".to_owned()),
5967                    upsert: false,
5968                    chunk_policy: ChunkPolicy::Preserve,
5969                    content_ref: None,
5970                }],
5971                node_retires: vec![],
5972                edges: vec![],
5973                edge_retires: vec![],
5974                chunks: vec![ChunkInsert {
5975                    id: "chunk-1".to_owned(),
5976                    node_logical_id: "node-1".to_owned(),
5977                    text_content: "exactly this text".to_owned(),
5978                    byte_start: None,
5979                    byte_end: None,
5980                    content_hash: None,
5981                }],
5982                runs: vec![],
5983                steps: vec![],
5984                actions: vec![],
5985                optional_backfills: vec![],
5986                vec_inserts: vec![],
5987                operational_writes: vec![],
5988            })
5989            .expect("write");
5990
5991        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5992        let text: String = conn
5993            .query_row(
5994                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5995                [],
5996                |row| row.get(0),
5997            )
5998            .expect("fts row");
5999
6000        assert_eq!(text, "exactly this text");
6001    }
6002
6003    #[test]
6004    fn fts_row_has_correct_kind_from_pre_existing_node() {
6005        let db = NamedTempFile::new().expect("temporary db");
6006        let writer = WriterActor::start(
6007            db.path(),
6008            Arc::new(SchemaManager::new()),
6009            ProvenanceMode::Warn,
6010            Arc::new(TelemetryCounters::default()),
6011        )
6012        .expect("writer");
6013
6014        // Request 1: node only
6015        writer
6016            .submit(WriteRequest {
6017                label: "r1".to_owned(),
6018                nodes: vec![NodeInsert {
6019                    row_id: "row-1".to_owned(),
6020                    logical_id: "node-1".to_owned(),
6021                    kind: "Document".to_owned(),
6022                    properties: "{}".to_owned(),
6023                    source_ref: Some("src-1".to_owned()),
6024                    upsert: false,
6025                    chunk_policy: ChunkPolicy::Preserve,
6026                    content_ref: None,
6027                }],
6028                node_retires: vec![],
6029                edges: vec![],
6030                edge_retires: vec![],
6031                chunks: vec![],
6032                runs: vec![],
6033                steps: vec![],
6034                actions: vec![],
6035                optional_backfills: vec![],
6036                vec_inserts: vec![],
6037                operational_writes: vec![],
6038            })
6039            .expect("r1 write");
6040
6041        // Request 2: chunk for pre-existing node
6042        writer
6043            .submit(WriteRequest {
6044                label: "r2".to_owned(),
6045                nodes: vec![],
6046                node_retires: vec![],
6047                edges: vec![],
6048                edge_retires: vec![],
6049                chunks: vec![ChunkInsert {
6050                    id: "chunk-1".to_owned(),
6051                    node_logical_id: "node-1".to_owned(),
6052                    text_content: "some text".to_owned(),
6053                    byte_start: None,
6054                    byte_end: None,
6055                    content_hash: None,
6056                }],
6057                runs: vec![],
6058                steps: vec![],
6059                actions: vec![],
6060                optional_backfills: vec![],
6061                vec_inserts: vec![],
6062                operational_writes: vec![],
6063            })
6064            .expect("r2 write");
6065
6066        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6067        let kind: String = conn
6068            .query_row(
6069                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6070                [],
6071                |row| row.get(0),
6072            )
6073            .expect("fts row");
6074
6075        assert_eq!(kind, "Document");
6076    }
6077
6078    #[test]
6079    fn fts_derives_rows_for_multiple_chunks_per_node() {
6080        let db = NamedTempFile::new().expect("temporary db");
6081        let writer = WriterActor::start(
6082            db.path(),
6083            Arc::new(SchemaManager::new()),
6084            ProvenanceMode::Warn,
6085            Arc::new(TelemetryCounters::default()),
6086        )
6087        .expect("writer");
6088
6089        writer
6090            .submit(WriteRequest {
6091                label: "test".to_owned(),
6092                nodes: vec![NodeInsert {
6093                    row_id: "row-1".to_owned(),
6094                    logical_id: "node-1".to_owned(),
6095                    kind: "Meeting".to_owned(),
6096                    properties: "{}".to_owned(),
6097                    source_ref: Some("src-1".to_owned()),
6098                    upsert: false,
6099                    chunk_policy: ChunkPolicy::Preserve,
6100                    content_ref: None,
6101                }],
6102                node_retires: vec![],
6103                edges: vec![],
6104                edge_retires: vec![],
6105                chunks: vec![
6106                    ChunkInsert {
6107                        id: "chunk-a".to_owned(),
6108                        node_logical_id: "node-1".to_owned(),
6109                        text_content: "intro".to_owned(),
6110                        byte_start: None,
6111                        byte_end: None,
6112                        content_hash: None,
6113                    },
6114                    ChunkInsert {
6115                        id: "chunk-b".to_owned(),
6116                        node_logical_id: "node-1".to_owned(),
6117                        text_content: "body".to_owned(),
6118                        byte_start: None,
6119                        byte_end: None,
6120                        content_hash: None,
6121                    },
6122                    ChunkInsert {
6123                        id: "chunk-c".to_owned(),
6124                        node_logical_id: "node-1".to_owned(),
6125                        text_content: "conclusion".to_owned(),
6126                        byte_start: None,
6127                        byte_end: None,
6128                        content_hash: None,
6129                    },
6130                ],
6131                runs: vec![],
6132                steps: vec![],
6133                actions: vec![],
6134                optional_backfills: vec![],
6135                vec_inserts: vec![],
6136                operational_writes: vec![],
6137            })
6138            .expect("write");
6139
6140        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6141        let count: i64 = conn
6142            .query_row(
6143                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6144                [],
6145                |row| row.get(0),
6146            )
6147            .expect("fts count");
6148
6149        assert_eq!(count, 3, "three chunks must produce three FTS rows");
6150    }
6151
6152    #[test]
6153    fn fts_resolves_mixed_fast_and_db_paths() {
6154        let db = NamedTempFile::new().expect("temporary db");
6155        let writer = WriterActor::start(
6156            db.path(),
6157            Arc::new(SchemaManager::new()),
6158            ProvenanceMode::Warn,
6159            Arc::new(TelemetryCounters::default()),
6160        )
6161        .expect("writer");
6162
6163        // Seed pre-existing node
6164        writer
6165            .submit(WriteRequest {
6166                label: "seed".to_owned(),
6167                nodes: vec![NodeInsert {
6168                    row_id: "row-existing".to_owned(),
6169                    logical_id: "existing-node".to_owned(),
6170                    kind: "Archive".to_owned(),
6171                    properties: "{}".to_owned(),
6172                    source_ref: Some("src-1".to_owned()),
6173                    upsert: false,
6174                    chunk_policy: ChunkPolicy::Preserve,
6175                    content_ref: None,
6176                }],
6177                node_retires: vec![],
6178                edges: vec![],
6179                edge_retires: vec![],
6180                chunks: vec![],
6181                runs: vec![],
6182                steps: vec![],
6183                actions: vec![],
6184                optional_backfills: vec![],
6185                vec_inserts: vec![],
6186                operational_writes: vec![],
6187            })
6188            .expect("seed");
6189
6190        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
6191        writer
6192            .submit(WriteRequest {
6193                label: "mixed".to_owned(),
6194                nodes: vec![NodeInsert {
6195                    row_id: "row-new".to_owned(),
6196                    logical_id: "new-node".to_owned(),
6197                    kind: "Inbox".to_owned(),
6198                    properties: "{}".to_owned(),
6199                    source_ref: Some("src-2".to_owned()),
6200                    upsert: false,
6201                    chunk_policy: ChunkPolicy::Preserve,
6202                    content_ref: None,
6203                }],
6204                node_retires: vec![],
6205                edges: vec![],
6206                edge_retires: vec![],
6207                chunks: vec![
6208                    ChunkInsert {
6209                        id: "chunk-fast".to_owned(),
6210                        node_logical_id: "new-node".to_owned(),
6211                        text_content: "new content".to_owned(),
6212                        byte_start: None,
6213                        byte_end: None,
6214                        content_hash: None,
6215                    },
6216                    ChunkInsert {
6217                        id: "chunk-db".to_owned(),
6218                        node_logical_id: "existing-node".to_owned(),
6219                        text_content: "archive content".to_owned(),
6220                        byte_start: None,
6221                        byte_end: None,
6222                        content_hash: None,
6223                    },
6224                ],
6225                runs: vec![],
6226                steps: vec![],
6227                actions: vec![],
6228                optional_backfills: vec![],
6229                vec_inserts: vec![],
6230                operational_writes: vec![],
6231            })
6232            .expect("mixed write");
6233
6234        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6235        let fast_kind: String = conn
6236            .query_row(
6237                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6238                [],
6239                |row| row.get(0),
6240            )
6241            .expect("fast path fts row");
6242        let db_kind: String = conn
6243            .query_row(
6244                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6245                [],
6246                |row| row.get(0),
6247            )
6248            .expect("db path fts row");
6249
6250        assert_eq!(fast_kind, "Inbox");
6251        assert_eq!(db_kind, "Archive");
6252    }
6253
6254    #[test]
6255    fn prepare_write_rejects_empty_chunk_text() {
6256        let db = NamedTempFile::new().expect("temporary db");
6257        let writer = WriterActor::start(
6258            db.path(),
6259            Arc::new(SchemaManager::new()),
6260            ProvenanceMode::Warn,
6261            Arc::new(TelemetryCounters::default()),
6262        )
6263        .expect("writer");
6264
6265        let result = writer.submit(WriteRequest {
6266            label: "test".to_owned(),
6267            nodes: vec![NodeInsert {
6268                row_id: "row-1".to_owned(),
6269                logical_id: "node-1".to_owned(),
6270                kind: "Note".to_owned(),
6271                properties: "{}".to_owned(),
6272                source_ref: None,
6273                upsert: false,
6274                chunk_policy: ChunkPolicy::Preserve,
6275                content_ref: None,
6276            }],
6277            node_retires: vec![],
6278            edges: vec![],
6279            edge_retires: vec![],
6280            chunks: vec![ChunkInsert {
6281                id: "chunk-1".to_owned(),
6282                node_logical_id: "node-1".to_owned(),
6283                text_content: String::new(),
6284                byte_start: None,
6285                byte_end: None,
6286                content_hash: None,
6287            }],
6288            runs: vec![],
6289            steps: vec![],
6290            actions: vec![],
6291            optional_backfills: vec![],
6292            vec_inserts: vec![],
6293            operational_writes: vec![],
6294        });
6295
6296        assert!(
6297            matches!(result, Err(EngineError::InvalidWrite(_))),
6298            "empty text_content must be rejected"
6299        );
6300    }
6301
6302    #[test]
6303    fn receipt_reports_zero_backfills_when_none_submitted() {
6304        let db = NamedTempFile::new().expect("temporary db");
6305        let writer = WriterActor::start(
6306            db.path(),
6307            Arc::new(SchemaManager::new()),
6308            ProvenanceMode::Warn,
6309            Arc::new(TelemetryCounters::default()),
6310        )
6311        .expect("writer");
6312
6313        let receipt = writer
6314            .submit(WriteRequest {
6315                label: "test".to_owned(),
6316                nodes: vec![NodeInsert {
6317                    row_id: "row-1".to_owned(),
6318                    logical_id: "node-1".to_owned(),
6319                    kind: "Note".to_owned(),
6320                    properties: "{}".to_owned(),
6321                    source_ref: Some("src-1".to_owned()),
6322                    upsert: false,
6323                    chunk_policy: ChunkPolicy::Preserve,
6324                    content_ref: None,
6325                }],
6326                node_retires: vec![],
6327                edges: vec![],
6328                edge_retires: vec![],
6329                chunks: vec![],
6330                runs: vec![],
6331                steps: vec![],
6332                actions: vec![],
6333                optional_backfills: vec![],
6334                vec_inserts: vec![],
6335                operational_writes: vec![],
6336            })
6337            .expect("write");
6338
6339        assert_eq!(receipt.optional_backfill_count, 0);
6340    }
6341
6342    #[test]
6343    fn receipt_reports_correct_backfill_count() {
6344        let db = NamedTempFile::new().expect("temporary db");
6345        let writer = WriterActor::start(
6346            db.path(),
6347            Arc::new(SchemaManager::new()),
6348            ProvenanceMode::Warn,
6349            Arc::new(TelemetryCounters::default()),
6350        )
6351        .expect("writer");
6352
6353        let receipt = writer
6354            .submit(WriteRequest {
6355                label: "test".to_owned(),
6356                nodes: vec![NodeInsert {
6357                    row_id: "row-1".to_owned(),
6358                    logical_id: "node-1".to_owned(),
6359                    kind: "Note".to_owned(),
6360                    properties: "{}".to_owned(),
6361                    source_ref: Some("src-1".to_owned()),
6362                    upsert: false,
6363                    chunk_policy: ChunkPolicy::Preserve,
6364                    content_ref: None,
6365                }],
6366                node_retires: vec![],
6367                edges: vec![],
6368                edge_retires: vec![],
6369                chunks: vec![],
6370                runs: vec![],
6371                steps: vec![],
6372                actions: vec![],
6373                optional_backfills: vec![
6374                    OptionalProjectionTask {
6375                        target: ProjectionTarget::Fts,
6376                        payload: "p1".to_owned(),
6377                    },
6378                    OptionalProjectionTask {
6379                        target: ProjectionTarget::Vec,
6380                        payload: "p2".to_owned(),
6381                    },
6382                    OptionalProjectionTask {
6383                        target: ProjectionTarget::All,
6384                        payload: "p3".to_owned(),
6385                    },
6386                ],
6387                vec_inserts: vec![],
6388                operational_writes: vec![],
6389            })
6390            .expect("write");
6391
6392        assert_eq!(receipt.optional_backfill_count, 3);
6393    }
6394
6395    #[test]
6396    fn backfill_tasks_are_not_executed_during_write() {
6397        let db = NamedTempFile::new().expect("temporary db");
6398        let writer = WriterActor::start(
6399            db.path(),
6400            Arc::new(SchemaManager::new()),
6401            ProvenanceMode::Warn,
6402            Arc::new(TelemetryCounters::default()),
6403        )
6404        .expect("writer");
6405
6406        // Write a node + chunk. Submit a backfill task targeting FTS.
6407        // The write path must not create any extra FTS rows beyond the required one.
6408        writer
6409            .submit(WriteRequest {
6410                label: "test".to_owned(),
6411                nodes: vec![NodeInsert {
6412                    row_id: "row-1".to_owned(),
6413                    logical_id: "node-1".to_owned(),
6414                    kind: "Note".to_owned(),
6415                    properties: "{}".to_owned(),
6416                    source_ref: Some("src-1".to_owned()),
6417                    upsert: false,
6418                    chunk_policy: ChunkPolicy::Preserve,
6419                    content_ref: None,
6420                }],
6421                node_retires: vec![],
6422                edges: vec![],
6423                edge_retires: vec![],
6424                chunks: vec![ChunkInsert {
6425                    id: "chunk-1".to_owned(),
6426                    node_logical_id: "node-1".to_owned(),
6427                    text_content: "required text".to_owned(),
6428                    byte_start: None,
6429                    byte_end: None,
6430                    content_hash: None,
6431                }],
6432                runs: vec![],
6433                steps: vec![],
6434                actions: vec![],
6435                optional_backfills: vec![OptionalProjectionTask {
6436                    target: ProjectionTarget::Fts,
6437                    payload: "backfill-payload".to_owned(),
6438                }],
6439                vec_inserts: vec![],
6440                operational_writes: vec![],
6441            })
6442            .expect("write");
6443
6444        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6445        let count: i64 = conn
6446            .query_row(
6447                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6448                [],
6449                |row| row.get(0),
6450            )
6451            .expect("fts count");
6452
6453        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6454    }
6455
6456    #[test]
6457    fn fts_row_uses_new_kind_after_node_replace() {
6458        let db = NamedTempFile::new().expect("temporary db");
6459        let writer = WriterActor::start(
6460            db.path(),
6461            Arc::new(SchemaManager::new()),
6462            ProvenanceMode::Warn,
6463            Arc::new(TelemetryCounters::default()),
6464        )
6465        .expect("writer");
6466
6467        // Write original node as "Note"
6468        writer
6469            .submit(WriteRequest {
6470                label: "v1".to_owned(),
6471                nodes: vec![NodeInsert {
6472                    row_id: "row-1".to_owned(),
6473                    logical_id: "node-1".to_owned(),
6474                    kind: "Note".to_owned(),
6475                    properties: "{}".to_owned(),
6476                    source_ref: Some("src-1".to_owned()),
6477                    upsert: false,
6478                    chunk_policy: ChunkPolicy::Preserve,
6479                    content_ref: None,
6480                }],
6481                node_retires: vec![],
6482                edges: vec![],
6483                edge_retires: vec![],
6484                chunks: vec![ChunkInsert {
6485                    id: "chunk-v1".to_owned(),
6486                    node_logical_id: "node-1".to_owned(),
6487                    text_content: "original".to_owned(),
6488                    byte_start: None,
6489                    byte_end: None,
6490                    content_hash: None,
6491                }],
6492                runs: vec![],
6493                steps: vec![],
6494                actions: vec![],
6495                optional_backfills: vec![],
6496                vec_inserts: vec![],
6497                operational_writes: vec![],
6498            })
6499            .expect("v1 write");
6500
6501        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
6502        writer
6503            .submit(WriteRequest {
6504                label: "v2".to_owned(),
6505                nodes: vec![NodeInsert {
6506                    row_id: "row-2".to_owned(),
6507                    logical_id: "node-1".to_owned(),
6508                    kind: "Meeting".to_owned(),
6509                    properties: "{}".to_owned(),
6510                    source_ref: Some("src-2".to_owned()),
6511                    upsert: true,
6512                    chunk_policy: ChunkPolicy::Replace,
6513                    content_ref: None,
6514                }],
6515                node_retires: vec![],
6516                edges: vec![],
6517                edge_retires: vec![],
6518                chunks: vec![ChunkInsert {
6519                    id: "chunk-v2".to_owned(),
6520                    node_logical_id: "node-1".to_owned(),
6521                    text_content: "updated".to_owned(),
6522                    byte_start: None,
6523                    byte_end: None,
6524                    content_hash: None,
6525                }],
6526                runs: vec![],
6527                steps: vec![],
6528                actions: vec![],
6529                optional_backfills: vec![],
6530                vec_inserts: vec![],
6531                operational_writes: vec![],
6532            })
6533            .expect("v2 write");
6534
6535        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6536
6537        // Old FTS row must be gone
6538        let old_count: i64 = conn
6539            .query_row(
6540                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6541                [],
6542                |row| row.get(0),
6543            )
6544            .expect("old fts count");
6545        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6546
6547        // New FTS row must use the new kind
6548        let new_kind: String = conn
6549            .query_row(
6550                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6551                [],
6552                |row| row.get(0),
6553            )
6554            .expect("new fts row");
6555        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6556    }
6557
6558    // --- Item 3: VecInsert tests ---
6559
6560    #[test]
6561    fn vec_insert_empty_chunk_id_is_rejected() {
6562        let db = NamedTempFile::new().expect("temporary db");
6563        let writer = WriterActor::start(
6564            db.path(),
6565            Arc::new(SchemaManager::new()),
6566            ProvenanceMode::Warn,
6567            Arc::new(TelemetryCounters::default()),
6568        )
6569        .expect("writer");
6570        let result = writer.submit(WriteRequest {
6571            label: "vec-test".to_owned(),
6572            nodes: vec![],
6573            node_retires: vec![],
6574            edges: vec![],
6575            edge_retires: vec![],
6576            chunks: vec![],
6577            runs: vec![],
6578            steps: vec![],
6579            actions: vec![],
6580            optional_backfills: vec![],
6581            vec_inserts: vec![VecInsert {
6582                chunk_id: String::new(),
6583                embedding: vec![0.1, 0.2, 0.3],
6584            }],
6585            operational_writes: vec![],
6586        });
6587        assert!(
6588            matches!(result, Err(EngineError::InvalidWrite(_))),
6589            "empty chunk_id in VecInsert must be rejected"
6590        );
6591    }
6592
6593    #[test]
6594    fn vec_insert_empty_embedding_is_rejected() {
6595        let db = NamedTempFile::new().expect("temporary db");
6596        let writer = WriterActor::start(
6597            db.path(),
6598            Arc::new(SchemaManager::new()),
6599            ProvenanceMode::Warn,
6600            Arc::new(TelemetryCounters::default()),
6601        )
6602        .expect("writer");
6603        let result = writer.submit(WriteRequest {
6604            label: "vec-test".to_owned(),
6605            nodes: vec![],
6606            node_retires: vec![],
6607            edges: vec![],
6608            edge_retires: vec![],
6609            chunks: vec![],
6610            runs: vec![],
6611            steps: vec![],
6612            actions: vec![],
6613            optional_backfills: vec![],
6614            vec_inserts: vec![VecInsert {
6615                chunk_id: "chunk-1".to_owned(),
6616                embedding: vec![],
6617            }],
6618            operational_writes: vec![],
6619        });
6620        assert!(
6621            matches!(result, Err(EngineError::InvalidWrite(_))),
6622            "empty embedding in VecInsert must be rejected"
6623        );
6624    }
6625
6626    #[test]
6627    fn vec_insert_noop_without_feature() {
6628        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6629        // (no error) but not write to any vec table.
6630        let db = NamedTempFile::new().expect("temporary db");
6631        let writer = WriterActor::start(
6632            db.path(),
6633            Arc::new(SchemaManager::new()),
6634            ProvenanceMode::Warn,
6635            Arc::new(TelemetryCounters::default()),
6636        )
6637        .expect("writer");
6638        let result = writer.submit(WriteRequest {
6639            label: "vec-noop".to_owned(),
6640            nodes: vec![],
6641            node_retires: vec![],
6642            edges: vec![],
6643            edge_retires: vec![],
6644            chunks: vec![],
6645            runs: vec![],
6646            steps: vec![],
6647            actions: vec![],
6648            optional_backfills: vec![],
6649            vec_inserts: vec![VecInsert {
6650                chunk_id: "chunk-noop".to_owned(),
6651                embedding: vec![1.0, 2.0, 3.0],
6652            }],
6653            operational_writes: vec![],
6654        });
6655        #[cfg(not(feature = "sqlite-vec"))]
6656        result.expect("noop VecInsert without feature must succeed");
6657        // The result variable is used above; silence unused warning for cfg-on path.
6658        #[cfg(feature = "sqlite-vec")]
6659        let _ = result;
6660    }
6661
6662    #[cfg(feature = "sqlite-vec")]
6663    #[test]
6664    fn node_retire_preserves_vec_rows_for_later_restore() {
6665        use crate::sqlite::open_connection_with_vec;
6666
6667        let db = NamedTempFile::new().expect("temporary db");
6668        let schema_manager = Arc::new(SchemaManager::new());
6669
6670        {
6671            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6672            schema_manager.bootstrap(&conn).expect("bootstrap");
6673            schema_manager
6674                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6675                .expect("ensure profile");
6676        }
6677
6678        let writer = WriterActor::start(
6679            db.path(),
6680            Arc::clone(&schema_manager),
6681            ProvenanceMode::Warn,
6682            Arc::new(TelemetryCounters::default()),
6683        )
6684        .expect("writer");
6685
6686        // Insert node + chunk + vec row
6687        writer
6688            .submit(WriteRequest {
6689                label: "setup".to_owned(),
6690                nodes: vec![NodeInsert {
6691                    row_id: "row-retire-vec".to_owned(),
6692                    logical_id: "node-retire-vec".to_owned(),
6693                    kind: "Doc".to_owned(),
6694                    properties: "{}".to_owned(),
6695                    source_ref: Some("src".to_owned()),
6696                    upsert: false,
6697                    chunk_policy: ChunkPolicy::Preserve,
6698                    content_ref: None,
6699                }],
6700                node_retires: vec![],
6701                edges: vec![],
6702                edge_retires: vec![],
6703                chunks: vec![ChunkInsert {
6704                    id: "chunk-retire-vec".to_owned(),
6705                    node_logical_id: "node-retire-vec".to_owned(),
6706                    text_content: "text".to_owned(),
6707                    byte_start: None,
6708                    byte_end: None,
6709                    content_hash: None,
6710                }],
6711                runs: vec![],
6712                steps: vec![],
6713                actions: vec![],
6714                optional_backfills: vec![],
6715                vec_inserts: vec![VecInsert {
6716                    chunk_id: "chunk-retire-vec".to_owned(),
6717                    embedding: vec![0.1, 0.2, 0.3],
6718                }],
6719                operational_writes: vec![],
6720            })
6721            .expect("setup write");
6722
6723        // Retire the node
6724        writer
6725            .submit(WriteRequest {
6726                label: "retire".to_owned(),
6727                nodes: vec![],
6728                node_retires: vec![NodeRetire {
6729                    logical_id: "node-retire-vec".to_owned(),
6730                    source_ref: Some("src".to_owned()),
6731                }],
6732                edges: vec![],
6733                edge_retires: vec![],
6734                chunks: vec![],
6735                runs: vec![],
6736                steps: vec![],
6737                actions: vec![],
6738                optional_backfills: vec![],
6739                vec_inserts: vec![],
6740                operational_writes: vec![],
6741            })
6742            .expect("retire write");
6743
6744        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6745        let count: i64 = conn
6746            .query_row(
6747                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6748                [],
6749                |row| row.get(0),
6750            )
6751            .expect("count");
6752        assert_eq!(
6753            count, 1,
6754            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6755        );
6756    }
6757
6758    #[cfg(feature = "sqlite-vec")]
6759    #[test]
6760    #[allow(clippy::too_many_lines)]
6761    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6762        use crate::sqlite::open_connection_with_vec;
6763
6764        let db = NamedTempFile::new().expect("temporary db");
6765        let schema_manager = Arc::new(SchemaManager::new());
6766
6767        {
6768            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6769            schema_manager.bootstrap(&conn).expect("bootstrap");
6770            schema_manager
6771                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6772                .expect("ensure profile");
6773        }
6774
6775        let writer = WriterActor::start(
6776            db.path(),
6777            Arc::clone(&schema_manager),
6778            ProvenanceMode::Warn,
6779            Arc::new(TelemetryCounters::default()),
6780        )
6781        .expect("writer");
6782
6783        // Insert node + chunk-A + vec-A
6784        writer
6785            .submit(WriteRequest {
6786                label: "v1".to_owned(),
6787                nodes: vec![NodeInsert {
6788                    row_id: "row-replace-v1".to_owned(),
6789                    logical_id: "node-replace-vec".to_owned(),
6790                    kind: "Doc".to_owned(),
6791                    properties: "{}".to_owned(),
6792                    source_ref: Some("src".to_owned()),
6793                    upsert: false,
6794                    chunk_policy: ChunkPolicy::Preserve,
6795                    content_ref: None,
6796                }],
6797                node_retires: vec![],
6798                edges: vec![],
6799                edge_retires: vec![],
6800                chunks: vec![ChunkInsert {
6801                    id: "chunk-replace-A".to_owned(),
6802                    node_logical_id: "node-replace-vec".to_owned(),
6803                    text_content: "version one".to_owned(),
6804                    byte_start: None,
6805                    byte_end: None,
6806                    content_hash: None,
6807                }],
6808                runs: vec![],
6809                steps: vec![],
6810                actions: vec![],
6811                optional_backfills: vec![],
6812                vec_inserts: vec![VecInsert {
6813                    chunk_id: "chunk-replace-A".to_owned(),
6814                    embedding: vec![0.1, 0.2, 0.3],
6815                }],
6816                operational_writes: vec![],
6817            })
6818            .expect("v1 write");
6819
6820        // Upsert with Replace + chunk-B + vec-B
6821        writer
6822            .submit(WriteRequest {
6823                label: "v2".to_owned(),
6824                nodes: vec![NodeInsert {
6825                    row_id: "row-replace-v2".to_owned(),
6826                    logical_id: "node-replace-vec".to_owned(),
6827                    kind: "Doc".to_owned(),
6828                    properties: "{}".to_owned(),
6829                    source_ref: Some("src".to_owned()),
6830                    upsert: true,
6831                    chunk_policy: ChunkPolicy::Replace,
6832                    content_ref: None,
6833                }],
6834                node_retires: vec![],
6835                edges: vec![],
6836                edge_retires: vec![],
6837                chunks: vec![ChunkInsert {
6838                    id: "chunk-replace-B".to_owned(),
6839                    node_logical_id: "node-replace-vec".to_owned(),
6840                    text_content: "version two".to_owned(),
6841                    byte_start: None,
6842                    byte_end: None,
6843                    content_hash: None,
6844                }],
6845                runs: vec![],
6846                steps: vec![],
6847                actions: vec![],
6848                optional_backfills: vec![],
6849                vec_inserts: vec![VecInsert {
6850                    chunk_id: "chunk-replace-B".to_owned(),
6851                    embedding: vec![0.4, 0.5, 0.6],
6852                }],
6853                operational_writes: vec![],
6854            })
6855            .expect("v2 write");
6856
6857        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6858        let count_a: i64 = conn
6859            .query_row(
6860                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6861                [],
6862                |row| row.get(0),
6863            )
6864            .expect("count A");
6865        let count_b: i64 = conn
6866            .query_row(
6867                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6868                [],
6869                |row| row.get(0),
6870            )
6871            .expect("count B");
6872        assert_eq!(
6873            count_a, 0,
6874            "old vec row (chunk-A) must be deleted on Replace"
6875        );
6876        assert_eq!(
6877            count_b, 1,
6878            "new vec row (chunk-B) must be present after Replace"
6879        );
6880    }
6881
6882    #[cfg(feature = "sqlite-vec")]
6883    #[test]
6884    fn vec_insert_is_persisted_when_feature_enabled() {
6885        use crate::sqlite::open_connection_with_vec;
6886
6887        let db = NamedTempFile::new().expect("temporary db");
6888        let schema_manager = Arc::new(SchemaManager::new());
6889
6890        // Open a vec-capable connection and bootstrap + ensure profile
6891        {
6892            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6893            schema_manager.bootstrap(&conn).expect("bootstrap");
6894            schema_manager
6895                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6896                .expect("ensure profile");
6897        }
6898
6899        let writer = WriterActor::start(
6900            db.path(),
6901            Arc::clone(&schema_manager),
6902            ProvenanceMode::Warn,
6903            Arc::new(TelemetryCounters::default()),
6904        )
6905        .expect("writer");
6906
6907        writer
6908            .submit(WriteRequest {
6909                label: "vec-insert".to_owned(),
6910                nodes: vec![],
6911                node_retires: vec![],
6912                edges: vec![],
6913                edge_retires: vec![],
6914                chunks: vec![],
6915                runs: vec![],
6916                steps: vec![],
6917                actions: vec![],
6918                optional_backfills: vec![],
6919                vec_inserts: vec![VecInsert {
6920                    chunk_id: "chunk-vec".to_owned(),
6921                    embedding: vec![0.1, 0.2, 0.3],
6922                }],
6923                operational_writes: vec![],
6924            })
6925            .expect("vec insert write");
6926
6927        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6928        let count: i64 = conn
6929            .query_row(
6930                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6931                [],
6932                |row| row.get(0),
6933            )
6934            .expect("count");
6935        assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6936    }
6937
6938    // --- WriteRequest size validation tests ---
6939
6940    #[test]
6941    fn write_request_exceeding_node_limit_is_rejected() {
6942        let nodes: Vec<NodeInsert> = (0..10_001)
6943            .map(|i| NodeInsert {
6944                row_id: format!("row-{i}"),
6945                logical_id: format!("lg-{i}"),
6946                kind: "Note".to_owned(),
6947                properties: "{}".to_owned(),
6948                source_ref: None,
6949                upsert: false,
6950                chunk_policy: ChunkPolicy::Preserve,
6951                content_ref: None,
6952            })
6953            .collect();
6954
6955        let request = WriteRequest {
6956            label: "too-many-nodes".to_owned(),
6957            nodes,
6958            node_retires: vec![],
6959            edges: vec![],
6960            edge_retires: vec![],
6961            chunks: vec![],
6962            runs: vec![],
6963            steps: vec![],
6964            actions: vec![],
6965            optional_backfills: vec![],
6966            vec_inserts: vec![],
6967            operational_writes: vec![],
6968        };
6969
6970        let result = prepare_write(request, ProvenanceMode::Warn)
6971            .map(|_| ())
6972            .map_err(|e| format!("{e}"));
6973        assert!(
6974            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6975            "exceeding node limit must return InvalidWrite: got {result:?}"
6976        );
6977    }
6978
6979    #[test]
6980    fn write_request_exceeding_total_limit_is_rejected() {
6981        // Spread items across fields to exceed 100_000 total
6982        // without exceeding any single per-field limit.
6983        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
6984        let request = WriteRequest {
6985            label: "too-many-total".to_owned(),
6986            nodes: (0..10_000)
6987                .map(|i| NodeInsert {
6988                    row_id: format!("row-{i}"),
6989                    logical_id: format!("lg-{i}"),
6990                    kind: "Note".to_owned(),
6991                    properties: "{}".to_owned(),
6992                    source_ref: None,
6993                    upsert: false,
6994                    chunk_policy: ChunkPolicy::Preserve,
6995                    content_ref: None,
6996                })
6997                .collect(),
6998            node_retires: vec![],
6999            edges: (0..10_000)
7000                .map(|i| EdgeInsert {
7001                    row_id: format!("edge-row-{i}"),
7002                    logical_id: format!("edge-lg-{i}"),
7003                    kind: "link".to_owned(),
7004                    source_logical_id: format!("lg-{i}"),
7005                    target_logical_id: format!("lg-{}", i + 1),
7006                    properties: "{}".to_owned(),
7007                    source_ref: None,
7008                    upsert: false,
7009                })
7010                .collect(),
7011            edge_retires: vec![],
7012            chunks: (0..50_000)
7013                .map(|i| ChunkInsert {
7014                    id: format!("chunk-{i}"),
7015                    node_logical_id: "lg-0".to_owned(),
7016                    text_content: "text".to_owned(),
7017                    byte_start: None,
7018                    byte_end: None,
7019                    content_hash: None,
7020                })
7021                .collect(),
7022            runs: vec![],
7023            steps: vec![],
7024            actions: vec![],
7025            optional_backfills: vec![],
7026            vec_inserts: (0..20_001)
7027                .map(|i| VecInsert {
7028                    chunk_id: format!("vec-chunk-{i}"),
7029                    embedding: vec![0.1],
7030                })
7031                .collect(),
7032            operational_writes: (0..10_000)
7033                .map(|i| OperationalWrite::Append {
7034                    collection: format!("col-{i}"),
7035                    record_key: format!("key-{i}"),
7036                    payload_json: "{}".to_owned(),
7037                    source_ref: None,
7038                })
7039                .collect(),
7040        };
7041
7042        let result = prepare_write(request, ProvenanceMode::Warn)
7043            .map(|_| ())
7044            .map_err(|e| format!("{e}"));
7045        assert!(
7046            matches!(result, Err(ref msg) if msg.contains("too many total items")),
7047            "exceeding total item limit must return InvalidWrite: got {result:?}"
7048        );
7049    }
7050
7051    #[test]
7052    fn write_request_within_limits_succeeds() {
7053        let db = NamedTempFile::new().expect("temporary db");
7054        let writer = WriterActor::start(
7055            db.path(),
7056            Arc::new(SchemaManager::new()),
7057            ProvenanceMode::Warn,
7058            Arc::new(TelemetryCounters::default()),
7059        )
7060        .expect("writer");
7061
7062        let result = writer.submit(WriteRequest {
7063            label: "within-limits".to_owned(),
7064            nodes: vec![NodeInsert {
7065                row_id: "row-1".to_owned(),
7066                logical_id: "lg-1".to_owned(),
7067                kind: "Note".to_owned(),
7068                properties: "{}".to_owned(),
7069                source_ref: None,
7070                upsert: false,
7071                chunk_policy: ChunkPolicy::Preserve,
7072                content_ref: None,
7073            }],
7074            node_retires: vec![],
7075            edges: vec![],
7076            edge_retires: vec![],
7077            chunks: vec![],
7078            runs: vec![],
7079            steps: vec![],
7080            actions: vec![],
7081            optional_backfills: vec![],
7082            vec_inserts: vec![],
7083            operational_writes: vec![],
7084        });
7085
7086        assert!(
7087            result.is_ok(),
7088            "write request within limits must succeed: got {result:?}"
7089        );
7090    }
7091
7092    #[test]
7093    fn property_fts_rows_created_on_node_insert() {
7094        let db = NamedTempFile::new().expect("temporary db");
7095        // Bootstrap and register a property schema before starting the writer.
7096        let schema = Arc::new(SchemaManager::new());
7097        {
7098            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7099            schema.bootstrap(&conn).expect("bootstrap");
7100            conn.execute(
7101                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7102                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
7103                [],
7104            )
7105            .expect("register schema");
7106        }
7107        let writer = WriterActor::start(
7108            db.path(),
7109            Arc::clone(&schema),
7110            ProvenanceMode::Warn,
7111            Arc::new(TelemetryCounters::default()),
7112        )
7113        .expect("writer");
7114
7115        writer
7116            .submit(WriteRequest {
7117                label: "goal-insert".to_owned(),
7118                nodes: vec![NodeInsert {
7119                    row_id: "row-1".to_owned(),
7120                    logical_id: "goal-1".to_owned(),
7121                    kind: "Goal".to_owned(),
7122                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
7123                        .to_owned(),
7124                    source_ref: Some("src-1".to_owned()),
7125                    upsert: false,
7126                    chunk_policy: ChunkPolicy::Preserve,
7127                    content_ref: None,
7128                }],
7129                node_retires: vec![],
7130                edges: vec![],
7131                edge_retires: vec![],
7132                chunks: vec![],
7133                runs: vec![],
7134                steps: vec![],
7135                actions: vec![],
7136                optional_backfills: vec![],
7137                vec_inserts: vec![],
7138                operational_writes: vec![],
7139            })
7140            .expect("write");
7141
7142        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7143        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7144        let text: String = conn
7145            .query_row(
7146                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7147                [],
7148                |row| row.get(0),
7149            )
7150            .expect("property FTS row must exist");
7151        assert_eq!(text, "Ship v2 Launch the redesign");
7152    }
7153
7154    #[test]
7155    fn property_fts_rows_replaced_on_upsert() {
7156        let db = NamedTempFile::new().expect("temporary db");
7157        let schema = Arc::new(SchemaManager::new());
7158        {
7159            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7160            schema.bootstrap(&conn).expect("bootstrap");
7161            conn.execute(
7162                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7163                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7164                [],
7165            )
7166            .expect("register schema");
7167        }
7168        let writer = WriterActor::start(
7169            db.path(),
7170            Arc::clone(&schema),
7171            ProvenanceMode::Warn,
7172            Arc::new(TelemetryCounters::default()),
7173        )
7174        .expect("writer");
7175
7176        // Initial insert.
7177        writer
7178            .submit(WriteRequest {
7179                label: "insert".to_owned(),
7180                nodes: vec![NodeInsert {
7181                    row_id: "row-1".to_owned(),
7182                    logical_id: "goal-1".to_owned(),
7183                    kind: "Goal".to_owned(),
7184                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7185                    source_ref: Some("src-1".to_owned()),
7186                    upsert: false,
7187                    chunk_policy: ChunkPolicy::Preserve,
7188                    content_ref: None,
7189                }],
7190                node_retires: vec![],
7191                edges: vec![],
7192                edge_retires: vec![],
7193                chunks: vec![],
7194                runs: vec![],
7195                steps: vec![],
7196                actions: vec![],
7197                optional_backfills: vec![],
7198                vec_inserts: vec![],
7199                operational_writes: vec![],
7200            })
7201            .expect("insert");
7202
7203        // Upsert with changed property.
7204        writer
7205            .submit(WriteRequest {
7206                label: "upsert".to_owned(),
7207                nodes: vec![NodeInsert {
7208                    row_id: "row-2".to_owned(),
7209                    logical_id: "goal-1".to_owned(),
7210                    kind: "Goal".to_owned(),
7211                    properties: r#"{"name":"Beta"}"#.to_owned(),
7212                    source_ref: Some("src-2".to_owned()),
7213                    upsert: true,
7214                    chunk_policy: ChunkPolicy::Preserve,
7215                    content_ref: None,
7216                }],
7217                node_retires: vec![],
7218                edges: vec![],
7219                edge_retires: vec![],
7220                chunks: vec![],
7221                runs: vec![],
7222                steps: vec![],
7223                actions: vec![],
7224                optional_backfills: vec![],
7225                vec_inserts: vec![],
7226                operational_writes: vec![],
7227            })
7228            .expect("upsert");
7229
7230        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7231        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7232        let count: i64 = conn
7233            .query_row(
7234                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7235                [],
7236                |row| row.get(0),
7237            )
7238            .expect("count");
7239        assert_eq!(
7240            count, 1,
7241            "must have exactly one property FTS row after upsert"
7242        );
7243
7244        let text: String = conn
7245            .query_row(
7246                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7247                [],
7248                |row| row.get(0),
7249            )
7250            .expect("text");
7251        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7252    }
7253
7254    #[test]
7255    fn property_fts_rows_deleted_on_retire() {
7256        let db = NamedTempFile::new().expect("temporary db");
7257        let schema = Arc::new(SchemaManager::new());
7258        {
7259            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7260            schema.bootstrap(&conn).expect("bootstrap");
7261            conn.execute(
7262                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7263                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7264                [],
7265            )
7266            .expect("register schema");
7267        }
7268        let writer = WriterActor::start(
7269            db.path(),
7270            Arc::clone(&schema),
7271            ProvenanceMode::Warn,
7272            Arc::new(TelemetryCounters::default()),
7273        )
7274        .expect("writer");
7275
7276        // Insert a node.
7277        writer
7278            .submit(WriteRequest {
7279                label: "insert".to_owned(),
7280                nodes: vec![NodeInsert {
7281                    row_id: "row-1".to_owned(),
7282                    logical_id: "goal-1".to_owned(),
7283                    kind: "Goal".to_owned(),
7284                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7285                    source_ref: Some("src-1".to_owned()),
7286                    upsert: false,
7287                    chunk_policy: ChunkPolicy::Preserve,
7288                    content_ref: None,
7289                }],
7290                node_retires: vec![],
7291                edges: vec![],
7292                edge_retires: vec![],
7293                chunks: vec![],
7294                runs: vec![],
7295                steps: vec![],
7296                actions: vec![],
7297                optional_backfills: vec![],
7298                vec_inserts: vec![],
7299                operational_writes: vec![],
7300            })
7301            .expect("insert");
7302
7303        // Retire the node.
7304        writer
7305            .submit(WriteRequest {
7306                label: "retire".to_owned(),
7307                nodes: vec![],
7308                node_retires: vec![NodeRetire {
7309                    logical_id: "goal-1".to_owned(),
7310                    source_ref: Some("forget-1".to_owned()),
7311                }],
7312                edges: vec![],
7313                edge_retires: vec![],
7314                chunks: vec![],
7315                runs: vec![],
7316                steps: vec![],
7317                actions: vec![],
7318                optional_backfills: vec![],
7319                vec_inserts: vec![],
7320                operational_writes: vec![],
7321            })
7322            .expect("retire");
7323
7324        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7325        let table = fathomdb_schema::fts_kind_table_name("Goal");
7326        let count: i64 = conn
7327            .query_row(
7328                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
7329                [],
7330                |row| row.get(0),
7331            )
7332            .expect("count");
7333        assert_eq!(count, 0, "property FTS row must be deleted on retire");
7334    }
7335
7336    #[test]
7337    fn no_property_fts_row_for_unregistered_kind() {
7338        let db = NamedTempFile::new().expect("temporary db");
7339        let schema = Arc::new(SchemaManager::new());
7340        {
7341            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7342            schema.bootstrap(&conn).expect("bootstrap");
7343            // No schema registered for "Note" kind.
7344        }
7345        let writer = WriterActor::start(
7346            db.path(),
7347            Arc::clone(&schema),
7348            ProvenanceMode::Warn,
7349            Arc::new(TelemetryCounters::default()),
7350        )
7351        .expect("writer");
7352
7353        writer
7354            .submit(WriteRequest {
7355                label: "insert".to_owned(),
7356                nodes: vec![NodeInsert {
7357                    row_id: "row-1".to_owned(),
7358                    logical_id: "note-1".to_owned(),
7359                    kind: "Note".to_owned(),
7360                    properties: r#"{"title":"hello"}"#.to_owned(),
7361                    source_ref: Some("src-1".to_owned()),
7362                    upsert: false,
7363                    chunk_policy: ChunkPolicy::Preserve,
7364                    content_ref: None,
7365                }],
7366                node_retires: vec![],
7367                edges: vec![],
7368                edge_retires: vec![],
7369                chunks: vec![],
7370                runs: vec![],
7371                steps: vec![],
7372                actions: vec![],
7373                optional_backfills: vec![],
7374                vec_inserts: vec![],
7375                operational_writes: vec![],
7376            })
7377            .expect("insert");
7378
7379        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7380        let table = fathomdb_schema::fts_kind_table_name("Note");
7381        let exists: bool = conn
7382            .query_row(
7383                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
7384                rusqlite::params![table],
7385                |row| row.get::<_, i64>(0),
7386            )
7387            .map(|c| c > 0)
7388            .expect("exists check");
7389        assert!(
7390            !exists,
7391            "no per-kind FTS table should be created for unregistered kind"
7392        );
7393    }
7394
7395    // --- A-6 per-kind FTS table tests ---
7396
7397    #[test]
7398    fn property_fts_write_goes_to_per_kind_table() {
7399        // After A-6: writes go to fts_props_goal, NOT fts_node_properties.
7400        let db = NamedTempFile::new().expect("temporary db");
7401        let schema = Arc::new(SchemaManager::new());
7402        {
7403            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7404            schema.bootstrap(&conn).expect("bootstrap");
7405            conn.execute(
7406                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7407                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7408                [],
7409            )
7410            .expect("register schema");
7411            // Create per-kind table (migration 21 would do this for existing schemas).
7412            conn.execute_batch(
7413                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7414                    node_logical_id UNINDEXED, text_content, \
7415                    tokenize = 'porter unicode61 remove_diacritics 2'\
7416                )",
7417            )
7418            .expect("create per-kind table");
7419        }
7420        let writer = WriterActor::start(
7421            db.path(),
7422            Arc::clone(&schema),
7423            ProvenanceMode::Warn,
7424            Arc::new(TelemetryCounters::default()),
7425        )
7426        .expect("writer");
7427
7428        writer
7429            .submit(WriteRequest {
7430                label: "insert".to_owned(),
7431                nodes: vec![NodeInsert {
7432                    row_id: "row-1".to_owned(),
7433                    logical_id: "goal-1".to_owned(),
7434                    kind: "Goal".to_owned(),
7435                    properties: r#"{"name":"Ship v2"}"#.to_owned(),
7436                    source_ref: Some("src-1".to_owned()),
7437                    upsert: false,
7438                    chunk_policy: ChunkPolicy::Preserve,
7439                    content_ref: None,
7440                }],
7441                node_retires: vec![],
7442                edges: vec![],
7443                edge_retires: vec![],
7444                chunks: vec![],
7445                runs: vec![],
7446                steps: vec![],
7447                actions: vec![],
7448                optional_backfills: vec![],
7449                vec_inserts: vec![],
7450                operational_writes: vec![],
7451            })
7452            .expect("write");
7453
7454        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7455        // Per-kind table must have the row.
7456        let per_kind_count: i64 = conn
7457            .query_row(
7458                "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7459                [],
7460                |row| row.get(0),
7461            )
7462            .expect("per-kind count");
7463        assert_eq!(
7464            per_kind_count, 1,
7465            "per-kind table fts_props_goal must have the row"
7466        );
7467    }
7468
7469    #[test]
7470    fn property_fts_retire_removes_from_per_kind_table() {
7471        // After A-6: retire removes from fts_props_goal, NOT fts_node_properties.
7472        let db = NamedTempFile::new().expect("temporary db");
7473        let schema = Arc::new(SchemaManager::new());
7474        {
7475            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7476            schema.bootstrap(&conn).expect("bootstrap");
7477            conn.execute(
7478                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7479                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7480                [],
7481            )
7482            .expect("register schema");
7483            conn.execute_batch(
7484                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7485                    node_logical_id UNINDEXED, text_content, \
7486                    tokenize = 'porter unicode61 remove_diacritics 2'\
7487                )",
7488            )
7489            .expect("create per-kind table");
7490        }
7491        let writer = WriterActor::start(
7492            db.path(),
7493            Arc::clone(&schema),
7494            ProvenanceMode::Warn,
7495            Arc::new(TelemetryCounters::default()),
7496        )
7497        .expect("writer");
7498
7499        // Insert.
7500        writer
7501            .submit(WriteRequest {
7502                label: "insert".to_owned(),
7503                nodes: vec![NodeInsert {
7504                    row_id: "row-1".to_owned(),
7505                    logical_id: "goal-1".to_owned(),
7506                    kind: "Goal".to_owned(),
7507                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7508                    source_ref: Some("src-1".to_owned()),
7509                    upsert: false,
7510                    chunk_policy: ChunkPolicy::Preserve,
7511                    content_ref: None,
7512                }],
7513                node_retires: vec![],
7514                edges: vec![],
7515                edge_retires: vec![],
7516                chunks: vec![],
7517                runs: vec![],
7518                steps: vec![],
7519                actions: vec![],
7520                optional_backfills: vec![],
7521                vec_inserts: vec![],
7522                operational_writes: vec![],
7523            })
7524            .expect("insert");
7525
7526        // Retire.
7527        writer
7528            .submit(WriteRequest {
7529                label: "retire".to_owned(),
7530                nodes: vec![],
7531                node_retires: vec![NodeRetire {
7532                    logical_id: "goal-1".to_owned(),
7533                    source_ref: Some("forget-1".to_owned()),
7534                }],
7535                edges: vec![],
7536                edge_retires: vec![],
7537                chunks: vec![],
7538                runs: vec![],
7539                steps: vec![],
7540                actions: vec![],
7541                optional_backfills: vec![],
7542                vec_inserts: vec![],
7543                operational_writes: vec![],
7544            })
7545            .expect("retire");
7546
7547        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7548        let per_kind_count: i64 = conn
7549            .query_row(
7550                "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7551                [],
7552                |row| row.get(0),
7553            )
7554            .expect("per-kind count");
7555        assert_eq!(
7556            per_kind_count, 0,
7557            "per-kind table row must be removed on retire"
7558        );
7559    }
7560
7561    mod extract_json_path_tests {
7562        use super::super::extract_json_path;
7563        use serde_json::json;
7564
7565        #[test]
7566        fn string_value() {
7567            let v = json!({"name": "alice"});
7568            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7569        }
7570
7571        #[test]
7572        fn number_value() {
7573            let v = json!({"age": 42});
7574            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7575        }
7576
7577        #[test]
7578        fn bool_value() {
7579            let v = json!({"active": true});
7580            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7581        }
7582
7583        #[test]
7584        fn null_value() {
7585            let v = json!({"x": null});
7586            assert!(extract_json_path(&v, "$.x").is_empty());
7587        }
7588
7589        #[test]
7590        fn missing_path() {
7591            let v = json!({"name": "a"});
7592            assert!(extract_json_path(&v, "$.missing").is_empty());
7593        }
7594
7595        #[test]
7596        fn nested_path() {
7597            let v = json!({"address": {"city": "NYC"}});
7598            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7599        }
7600
7601        #[test]
7602        fn array_of_strings() {
7603            let v = json!({"tags": ["a", "b", "c"]});
7604            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7605        }
7606
7607        #[test]
7608        fn array_mixed_scalars() {
7609            let v = json!({"vals": ["x", 1, true]});
7610            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7611        }
7612
7613        #[test]
7614        fn array_only_objects_returns_empty() {
7615            let v = json!({"data": [{"k": "v"}]});
7616            assert!(extract_json_path(&v, "$.data").is_empty());
7617        }
7618
7619        #[test]
7620        fn array_mixed_objects_and_scalars() {
7621            let v = json!({"data": ["keep", {"skip": true}, "also"]});
7622            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7623        }
7624
7625        #[test]
7626        fn object_returns_empty() {
7627            let v = json!({"meta": {"k": "v"}});
7628            assert!(extract_json_path(&v, "$.meta").is_empty());
7629        }
7630
7631        #[test]
7632        fn no_prefix_returns_empty() {
7633            let v = json!({"name": "a"});
7634            assert!(extract_json_path(&v, "name").is_empty());
7635        }
7636    }
7637
7638    mod recursive_extraction_tests {
7639        use super::super::{
7640            LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7641            PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7642        };
7643        use serde_json::json;
7644
7645        fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7646            PropertyFtsSchema {
7647                paths,
7648                separator: " ".to_owned(),
7649                exclude_paths: Vec::new(),
7650            }
7651        }
7652
7653        fn schema_with_excludes(
7654            paths: Vec<PropertyPathEntry>,
7655            excludes: Vec<String>,
7656        ) -> PropertyFtsSchema {
7657            PropertyFtsSchema {
7658                paths,
7659                separator: " ".to_owned(),
7660                exclude_paths: excludes,
7661            }
7662        }
7663
7664        #[test]
7665        fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7666            let props = json!({"payload": {"b": "two", "a": "one"}});
7667            let (blob, positions, _stats) = extract_property_fts(
7668                &props,
7669                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7670            );
7671            let blob = blob.expect("blob emitted");
7672            let idx_one = blob.find("one").expect("contains 'one'");
7673            let idx_two = blob.find("two").expect("contains 'two'");
7674            assert!(
7675                idx_one < idx_two,
7676                "lex order: 'one' (key a) before 'two' (key b)"
7677            );
7678            assert_eq!(positions.len(), 2);
7679            assert_eq!(positions[0].leaf_path, "$.payload.a");
7680            assert_eq!(positions[1].leaf_path, "$.payload.b");
7681        }
7682
7683        #[test]
7684        fn recursive_extraction_walks_arrays_of_scalars() {
7685            let props = json!({"tags": ["red", "blue"]});
7686            let (_blob, positions, _stats) = extract_property_fts(
7687                &props,
7688                &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7689            );
7690            assert_eq!(positions.len(), 2);
7691            assert_eq!(positions[0].leaf_path, "$.tags[0]");
7692            assert_eq!(positions[1].leaf_path, "$.tags[1]");
7693        }
7694
7695        #[test]
7696        fn recursive_extraction_walks_arrays_of_objects() {
7697            let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7698            let (_blob, positions, _stats) = extract_property_fts(
7699                &props,
7700                &schema(vec![PropertyPathEntry::recursive("$.items")]),
7701            );
7702            assert_eq!(positions.len(), 2);
7703            assert_eq!(positions[0].leaf_path, "$.items[0].name");
7704            assert_eq!(positions[1].leaf_path, "$.items[1].name");
7705        }
7706
7707        #[test]
7708        fn recursive_extraction_stringifies_numbers_and_bools() {
7709            let props = json!({"root": {"n": 42, "ok": true}});
7710            let (blob, _positions, _stats) = extract_property_fts(
7711                &props,
7712                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7713            );
7714            let blob = blob.expect("blob emitted");
7715            assert!(blob.contains("42"));
7716            assert!(blob.contains("true"));
7717        }
7718
7719        #[test]
7720        fn recursive_extraction_skips_nulls_and_missing() {
7721            let props = json!({"root": {"x": null, "y": "present"}});
7722            let (blob, positions, _stats) = extract_property_fts(
7723                &props,
7724                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7725            );
7726            let blob = blob.expect("blob emitted");
7727            assert!(!blob.contains("null"));
7728            assert_eq!(positions.len(), 1);
7729            assert_eq!(positions[0].leaf_path, "$.root.y");
7730        }
7731
7732        #[test]
7733        fn recursive_extraction_respects_max_depth_guardrail() {
7734            // Build nested object 10 levels deep. MAX_RECURSIVE_DEPTH = 8.
7735            let mut inner = json!("leaf-value");
7736            for _ in 0..10 {
7737                inner = json!({ "k": inner });
7738            }
7739            let props = json!({ "root": inner });
7740            let (blob, positions, stats) = extract_property_fts(
7741                &props,
7742                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7743            );
7744            assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7745            // The walk should stop before reaching the leaf (depth 8).
7746            assert!(
7747                blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7748                "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7749            );
7750            // Row is still indexed with whatever was emitted — even if
7751            // that's nothing. The contract is only that the walk clamped.
7752            let _ = positions;
7753            let _ = MAX_RECURSIVE_DEPTH;
7754        }
7755
7756        #[test]
7757        fn recursive_extraction_respects_max_bytes_guardrail() {
7758            // Build an array of 4KB strings; total blob > 64KB.
7759            let leaves: Vec<String> = (0..40)
7760                .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7761                .collect();
7762            let props = json!({ "root": leaves });
7763            let (blob, positions, stats) = extract_property_fts(
7764                &props,
7765                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7766            );
7767            assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7768            let blob = blob.expect("blob must still be emitted");
7769            assert!(
7770                blob.len() <= MAX_EXTRACTED_BYTES,
7771                "blob must not exceed MAX_EXTRACTED_BYTES"
7772            );
7773            // No partial leaves: every position end must fall inside blob length.
7774            for pos in &positions {
7775                assert!(pos.end_offset <= blob.len());
7776                let slice = &blob[pos.start_offset..pos.end_offset];
7777                // Slice must equal some original leaf value; we only
7778                // verify it's not empty and doesn't straddle a separator.
7779                assert!(!slice.is_empty());
7780                assert!(!slice.contains(LEAF_SEPARATOR));
7781            }
7782            // Blob cannot end in a dangling separator.
7783            assert!(!blob.ends_with(LEAF_SEPARATOR));
7784        }
7785
7786        #[test]
7787        fn recursive_extraction_respects_exclude_paths() {
7788            let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7789            let (blob, positions, stats) = extract_property_fts(
7790                &props,
7791                &schema_with_excludes(
7792                    vec![PropertyPathEntry::recursive("$.payload")],
7793                    vec!["$.payload.priv".to_owned()],
7794                ),
7795            );
7796            let blob = blob.expect("blob emitted");
7797            assert!(blob.contains("yes"));
7798            assert!(!blob.contains("no"));
7799            assert_eq!(positions.len(), 1);
7800            assert_eq!(positions[0].leaf_path, "$.payload.pub");
7801            assert!(stats.excluded_subtree > 0);
7802        }
7803
7804        #[test]
7805        fn position_map_entries_match_emitted_leaves_in_order() {
7806            let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7807            let (blob, positions, _stats) = extract_property_fts(
7808                &props,
7809                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7810            );
7811            let blob = blob.expect("blob emitted");
7812            assert_eq!(positions.len(), 3);
7813            // Leaf paths in lexicographic order.
7814            assert_eq!(positions[0].leaf_path, "$.root.a");
7815            assert_eq!(positions[1].leaf_path, "$.root.b");
7816            assert_eq!(positions[2].leaf_path, "$.root.c");
7817            // Monotonic, non-overlapping offsets.
7818            let mut prev_end: usize = 0;
7819            for (i, pos) in positions.iter().enumerate() {
7820                assert!(pos.start_offset >= prev_end);
7821                assert!(pos.end_offset > pos.start_offset);
7822                assert!(pos.end_offset <= blob.len());
7823                let slice = &blob[pos.start_offset..pos.end_offset];
7824                match i {
7825                    0 => assert_eq!(slice, "alpha"),
7826                    1 => assert_eq!(slice, "bravo"),
7827                    2 => assert_eq!(slice, "charlie"),
7828                    _ => unreachable!(),
7829                }
7830                prev_end = pos.end_offset;
7831            }
7832        }
7833
7834        #[test]
7835        fn scalar_only_schema_produces_empty_position_map() {
7836            let props = json!({"name": "alpha", "title": "beta"});
7837            let (blob, positions, _stats) = extract_property_fts(
7838                &props,
7839                &schema(vec![
7840                    PropertyPathEntry::scalar("$.name"),
7841                    PropertyPathEntry::scalar("$.title"),
7842                ]),
7843            );
7844            assert_eq!(blob.as_deref(), Some("alpha beta"));
7845            assert!(
7846                positions.is_empty(),
7847                "scalar-only schema must emit no position entries"
7848            );
7849            // Sanity-check the type so the test fails loudly if the
7850            // signature changes.
7851            let _: Vec<PositionEntry> = positions;
7852        }
7853
7854        fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7855            let mut seen = std::collections::HashSet::new();
7856            for pos in positions {
7857                let start = pos.start_offset;
7858                assert!(
7859                    seen.insert(start),
7860                    "duplicate start_offset {start} in positions {positions:?}"
7861                );
7862            }
7863        }
7864
7865        #[test]
7866        fn recursive_extraction_empty_then_nonempty_in_array() {
7867            let props = json!({"payload": {"xs": ["", "x"]}});
7868            let (combined, positions, _stats) = extract_property_fts(
7869                &props,
7870                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7871            );
7872            assert_eq!(combined.as_deref(), Some("x"));
7873            assert_eq!(positions.len(), 1);
7874            assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7875            assert_eq!(positions[0].start_offset, 0);
7876            assert_eq!(positions[0].end_offset, 1);
7877            assert_unique_start_offsets(&positions);
7878        }
7879
7880        #[test]
7881        fn recursive_extraction_two_empties_then_nonempty_in_array() {
7882            let props = json!({"payload": {"xs": ["", "", "x"]}});
7883            let (combined, positions, _stats) = extract_property_fts(
7884                &props,
7885                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7886            );
7887            assert_eq!(combined.as_deref(), Some("x"));
7888            assert_eq!(positions.len(), 1);
7889            assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7890            assert_eq!(positions[0].start_offset, 0);
7891            assert_eq!(positions[0].end_offset, 1);
7892            assert_unique_start_offsets(&positions);
7893        }
7894
7895        #[test]
7896        fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7897            let props = json!({"payload": {"a": "", "b": "x"}});
7898            let (combined, positions, _stats) = extract_property_fts(
7899                &props,
7900                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7901            );
7902            assert_eq!(combined.as_deref(), Some("x"));
7903            assert_eq!(positions.len(), 1);
7904            assert_eq!(positions[0].leaf_path, "$.payload.b");
7905            assert_eq!(positions[0].start_offset, 0);
7906            assert_eq!(positions[0].end_offset, 1);
7907            assert_unique_start_offsets(&positions);
7908        }
7909
7910        #[test]
7911        fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7912            let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7913            let (combined, positions, _stats) = extract_property_fts(
7914                &props,
7915                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7916            );
7917            assert_eq!(combined.as_deref(), Some("x"));
7918            assert_eq!(positions.len(), 1);
7919            assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7920            assert_eq!(positions[0].start_offset, 0);
7921            assert_eq!(positions[0].end_offset, 1);
7922            assert_unique_start_offsets(&positions);
7923        }
7924
7925        #[test]
7926        fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7927            let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7928            let (combined, positions, _stats) = extract_property_fts(
7929                &props,
7930                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7931            );
7932            assert_eq!(combined.as_deref(), Some("x"));
7933            assert_eq!(positions.len(), 1);
7934            assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7935            assert_eq!(positions[0].start_offset, 0);
7936            assert_eq!(positions[0].end_offset, 1);
7937            assert_unique_start_offsets(&positions);
7938        }
7939
7940        #[test]
7941        fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7942            let cases = vec![
7943                json!({"payload": {}}),
7944                json!({"payload": {"a": ""}}),
7945                json!({"payload": {"xs": []}}),
7946                json!({"payload": {"xs": [""]}}),
7947                json!({"payload": {"xs": ["", ""]}}),
7948                json!({"payload": {"xs": ["", "", ""]}}),
7949            ];
7950            for case in cases {
7951                let (combined, positions, _stats) = extract_property_fts(
7952                    &case,
7953                    &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7954                );
7955                assert!(
7956                    combined.is_none(),
7957                    "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7958                );
7959                assert!(
7960                    positions.is_empty(),
7961                    "all-empty payload {case:?} must produce no positions, got {positions:?}"
7962                );
7963            }
7964        }
7965
7966        #[test]
7967        fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7968            let props = json!({"payload": {"xs": ["x", "", "y"]}});
7969            let (combined, positions, _stats) = extract_property_fts(
7970                &props,
7971                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7972            );
7973            let blob = combined.expect("blob emitted");
7974            assert_eq!(positions.len(), 2);
7975            assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7976            assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7977            assert_eq!(positions[0].start_offset, 0);
7978            assert_eq!(positions[0].end_offset, 1);
7979            // The two non-empty leaves are separated by exactly one
7980            // LEAF_SEPARATOR; the empty middle leaf contributes neither
7981            // bytes nor a position.
7982            assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7983            assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7984            assert_eq!(
7985                &blob[positions[0].start_offset..positions[0].end_offset],
7986                "x"
7987            );
7988            assert_eq!(
7989                &blob[positions[1].start_offset..positions[1].end_offset],
7990                "y"
7991            );
7992            assert_unique_start_offsets(&positions);
7993        }
7994
7995        #[test]
7996        fn recursive_extraction_null_leaves_unchanged() {
7997            let props = json!({"payload": {"xs": [null, null]}});
7998            let (combined, positions, _stats) = extract_property_fts(
7999                &props,
8000                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8001            );
8002            assert!(combined.is_none());
8003            assert!(positions.is_empty());
8004        }
8005    }
8006
8007    mod parse_property_schema_json_tests {
8008        use super::super::parse_property_schema_json;
8009
8010        #[test]
8011        fn parse_property_schema_json_preserves_weight() {
8012            let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
8013            let schema = parse_property_schema_json(json, " ");
8014            assert_eq!(schema.paths.len(), 2);
8015            let title = &schema.paths[0];
8016            assert_eq!(title.path, "$.title");
8017            assert_eq!(title.weight, Some(2.0_f32));
8018            let body = &schema.paths[1];
8019            assert_eq!(body.path, "$.body");
8020            assert_eq!(body.weight, None);
8021        }
8022    }
8023
8024    // --- B-3: extract_property_fts_columns tests ---
8025
8026    mod extract_property_fts_columns_tests {
8027        use serde_json::json;
8028
8029        use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
8030
8031        fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
8032            PropertyFtsSchema {
8033                paths,
8034                separator: " ".to_owned(),
8035                exclude_paths: Vec::new(),
8036            }
8037        }
8038
8039        #[test]
8040        fn extract_property_fts_columns_returns_per_spec_text() {
8041            let props = json!({"title": "Hello", "body": "World"});
8042            let mut title_entry = PropertyPathEntry::scalar("$.title");
8043            title_entry.weight = Some(2.0);
8044            let mut body_entry = PropertyPathEntry::scalar("$.body");
8045            body_entry.weight = Some(1.0);
8046            let schema = weighted_schema(vec![title_entry, body_entry]);
8047            let cols = extract_property_fts_columns(&props, &schema);
8048            assert_eq!(cols.len(), 2);
8049            assert_eq!(cols[0].0, "title");
8050            assert_eq!(cols[0].1, "Hello");
8051            assert_eq!(cols[1].0, "body");
8052            assert_eq!(cols[1].1, "World");
8053        }
8054
8055        #[test]
8056        fn extract_property_fts_columns_missing_field_returns_empty_string() {
8057            let props = json!({"title": "Hello"});
8058            let mut title_entry = PropertyPathEntry::scalar("$.title");
8059            title_entry.weight = Some(2.0);
8060            let mut body_entry = PropertyPathEntry::scalar("$.body");
8061            body_entry.weight = Some(1.0);
8062            let schema = weighted_schema(vec![title_entry, body_entry]);
8063            let cols = extract_property_fts_columns(&props, &schema);
8064            assert_eq!(cols.len(), 2);
8065            assert_eq!(cols[0].1, "Hello");
8066            assert_eq!(cols[1].1, "", "missing field yields empty string");
8067        }
8068    }
8069
8070    // --- B-3: writer per-column INSERT for weighted schemas ---
8071
8072    #[test]
8073    fn writer_inserts_per_column_for_weighted_schema() {
8074        use fathomdb_schema::fts_column_name;
8075
8076        let db = NamedTempFile::new().expect("temporary db");
8077        let schema_manager = Arc::new(SchemaManager::new());
8078
8079        // Bootstrap and create the weighted per-kind table.
8080        {
8081            let conn = rusqlite::Connection::open(db.path()).expect("conn");
8082            schema_manager.bootstrap(&conn).expect("bootstrap");
8083
8084            let title_col = fts_column_name("$.title", false);
8085            let body_col = fts_column_name("$.body", false);
8086
8087            // Register schema with weights.
8088            let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
8089            conn.execute(
8090                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
8091                 VALUES (?1, ?2, ' ')",
8092                rusqlite::params!["Article", paths_json],
8093            )
8094            .expect("register schema");
8095
8096            // Create the per-kind FTS table with per-column layout.
8097            conn.execute_batch(&format!(
8098                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_article USING fts5(\
8099                    node_logical_id UNINDEXED, {title_col}, {body_col}, \
8100                    tokenize = 'porter unicode61 remove_diacritics 2'\
8101                )"
8102            ))
8103            .expect("create weighted per-kind table");
8104        }
8105
8106        let writer = WriterActor::start(
8107            db.path(),
8108            Arc::clone(&schema_manager),
8109            ProvenanceMode::Warn,
8110            Arc::new(TelemetryCounters::default()),
8111        )
8112        .expect("writer");
8113
8114        writer
8115            .submit(WriteRequest {
8116                label: "insert".to_owned(),
8117                nodes: vec![NodeInsert {
8118                    row_id: "row-1".to_owned(),
8119                    logical_id: "article-1".to_owned(),
8120                    kind: "Article".to_owned(),
8121                    properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
8122                    source_ref: Some("src-1".to_owned()),
8123                    upsert: false,
8124                    chunk_policy: ChunkPolicy::Preserve,
8125                    content_ref: None,
8126                }],
8127                node_retires: vec![],
8128                edges: vec![],
8129                edge_retires: vec![],
8130                chunks: vec![],
8131                runs: vec![],
8132                steps: vec![],
8133                actions: vec![],
8134                optional_backfills: vec![],
8135                vec_inserts: vec![],
8136                operational_writes: vec![],
8137            })
8138            .expect("write");
8139
8140        let conn = rusqlite::Connection::open(db.path()).expect("conn");
8141        let title_col = fts_column_name("$.title", false);
8142        let body_col = fts_column_name("$.body", false);
8143
8144        // The per-kind table must have one row for article-1.
8145        let count: i64 = conn
8146            .query_row(
8147                "SELECT count(*) FROM fts_props_article WHERE node_logical_id = 'article-1'",
8148                [],
8149                |r| r.get(0),
8150            )
8151            .expect("count");
8152        assert_eq!(count, 1, "per-kind FTS table must have the row");
8153
8154        // Verify per-column values via direct SELECT (not MATCH).
8155        let (title_val, body_val): (String, String) = conn
8156            .query_row(
8157                &format!(
8158                    "SELECT {title_col}, {body_col} FROM fts_props_article \
8159                     WHERE node_logical_id = 'article-1'"
8160                ),
8161                [],
8162                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
8163            )
8164            .expect("select per-column");
8165        assert_eq!(
8166            title_val, "Hello World",
8167            "title column must have correct value"
8168        );
8169        assert_eq!(body_val, "Foo Bar", "body column must have correct value");
8170    }
8171}