Skip to main content

fathomdb_engine/
writer.rs

1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16    OperationalValidationMode, extract_secondary_index_entries_for_current,
17    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18    parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23/// A deferred projection backfill task submitted alongside a write.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26    /// Which projection to backfill (FTS, vec, etc.).
27    pub target: ProjectionTarget,
28    /// JSON payload describing the backfill work.
29    pub payload: String,
30}
31
32/// Policy for handling existing chunks when upserting a node.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35    /// Keep existing chunks and FTS rows.
36    #[default]
37    Preserve,
38    /// Delete existing chunks and FTS rows before inserting new ones.
39    Replace,
40}
41
42/// Controls how missing `source_ref` values are handled at write time.
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
46    #[default]
47    Warn,
48    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
49    /// insert or retire is missing `source_ref`.
50    Require,
51}
52
53/// A node to be inserted in a [`WriteRequest`].
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56    pub row_id: String,
57    pub logical_id: String,
58    pub kind: String,
59    pub properties: String,
60    pub source_ref: Option<String>,
61    /// When true the writer supersedes the current active row for this `logical_id`
62    /// before inserting this new version. The supersession and insert are atomic.
63    pub upsert: bool,
64    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
65    pub chunk_policy: ChunkPolicy,
66    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
67    pub content_ref: Option<String>,
68}
69
70/// An edge to be inserted in a [`WriteRequest`].
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73    pub row_id: String,
74    pub logical_id: String,
75    pub source_logical_id: String,
76    pub target_logical_id: String,
77    pub kind: String,
78    pub properties: String,
79    pub source_ref: Option<String>,
80    /// When true the writer supersedes the current active edge for this `logical_id`
81    /// before inserting this new version. The supersession and insert are atomic.
82    pub upsert: bool,
83}
84
85/// A node to be retired (soft-deleted) in a [`WriteRequest`].
86#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88    pub logical_id: String,
89    pub source_ref: Option<String>,
90}
91
92/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95    pub logical_id: String,
96    pub source_ref: Option<String>,
97}
98
99/// A text chunk to be inserted in a [`WriteRequest`].
100#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102    pub id: String,
103    pub node_logical_id: String,
104    pub text_content: String,
105    pub byte_start: Option<i64>,
106    pub byte_end: Option<i64>,
107    /// Optional hash of the external content this chunk was derived from.
108    pub content_hash: Option<String>,
109}
110
111/// A vector embedding to attach to an existing chunk.
112///
113/// The `chunk_id` must reference a chunk already present in the database or
114/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
115/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
116/// without the feature the insert is silently skipped.
117#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119    pub chunk_id: String,
120    pub embedding: Vec<f32>,
121}
122
123/// A mutation to an operational collection submitted as part of a write request.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126    Append {
127        collection: String,
128        record_key: String,
129        payload_json: String,
130        source_ref: Option<String>,
131    },
132    Put {
133        collection: String,
134        record_key: String,
135        payload_json: String,
136        source_ref: Option<String>,
137    },
138    Delete {
139        collection: String,
140        record_key: String,
141        source_ref: Option<String>,
142    },
143}
144
145/// A run to be inserted in a [`WriteRequest`].
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148    pub id: String,
149    pub kind: String,
150    pub status: String,
151    pub properties: String,
152    pub source_ref: Option<String>,
153    pub upsert: bool,
154    pub supersedes_id: Option<String>,
155}
156
157/// A step to be inserted in a [`WriteRequest`].
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160    pub id: String,
161    pub run_id: String,
162    pub kind: String,
163    pub status: String,
164    pub properties: String,
165    pub source_ref: Option<String>,
166    pub upsert: bool,
167    pub supersedes_id: Option<String>,
168}
169
170/// An action to be inserted in a [`WriteRequest`].
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173    pub id: String,
174    pub step_id: String,
175    pub kind: String,
176    pub status: String,
177    pub properties: String,
178    pub source_ref: Option<String>,
179    pub upsert: bool,
180    pub supersedes_id: Option<String>,
181}
182
183// --- Write-request size limits ---
184const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
193///
194/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
195/// writer thread is **not** cancelled, so the write may still commit.  This is
196/// intentionally distinct from [`EngineError::WriterRejected`], which signals
197/// that the writer thread has disconnected and the write will **not** commit.
198const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
201#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203    pub label: String,
204    pub nodes: Vec<NodeInsert>,
205    pub node_retires: Vec<NodeRetire>,
206    pub edges: Vec<EdgeInsert>,
207    pub edge_retires: Vec<EdgeRetire>,
208    pub chunks: Vec<ChunkInsert>,
209    pub runs: Vec<RunInsert>,
210    pub steps: Vec<StepInsert>,
211    pub actions: Vec<ActionInsert>,
212    pub optional_backfills: Vec<OptionalProjectionTask>,
213    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
214    /// `sqlite-vec` feature is absent.
215    pub vec_inserts: Vec<VecInsert>,
216    pub operational_writes: Vec<OperationalWrite>,
217}
218
219/// Receipt returned after a successful write transaction.
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222    pub label: String,
223    pub optional_backfill_count: usize,
224    pub warnings: Vec<String>,
225    pub provenance_warnings: Vec<String>,
226}
227
228/// Request to update `last_accessed_at` timestamps for a batch of nodes.
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231    pub logical_ids: Vec<String>,
232    pub touched_at: i64,
233    pub source_ref: Option<String>,
234}
235
236/// Report from a last-access touch operation.
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239    pub touched_logical_ids: usize,
240    pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245    chunk_id: String,
246    node_logical_id: String,
247    kind: String,
248    text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253    node_logical_id: String,
254    kind: String,
255    text_content: String,
256    positions: Vec<PositionEntry>,
257}
258
259/// Maximum depth the recursive property-FTS extraction walk descends before
260/// clamping. A walk that reaches this depth will not emit leaves below it.
261pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
262
263/// Maximum blob size the recursive property-FTS extraction walk will emit.
264/// When the next complete leaf would push the blob past this cap, the walk
265/// stops on the preceding leaf boundary — the existing blob is indexed, the
266/// truncated leaf is not partially emitted.
267///
268/// This budget applies only to the recursive-walk portion of the blob.
269/// Scalar-prefix emissions and the scalar↔recursive [`LEAF_SEPARATOR`] token
270/// are not counted against it; the cap governs how much nested-subtree
271/// content is serialized, not the final blob length.
272pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
273
274/// Hard phrase-break separator inserted between two adjacent recursive
275/// leaves in the concatenated blob.
276///
277/// FTS5 phrase queries match on consecutive token positions, so simply
278/// inserting non-token whitespace or control characters does NOT prevent a
279/// phrase like `"foo bar"` from matching when `foo` ends one leaf and
280/// `bar` starts the next — the tokenizer would discard the control
281/// character and the two content tokens would still be adjacent.
282///
283/// To create a real position gap we embed a sentinel *token*
284/// `fathomdbphrasebreaksentinel` between leaves. Under `porter
285/// unicode61 remove_diacritics 2` this survives tokenization as its own
286/// position, so the content tokens on either side are separated by at
287/// least one intervening position and phrase queries spanning the gap
288/// cannot match. The sentinel is long, lowercase, and obviously
289/// synthetic to minimize the chance of accidental collisions with real
290/// content. It remains searchable as a word — callers should avoid
291/// passing it to `text_search` directly.
292///
293/// Validated by the integration test
294/// `leaf_separator_is_hard_phrase_break_under_unicode61_porter`.
295///
296/// Note: because the sentinel is a real token it may appear in FTS5
297/// `snippet()` output when the snippet window spans a leaf boundary.
298/// Phase 5 presentation-layer snippet post-processing is responsible for
299/// stripping it before the snippet is surfaced to callers.
300pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
301
302/// Whether a registered property-FTS path extracts a single scalar value
303/// or recursively walks the subtree rooted at the path.
304#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
305pub(crate) enum PropertyPathMode {
306    /// Legacy scalar extraction — resolve the path, append the scalar
307    /// (flattening arrays of scalars as before Phase 4).
308    #[default]
309    Scalar,
310    /// Recursively walk scalar leaves rooted at the path, emitting one
311    /// leaf per scalar with position-map tracking.
312    Recursive,
313}
314
315/// A single registered property-FTS path with its extraction mode.
316#[derive(Clone, Debug, PartialEq, Eq)]
317pub(crate) struct PropertyPathEntry {
318    pub path: String,
319    pub mode: PropertyPathMode,
320}
321
322impl PropertyPathEntry {
323    pub(crate) fn scalar(path: impl Into<String>) -> Self {
324        Self {
325            path: path.into(),
326            mode: PropertyPathMode::Scalar,
327        }
328    }
329
330    #[cfg(test)]
331    pub(crate) fn recursive(path: impl Into<String>) -> Self {
332        Self {
333            path: path.into(),
334            mode: PropertyPathMode::Recursive,
335        }
336    }
337}
338
339/// Parsed property-FTS schema definition for a single kind.
340#[derive(Clone, Debug, PartialEq, Eq)]
341pub(crate) struct PropertyFtsSchema {
342    /// Property paths to extract. Each entry is matched exactly against
343    /// the JSON tree from the node's root; see [`PropertyPathMode`].
344    pub paths: Vec<PropertyPathEntry>,
345    /// Separator inserted between adjacent leaf values in the extracted
346    /// blob. Acts as a hard phrase break under FTS5.
347    pub separator: String,
348    /// JSON paths to skip during recursive extraction. Matched as an
349    /// *exact* path equality — the walk visits each object/array before
350    /// descending into it, so listing `$.x.y` suppresses the entire
351    /// subtree rooted at `$.x.y`. Prefix matching is NOT supported: an
352    /// exclude of `$.x` does not implicitly exclude `$.x.y`, and
353    /// `$.payload.priv` does not implicitly exclude `$.payload.priv.inner`
354    /// via prefix — the walker will still descend into any subtree that
355    /// is not itself listed exactly in `exclude_paths`.
356    pub exclude_paths: Vec<String>,
357}
358
359/// One position-map entry: a half-open `[start, end)` byte range within the
360/// extracted blob plus the `JSONPath` of the scalar leaf whose value occupies
361/// that range.
362#[derive(Clone, Debug, PartialEq, Eq)]
363pub(crate) struct PositionEntry {
364    pub start_offset: usize,
365    pub end_offset: usize,
366    pub leaf_path: String,
367}
368
369/// Per-kind extraction stats accumulated by a recursive walk. Surface-level
370/// Phase 4 logging only; Phase 5 may expose these to callers.
371#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
372pub(crate) struct ExtractStats {
373    pub depth_cap_hit: usize,
374    /// `true` once the extracted blob has reached `MAX_EXTRACTED_BYTES` and
375    /// the walker has stopped accepting additional leaves. Unlike the
376    /// depth counter this is a boolean: the walker's `stopped` guard
377    /// blocks further `emit_leaf` calls after the first truncation.
378    pub byte_cap_reached: bool,
379    pub excluded_subtree: usize,
380}
381
382impl ExtractStats {
383    fn merge(&mut self, other: ExtractStats) {
384        self.depth_cap_hit += other.depth_cap_hit;
385        self.byte_cap_reached |= other.byte_cap_reached;
386        self.excluded_subtree += other.excluded_subtree;
387    }
388}
389
390struct PreparedWrite {
391    label: String,
392    nodes: Vec<NodeInsert>,
393    node_retires: Vec<NodeRetire>,
394    edges: Vec<EdgeInsert>,
395    edge_retires: Vec<EdgeRetire>,
396    chunks: Vec<ChunkInsert>,
397    runs: Vec<RunInsert>,
398    steps: Vec<StepInsert>,
399    actions: Vec<ActionInsert>,
400    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
401    /// consumed by the cfg-gated apply block.
402    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
403    vec_inserts: Vec<VecInsert>,
404    operational_writes: Vec<OperationalWrite>,
405    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
406    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
407    operational_validation_warnings: Vec<String>,
408    /// `node_logical_id` → kind for nodes co-submitted in this request.
409    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
410    node_kinds: HashMap<String, String>,
411    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
412    required_fts_rows: Vec<FtsProjectionRow>,
413    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
414    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
415    optional_backfills: Vec<OptionalProjectionTask>,
416}
417
418enum WriteMessage {
419    Submit {
420        prepared: Box<PreparedWrite>,
421        reply: Sender<Result<WriteReceipt, EngineError>>,
422    },
423    TouchLastAccessed {
424        request: LastAccessTouchRequest,
425        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
426    },
427}
428
429/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
430///
431/// On drop, the channel is closed and the writer thread is joined, ensuring all
432/// in-flight writes complete and the `SQLite` connection closes cleanly.
433#[derive(Debug)]
434pub struct WriterActor {
435    sender: ManuallyDrop<SyncSender<WriteMessage>>,
436    thread_handle: Option<thread::JoinHandle<()>>,
437    provenance_mode: ProvenanceMode,
438    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
439    _telemetry: Arc<TelemetryCounters>,
440}
441
442impl WriterActor {
443    /// # Errors
444    /// Returns [`EngineError`] if the writer thread cannot be spawned.
445    pub fn start(
446        path: impl AsRef<Path>,
447        schema_manager: Arc<SchemaManager>,
448        provenance_mode: ProvenanceMode,
449        telemetry: Arc<TelemetryCounters>,
450    ) -> Result<Self, EngineError> {
451        let database_path = path.as_ref().to_path_buf();
452        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
453
454        let writer_telemetry = Arc::clone(&telemetry);
455        let handle = thread::Builder::new()
456            .name("fathomdb-writer".to_owned())
457            .spawn(move || {
458                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
459            })
460            .map_err(EngineError::Io)?;
461
462        Ok(Self {
463            sender: ManuallyDrop::new(sender),
464            thread_handle: Some(handle),
465            provenance_mode,
466            _telemetry: telemetry,
467        })
468    }
469
470    /// Returns `true` if the writer thread is still running.
471    fn is_thread_alive(&self) -> bool {
472        self.thread_handle
473            .as_ref()
474            .is_some_and(|h| !h.is_finished())
475    }
476
477    /// Returns `WriterRejected` if the writer thread has exited.
478    fn check_thread_alive(&self) -> Result<(), EngineError> {
479        if self.is_thread_alive() {
480            Ok(())
481        } else {
482            Err(EngineError::WriterRejected(
483                "writer thread has exited".to_owned(),
484            ))
485        }
486    }
487
488    /// # Errors
489    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
490    /// down, or the underlying `SQLite` transaction fails.
491    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
492        self.check_thread_alive()?;
493        let prepared = prepare_write(request, self.provenance_mode)?;
494        let (reply_tx, reply_rx) = mpsc::channel();
495        self.sender
496            .send(WriteMessage::Submit {
497                prepared: Box::new(prepared),
498                reply: reply_tx,
499            })
500            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
501
502        recv_with_timeout(&reply_rx)
503    }
504
505    /// # Errors
506    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
507    /// or the underlying `SQLite` transaction fails.
508    pub fn touch_last_accessed(
509        &self,
510        request: LastAccessTouchRequest,
511    ) -> Result<LastAccessTouchReport, EngineError> {
512        self.check_thread_alive()?;
513        prepare_touch_last_accessed(&request, self.provenance_mode)?;
514        let (reply_tx, reply_rx) = mpsc::channel();
515        self.sender
516            .send(WriteMessage::TouchLastAccessed {
517                request,
518                reply: reply_tx,
519            })
520            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
521
522        recv_with_timeout(&reply_rx)
523    }
524}
525
526#[cfg(not(feature = "tracing"))]
527#[allow(clippy::print_stderr)]
528fn stderr_panic_notice() {
529    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
530}
531
532impl Drop for WriterActor {
533    fn drop(&mut self) {
534        // Phase 1: close the channel so the writer thread's `for msg in receiver`
535        // loop exits after finishing any in-progress message.
536        // Must happen BEFORE join to avoid deadlock.
537        //
538        // SAFETY: drop is called exactly once, and no method accesses the sender
539        // after drop begins.
540        unsafe { ManuallyDrop::drop(&mut self.sender) };
541
542        // Phase 2: join the writer thread to ensure the SQLite connection closes.
543        if let Some(handle) = self.thread_handle.take() {
544            match handle.join() {
545                Ok(()) => {}
546                Err(payload) => {
547                    if std::thread::panicking() {
548                        trace_warn!(
549                            "writer thread panicked during shutdown (suppressed: already panicking)"
550                        );
551                        #[cfg(not(feature = "tracing"))]
552                        stderr_panic_notice();
553                    } else {
554                        std::panic::resume_unwind(payload);
555                    }
556                }
557            }
558        }
559    }
560}
561
562/// Wait for a reply from the writer thread, with a timeout.
563fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
564    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
565        .map_err(|error| match error {
566            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
567                "write timed out waiting for writer thread reply — the write may still commit"
568                    .to_owned(),
569            ),
570            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
571        })
572        .and_then(|result| result)
573}
574
575fn prepare_touch_last_accessed(
576    request: &LastAccessTouchRequest,
577    mode: ProvenanceMode,
578) -> Result<(), EngineError> {
579    if request.logical_ids.is_empty() {
580        return Err(EngineError::InvalidWrite(
581            "touch_last_accessed requires at least one logical_id".to_owned(),
582        ));
583    }
584    for logical_id in &request.logical_ids {
585        if logical_id.trim().is_empty() {
586            return Err(EngineError::InvalidWrite(
587                "touch_last_accessed requires non-empty logical_ids".to_owned(),
588            ));
589        }
590    }
591    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
592        return Err(EngineError::InvalidWrite(
593            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
594                .to_owned(),
595        ));
596    }
597    Ok(())
598}
599
600fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
601    let missing: Vec<String> = request
602        .nodes
603        .iter()
604        .filter(|n| n.source_ref.is_none())
605        .map(|n| format!("node '{}'", n.logical_id))
606        .chain(
607            request
608                .node_retires
609                .iter()
610                .filter(|r| r.source_ref.is_none())
611                .map(|r| format!("node retire '{}'", r.logical_id)),
612        )
613        .chain(
614            request
615                .edges
616                .iter()
617                .filter(|e| e.source_ref.is_none())
618                .map(|e| format!("edge '{}'", e.logical_id)),
619        )
620        .chain(
621            request
622                .edge_retires
623                .iter()
624                .filter(|r| r.source_ref.is_none())
625                .map(|r| format!("edge retire '{}'", r.logical_id)),
626        )
627        .chain(
628            request
629                .runs
630                .iter()
631                .filter(|r| r.source_ref.is_none())
632                .map(|r| format!("run '{}'", r.id)),
633        )
634        .chain(
635            request
636                .steps
637                .iter()
638                .filter(|s| s.source_ref.is_none())
639                .map(|s| format!("step '{}'", s.id)),
640        )
641        .chain(
642            request
643                .actions
644                .iter()
645                .filter(|a| a.source_ref.is_none())
646                .map(|a| format!("action '{}'", a.id)),
647        )
648        .chain(
649            request
650                .operational_writes
651                .iter()
652                .filter(|write| operational_write_source_ref(write).is_none())
653                .map(|write| {
654                    format!(
655                        "operational {} '{}:{}'",
656                        operational_write_kind(write),
657                        operational_write_collection(write),
658                        operational_write_record_key(write)
659                    )
660                }),
661        )
662        .collect();
663
664    if missing.is_empty() {
665        Ok(())
666    } else {
667        Err(EngineError::InvalidWrite(format!(
668            "ProvenanceMode::Require: missing source_ref on: {}",
669            missing.join(", ")
670        )))
671    }
672}
673
674fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
675    if request.nodes.len() > MAX_NODES {
676        return Err(EngineError::InvalidWrite(format!(
677            "too many nodes: {} exceeds limit of {MAX_NODES}",
678            request.nodes.len()
679        )));
680    }
681    if request.edges.len() > MAX_EDGES {
682        return Err(EngineError::InvalidWrite(format!(
683            "too many edges: {} exceeds limit of {MAX_EDGES}",
684            request.edges.len()
685        )));
686    }
687    if request.chunks.len() > MAX_CHUNKS {
688        return Err(EngineError::InvalidWrite(format!(
689            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
690            request.chunks.len()
691        )));
692    }
693    let retires = request.node_retires.len() + request.edge_retires.len();
694    if retires > MAX_RETIRES {
695        return Err(EngineError::InvalidWrite(format!(
696            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
697        )));
698    }
699    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
700    if runtime_items > MAX_RUNTIME_ITEMS {
701        return Err(EngineError::InvalidWrite(format!(
702            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
703        )));
704    }
705    if request.operational_writes.len() > MAX_OPERATIONAL {
706        return Err(EngineError::InvalidWrite(format!(
707            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
708            request.operational_writes.len()
709        )));
710    }
711    let total = request.nodes.len()
712        + request.node_retires.len()
713        + request.edges.len()
714        + request.edge_retires.len()
715        + request.chunks.len()
716        + request.runs.len()
717        + request.steps.len()
718        + request.actions.len()
719        + request.vec_inserts.len()
720        + request.operational_writes.len();
721    if total > MAX_TOTAL_ITEMS {
722        return Err(EngineError::InvalidWrite(format!(
723            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
724        )));
725    }
726    Ok(())
727}
728
729#[allow(clippy::too_many_lines)]
730fn prepare_write(
731    request: WriteRequest,
732    mode: ProvenanceMode,
733) -> Result<PreparedWrite, EngineError> {
734    validate_request_size(&request)?;
735
736    // --- ID validation: reject empty IDs ---
737    for node in &request.nodes {
738        if node.row_id.is_empty() {
739            return Err(EngineError::InvalidWrite(
740                "NodeInsert has empty row_id".to_owned(),
741            ));
742        }
743        if node.logical_id.is_empty() {
744            return Err(EngineError::InvalidWrite(
745                "NodeInsert has empty logical_id".to_owned(),
746            ));
747        }
748    }
749    for edge in &request.edges {
750        if edge.row_id.is_empty() {
751            return Err(EngineError::InvalidWrite(
752                "EdgeInsert has empty row_id".to_owned(),
753            ));
754        }
755        if edge.logical_id.is_empty() {
756            return Err(EngineError::InvalidWrite(
757                "EdgeInsert has empty logical_id".to_owned(),
758            ));
759        }
760    }
761    for chunk in &request.chunks {
762        if chunk.id.is_empty() {
763            return Err(EngineError::InvalidWrite(
764                "ChunkInsert has empty id".to_owned(),
765            ));
766        }
767        if chunk.text_content.is_empty() {
768            return Err(EngineError::InvalidWrite(format!(
769                "chunk '{}' has empty text_content; empty chunks are not allowed",
770                chunk.id
771            )));
772        }
773    }
774    for run in &request.runs {
775        if run.id.is_empty() {
776            return Err(EngineError::InvalidWrite(
777                "RunInsert has empty id".to_owned(),
778            ));
779        }
780    }
781    for step in &request.steps {
782        if step.id.is_empty() {
783            return Err(EngineError::InvalidWrite(
784                "StepInsert has empty id".to_owned(),
785            ));
786        }
787    }
788    for action in &request.actions {
789        if action.id.is_empty() {
790            return Err(EngineError::InvalidWrite(
791                "ActionInsert has empty id".to_owned(),
792            ));
793        }
794    }
795    for vi in &request.vec_inserts {
796        if vi.chunk_id.is_empty() {
797            return Err(EngineError::InvalidWrite(
798                "VecInsert has empty chunk_id".to_owned(),
799            ));
800        }
801        if vi.embedding.is_empty() {
802            return Err(EngineError::InvalidWrite(format!(
803                "VecInsert for chunk '{}' has empty embedding",
804                vi.chunk_id
805            )));
806        }
807    }
808    for operational in &request.operational_writes {
809        if operational_write_collection(operational).is_empty() {
810            return Err(EngineError::InvalidWrite(
811                "OperationalWrite has empty collection".to_owned(),
812            ));
813        }
814        if operational_write_record_key(operational).is_empty() {
815            return Err(EngineError::InvalidWrite(format!(
816                "OperationalWrite for collection '{}' has empty record_key",
817                operational_write_collection(operational)
818            )));
819        }
820        match operational {
821            OperationalWrite::Append { payload_json, .. }
822            | OperationalWrite::Put { payload_json, .. } => {
823                if payload_json.is_empty() {
824                    return Err(EngineError::InvalidWrite(format!(
825                        "OperationalWrite {} '{}:{}' has empty payload_json",
826                        operational_write_kind(operational),
827                        operational_write_collection(operational),
828                        operational_write_record_key(operational)
829                    )));
830                }
831            }
832            OperationalWrite::Delete { .. } => {}
833        }
834    }
835
836    // --- ID validation: reject duplicate row_ids within the request ---
837    {
838        let mut seen = std::collections::HashSet::new();
839        for node in &request.nodes {
840            if !seen.insert(node.row_id.as_str()) {
841                return Err(EngineError::InvalidWrite(format!(
842                    "duplicate row_id '{}' within the same WriteRequest",
843                    node.row_id
844                )));
845            }
846        }
847        for edge in &request.edges {
848            if !seen.insert(edge.row_id.as_str()) {
849                return Err(EngineError::InvalidWrite(format!(
850                    "duplicate row_id '{}' within the same WriteRequest",
851                    edge.row_id
852                )));
853            }
854        }
855    }
856
857    // --- ProvenanceMode::Require enforcement ---
858    if mode == ProvenanceMode::Require {
859        check_require_provenance(&request)?;
860    }
861
862    // --- Runtime table upsert validation ---
863    for run in &request.runs {
864        if run.upsert && run.supersedes_id.is_none() {
865            return Err(EngineError::InvalidWrite(format!(
866                "run '{}': upsert=true requires supersedes_id to be set",
867                run.id
868            )));
869        }
870    }
871    for step in &request.steps {
872        if step.upsert && step.supersedes_id.is_none() {
873            return Err(EngineError::InvalidWrite(format!(
874                "step '{}': upsert=true requires supersedes_id to be set",
875                step.id
876            )));
877        }
878    }
879    for action in &request.actions {
880        if action.upsert && action.supersedes_id.is_none() {
881            return Err(EngineError::InvalidWrite(format!(
882                "action '{}': upsert=true requires supersedes_id to be set",
883                action.id
884            )));
885        }
886    }
887
888    let node_kinds = request
889        .nodes
890        .iter()
891        .map(|node| (node.logical_id.clone(), node.kind.clone()))
892        .collect::<HashMap<_, _>>();
893
894    Ok(PreparedWrite {
895        label: request.label,
896        nodes: request.nodes,
897        node_retires: request.node_retires,
898        edges: request.edges,
899        edge_retires: request.edge_retires,
900        chunks: request.chunks,
901        runs: request.runs,
902        steps: request.steps,
903        actions: request.actions,
904        vec_inserts: request.vec_inserts,
905        operational_writes: request.operational_writes,
906        operational_collection_kinds: HashMap::new(),
907        operational_collection_filter_fields: HashMap::new(),
908        operational_validation_warnings: Vec::new(),
909        node_kinds,
910        required_fts_rows: Vec::new(),
911        required_property_fts_rows: Vec::new(),
912        optional_backfills: request.optional_backfills,
913    })
914}
915
916fn writer_loop(
917    database_path: &Path,
918    schema_manager: &Arc<SchemaManager>,
919    receiver: mpsc::Receiver<WriteMessage>,
920    telemetry: &TelemetryCounters,
921) {
922    trace_info!("writer thread started");
923
924    let mut conn = match sqlite::open_connection(database_path) {
925        Ok(conn) => conn,
926        Err(error) => {
927            trace_error!(error = %error, "writer thread: database connection failed");
928            reject_all(receiver, &error.to_string());
929            return;
930        }
931    };
932
933    if let Err(error) = schema_manager.bootstrap(&conn) {
934        trace_error!(error = %error, "writer thread: schema bootstrap failed");
935        reject_all(receiver, &error.to_string());
936        return;
937    }
938
939    for message in receiver {
940        match message {
941            WriteMessage::Submit {
942                mut prepared,
943                reply,
944            } => {
945                #[cfg(feature = "tracing")]
946                let start = std::time::Instant::now();
947                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
948                    resolve_and_apply(&mut conn, &mut prepared)
949                }));
950                if let Ok(inner) = result {
951                    #[allow(unused_variables)]
952                    if let Err(error) = &inner {
953                        trace_error!(
954                            label = %prepared.label,
955                            error = %error,
956                            "write failed"
957                        );
958                        telemetry.increment_errors();
959                    } else {
960                        let row_count = (prepared.nodes.len()
961                            + prepared.edges.len()
962                            + prepared.chunks.len()) as u64;
963                        telemetry.increment_writes(row_count);
964                        trace_info!(
965                            label = %prepared.label,
966                            nodes = prepared.nodes.len(),
967                            edges = prepared.edges.len(),
968                            chunks = prepared.chunks.len(),
969                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
970                            "write committed"
971                        );
972                    }
973                    let _ = reply.send(inner);
974                } else {
975                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
976                    telemetry.increment_errors();
977                    // Attempt to clean up any open transaction after a panic.
978                    let _ = conn.execute_batch("ROLLBACK");
979                    let _ = reply.send(Err(EngineError::WriterRejected(
980                        "writer thread panic during resolve_and_apply".to_owned(),
981                    )));
982                }
983            }
984            WriteMessage::TouchLastAccessed { request, reply } => {
985                let result = apply_touch_last_accessed(&mut conn, &request);
986                if result.is_ok() {
987                    telemetry.increment_writes(0);
988                } else {
989                    telemetry.increment_errors();
990                }
991                let _ = reply.send(result);
992            }
993        }
994    }
995
996    trace_info!("writer thread shutting down");
997}
998
999fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1000    for message in receiver {
1001        match message {
1002            WriteMessage::Submit { reply, .. } => {
1003                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1004            }
1005            WriteMessage::TouchLastAccessed { reply, .. } => {
1006                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1007            }
1008        }
1009    }
1010}
1011
1012/// Resolve FTS projection rows before the write transaction begins.
1013///
1014/// For each chunk in the prepared write, determines the node `kind` needed
1015/// to populate the FTS index. If the node was co-submitted in the same
1016/// request its kind is taken from `prepared.node_kinds` without a DB query.
1017/// If the node is pre-existing it is looked up in the database. If the node
1018/// cannot be found in either place, an `InvalidWrite` error is returned.
1019fn resolve_fts_rows(
1020    conn: &rusqlite::Connection,
1021    prepared: &mut PreparedWrite,
1022) -> Result<(), EngineError> {
1023    let retiring_ids: std::collections::HashSet<&str> = prepared
1024        .node_retires
1025        .iter()
1026        .map(|r| r.logical_id.as_str())
1027        .collect();
1028    for chunk in &prepared.chunks {
1029        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1030            return Err(EngineError::InvalidWrite(format!(
1031                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1032                 WriteRequest; retire and chunk insertion for the same node must not be combined",
1033                chunk.id, chunk.node_logical_id
1034            )));
1035        }
1036    }
1037    for chunk in &prepared.chunks {
1038        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1039            k.clone()
1040        } else {
1041            match conn.query_row(
1042                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1043                params![chunk.node_logical_id],
1044                |row| row.get::<_, String>(0),
1045            ) {
1046                Ok(kind) => kind,
1047                Err(rusqlite::Error::QueryReturnedNoRows) => {
1048                    return Err(EngineError::InvalidWrite(format!(
1049                        "chunk '{}' references node_logical_id '{}' that is not present in this \
1050                         write request or the database \
1051                         (v1 limitation: chunks and their nodes must be submitted together or the \
1052                         node must already exist)",
1053                        chunk.id, chunk.node_logical_id
1054                    )));
1055                }
1056                Err(e) => return Err(EngineError::Sqlite(e)),
1057            }
1058        };
1059        prepared.required_fts_rows.push(FtsProjectionRow {
1060            chunk_id: chunk.id.clone(),
1061            node_logical_id: chunk.node_logical_id.clone(),
1062            kind,
1063            text_content: chunk.text_content.clone(),
1064        });
1065    }
1066    trace_debug!(
1067        fts_rows = prepared.required_fts_rows.len(),
1068        chunks_processed = prepared.chunks.len(),
1069        "fts row resolution completed"
1070    );
1071    Ok(())
1072}
1073
1074/// Load FTS property schemas from the database and extract property values from
1075/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
1076fn resolve_property_fts_rows(
1077    conn: &rusqlite::Connection,
1078    prepared: &mut PreparedWrite,
1079) -> Result<(), EngineError> {
1080    if prepared.nodes.is_empty() {
1081        return Ok(());
1082    }
1083
1084    let schemas: HashMap<String, PropertyFtsSchema> =
1085        load_fts_property_schemas(conn)?.into_iter().collect();
1086
1087    if schemas.is_empty() {
1088        return Ok(());
1089    }
1090
1091    let mut combined_stats = ExtractStats::default();
1092    for node in &prepared.nodes {
1093        let Some(schema) = schemas.get(&node.kind) else {
1094            continue;
1095        };
1096        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1097        let (text_content, positions, stats) = extract_property_fts(&props, schema);
1098        combined_stats.merge(stats);
1099        if let Some(text_content) = text_content {
1100            prepared
1101                .required_property_fts_rows
1102                .push(FtsPropertyProjectionRow {
1103                    node_logical_id: node.logical_id.clone(),
1104                    kind: node.kind.clone(),
1105                    text_content,
1106                    positions,
1107                });
1108        }
1109    }
1110    if combined_stats != ExtractStats::default() {
1111        trace_debug!(
1112            depth_cap_hit = combined_stats.depth_cap_hit,
1113            byte_cap_reached = combined_stats.byte_cap_reached,
1114            excluded_subtree = combined_stats.excluded_subtree,
1115            "property fts recursive extraction guardrails engaged"
1116        );
1117    }
1118    trace_debug!(
1119        property_fts_rows = prepared.required_property_fts_rows.len(),
1120        nodes_processed = prepared.nodes.len(),
1121        "property fts row resolution completed"
1122    );
1123    Ok(())
1124}
1125
1126/// Extract scalar values from a JSON object at a `$.field` path.
1127/// Supports simple dot-notation paths like `$.name`, `$.address.city`.
1128/// Returns individual scalars so callers can join them with the configured separator.
1129pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1130    let Some(path) = path.strip_prefix("$.") else {
1131        return Vec::new();
1132    };
1133    let mut current = value;
1134    for segment in path.split('.') {
1135        match current.get(segment) {
1136            Some(v) => current = v,
1137            None => return Vec::new(),
1138        }
1139    }
1140    match current {
1141        serde_json::Value::String(s) => vec![s.clone()],
1142        serde_json::Value::Number(n) => vec![n.to_string()],
1143        serde_json::Value::Bool(b) => vec![b.to_string()],
1144        serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1145        serde_json::Value::Array(arr) => arr
1146            .iter()
1147            .filter_map(|v| match v {
1148                serde_json::Value::String(s) => Some(s.clone()),
1149                serde_json::Value::Number(n) => Some(n.to_string()),
1150                serde_json::Value::Bool(b) => Some(b.to_string()),
1151                _ => None,
1152            })
1153            .collect(),
1154    }
1155}
1156
1157/// Core property-FTS extraction. Produces the concatenated blob, the
1158/// position map identifying which byte range in the blob came from which
1159/// JSON leaf path, and the guardrail stats for the walk.
1160///
1161/// Emission rules:
1162/// - Path entries are processed in their registered order.
1163/// - Scalar-mode entries append the scalar value(s) directly (arrays of
1164///   scalars flatten, matching pre-Phase-4 behavior). Scalar emissions do
1165///   not produce position-map entries — the position map is only populated
1166///   when a recursive path contributes a leaf.
1167/// - Recursive-mode entries walk every descendant scalar leaf in stable
1168///   lexicographic key order. Each leaf emits a position-map entry whose
1169///   `[start, end)` spans the leaf value's bytes in the blob (NOT the
1170///   trailing separator).
1171/// - Between any two emitted values (whether from the same or different
1172///   path entries) the walk inserts [`LEAF_SEPARATOR`], a hard phrase
1173///   break under FTS5's `porter unicode61` tokenizer. Scalar-mode emissions
1174///   between each other use the schema's configured `separator` for
1175///   backwards compatibility with Phase 0.
1176pub(crate) fn extract_property_fts(
1177    props: &serde_json::Value,
1178    schema: &PropertyFtsSchema,
1179) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1180    let mut walker = RecursiveWalker {
1181        blob: String::new(),
1182        positions: Vec::new(),
1183        stats: ExtractStats::default(),
1184        exclude_paths: schema.exclude_paths.clone(),
1185        stopped: false,
1186    };
1187
1188    let mut scalar_parts: Vec<String> = Vec::new();
1189
1190    for entry in &schema.paths {
1191        match entry.mode {
1192            PropertyPathMode::Scalar => {
1193                scalar_parts.extend(extract_json_path(props, &entry.path));
1194            }
1195            PropertyPathMode::Recursive => {
1196                let root = resolve_path_root(props, &entry.path);
1197                if let Some(root) = root {
1198                    walker.walk(&entry.path, root, 0);
1199                }
1200            }
1201        }
1202    }
1203
1204    // Flush scalar parts into the blob. Scalar emissions go first so that
1205    // their byte offsets are predictable relative to any recursive leaves
1206    // that follow. Scalars use the configured `separator`; the boundary
1207    // between scalars and recursive leaves uses `LEAF_SEPARATOR` (so phrase
1208    // queries cannot cross it).
1209    let scalar_text = if scalar_parts.is_empty() {
1210        None
1211    } else {
1212        Some(scalar_parts.join(&schema.separator))
1213    };
1214
1215    let combined = match (scalar_text, walker.blob.is_empty()) {
1216        (None, true) => None,
1217        (None, false) => Some(walker.blob.clone()),
1218        (Some(s), true) => Some(s),
1219        (Some(mut s), false) => {
1220            // Shift recursive positions by the prefix length so they stay
1221            // correct relative to the combined blob.
1222            let offset = s.len() + LEAF_SEPARATOR.len();
1223            for pos in &mut walker.positions {
1224                pos.start_offset += offset;
1225                pos.end_offset += offset;
1226            }
1227            s.push_str(LEAF_SEPARATOR);
1228            s.push_str(&walker.blob);
1229            Some(s)
1230        }
1231    };
1232
1233    (combined, walker.positions, walker.stats)
1234}
1235
1236fn resolve_path_root<'a>(
1237    value: &'a serde_json::Value,
1238    path: &str,
1239) -> Option<&'a serde_json::Value> {
1240    let stripped = path.strip_prefix("$.")?;
1241    let mut current = value;
1242    for segment in stripped.split('.') {
1243        current = current.get(segment)?;
1244    }
1245    Some(current)
1246}
1247
1248struct RecursiveWalker {
1249    blob: String,
1250    positions: Vec<PositionEntry>,
1251    stats: ExtractStats,
1252    exclude_paths: Vec<String>,
1253    /// Once the byte cap is hit, the walker refuses to emit any further
1254    /// leaves. Subsequent walks still account for depth/exclude stats but
1255    /// do not append.
1256    stopped: bool,
1257}
1258
1259impl RecursiveWalker {
1260    fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1261        if self.stopped {
1262            return;
1263        }
1264        if self.exclude_paths.iter().any(|p| p == current_path) {
1265            self.stats.excluded_subtree += 1;
1266            return;
1267        }
1268        match value {
1269            serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1270            serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1271            serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1272            serde_json::Value::Null => {}
1273            serde_json::Value::Object(map) => {
1274                if depth >= MAX_RECURSIVE_DEPTH {
1275                    self.stats.depth_cap_hit += 1;
1276                    return;
1277                }
1278                let mut keys: Vec<&String> = map.keys().collect();
1279                keys.sort();
1280                for key in keys {
1281                    if self.stopped {
1282                        return;
1283                    }
1284                    let child_path = format!("{current_path}.{key}");
1285                    if let Some(child) = map.get(key) {
1286                        self.walk(&child_path, child, depth + 1);
1287                    }
1288                }
1289            }
1290            serde_json::Value::Array(items) => {
1291                if depth >= MAX_RECURSIVE_DEPTH {
1292                    self.stats.depth_cap_hit += 1;
1293                    return;
1294                }
1295                for (idx, item) in items.iter().enumerate() {
1296                    if self.stopped {
1297                        return;
1298                    }
1299                    let child_path = format!("{current_path}[{idx}]");
1300                    self.walk(&child_path, item, depth + 1);
1301                }
1302            }
1303        }
1304    }
1305
1306    fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1307        if self.stopped {
1308            return;
1309        }
1310        if value.is_empty() {
1311            return;
1312        }
1313        // Compute the projected blob size if we accept this leaf.
1314        // If we already emitted at least one leaf, we must account for
1315        // the separator that precedes this one.
1316        let sep_len = if self.blob.is_empty() {
1317            0
1318        } else {
1319            LEAF_SEPARATOR.len()
1320        };
1321        let projected_len = self.blob.len() + sep_len + value.len();
1322        if projected_len > MAX_EXTRACTED_BYTES {
1323            self.stats.byte_cap_reached = true;
1324            self.stopped = true;
1325            return;
1326        }
1327        if !self.blob.is_empty() {
1328            self.blob.push_str(LEAF_SEPARATOR);
1329        }
1330        let start_offset = self.blob.len();
1331        self.blob.push_str(value);
1332        let end_offset = self.blob.len();
1333        self.positions.push(PositionEntry {
1334            start_offset,
1335            end_offset,
1336            leaf_path: leaf_path.to_owned(),
1337        });
1338    }
1339}
1340
1341/// Load all registered FTS property schemas from the database, tolerating
1342/// both the legacy JSON shape (array of bare path strings = scalar mode)
1343/// and the Phase 4 shape (objects carrying `path`, `mode`, optional
1344/// `exclude_paths`, or a top-level object carrying `paths` + global
1345/// `exclude_paths`).
1346pub(crate) fn load_fts_property_schemas(
1347    conn: &rusqlite::Connection,
1348) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1349    let mut stmt =
1350        conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1351    stmt.query_map([], |row| {
1352        let kind: String = row.get(0)?;
1353        let paths_json: String = row.get(1)?;
1354        let separator: String = row.get(2)?;
1355        let schema = parse_property_schema_json(&paths_json, &separator);
1356        Ok((kind, schema))
1357    })?
1358    .collect::<Result<Vec<_>, _>>()
1359}
1360
1361pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1362    let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1363    let mut paths = Vec::new();
1364    let mut exclude_paths: Vec<String> = Vec::new();
1365
1366    let path_values: Vec<serde_json::Value> = match value {
1367        serde_json::Value::Array(arr) => arr,
1368        serde_json::Value::Object(map) => {
1369            if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1370                exclude_paths = excl
1371                    .iter()
1372                    .filter_map(|v| v.as_str().map(str::to_owned))
1373                    .collect();
1374            }
1375            match map.get("paths") {
1376                Some(serde_json::Value::Array(arr)) => arr.clone(),
1377                _ => Vec::new(),
1378            }
1379        }
1380        _ => Vec::new(),
1381    };
1382
1383    for entry in path_values {
1384        match entry {
1385            serde_json::Value::String(path) => {
1386                paths.push(PropertyPathEntry::scalar(path));
1387            }
1388            serde_json::Value::Object(map) => {
1389                let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1390                    continue;
1391                };
1392                let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1393                    PropertyPathMode::Scalar,
1394                    |m| match m {
1395                        "recursive" => PropertyPathMode::Recursive,
1396                        _ => PropertyPathMode::Scalar,
1397                    },
1398                );
1399                paths.push(PropertyPathEntry {
1400                    path: path.to_owned(),
1401                    mode,
1402                });
1403                if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1404                    for p in excl {
1405                        if let Some(s) = p.as_str() {
1406                            exclude_paths.push(s.to_owned());
1407                        }
1408                    }
1409                }
1410            }
1411            _ => {}
1412        }
1413    }
1414
1415    PropertyFtsSchema {
1416        paths,
1417        separator: separator.to_owned(),
1418        exclude_paths,
1419    }
1420}
1421
1422fn resolve_operational_writes(
1423    conn: &rusqlite::Connection,
1424    prepared: &mut PreparedWrite,
1425) -> Result<(), EngineError> {
1426    let mut collection_kinds = HashMap::new();
1427    let mut collection_filter_fields = HashMap::new();
1428    let mut collection_validation_contracts = HashMap::new();
1429    for write in &prepared.operational_writes {
1430        let collection = operational_write_collection(write);
1431        if !collection_kinds.contains_key(collection) {
1432            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1433                .query_row(
1434                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1435                    params![collection],
1436                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1437                )
1438                .optional()
1439                .map_err(EngineError::Sqlite)?;
1440            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1441                .ok_or_else(|| {
1442                    EngineError::InvalidWrite(format!(
1443                        "operational collection '{collection}' is not registered"
1444                    ))
1445                })?;
1446            if disabled_at.is_some() {
1447                return Err(EngineError::InvalidWrite(format!(
1448                    "operational collection '{collection}' is disabled"
1449                )));
1450            }
1451            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1452                .map_err(EngineError::InvalidWrite)?;
1453            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1454            let validation_contract = parse_operational_validation_contract(&validation_json)
1455                .map_err(EngineError::InvalidWrite)?;
1456            collection_kinds.insert(collection.to_owned(), kind);
1457            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1458            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1459        }
1460
1461        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1462            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1463        })?;
1464        match (kind, write) {
1465            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1466            | (
1467                OperationalCollectionKind::LatestState,
1468                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1469            ) => {}
1470            (OperationalCollectionKind::AppendOnlyLog, _) => {
1471                return Err(EngineError::InvalidWrite(format!(
1472                    "operational collection '{collection}' is append_only_log and only accepts Append"
1473                )));
1474            }
1475            (OperationalCollectionKind::LatestState, _) => {
1476                return Err(EngineError::InvalidWrite(format!(
1477                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1478                )));
1479            }
1480        }
1481        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1482            let _ = check_operational_write_against_contract(write, contract)?;
1483        }
1484    }
1485    prepared.operational_collection_kinds = collection_kinds;
1486    prepared.operational_collection_filter_fields = collection_filter_fields;
1487    Ok(())
1488}
1489
1490fn parse_operational_filter_fields(
1491    filter_fields_json: &str,
1492) -> Result<Vec<OperationalFilterField>, EngineError> {
1493    let fields: Vec<OperationalFilterField> =
1494        serde_json::from_str(filter_fields_json).map_err(|error| {
1495            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1496        })?;
1497    let mut seen = std::collections::HashSet::new();
1498    for field in &fields {
1499        if field.name.trim().is_empty() {
1500            return Err(EngineError::InvalidWrite(
1501                "filter_fields_json field names must not be empty".to_owned(),
1502            ));
1503        }
1504        if !seen.insert(field.name.as_str()) {
1505            return Err(EngineError::InvalidWrite(format!(
1506                "filter_fields_json contains duplicate field '{}'",
1507                field.name
1508            )));
1509        }
1510        if field.modes.is_empty() {
1511            return Err(EngineError::InvalidWrite(format!(
1512                "filter_fields_json field '{}' must declare at least one mode",
1513                field.name
1514            )));
1515        }
1516        if field.modes.contains(&OperationalFilterMode::Prefix)
1517            && field.field_type != OperationalFilterFieldType::String
1518        {
1519            return Err(EngineError::InvalidWrite(format!(
1520                "filter field '{}' only supports prefix for string types",
1521                field.name
1522            )));
1523        }
1524    }
1525    Ok(fields)
1526}
1527
1528#[derive(Clone, Debug, PartialEq, Eq)]
1529struct OperationalFilterValueRow {
1530    field_name: String,
1531    string_value: Option<String>,
1532    integer_value: Option<i64>,
1533}
1534
1535fn extract_operational_filter_values(
1536    filter_fields: &[OperationalFilterField],
1537    payload_json: &str,
1538) -> Vec<OperationalFilterValueRow> {
1539    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1540        return Vec::new();
1541    };
1542    let Some(object) = parsed.as_object() else {
1543        return Vec::new();
1544    };
1545
1546    filter_fields
1547        .iter()
1548        .filter_map(|field| {
1549            let value = object.get(&field.name)?;
1550            match field.field_type {
1551                OperationalFilterFieldType::String => {
1552                    value
1553                        .as_str()
1554                        .map(|string_value| OperationalFilterValueRow {
1555                            field_name: field.name.clone(),
1556                            string_value: Some(string_value.to_owned()),
1557                            integer_value: None,
1558                        })
1559                }
1560                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1561                    value
1562                        .as_i64()
1563                        .map(|integer_value| OperationalFilterValueRow {
1564                            field_name: field.name.clone(),
1565                            string_value: None,
1566                            integer_value: Some(integer_value),
1567                        })
1568                }
1569            }
1570        })
1571        .collect()
1572}
1573
1574fn resolve_and_apply(
1575    conn: &mut rusqlite::Connection,
1576    prepared: &mut PreparedWrite,
1577) -> Result<WriteReceipt, EngineError> {
1578    resolve_fts_rows(conn, prepared)?;
1579    resolve_property_fts_rows(conn, prepared)?;
1580    resolve_operational_writes(conn, prepared)?;
1581    apply_write(conn, prepared)
1582}
1583
1584fn apply_touch_last_accessed(
1585    conn: &mut rusqlite::Connection,
1586    request: &LastAccessTouchRequest,
1587) -> Result<LastAccessTouchReport, EngineError> {
1588    let mut seen = std::collections::HashSet::new();
1589    let logical_ids = request
1590        .logical_ids
1591        .iter()
1592        .filter(|logical_id| seen.insert(logical_id.as_str()))
1593        .cloned()
1594        .collect::<Vec<_>>();
1595    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1596
1597    for logical_id in &logical_ids {
1598        let exists = tx
1599            .query_row(
1600                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1601                params![logical_id],
1602                |row| row.get::<_, i64>(0),
1603            )
1604            .optional()?
1605            .is_some();
1606        if !exists {
1607            return Err(EngineError::InvalidWrite(format!(
1608                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1609            )));
1610        }
1611    }
1612
1613    {
1614        let mut upsert_metadata = tx.prepare_cached(
1615            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1616             VALUES (?1, ?2, ?2) \
1617             ON CONFLICT(logical_id) DO UPDATE SET \
1618                 last_accessed_at = excluded.last_accessed_at, \
1619                 updated_at = excluded.updated_at",
1620        )?;
1621        let mut insert_provenance = tx.prepare_cached(
1622            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1623             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1624        )?;
1625        for logical_id in &logical_ids {
1626            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1627            insert_provenance.execute(params![
1628                new_id(),
1629                logical_id,
1630                request.source_ref.as_deref(),
1631                format!("{{\"touched_at\":{}}}", request.touched_at),
1632            ])?;
1633        }
1634    }
1635
1636    tx.commit()?;
1637    Ok(LastAccessTouchReport {
1638        touched_logical_ids: logical_ids.len(),
1639        touched_at: request.touched_at,
1640    })
1641}
1642
1643fn ensure_operational_collections_writable(
1644    tx: &rusqlite::Transaction<'_>,
1645    prepared: &PreparedWrite,
1646) -> Result<(), EngineError> {
1647    for collection in prepared.operational_collection_kinds.keys() {
1648        let disabled_at: Option<Option<i64>> = tx
1649            .query_row(
1650                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1651                params![collection],
1652                |row| row.get::<_, Option<i64>>(0),
1653            )
1654            .optional()?;
1655        match disabled_at {
1656            Some(Some(_)) => {
1657                return Err(EngineError::InvalidWrite(format!(
1658                    "operational collection '{collection}' is disabled"
1659                )));
1660            }
1661            Some(None) => {}
1662            None => {
1663                return Err(EngineError::InvalidWrite(format!(
1664                    "operational collection '{collection}' is not registered"
1665                )));
1666            }
1667        }
1668    }
1669    Ok(())
1670}
1671
1672fn validate_operational_writes_against_live_contracts(
1673    tx: &rusqlite::Transaction<'_>,
1674    prepared: &PreparedWrite,
1675) -> Result<Vec<String>, EngineError> {
1676    let mut collection_validation_contracts =
1677        HashMap::<String, Option<OperationalValidationContract>>::new();
1678    for collection in prepared.operational_collection_kinds.keys() {
1679        let validation_json: String = tx
1680            .query_row(
1681                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1682                params![collection],
1683                |row| row.get(0),
1684            )
1685            .map_err(EngineError::Sqlite)?;
1686        let validation_contract = parse_operational_validation_contract(&validation_json)
1687            .map_err(EngineError::InvalidWrite)?;
1688        collection_validation_contracts.insert(collection.clone(), validation_contract);
1689    }
1690
1691    let mut warnings = Vec::new();
1692    for write in &prepared.operational_writes {
1693        if let Some(Some(contract)) =
1694            collection_validation_contracts.get(operational_write_collection(write))
1695            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1696        {
1697            warnings.push(warning);
1698        }
1699    }
1700
1701    Ok(warnings)
1702}
1703
1704fn load_live_operational_secondary_indexes(
1705    tx: &rusqlite::Transaction<'_>,
1706    prepared: &PreparedWrite,
1707) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1708    let mut collection_indexes = HashMap::new();
1709    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1710        let secondary_indexes_json: String = tx
1711            .query_row(
1712                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1713                params![collection],
1714                |row| row.get(0),
1715            )
1716            .map_err(EngineError::Sqlite)?;
1717        let indexes =
1718            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1719                .map_err(EngineError::InvalidWrite)?;
1720        collection_indexes.insert(collection.clone(), indexes);
1721    }
1722    Ok(collection_indexes)
1723}
1724
1725fn check_operational_write_against_contract(
1726    write: &OperationalWrite,
1727    contract: &OperationalValidationContract,
1728) -> Result<Option<String>, EngineError> {
1729    if contract.mode == OperationalValidationMode::Disabled {
1730        return Ok(None);
1731    }
1732
1733    let (payload_json, collection, record_key) = match write {
1734        OperationalWrite::Append {
1735            collection,
1736            record_key,
1737            payload_json,
1738            ..
1739        }
1740        | OperationalWrite::Put {
1741            collection,
1742            record_key,
1743            payload_json,
1744            ..
1745        } => (
1746            payload_json.as_str(),
1747            collection.as_str(),
1748            record_key.as_str(),
1749        ),
1750        OperationalWrite::Delete { .. } => return Ok(None),
1751    };
1752
1753    match validate_operational_payload_against_contract(contract, payload_json) {
1754        Ok(()) => Ok(None),
1755        Err(message) => match contract.mode {
1756            OperationalValidationMode::Disabled => Ok(None),
1757            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1758                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1759                kind = operational_write_kind(write)
1760            ))),
1761            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1762                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1763                kind = operational_write_kind(write)
1764            ))),
1765        },
1766    }
1767}
1768
1769#[allow(clippy::too_many_lines)]
1770fn apply_write(
1771    conn: &mut rusqlite::Connection,
1772    prepared: &mut PreparedWrite,
1773) -> Result<WriteReceipt, EngineError> {
1774    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1775
1776    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1777    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1778    {
1779        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1780        let mut del_prop_fts =
1781            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1782        let mut del_prop_positions = tx
1783            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1784        let mut sup_node = tx.prepare_cached(
1785            "UPDATE nodes SET superseded_at = unixepoch() \
1786             WHERE logical_id = ?1 AND superseded_at IS NULL",
1787        )?;
1788        let mut ins_event = tx.prepare_cached(
1789            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1790             VALUES (?1, 'node_retire', ?2, ?3)",
1791        )?;
1792        for retire in &prepared.node_retires {
1793            del_fts.execute(params![retire.logical_id])?;
1794            del_prop_fts.execute(params![retire.logical_id])?;
1795            del_prop_positions.execute(params![retire.logical_id])?;
1796            sup_node.execute(params![retire.logical_id])?;
1797            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1798        }
1799    }
1800
1801    // Edge retires: mark superseded, record audit event.
1802    {
1803        let mut sup_edge = tx.prepare_cached(
1804            "UPDATE edges SET superseded_at = unixepoch() \
1805             WHERE logical_id = ?1 AND superseded_at IS NULL",
1806        )?;
1807        let mut ins_event = tx.prepare_cached(
1808            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1809             VALUES (?1, 'edge_retire', ?2, ?3)",
1810        )?;
1811        for retire in &prepared.edge_retires {
1812            sup_edge.execute(params![retire.logical_id])?;
1813            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1814        }
1815    }
1816
1817    // Node inserts (with optional upsert + chunk-policy handling).
1818    {
1819        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1820        let mut del_prop_fts =
1821            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1822        let mut del_prop_positions = tx
1823            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1824        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1825        let mut sup_node = tx.prepare_cached(
1826            "UPDATE nodes SET superseded_at = unixepoch() \
1827             WHERE logical_id = ?1 AND superseded_at IS NULL",
1828        )?;
1829        let mut ins_node = tx.prepare_cached(
1830            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1831             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1832        )?;
1833        #[cfg(feature = "sqlite-vec")]
1834        let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1835                            (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1836        #[cfg(feature = "sqlite-vec")]
1837        let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1838            Ok(stmt) => Some(stmt),
1839            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1840            Err(e) => return Err(e.into()),
1841        };
1842        for node in &prepared.nodes {
1843            if node.upsert {
1844                // Property FTS rows are always replaced on upsert since properties change.
1845                del_prop_fts.execute(params![node.logical_id])?;
1846                del_prop_positions.execute(params![node.logical_id])?;
1847                if node.chunk_policy == ChunkPolicy::Replace {
1848                    #[cfg(feature = "sqlite-vec")]
1849                    if let Some(ref mut stmt) = del_vec {
1850                        stmt.execute(params![node.logical_id])?;
1851                    }
1852                    del_fts.execute(params![node.logical_id])?;
1853                    del_chunks.execute(params![node.logical_id])?;
1854                }
1855                sup_node.execute(params![node.logical_id])?;
1856            }
1857            ins_node.execute(params![
1858                node.row_id,
1859                node.logical_id,
1860                node.kind,
1861                node.properties,
1862                node.source_ref,
1863                node.content_ref,
1864            ])?;
1865        }
1866    }
1867
1868    // Edge inserts (with optional upsert).
1869    {
1870        let mut sup_edge = tx.prepare_cached(
1871            "UPDATE edges SET superseded_at = unixepoch() \
1872             WHERE logical_id = ?1 AND superseded_at IS NULL",
1873        )?;
1874        let mut ins_edge = tx.prepare_cached(
1875            "INSERT INTO edges \
1876             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1877             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1878        )?;
1879        for edge in &prepared.edges {
1880            if edge.upsert {
1881                sup_edge.execute(params![edge.logical_id])?;
1882            }
1883            ins_edge.execute(params![
1884                edge.row_id,
1885                edge.logical_id,
1886                edge.source_logical_id,
1887                edge.target_logical_id,
1888                edge.kind,
1889                edge.properties,
1890                edge.source_ref,
1891            ])?;
1892        }
1893    }
1894
1895    // Chunk inserts.
1896    {
1897        let mut ins_chunk = tx.prepare_cached(
1898            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1899             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1900        )?;
1901        for chunk in &prepared.chunks {
1902            ins_chunk.execute(params![
1903                chunk.id,
1904                chunk.node_logical_id,
1905                chunk.text_content,
1906                chunk.byte_start,
1907                chunk.byte_end,
1908                chunk.content_hash,
1909            ])?;
1910        }
1911    }
1912
1913    // Run inserts (with optional upsert).
1914    {
1915        let mut sup_run = tx.prepare_cached(
1916            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1917        )?;
1918        let mut ins_run = tx.prepare_cached(
1919            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1920             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1921        )?;
1922        for run in &prepared.runs {
1923            if run.upsert
1924                && let Some(ref prior_id) = run.supersedes_id
1925            {
1926                sup_run.execute(params![prior_id])?;
1927            }
1928            ins_run.execute(params![
1929                run.id,
1930                run.kind,
1931                run.status,
1932                run.properties,
1933                run.source_ref
1934            ])?;
1935        }
1936    }
1937
1938    // Step inserts (with optional upsert).
1939    {
1940        let mut sup_step = tx.prepare_cached(
1941            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1942        )?;
1943        let mut ins_step = tx.prepare_cached(
1944            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1945             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1946        )?;
1947        for step in &prepared.steps {
1948            if step.upsert
1949                && let Some(ref prior_id) = step.supersedes_id
1950            {
1951                sup_step.execute(params![prior_id])?;
1952            }
1953            ins_step.execute(params![
1954                step.id,
1955                step.run_id,
1956                step.kind,
1957                step.status,
1958                step.properties,
1959                step.source_ref,
1960            ])?;
1961        }
1962    }
1963
1964    // Action inserts (with optional upsert).
1965    {
1966        let mut sup_action = tx.prepare_cached(
1967            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1968        )?;
1969        let mut ins_action = tx.prepare_cached(
1970            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1971             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1972        )?;
1973        for action in &prepared.actions {
1974            if action.upsert
1975                && let Some(ref prior_id) = action.supersedes_id
1976            {
1977                sup_action.execute(params![prior_id])?;
1978            }
1979            ins_action.execute(params![
1980                action.id,
1981                action.step_id,
1982                action.kind,
1983                action.status,
1984                action.properties,
1985                action.source_ref,
1986            ])?;
1987        }
1988    }
1989
1990    // Operational mutation log writes and latest-state current rows.
1991    {
1992        ensure_operational_collections_writable(&tx, prepared)?;
1993        prepared.operational_validation_warnings =
1994            validate_operational_writes_against_live_contracts(&tx, prepared)?;
1995        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1996
1997        let mut next_mutation_order: i64 = tx.query_row(
1998            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1999            [],
2000            |row| row.get(0),
2001        )?;
2002        let mut ins_mutation = tx.prepare_cached(
2003            "INSERT INTO operational_mutations \
2004             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2005             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2006        )?;
2007        let mut ins_filter_value = tx.prepare_cached(
2008            "INSERT INTO operational_filter_values \
2009             (mutation_id, collection_name, field_name, string_value, integer_value) \
2010             VALUES (?1, ?2, ?3, ?4, ?5)",
2011        )?;
2012        let mut upsert_current = tx.prepare_cached(
2013            "INSERT INTO operational_current \
2014             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2015             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2016             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2017                 payload_json = excluded.payload_json, \
2018                 updated_at = excluded.updated_at, \
2019                 last_mutation_id = excluded.last_mutation_id",
2020        )?;
2021        let mut del_current = tx.prepare_cached(
2022            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2023        )?;
2024        let mut del_current_secondary_indexes = tx.prepare_cached(
2025            "DELETE FROM operational_secondary_index_entries \
2026             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2027        )?;
2028        let mut ins_secondary_index = tx.prepare_cached(
2029            "INSERT INTO operational_secondary_index_entries \
2030             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2031              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2032             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2033        )?;
2034        let mut current_row_stmt = tx.prepare_cached(
2035            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2036             WHERE collection_name = ?1 AND record_key = ?2",
2037        )?;
2038
2039        for write in &prepared.operational_writes {
2040            let collection = operational_write_collection(write);
2041            let record_key = operational_write_record_key(write);
2042            let mutation_id = new_id();
2043            next_mutation_order += 1;
2044            let payload_json = operational_write_payload(write);
2045            ins_mutation.execute(params![
2046                &mutation_id,
2047                collection,
2048                record_key,
2049                operational_write_kind(write),
2050                payload_json,
2051                operational_write_source_ref(write),
2052                next_mutation_order,
2053            ])?;
2054            if let Some(indexes) = collection_secondary_indexes.get(collection) {
2055                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2056                    ins_secondary_index.execute(params![
2057                        collection,
2058                        entry.index_name,
2059                        "mutation",
2060                        &mutation_id,
2061                        record_key,
2062                        entry.sort_timestamp,
2063                        entry.slot1_text,
2064                        entry.slot1_integer,
2065                        entry.slot2_text,
2066                        entry.slot2_integer,
2067                        entry.slot3_text,
2068                        entry.slot3_integer,
2069                    ])?;
2070                }
2071            }
2072            if let Some(filter_fields) = prepared
2073                .operational_collection_filter_fields
2074                .get(collection)
2075            {
2076                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2077                    ins_filter_value.execute(params![
2078                        &mutation_id,
2079                        collection,
2080                        filter_value.field_name,
2081                        filter_value.string_value,
2082                        filter_value.integer_value,
2083                    ])?;
2084                }
2085            }
2086
2087            if prepared.operational_collection_kinds.get(collection)
2088                == Some(&OperationalCollectionKind::LatestState)
2089            {
2090                del_current_secondary_indexes.execute(params![collection, record_key])?;
2091                match write {
2092                    OperationalWrite::Put { payload_json, .. } => {
2093                        upsert_current.execute(params![
2094                            collection,
2095                            record_key,
2096                            payload_json,
2097                            &mutation_id,
2098                        ])?;
2099                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
2100                            let (current_payload_json, updated_at, last_mutation_id): (
2101                                String,
2102                                i64,
2103                                String,
2104                            ) = current_row_stmt
2105                                .query_row(params![collection, record_key], |row| {
2106                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2107                                })?;
2108                            for entry in extract_secondary_index_entries_for_current(
2109                                indexes,
2110                                &current_payload_json,
2111                                updated_at,
2112                            ) {
2113                                ins_secondary_index.execute(params![
2114                                    collection,
2115                                    entry.index_name,
2116                                    "current",
2117                                    last_mutation_id.as_str(),
2118                                    record_key,
2119                                    entry.sort_timestamp,
2120                                    entry.slot1_text,
2121                                    entry.slot1_integer,
2122                                    entry.slot2_text,
2123                                    entry.slot2_integer,
2124                                    entry.slot3_text,
2125                                    entry.slot3_integer,
2126                                ])?;
2127                            }
2128                        }
2129                    }
2130                    OperationalWrite::Delete { .. } => {
2131                        del_current.execute(params![collection, record_key])?;
2132                    }
2133                    OperationalWrite::Append { .. } => {}
2134                }
2135            }
2136        }
2137    }
2138
2139    // FTS row inserts.
2140    {
2141        let mut ins_fts = tx.prepare_cached(
2142            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2143             VALUES (?1, ?2, ?3, ?4)",
2144        )?;
2145        for fts_row in &prepared.required_fts_rows {
2146            ins_fts.execute(params![
2147                fts_row.chunk_id,
2148                fts_row.node_logical_id,
2149                fts_row.kind,
2150                fts_row.text_content,
2151            ])?;
2152        }
2153    }
2154
2155    // Property FTS row inserts.
2156    if !prepared.required_property_fts_rows.is_empty() {
2157        let mut ins_prop_fts = tx.prepare_cached(
2158            "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
2159             VALUES (?1, ?2, ?3)",
2160        )?;
2161        let mut ins_positions = tx.prepare_cached(
2162            "INSERT INTO fts_node_property_positions \
2163             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2164             VALUES (?1, ?2, ?3, ?4, ?5)",
2165        )?;
2166        // Delete any stale position rows for these nodes; writer already
2167        // deleted the `fts_node_properties` rows above.
2168        let mut del_positions = tx
2169            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2170        for row in &prepared.required_property_fts_rows {
2171            del_positions.execute(params![row.node_logical_id])?;
2172            ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
2173            for pos in &row.positions {
2174                ins_positions.execute(params![
2175                    row.node_logical_id,
2176                    row.kind,
2177                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2178                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2179                    pos.leaf_path,
2180                ])?;
2181            }
2182        }
2183    }
2184
2185    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
2186    #[cfg(feature = "sqlite-vec")]
2187    {
2188        match tx
2189            .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2190        {
2191            Ok(mut ins_vec) => {
2192                for vi in &prepared.vec_inserts {
2193                    let bytes: Vec<u8> =
2194                        vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2195                    ins_vec.execute(params![vi.chunk_id, bytes])?;
2196                }
2197            }
2198            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2199                // vec profile absent: vec inserts are silently skipped.
2200            }
2201            Err(e) => return Err(e.into()),
2202        }
2203    }
2204
2205    tx.commit()?;
2206
2207    let provenance_warnings: Vec<String> = prepared
2208        .nodes
2209        .iter()
2210        .filter(|node| node.source_ref.is_none())
2211        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2212        .chain(
2213            prepared
2214                .node_retires
2215                .iter()
2216                .filter(|r| r.source_ref.is_none())
2217                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2218        )
2219        .chain(
2220            prepared
2221                .edges
2222                .iter()
2223                .filter(|e| e.source_ref.is_none())
2224                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2225        )
2226        .chain(
2227            prepared
2228                .edge_retires
2229                .iter()
2230                .filter(|r| r.source_ref.is_none())
2231                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2232        )
2233        .chain(
2234            prepared
2235                .runs
2236                .iter()
2237                .filter(|r| r.source_ref.is_none())
2238                .map(|r| format!("run '{}' has no source_ref", r.id)),
2239        )
2240        .chain(
2241            prepared
2242                .steps
2243                .iter()
2244                .filter(|s| s.source_ref.is_none())
2245                .map(|s| format!("step '{}' has no source_ref", s.id)),
2246        )
2247        .chain(
2248            prepared
2249                .actions
2250                .iter()
2251                .filter(|a| a.source_ref.is_none())
2252                .map(|a| format!("action '{}' has no source_ref", a.id)),
2253        )
2254        .chain(
2255            prepared
2256                .operational_writes
2257                .iter()
2258                .filter(|write| operational_write_source_ref(write).is_none())
2259                .map(|write| {
2260                    format!(
2261                        "operational {} '{}:{}' has no source_ref",
2262                        operational_write_kind(write),
2263                        operational_write_collection(write),
2264                        operational_write_record_key(write)
2265                    )
2266                }),
2267        )
2268        .collect();
2269
2270    let mut warnings = provenance_warnings.clone();
2271    warnings.extend(prepared.operational_validation_warnings.clone());
2272
2273    Ok(WriteReceipt {
2274        label: prepared.label.clone(),
2275        optional_backfill_count: prepared.optional_backfills.len(),
2276        warnings,
2277        provenance_warnings,
2278    })
2279}
2280
2281fn operational_write_collection(write: &OperationalWrite) -> &str {
2282    match write {
2283        OperationalWrite::Append { collection, .. }
2284        | OperationalWrite::Put { collection, .. }
2285        | OperationalWrite::Delete { collection, .. } => collection,
2286    }
2287}
2288
2289fn operational_write_record_key(write: &OperationalWrite) -> &str {
2290    match write {
2291        OperationalWrite::Append { record_key, .. }
2292        | OperationalWrite::Put { record_key, .. }
2293        | OperationalWrite::Delete { record_key, .. } => record_key,
2294    }
2295}
2296
2297fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2298    match write {
2299        OperationalWrite::Append { .. } => "append",
2300        OperationalWrite::Put { .. } => "put",
2301        OperationalWrite::Delete { .. } => "delete",
2302    }
2303}
2304
2305fn operational_write_payload(write: &OperationalWrite) -> &str {
2306    match write {
2307        OperationalWrite::Append { payload_json, .. }
2308        | OperationalWrite::Put { payload_json, .. } => payload_json,
2309        OperationalWrite::Delete { .. } => "null",
2310    }
2311}
2312
2313fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2314    match write {
2315        OperationalWrite::Append { source_ref, .. }
2316        | OperationalWrite::Put { source_ref, .. }
2317        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2318    }
2319}
2320
2321#[cfg(test)]
2322#[allow(clippy::expect_used)]
2323mod tests {
2324    use std::sync::Arc;
2325
2326    use fathomdb_schema::SchemaManager;
2327    use tempfile::NamedTempFile;
2328
2329    use super::{apply_write, prepare_write, resolve_operational_writes};
2330    use crate::{
2331        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2332        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2333        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2334        projection::ProjectionTarget,
2335    };
2336
2337    #[test]
2338    fn writer_executes_runtime_table_rows() {
2339        let db = NamedTempFile::new().expect("temporary db");
2340        let writer = WriterActor::start(
2341            db.path(),
2342            Arc::new(SchemaManager::new()),
2343            ProvenanceMode::Warn,
2344            Arc::new(TelemetryCounters::default()),
2345        )
2346        .expect("writer");
2347
2348        let receipt = writer
2349            .submit(WriteRequest {
2350                label: "runtime".to_owned(),
2351                nodes: vec![],
2352                node_retires: vec![],
2353                edges: vec![],
2354                edge_retires: vec![],
2355                chunks: vec![],
2356                runs: vec![RunInsert {
2357                    id: "run-1".to_owned(),
2358                    kind: "session".to_owned(),
2359                    status: "completed".to_owned(),
2360                    properties: "{}".to_owned(),
2361                    source_ref: Some("src-1".to_owned()),
2362                    upsert: false,
2363                    supersedes_id: None,
2364                }],
2365                steps: vec![StepInsert {
2366                    id: "step-1".to_owned(),
2367                    run_id: "run-1".to_owned(),
2368                    kind: "llm".to_owned(),
2369                    status: "completed".to_owned(),
2370                    properties: "{}".to_owned(),
2371                    source_ref: Some("src-1".to_owned()),
2372                    upsert: false,
2373                    supersedes_id: None,
2374                }],
2375                actions: vec![ActionInsert {
2376                    id: "action-1".to_owned(),
2377                    step_id: "step-1".to_owned(),
2378                    kind: "emit".to_owned(),
2379                    status: "completed".to_owned(),
2380                    properties: "{}".to_owned(),
2381                    source_ref: Some("src-1".to_owned()),
2382                    upsert: false,
2383                    supersedes_id: None,
2384                }],
2385                optional_backfills: vec![],
2386                vec_inserts: vec![],
2387                operational_writes: vec![],
2388            })
2389            .expect("write receipt");
2390
2391        assert_eq!(receipt.label, "runtime");
2392    }
2393
2394    #[test]
2395    fn writer_put_operational_write_updates_current_and_mutations() {
2396        let db = NamedTempFile::new().expect("temporary db");
2397        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2398        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2399        conn.execute(
2400            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2401             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2402            [],
2403        )
2404        .expect("seed collection");
2405        drop(conn);
2406        let writer = WriterActor::start(
2407            db.path(),
2408            Arc::new(SchemaManager::new()),
2409            ProvenanceMode::Warn,
2410            Arc::new(TelemetryCounters::default()),
2411        )
2412        .expect("writer");
2413
2414        writer
2415            .submit(WriteRequest {
2416                label: "node-and-operational".to_owned(),
2417                nodes: vec![NodeInsert {
2418                    row_id: "row-1".to_owned(),
2419                    logical_id: "lg-1".to_owned(),
2420                    kind: "Meeting".to_owned(),
2421                    properties: "{}".to_owned(),
2422                    source_ref: Some("src-1".to_owned()),
2423                    upsert: false,
2424                    chunk_policy: ChunkPolicy::Preserve,
2425                    content_ref: None,
2426                }],
2427                node_retires: vec![],
2428                edges: vec![],
2429                edge_retires: vec![],
2430                chunks: vec![],
2431                runs: vec![],
2432                steps: vec![],
2433                actions: vec![],
2434                optional_backfills: vec![],
2435                vec_inserts: vec![],
2436                operational_writes: vec![OperationalWrite::Put {
2437                    collection: "connector_health".to_owned(),
2438                    record_key: "gmail".to_owned(),
2439                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2440                    source_ref: Some("src-1".to_owned()),
2441                }],
2442            })
2443            .expect("write receipt");
2444
2445        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2446        let node_count: i64 = conn
2447            .query_row(
2448                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2449                [],
2450                |row| row.get(0),
2451            )
2452            .expect("node count");
2453        assert_eq!(node_count, 1);
2454        let mutation_count: i64 = conn
2455            .query_row(
2456                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2457                 AND record_key = 'gmail'",
2458                [],
2459                |row| row.get(0),
2460            )
2461            .expect("mutation count");
2462        assert_eq!(mutation_count, 1);
2463        let payload: String = conn
2464            .query_row(
2465                "SELECT payload_json FROM operational_current \
2466                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2467                [],
2468                |row| row.get(0),
2469            )
2470            .expect("current payload");
2471        assert_eq!(payload, r#"{"status":"ok"}"#);
2472    }
2473
2474    #[test]
2475    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2476        let db = NamedTempFile::new().expect("temporary db");
2477        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2478        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2479        conn.execute(
2480            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2481             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2482            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2483        )
2484        .expect("seed collection");
2485        drop(conn);
2486        let writer = WriterActor::start(
2487            db.path(),
2488            Arc::new(SchemaManager::new()),
2489            ProvenanceMode::Warn,
2490            Arc::new(TelemetryCounters::default()),
2491        )
2492        .expect("writer");
2493
2494        writer
2495            .submit(WriteRequest {
2496                label: "disabled-validation".to_owned(),
2497                nodes: vec![],
2498                node_retires: vec![],
2499                edges: vec![],
2500                edge_retires: vec![],
2501                chunks: vec![],
2502                runs: vec![],
2503                steps: vec![],
2504                actions: vec![],
2505                optional_backfills: vec![],
2506                vec_inserts: vec![],
2507                operational_writes: vec![OperationalWrite::Put {
2508                    collection: "connector_health".to_owned(),
2509                    record_key: "gmail".to_owned(),
2510                    payload_json: r#"{"bogus":true}"#.to_owned(),
2511                    source_ref: Some("src-1".to_owned()),
2512                }],
2513            })
2514            .expect("write receipt");
2515
2516        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2517        let payload: String = conn
2518            .query_row(
2519                "SELECT payload_json FROM operational_current \
2520                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2521                [],
2522                |row| row.get(0),
2523            )
2524            .expect("current payload");
2525        assert_eq!(payload, r#"{"bogus":true}"#);
2526    }
2527
2528    #[test]
2529    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2530        let db = NamedTempFile::new().expect("temporary db");
2531        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2532        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2533        conn.execute(
2534            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2535             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2536            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2537        )
2538        .expect("seed collection");
2539        drop(conn);
2540        let writer = WriterActor::start(
2541            db.path(),
2542            Arc::new(SchemaManager::new()),
2543            ProvenanceMode::Warn,
2544            Arc::new(TelemetryCounters::default()),
2545        )
2546        .expect("writer");
2547
2548        let receipt = writer
2549            .submit(WriteRequest {
2550                label: "report-only-validation".to_owned(),
2551                nodes: vec![],
2552                node_retires: vec![],
2553                edges: vec![],
2554                edge_retires: vec![],
2555                chunks: vec![],
2556                runs: vec![],
2557                steps: vec![],
2558                actions: vec![],
2559                optional_backfills: vec![],
2560                vec_inserts: vec![],
2561                operational_writes: vec![OperationalWrite::Put {
2562                    collection: "connector_health".to_owned(),
2563                    record_key: "gmail".to_owned(),
2564                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2565                    source_ref: Some("src-1".to_owned()),
2566                }],
2567            })
2568            .expect("report_only write should succeed");
2569
2570        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2571        assert_eq!(receipt.warnings.len(), 1);
2572        assert!(
2573            receipt.warnings[0].contains("connector_health"),
2574            "warning should identify collection"
2575        );
2576        assert!(
2577            receipt.warnings[0].contains("must be one of"),
2578            "warning should explain validation failure"
2579        );
2580
2581        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2582        let payload: String = conn
2583            .query_row(
2584                "SELECT payload_json FROM operational_current \
2585                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2586                [],
2587                |row| row.get(0),
2588            )
2589            .expect("current payload");
2590        assert_eq!(payload, r#"{"status":"bogus"}"#);
2591    }
2592
2593    #[test]
2594    fn writer_rejects_operational_write_for_missing_collection() {
2595        let db = NamedTempFile::new().expect("temporary db");
2596        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2597        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2598        drop(conn);
2599        let writer = WriterActor::start(
2600            db.path(),
2601            Arc::new(SchemaManager::new()),
2602            ProvenanceMode::Warn,
2603            Arc::new(TelemetryCounters::default()),
2604        )
2605        .expect("writer");
2606
2607        let result = writer.submit(WriteRequest {
2608            label: "missing-operational-collection".to_owned(),
2609            nodes: vec![],
2610            node_retires: vec![],
2611            edges: vec![],
2612            edge_retires: vec![],
2613            chunks: vec![],
2614            runs: vec![],
2615            steps: vec![],
2616            actions: vec![],
2617            optional_backfills: vec![],
2618            vec_inserts: vec![],
2619            operational_writes: vec![OperationalWrite::Put {
2620                collection: "connector_health".to_owned(),
2621                record_key: "gmail".to_owned(),
2622                payload_json: r#"{"status":"ok"}"#.to_owned(),
2623                source_ref: Some("src-1".to_owned()),
2624            }],
2625        });
2626
2627        assert!(
2628            matches!(result, Err(EngineError::InvalidWrite(_))),
2629            "missing operational collection must return InvalidWrite"
2630        );
2631    }
2632
2633    #[test]
2634    fn writer_append_operational_write_records_history_without_current_row() {
2635        let db = NamedTempFile::new().expect("temporary db");
2636        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2637        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2638        conn.execute(
2639            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2640             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2641            [],
2642        )
2643        .expect("seed collection");
2644        drop(conn);
2645        let writer = WriterActor::start(
2646            db.path(),
2647            Arc::new(SchemaManager::new()),
2648            ProvenanceMode::Warn,
2649            Arc::new(TelemetryCounters::default()),
2650        )
2651        .expect("writer");
2652
2653        writer
2654            .submit(WriteRequest {
2655                label: "append-operational".to_owned(),
2656                nodes: vec![],
2657                node_retires: vec![],
2658                edges: vec![],
2659                edge_retires: vec![],
2660                chunks: vec![],
2661                runs: vec![],
2662                steps: vec![],
2663                actions: vec![],
2664                optional_backfills: vec![],
2665                vec_inserts: vec![],
2666                operational_writes: vec![OperationalWrite::Append {
2667                    collection: "audit_log".to_owned(),
2668                    record_key: "evt-1".to_owned(),
2669                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2670                    source_ref: Some("src-1".to_owned()),
2671                }],
2672            })
2673            .expect("write receipt");
2674
2675        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2676        let mutation: (String, String) = conn
2677            .query_row(
2678                "SELECT op_kind, payload_json FROM operational_mutations \
2679                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2680                [],
2681                |row| Ok((row.get(0)?, row.get(1)?)),
2682            )
2683            .expect("mutation row");
2684        assert_eq!(mutation.0, "append");
2685        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2686        let current_count: i64 = conn
2687            .query_row(
2688                "SELECT count(*) FROM operational_current \
2689                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2690                [],
2691                |row| row.get(0),
2692            )
2693            .expect("current count");
2694        assert_eq!(current_count, 0);
2695    }
2696
2697    #[test]
2698    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2699        let db = NamedTempFile::new().expect("temporary db");
2700        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2701        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2702        conn.execute(
2703            "INSERT INTO operational_collections \
2704             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2705             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2706                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2707            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2708        )
2709        .expect("seed collection");
2710        drop(conn);
2711        let writer = WriterActor::start(
2712            db.path(),
2713            Arc::new(SchemaManager::new()),
2714            ProvenanceMode::Warn,
2715            Arc::new(TelemetryCounters::default()),
2716        )
2717        .expect("writer");
2718
2719        let error = writer
2720            .submit(WriteRequest {
2721                label: "invalid-append".to_owned(),
2722                nodes: vec![],
2723                node_retires: vec![],
2724                edges: vec![],
2725                edge_retires: vec![],
2726                chunks: vec![],
2727                runs: vec![],
2728                steps: vec![],
2729                actions: vec![],
2730                optional_backfills: vec![],
2731                vec_inserts: vec![],
2732                operational_writes: vec![OperationalWrite::Append {
2733                    collection: "audit_log".to_owned(),
2734                    record_key: "evt-1".to_owned(),
2735                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2736                    source_ref: Some("src-1".to_owned()),
2737                }],
2738            })
2739            .expect_err("invalid append must reject");
2740        assert!(matches!(error, EngineError::InvalidWrite(_)));
2741        assert!(error.to_string().contains("must be one of"));
2742
2743        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2744        let mutation_count: i64 = conn
2745            .query_row(
2746                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2747                [],
2748                |row| row.get(0),
2749            )
2750            .expect("mutation count");
2751        assert_eq!(mutation_count, 0);
2752        let filter_count: i64 = conn
2753            .query_row(
2754                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2755                [],
2756                |row| row.get(0),
2757            )
2758            .expect("filter count");
2759        assert_eq!(filter_count, 0);
2760    }
2761
2762    #[test]
2763    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2764        let db = NamedTempFile::new().expect("temporary db");
2765        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2766        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2767        conn.execute(
2768            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2769             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2770            [],
2771        )
2772        .expect("seed collection");
2773        drop(conn);
2774        let writer = WriterActor::start(
2775            db.path(),
2776            Arc::new(SchemaManager::new()),
2777            ProvenanceMode::Warn,
2778            Arc::new(TelemetryCounters::default()),
2779        )
2780        .expect("writer");
2781
2782        writer
2783            .submit(WriteRequest {
2784                label: "put-operational".to_owned(),
2785                nodes: vec![],
2786                node_retires: vec![],
2787                edges: vec![],
2788                edge_retires: vec![],
2789                chunks: vec![],
2790                runs: vec![],
2791                steps: vec![],
2792                actions: vec![],
2793                optional_backfills: vec![],
2794                vec_inserts: vec![],
2795                operational_writes: vec![OperationalWrite::Put {
2796                    collection: "connector_health".to_owned(),
2797                    record_key: "gmail".to_owned(),
2798                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2799                    source_ref: Some("src-1".to_owned()),
2800                }],
2801            })
2802            .expect("put receipt");
2803
2804        writer
2805            .submit(WriteRequest {
2806                label: "delete-operational".to_owned(),
2807                nodes: vec![],
2808                node_retires: vec![],
2809                edges: vec![],
2810                edge_retires: vec![],
2811                chunks: vec![],
2812                runs: vec![],
2813                steps: vec![],
2814                actions: vec![],
2815                optional_backfills: vec![],
2816                vec_inserts: vec![],
2817                operational_writes: vec![OperationalWrite::Delete {
2818                    collection: "connector_health".to_owned(),
2819                    record_key: "gmail".to_owned(),
2820                    source_ref: Some("src-2".to_owned()),
2821                }],
2822            })
2823            .expect("delete receipt");
2824
2825        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2826        let mutation_kinds: Vec<String> = {
2827            let mut stmt = conn
2828                .prepare(
2829                    "SELECT op_kind FROM operational_mutations \
2830                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2831                     ORDER BY mutation_order ASC",
2832                )
2833                .expect("stmt");
2834            stmt.query_map([], |row| row.get(0))
2835                .expect("rows")
2836                .collect::<Result<_, _>>()
2837                .expect("collect")
2838        };
2839        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2840        let current_count: i64 = conn
2841            .query_row(
2842                "SELECT count(*) FROM operational_current \
2843                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2844                [],
2845                |row| row.get(0),
2846            )
2847            .expect("current count");
2848        assert_eq!(current_count, 0);
2849    }
2850
2851    #[test]
2852    fn writer_delete_bypasses_validation_contract() {
2853        let db = NamedTempFile::new().expect("temporary db");
2854        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2855        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2856        conn.execute(
2857            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2858             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2859            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2860        )
2861        .expect("seed collection");
2862        drop(conn);
2863        let writer = WriterActor::start(
2864            db.path(),
2865            Arc::new(SchemaManager::new()),
2866            ProvenanceMode::Warn,
2867            Arc::new(TelemetryCounters::default()),
2868        )
2869        .expect("writer");
2870
2871        writer
2872            .submit(WriteRequest {
2873                label: "valid-put".to_owned(),
2874                nodes: vec![],
2875                node_retires: vec![],
2876                edges: vec![],
2877                edge_retires: vec![],
2878                chunks: vec![],
2879                runs: vec![],
2880                steps: vec![],
2881                actions: vec![],
2882                optional_backfills: vec![],
2883                vec_inserts: vec![],
2884                operational_writes: vec![OperationalWrite::Put {
2885                    collection: "connector_health".to_owned(),
2886                    record_key: "gmail".to_owned(),
2887                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2888                    source_ref: Some("src-1".to_owned()),
2889                }],
2890            })
2891            .expect("put receipt");
2892        writer
2893            .submit(WriteRequest {
2894                label: "delete-after-put".to_owned(),
2895                nodes: vec![],
2896                node_retires: vec![],
2897                edges: vec![],
2898                edge_retires: vec![],
2899                chunks: vec![],
2900                runs: vec![],
2901                steps: vec![],
2902                actions: vec![],
2903                optional_backfills: vec![],
2904                vec_inserts: vec![],
2905                operational_writes: vec![OperationalWrite::Delete {
2906                    collection: "connector_health".to_owned(),
2907                    record_key: "gmail".to_owned(),
2908                    source_ref: Some("src-2".to_owned()),
2909                }],
2910            })
2911            .expect("delete receipt");
2912
2913        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2914        let current_count: i64 = conn
2915            .query_row(
2916                "SELECT count(*) FROM operational_current \
2917                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2918                [],
2919                |row| row.get(0),
2920            )
2921            .expect("current count");
2922        assert_eq!(current_count, 0);
2923    }
2924
2925    #[test]
2926    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2927        let db = NamedTempFile::new().expect("temporary db");
2928        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2929        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2930        conn.execute(
2931            "INSERT INTO operational_collections \
2932             (name, kind, schema_json, retention_json, secondary_indexes_json) \
2933             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2934            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2935        )
2936        .expect("seed collection");
2937        drop(conn);
2938        let writer = WriterActor::start(
2939            db.path(),
2940            Arc::new(SchemaManager::new()),
2941            ProvenanceMode::Warn,
2942            Arc::new(TelemetryCounters::default()),
2943        )
2944        .expect("writer");
2945
2946        writer
2947            .submit(WriteRequest {
2948                label: "secondary-index-put".to_owned(),
2949                nodes: vec![],
2950                node_retires: vec![],
2951                edges: vec![],
2952                edge_retires: vec![],
2953                chunks: vec![],
2954                runs: vec![],
2955                steps: vec![],
2956                actions: vec![],
2957                optional_backfills: vec![],
2958                vec_inserts: vec![],
2959                operational_writes: vec![OperationalWrite::Put {
2960                    collection: "connector_health".to_owned(),
2961                    record_key: "gmail".to_owned(),
2962                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2963                        .to_owned(),
2964                    source_ref: Some("src-1".to_owned()),
2965                }],
2966            })
2967            .expect("put receipt");
2968
2969        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2970        let current_entry_count: i64 = conn
2971            .query_row(
2972                "SELECT count(*) FROM operational_secondary_index_entries \
2973                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2974                [],
2975                |row| row.get(0),
2976            )
2977            .expect("current secondary index count");
2978        assert_eq!(current_entry_count, 2);
2979        drop(conn);
2980
2981        writer
2982            .submit(WriteRequest {
2983                label: "secondary-index-delete".to_owned(),
2984                nodes: vec![],
2985                node_retires: vec![],
2986                edges: vec![],
2987                edge_retires: vec![],
2988                chunks: vec![],
2989                runs: vec![],
2990                steps: vec![],
2991                actions: vec![],
2992                optional_backfills: vec![],
2993                vec_inserts: vec![],
2994                operational_writes: vec![OperationalWrite::Delete {
2995                    collection: "connector_health".to_owned(),
2996                    record_key: "gmail".to_owned(),
2997                    source_ref: Some("src-2".to_owned()),
2998                }],
2999            })
3000            .expect("delete receipt");
3001
3002        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3003        let current_entry_count: i64 = conn
3004            .query_row(
3005                "SELECT count(*) FROM operational_secondary_index_entries \
3006                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3007                [],
3008                |row| row.get(0),
3009            )
3010            .expect("current secondary index count");
3011        assert_eq!(current_entry_count, 0);
3012    }
3013
3014    #[test]
3015    fn writer_latest_state_operational_writes_persist_mutation_order() {
3016        let db = NamedTempFile::new().expect("temporary db");
3017        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3018        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3019        conn.execute(
3020            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3021             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3022            [],
3023        )
3024        .expect("seed collection");
3025        drop(conn);
3026
3027        let writer = WriterActor::start(
3028            db.path(),
3029            Arc::new(SchemaManager::new()),
3030            ProvenanceMode::Warn,
3031            Arc::new(TelemetryCounters::default()),
3032        )
3033        .expect("writer");
3034
3035        writer
3036            .submit(WriteRequest {
3037                label: "ordered-operational-batch".to_owned(),
3038                nodes: vec![],
3039                node_retires: vec![],
3040                edges: vec![],
3041                edge_retires: vec![],
3042                chunks: vec![],
3043                runs: vec![],
3044                steps: vec![],
3045                actions: vec![],
3046                optional_backfills: vec![],
3047                vec_inserts: vec![],
3048                operational_writes: vec![
3049                    OperationalWrite::Put {
3050                        collection: "connector_health".to_owned(),
3051                        record_key: "gmail".to_owned(),
3052                        payload_json: r#"{"status":"old"}"#.to_owned(),
3053                        source_ref: Some("src-1".to_owned()),
3054                    },
3055                    OperationalWrite::Delete {
3056                        collection: "connector_health".to_owned(),
3057                        record_key: "gmail".to_owned(),
3058                        source_ref: Some("src-2".to_owned()),
3059                    },
3060                    OperationalWrite::Put {
3061                        collection: "connector_health".to_owned(),
3062                        record_key: "gmail".to_owned(),
3063                        payload_json: r#"{"status":"new"}"#.to_owned(),
3064                        source_ref: Some("src-3".to_owned()),
3065                    },
3066                ],
3067            })
3068            .expect("write receipt");
3069
3070        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3071        let rows: Vec<(String, i64)> = {
3072            let mut stmt = conn
3073                .prepare(
3074                    "SELECT op_kind, mutation_order FROM operational_mutations \
3075                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3076                     ORDER BY mutation_order ASC",
3077                )
3078                .expect("stmt");
3079            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3080                .expect("rows")
3081                .collect::<Result<_, _>>()
3082                .expect("collect")
3083        };
3084        assert_eq!(
3085            rows,
3086            vec![
3087                ("put".to_owned(), 1),
3088                ("delete".to_owned(), 2),
3089                ("put".to_owned(), 3),
3090            ]
3091        );
3092        let payload: String = conn
3093            .query_row(
3094                "SELECT payload_json FROM operational_current \
3095                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3096                [],
3097                |row| row.get(0),
3098            )
3099            .expect("current payload");
3100        assert_eq!(payload, r#"{"status":"new"}"#);
3101    }
3102
3103    #[test]
3104    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3105        let db = NamedTempFile::new().expect("temporary db");
3106        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3107        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3108        conn.execute(
3109            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3110             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3111            [],
3112        )
3113        .expect("seed collection");
3114
3115        let request = WriteRequest {
3116            label: "disabled-race".to_owned(),
3117            nodes: vec![],
3118            node_retires: vec![],
3119            edges: vec![],
3120            edge_retires: vec![],
3121            chunks: vec![],
3122            runs: vec![],
3123            steps: vec![],
3124            actions: vec![],
3125            optional_backfills: vec![],
3126            vec_inserts: vec![],
3127            operational_writes: vec![OperationalWrite::Put {
3128                collection: "connector_health".to_owned(),
3129                record_key: "gmail".to_owned(),
3130                payload_json: r#"{"status":"ok"}"#.to_owned(),
3131                source_ref: Some("src-1".to_owned()),
3132            }],
3133        };
3134        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3135        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3136
3137        conn.execute(
3138            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3139            [],
3140        )
3141        .expect("disable collection after preflight");
3142
3143        let error =
3144            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3145        assert!(matches!(error, EngineError::InvalidWrite(_)));
3146        assert!(error.to_string().contains("is disabled"));
3147
3148        let mutation_count: i64 = conn
3149            .query_row(
3150                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3151                [],
3152                |row| row.get(0),
3153            )
3154            .expect("mutation count");
3155        assert_eq!(mutation_count, 0);
3156
3157        let current_count: i64 = conn
3158            .query_row(
3159                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3160                [],
3161                |row| row.get(0),
3162            )
3163            .expect("current count");
3164        assert_eq!(current_count, 0);
3165    }
3166
3167    #[test]
3168    fn writer_enforce_validation_rejects_invalid_put_atomically() {
3169        let db = NamedTempFile::new().expect("temporary db");
3170        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3171        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3172        conn.execute(
3173            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3174             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3175            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3176        )
3177        .expect("seed collection");
3178        drop(conn);
3179        let writer = WriterActor::start(
3180            db.path(),
3181            Arc::new(SchemaManager::new()),
3182            ProvenanceMode::Warn,
3183            Arc::new(TelemetryCounters::default()),
3184        )
3185        .expect("writer");
3186
3187        let error = writer
3188            .submit(WriteRequest {
3189                label: "invalid-put".to_owned(),
3190                nodes: vec![NodeInsert {
3191                    row_id: "row-1".to_owned(),
3192                    logical_id: "lg-1".to_owned(),
3193                    kind: "Meeting".to_owned(),
3194                    properties: "{}".to_owned(),
3195                    source_ref: Some("src-1".to_owned()),
3196                    upsert: false,
3197                    chunk_policy: ChunkPolicy::Preserve,
3198                    content_ref: None,
3199                }],
3200                node_retires: vec![],
3201                edges: vec![],
3202                edge_retires: vec![],
3203                chunks: vec![],
3204                runs: vec![],
3205                steps: vec![],
3206                actions: vec![],
3207                optional_backfills: vec![],
3208                vec_inserts: vec![],
3209                operational_writes: vec![OperationalWrite::Put {
3210                    collection: "connector_health".to_owned(),
3211                    record_key: "gmail".to_owned(),
3212                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
3213                    source_ref: Some("src-1".to_owned()),
3214                }],
3215            })
3216            .expect_err("invalid put must reject");
3217        assert!(matches!(error, EngineError::InvalidWrite(_)));
3218        assert!(error.to_string().contains("must be one of"));
3219
3220        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3221        let node_count: i64 = conn
3222            .query_row(
3223                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3224                [],
3225                |row| row.get(0),
3226            )
3227            .expect("node count");
3228        assert_eq!(node_count, 0);
3229        let mutation_count: i64 = conn
3230            .query_row(
3231                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3232                [],
3233                |row| row.get(0),
3234            )
3235            .expect("mutation count");
3236        assert_eq!(mutation_count, 0);
3237        let current_count: i64 = conn
3238            .query_row(
3239                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3240                [],
3241                |row| row.get(0),
3242            )
3243            .expect("current count");
3244        assert_eq!(current_count, 0);
3245    }
3246
3247    #[test]
3248    fn writer_rejects_append_against_latest_state_collection() {
3249        let db = NamedTempFile::new().expect("temporary db");
3250        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3251        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3252        conn.execute(
3253            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3254             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3255            [],
3256        )
3257        .expect("seed collection");
3258        drop(conn);
3259        let writer = WriterActor::start(
3260            db.path(),
3261            Arc::new(SchemaManager::new()),
3262            ProvenanceMode::Warn,
3263            Arc::new(TelemetryCounters::default()),
3264        )
3265        .expect("writer");
3266
3267        let result = writer.submit(WriteRequest {
3268            label: "bad-append".to_owned(),
3269            nodes: vec![],
3270            node_retires: vec![],
3271            edges: vec![],
3272            edge_retires: vec![],
3273            chunks: vec![],
3274            runs: vec![],
3275            steps: vec![],
3276            actions: vec![],
3277            optional_backfills: vec![],
3278            vec_inserts: vec![],
3279            operational_writes: vec![OperationalWrite::Append {
3280                collection: "connector_health".to_owned(),
3281                record_key: "gmail".to_owned(),
3282                payload_json: r#"{"status":"ok"}"#.to_owned(),
3283                source_ref: Some("src-1".to_owned()),
3284            }],
3285        });
3286
3287        assert!(
3288            matches!(result, Err(EngineError::InvalidWrite(_))),
3289            "latest_state collection must reject Append"
3290        );
3291    }
3292
3293    #[test]
3294    fn writer_upsert_supersedes_prior_active_node() {
3295        let db = NamedTempFile::new().expect("temporary db");
3296        let writer = WriterActor::start(
3297            db.path(),
3298            Arc::new(SchemaManager::new()),
3299            ProvenanceMode::Warn,
3300            Arc::new(TelemetryCounters::default()),
3301        )
3302        .expect("writer");
3303
3304        writer
3305            .submit(WriteRequest {
3306                label: "v1".to_owned(),
3307                nodes: vec![NodeInsert {
3308                    row_id: "row-1".to_owned(),
3309                    logical_id: "lg-1".to_owned(),
3310                    kind: "Meeting".to_owned(),
3311                    properties: r#"{"version":1}"#.to_owned(),
3312                    source_ref: Some("src-1".to_owned()),
3313                    upsert: false,
3314                    chunk_policy: ChunkPolicy::Preserve,
3315                    content_ref: None,
3316                }],
3317                node_retires: vec![],
3318                edges: vec![],
3319                edge_retires: vec![],
3320                chunks: vec![],
3321                runs: vec![],
3322                steps: vec![],
3323                actions: vec![],
3324                optional_backfills: vec![],
3325                vec_inserts: vec![],
3326                operational_writes: vec![],
3327            })
3328            .expect("v1 write");
3329
3330        writer
3331            .submit(WriteRequest {
3332                label: "v2".to_owned(),
3333                nodes: vec![NodeInsert {
3334                    row_id: "row-2".to_owned(),
3335                    logical_id: "lg-1".to_owned(),
3336                    kind: "Meeting".to_owned(),
3337                    properties: r#"{"version":2}"#.to_owned(),
3338                    source_ref: Some("src-2".to_owned()),
3339                    upsert: true,
3340                    chunk_policy: ChunkPolicy::Preserve,
3341                    content_ref: None,
3342                }],
3343                node_retires: vec![],
3344                edges: vec![],
3345                edge_retires: vec![],
3346                chunks: vec![],
3347                runs: vec![],
3348                steps: vec![],
3349                actions: vec![],
3350                optional_backfills: vec![],
3351                vec_inserts: vec![],
3352                operational_writes: vec![],
3353            })
3354            .expect("v2 upsert write");
3355
3356        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3357        let (active_row_id, props): (String, String) = conn
3358            .query_row(
3359                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3360                [],
3361                |row| Ok((row.get(0)?, row.get(1)?)),
3362            )
3363            .expect("active row");
3364        assert_eq!(active_row_id, "row-2");
3365        assert!(props.contains("\"version\":2"));
3366
3367        let superseded: i64 = conn
3368            .query_row(
3369                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3370                [],
3371                |row| row.get(0),
3372            )
3373            .expect("superseded count");
3374        assert_eq!(superseded, 1);
3375    }
3376
3377    #[test]
3378    fn writer_inserts_edge_between_two_nodes() {
3379        let db = NamedTempFile::new().expect("temporary db");
3380        let writer = WriterActor::start(
3381            db.path(),
3382            Arc::new(SchemaManager::new()),
3383            ProvenanceMode::Warn,
3384            Arc::new(TelemetryCounters::default()),
3385        )
3386        .expect("writer");
3387
3388        writer
3389            .submit(WriteRequest {
3390                label: "nodes-and-edge".to_owned(),
3391                nodes: vec![
3392                    NodeInsert {
3393                        row_id: "row-meeting".to_owned(),
3394                        logical_id: "meeting-1".to_owned(),
3395                        kind: "Meeting".to_owned(),
3396                        properties: "{}".to_owned(),
3397                        source_ref: Some("src-1".to_owned()),
3398                        upsert: false,
3399                        chunk_policy: ChunkPolicy::Preserve,
3400                        content_ref: None,
3401                    },
3402                    NodeInsert {
3403                        row_id: "row-task".to_owned(),
3404                        logical_id: "task-1".to_owned(),
3405                        kind: "Task".to_owned(),
3406                        properties: "{}".to_owned(),
3407                        source_ref: Some("src-1".to_owned()),
3408                        upsert: false,
3409                        chunk_policy: ChunkPolicy::Preserve,
3410                        content_ref: None,
3411                    },
3412                ],
3413                node_retires: vec![],
3414                edges: vec![EdgeInsert {
3415                    row_id: "edge-1".to_owned(),
3416                    logical_id: "edge-lg-1".to_owned(),
3417                    source_logical_id: "meeting-1".to_owned(),
3418                    target_logical_id: "task-1".to_owned(),
3419                    kind: "HAS_TASK".to_owned(),
3420                    properties: "{}".to_owned(),
3421                    source_ref: Some("src-1".to_owned()),
3422                    upsert: false,
3423                }],
3424                edge_retires: vec![],
3425                chunks: vec![],
3426                runs: vec![],
3427                steps: vec![],
3428                actions: vec![],
3429                optional_backfills: vec![],
3430                vec_inserts: vec![],
3431                operational_writes: vec![],
3432            })
3433            .expect("write receipt");
3434
3435        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3436        let (src, tgt, kind): (String, String, String) = conn
3437            .query_row(
3438                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3439                [],
3440                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3441            )
3442            .expect("edge row");
3443        assert_eq!(src, "meeting-1");
3444        assert_eq!(tgt, "task-1");
3445        assert_eq!(kind, "HAS_TASK");
3446    }
3447
3448    #[test]
3449    #[allow(clippy::too_many_lines)]
3450    fn writer_upsert_supersedes_prior_active_edge() {
3451        let db = NamedTempFile::new().expect("temporary db");
3452        let writer = WriterActor::start(
3453            db.path(),
3454            Arc::new(SchemaManager::new()),
3455            ProvenanceMode::Warn,
3456            Arc::new(TelemetryCounters::default()),
3457        )
3458        .expect("writer");
3459
3460        // Write two nodes
3461        writer
3462            .submit(WriteRequest {
3463                label: "nodes".to_owned(),
3464                nodes: vec![
3465                    NodeInsert {
3466                        row_id: "row-a".to_owned(),
3467                        logical_id: "node-a".to_owned(),
3468                        kind: "Meeting".to_owned(),
3469                        properties: "{}".to_owned(),
3470                        source_ref: Some("src-1".to_owned()),
3471                        upsert: false,
3472                        chunk_policy: ChunkPolicy::Preserve,
3473                        content_ref: None,
3474                    },
3475                    NodeInsert {
3476                        row_id: "row-b".to_owned(),
3477                        logical_id: "node-b".to_owned(),
3478                        kind: "Task".to_owned(),
3479                        properties: "{}".to_owned(),
3480                        source_ref: Some("src-1".to_owned()),
3481                        upsert: false,
3482                        chunk_policy: ChunkPolicy::Preserve,
3483                        content_ref: None,
3484                    },
3485                ],
3486                node_retires: vec![],
3487                edges: vec![],
3488                edge_retires: vec![],
3489                chunks: vec![],
3490                runs: vec![],
3491                steps: vec![],
3492                actions: vec![],
3493                optional_backfills: vec![],
3494                vec_inserts: vec![],
3495                operational_writes: vec![],
3496            })
3497            .expect("nodes write");
3498
3499        // Write v1 edge
3500        writer
3501            .submit(WriteRequest {
3502                label: "edge-v1".to_owned(),
3503                nodes: vec![],
3504                node_retires: vec![],
3505                edges: vec![EdgeInsert {
3506                    row_id: "edge-row-1".to_owned(),
3507                    logical_id: "edge-lg-1".to_owned(),
3508                    source_logical_id: "node-a".to_owned(),
3509                    target_logical_id: "node-b".to_owned(),
3510                    kind: "HAS_TASK".to_owned(),
3511                    properties: r#"{"weight":1}"#.to_owned(),
3512                    source_ref: Some("src-1".to_owned()),
3513                    upsert: false,
3514                }],
3515                edge_retires: vec![],
3516                chunks: vec![],
3517                runs: vec![],
3518                steps: vec![],
3519                actions: vec![],
3520                optional_backfills: vec![],
3521                vec_inserts: vec![],
3522                operational_writes: vec![],
3523            })
3524            .expect("edge v1 write");
3525
3526        // Upsert v2 edge
3527        writer
3528            .submit(WriteRequest {
3529                label: "edge-v2".to_owned(),
3530                nodes: vec![],
3531                node_retires: vec![],
3532                edges: vec![EdgeInsert {
3533                    row_id: "edge-row-2".to_owned(),
3534                    logical_id: "edge-lg-1".to_owned(),
3535                    source_logical_id: "node-a".to_owned(),
3536                    target_logical_id: "node-b".to_owned(),
3537                    kind: "HAS_TASK".to_owned(),
3538                    properties: r#"{"weight":2}"#.to_owned(),
3539                    source_ref: Some("src-2".to_owned()),
3540                    upsert: true,
3541                }],
3542                edge_retires: vec![],
3543                chunks: vec![],
3544                runs: vec![],
3545                steps: vec![],
3546                actions: vec![],
3547                optional_backfills: vec![],
3548                vec_inserts: vec![],
3549                operational_writes: vec![],
3550            })
3551            .expect("edge v2 upsert");
3552
3553        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3554        let (active_row_id, props): (String, String) = conn
3555            .query_row(
3556                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3557                [],
3558                |row| Ok((row.get(0)?, row.get(1)?)),
3559            )
3560            .expect("active edge");
3561        assert_eq!(active_row_id, "edge-row-2");
3562        assert!(props.contains("\"weight\":2"));
3563
3564        let superseded: i64 = conn
3565            .query_row(
3566                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3567                [],
3568                |row| row.get(0),
3569            )
3570            .expect("superseded count");
3571        assert_eq!(superseded, 1);
3572    }
3573
3574    #[test]
3575    fn writer_fts_rows_are_written_to_database() {
3576        let db = NamedTempFile::new().expect("temporary db");
3577        let writer = WriterActor::start(
3578            db.path(),
3579            Arc::new(SchemaManager::new()),
3580            ProvenanceMode::Warn,
3581            Arc::new(TelemetryCounters::default()),
3582        )
3583        .expect("writer");
3584
3585        writer
3586            .submit(WriteRequest {
3587                label: "seed".to_owned(),
3588                nodes: vec![NodeInsert {
3589                    row_id: "row-1".to_owned(),
3590                    logical_id: "logical-1".to_owned(),
3591                    kind: "Meeting".to_owned(),
3592                    properties: "{}".to_owned(),
3593                    source_ref: Some("src-1".to_owned()),
3594                    upsert: false,
3595                    chunk_policy: ChunkPolicy::Preserve,
3596                    content_ref: None,
3597                }],
3598                node_retires: vec![],
3599                edges: vec![],
3600                edge_retires: vec![],
3601                chunks: vec![ChunkInsert {
3602                    id: "chunk-1".to_owned(),
3603                    node_logical_id: "logical-1".to_owned(),
3604                    text_content: "budget discussion".to_owned(),
3605                    byte_start: None,
3606                    byte_end: None,
3607                    content_hash: None,
3608                }],
3609                runs: vec![],
3610                steps: vec![],
3611                actions: vec![],
3612                optional_backfills: vec![],
3613                vec_inserts: vec![],
3614                operational_writes: vec![],
3615            })
3616            .expect("write receipt");
3617
3618        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3619        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3620            conn.query_row(
3621                "SELECT chunk_id, node_logical_id, kind, text_content \
3622                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3623                [],
3624                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3625            )
3626            .expect("fts row");
3627        assert_eq!(chunk_id, "chunk-1");
3628        assert_eq!(node_logical_id, "logical-1");
3629        assert_eq!(kind, "Meeting");
3630        assert_eq!(text_content, "budget discussion");
3631    }
3632
3633    #[test]
3634    fn writer_receipt_warns_on_nodes_without_source_ref() {
3635        let db = NamedTempFile::new().expect("temporary db");
3636        let writer = WriterActor::start(
3637            db.path(),
3638            Arc::new(SchemaManager::new()),
3639            ProvenanceMode::Warn,
3640            Arc::new(TelemetryCounters::default()),
3641        )
3642        .expect("writer");
3643
3644        let receipt = writer
3645            .submit(WriteRequest {
3646                label: "no-source".to_owned(),
3647                nodes: vec![NodeInsert {
3648                    row_id: "row-1".to_owned(),
3649                    logical_id: "logical-1".to_owned(),
3650                    kind: "Meeting".to_owned(),
3651                    properties: "{}".to_owned(),
3652                    source_ref: None,
3653                    upsert: false,
3654                    chunk_policy: ChunkPolicy::Preserve,
3655                    content_ref: None,
3656                }],
3657                node_retires: vec![],
3658                edges: vec![],
3659                edge_retires: vec![],
3660                chunks: vec![],
3661                runs: vec![],
3662                steps: vec![],
3663                actions: vec![],
3664                optional_backfills: vec![],
3665                vec_inserts: vec![],
3666                operational_writes: vec![],
3667            })
3668            .expect("write receipt");
3669
3670        assert_eq!(receipt.provenance_warnings.len(), 1);
3671        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3672    }
3673
3674    #[test]
3675    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3676        let db = NamedTempFile::new().expect("temporary db");
3677        let writer = WriterActor::start(
3678            db.path(),
3679            Arc::new(SchemaManager::new()),
3680            ProvenanceMode::Warn,
3681            Arc::new(TelemetryCounters::default()),
3682        )
3683        .expect("writer");
3684
3685        let receipt = writer
3686            .submit(WriteRequest {
3687                label: "with-source".to_owned(),
3688                nodes: vec![NodeInsert {
3689                    row_id: "row-1".to_owned(),
3690                    logical_id: "logical-1".to_owned(),
3691                    kind: "Meeting".to_owned(),
3692                    properties: "{}".to_owned(),
3693                    source_ref: Some("src-1".to_owned()),
3694                    upsert: false,
3695                    chunk_policy: ChunkPolicy::Preserve,
3696                    content_ref: None,
3697                }],
3698                node_retires: vec![],
3699                edges: vec![],
3700                edge_retires: vec![],
3701                chunks: vec![],
3702                runs: vec![],
3703                steps: vec![],
3704                actions: vec![],
3705                optional_backfills: vec![],
3706                vec_inserts: vec![],
3707                operational_writes: vec![],
3708            })
3709            .expect("write receipt");
3710
3711        assert!(receipt.provenance_warnings.is_empty());
3712    }
3713
3714    #[test]
3715    fn writer_accepts_chunk_for_pre_existing_node() {
3716        let db = NamedTempFile::new().expect("temporary db");
3717        let writer = WriterActor::start(
3718            db.path(),
3719            Arc::new(SchemaManager::new()),
3720            ProvenanceMode::Warn,
3721            Arc::new(TelemetryCounters::default()),
3722        )
3723        .expect("writer");
3724
3725        // Request 1: submit node only
3726        writer
3727            .submit(WriteRequest {
3728                label: "r1".to_owned(),
3729                nodes: vec![NodeInsert {
3730                    row_id: "row-1".to_owned(),
3731                    logical_id: "logical-1".to_owned(),
3732                    kind: "Meeting".to_owned(),
3733                    properties: "{}".to_owned(),
3734                    source_ref: Some("src-1".to_owned()),
3735                    upsert: false,
3736                    chunk_policy: ChunkPolicy::Preserve,
3737                    content_ref: None,
3738                }],
3739                node_retires: vec![],
3740                edges: vec![],
3741                edge_retires: vec![],
3742                chunks: vec![],
3743                runs: vec![],
3744                steps: vec![],
3745                actions: vec![],
3746                optional_backfills: vec![],
3747                vec_inserts: vec![],
3748                operational_writes: vec![],
3749            })
3750            .expect("r1 write");
3751
3752        // Request 2: submit chunk for pre-existing node
3753        writer
3754            .submit(WriteRequest {
3755                label: "r2".to_owned(),
3756                nodes: vec![],
3757                node_retires: vec![],
3758                edges: vec![],
3759                edge_retires: vec![],
3760                chunks: vec![ChunkInsert {
3761                    id: "chunk-1".to_owned(),
3762                    node_logical_id: "logical-1".to_owned(),
3763                    text_content: "budget discussion".to_owned(),
3764                    byte_start: None,
3765                    byte_end: None,
3766                    content_hash: None,
3767                }],
3768                runs: vec![],
3769                steps: vec![],
3770                actions: vec![],
3771                optional_backfills: vec![],
3772                vec_inserts: vec![],
3773                operational_writes: vec![],
3774            })
3775            .expect("r2 write — chunk for pre-existing node");
3776
3777        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3778        let count: i64 = conn
3779            .query_row(
3780                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3781                [],
3782                |row| row.get(0),
3783            )
3784            .expect("fts count");
3785        assert_eq!(
3786            count, 1,
3787            "FTS row must exist for chunk attached to pre-existing node"
3788        );
3789    }
3790
3791    #[test]
3792    fn writer_rejects_chunk_for_completely_unknown_node() {
3793        let db = NamedTempFile::new().expect("temporary db");
3794        let writer = WriterActor::start(
3795            db.path(),
3796            Arc::new(SchemaManager::new()),
3797            ProvenanceMode::Warn,
3798            Arc::new(TelemetryCounters::default()),
3799        )
3800        .expect("writer");
3801
3802        let result = writer.submit(WriteRequest {
3803            label: "bad".to_owned(),
3804            nodes: vec![],
3805            node_retires: vec![],
3806            edges: vec![],
3807            edge_retires: vec![],
3808            chunks: vec![ChunkInsert {
3809                id: "chunk-1".to_owned(),
3810                node_logical_id: "nonexistent".to_owned(),
3811                text_content: "some text".to_owned(),
3812                byte_start: None,
3813                byte_end: None,
3814                content_hash: None,
3815            }],
3816            runs: vec![],
3817            steps: vec![],
3818            actions: vec![],
3819            optional_backfills: vec![],
3820            vec_inserts: vec![],
3821            operational_writes: vec![],
3822        });
3823
3824        assert!(
3825            matches!(result, Err(EngineError::InvalidWrite(_))),
3826            "completely unknown node must return InvalidWrite"
3827        );
3828    }
3829
3830    #[test]
3831    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3832        let db = NamedTempFile::new().expect("temporary db");
3833        let writer = WriterActor::start(
3834            db.path(),
3835            Arc::new(SchemaManager::new()),
3836            ProvenanceMode::Warn,
3837            Arc::new(TelemetryCounters::default()),
3838        )
3839        .expect("writer");
3840
3841        let receipt = writer
3842            .submit(WriteRequest {
3843                label: "seed".to_owned(),
3844                nodes: vec![NodeInsert {
3845                    row_id: "row-1".to_owned(),
3846                    logical_id: "logical-1".to_owned(),
3847                    kind: "Meeting".to_owned(),
3848                    properties: "{}".to_owned(),
3849                    source_ref: None,
3850                    upsert: false,
3851                    chunk_policy: ChunkPolicy::Preserve,
3852                    content_ref: None,
3853                }],
3854                node_retires: vec![],
3855                edges: vec![],
3856                edge_retires: vec![],
3857                chunks: vec![ChunkInsert {
3858                    id: "chunk-1".to_owned(),
3859                    node_logical_id: "logical-1".to_owned(),
3860                    text_content: "budget discussion".to_owned(),
3861                    byte_start: None,
3862                    byte_end: None,
3863                    content_hash: None,
3864                }],
3865                runs: vec![],
3866                steps: vec![],
3867                actions: vec![],
3868                optional_backfills: vec![],
3869                vec_inserts: vec![],
3870                operational_writes: vec![],
3871            })
3872            .expect("write receipt");
3873
3874        assert_eq!(receipt.label, "seed");
3875    }
3876
3877    #[test]
3878    fn writer_node_retire_supersedes_active_node() {
3879        let db = NamedTempFile::new().expect("temporary db");
3880        let writer = WriterActor::start(
3881            db.path(),
3882            Arc::new(SchemaManager::new()),
3883            ProvenanceMode::Warn,
3884            Arc::new(TelemetryCounters::default()),
3885        )
3886        .expect("writer");
3887
3888        writer
3889            .submit(WriteRequest {
3890                label: "seed".to_owned(),
3891                nodes: vec![NodeInsert {
3892                    row_id: "row-1".to_owned(),
3893                    logical_id: "meeting-1".to_owned(),
3894                    kind: "Meeting".to_owned(),
3895                    properties: "{}".to_owned(),
3896                    source_ref: Some("src-1".to_owned()),
3897                    upsert: false,
3898                    chunk_policy: ChunkPolicy::Preserve,
3899                    content_ref: None,
3900                }],
3901                node_retires: vec![],
3902                edges: vec![],
3903                edge_retires: vec![],
3904                chunks: vec![],
3905                runs: vec![],
3906                steps: vec![],
3907                actions: vec![],
3908                optional_backfills: vec![],
3909                vec_inserts: vec![],
3910                operational_writes: vec![],
3911            })
3912            .expect("seed write");
3913
3914        writer
3915            .submit(WriteRequest {
3916                label: "retire".to_owned(),
3917                nodes: vec![],
3918                node_retires: vec![NodeRetire {
3919                    logical_id: "meeting-1".to_owned(),
3920                    source_ref: Some("src-2".to_owned()),
3921                }],
3922                edges: vec![],
3923                edge_retires: vec![],
3924                chunks: vec![],
3925                runs: vec![],
3926                steps: vec![],
3927                actions: vec![],
3928                optional_backfills: vec![],
3929                vec_inserts: vec![],
3930                operational_writes: vec![],
3931            })
3932            .expect("retire write");
3933
3934        let conn = rusqlite::Connection::open(db.path()).expect("open");
3935        let active: i64 = conn
3936            .query_row(
3937                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3938                [],
3939                |r| r.get(0),
3940            )
3941            .expect("count active");
3942        let historical: i64 = conn
3943            .query_row(
3944                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3945                [],
3946                |r| r.get(0),
3947            )
3948            .expect("count historical");
3949
3950        assert_eq!(active, 0, "active count must be 0 after retire");
3951        assert_eq!(historical, 1, "historical count must be 1 after retire");
3952    }
3953
3954    #[test]
3955    fn writer_node_retire_preserves_chunks_and_clears_fts() {
3956        let db = NamedTempFile::new().expect("temporary db");
3957        let writer = WriterActor::start(
3958            db.path(),
3959            Arc::new(SchemaManager::new()),
3960            ProvenanceMode::Warn,
3961            Arc::new(TelemetryCounters::default()),
3962        )
3963        .expect("writer");
3964
3965        writer
3966            .submit(WriteRequest {
3967                label: "seed".to_owned(),
3968                nodes: vec![NodeInsert {
3969                    row_id: "row-1".to_owned(),
3970                    logical_id: "meeting-1".to_owned(),
3971                    kind: "Meeting".to_owned(),
3972                    properties: "{}".to_owned(),
3973                    source_ref: Some("src-1".to_owned()),
3974                    upsert: false,
3975                    chunk_policy: ChunkPolicy::Preserve,
3976                    content_ref: None,
3977                }],
3978                node_retires: vec![],
3979                edges: vec![],
3980                edge_retires: vec![],
3981                chunks: vec![ChunkInsert {
3982                    id: "chunk-1".to_owned(),
3983                    node_logical_id: "meeting-1".to_owned(),
3984                    text_content: "budget discussion".to_owned(),
3985                    byte_start: None,
3986                    byte_end: None,
3987                    content_hash: None,
3988                }],
3989                runs: vec![],
3990                steps: vec![],
3991                actions: vec![],
3992                optional_backfills: vec![],
3993                vec_inserts: vec![],
3994                operational_writes: vec![],
3995            })
3996            .expect("seed write");
3997
3998        writer
3999            .submit(WriteRequest {
4000                label: "retire".to_owned(),
4001                nodes: vec![],
4002                node_retires: vec![NodeRetire {
4003                    logical_id: "meeting-1".to_owned(),
4004                    source_ref: Some("src-2".to_owned()),
4005                }],
4006                edges: vec![],
4007                edge_retires: vec![],
4008                chunks: vec![],
4009                runs: vec![],
4010                steps: vec![],
4011                actions: vec![],
4012                optional_backfills: vec![],
4013                vec_inserts: vec![],
4014                operational_writes: vec![],
4015            })
4016            .expect("retire write");
4017
4018        let conn = rusqlite::Connection::open(db.path()).expect("open");
4019        let chunk_count: i64 = conn
4020            .query_row(
4021                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4022                [],
4023                |r| r.get(0),
4024            )
4025            .expect("chunk count");
4026        let fts_count: i64 = conn
4027            .query_row(
4028                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4029                [],
4030                |r| r.get(0),
4031            )
4032            .expect("fts count");
4033
4034        assert_eq!(
4035            chunk_count, 1,
4036            "chunks must remain after node retire so restore can re-establish content"
4037        );
4038        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4039    }
4040
4041    #[test]
4042    fn writer_edge_retire_supersedes_active_edge() {
4043        let db = NamedTempFile::new().expect("temporary db");
4044        let writer = WriterActor::start(
4045            db.path(),
4046            Arc::new(SchemaManager::new()),
4047            ProvenanceMode::Warn,
4048            Arc::new(TelemetryCounters::default()),
4049        )
4050        .expect("writer");
4051
4052        writer
4053            .submit(WriteRequest {
4054                label: "seed".to_owned(),
4055                nodes: vec![
4056                    NodeInsert {
4057                        row_id: "row-a".to_owned(),
4058                        logical_id: "node-a".to_owned(),
4059                        kind: "Meeting".to_owned(),
4060                        properties: "{}".to_owned(),
4061                        source_ref: Some("src-1".to_owned()),
4062                        upsert: false,
4063                        chunk_policy: ChunkPolicy::Preserve,
4064                        content_ref: None,
4065                    },
4066                    NodeInsert {
4067                        row_id: "row-b".to_owned(),
4068                        logical_id: "node-b".to_owned(),
4069                        kind: "Task".to_owned(),
4070                        properties: "{}".to_owned(),
4071                        source_ref: Some("src-1".to_owned()),
4072                        upsert: false,
4073                        chunk_policy: ChunkPolicy::Preserve,
4074                        content_ref: None,
4075                    },
4076                ],
4077                node_retires: vec![],
4078                edges: vec![EdgeInsert {
4079                    row_id: "edge-1".to_owned(),
4080                    logical_id: "edge-lg-1".to_owned(),
4081                    source_logical_id: "node-a".to_owned(),
4082                    target_logical_id: "node-b".to_owned(),
4083                    kind: "HAS_TASK".to_owned(),
4084                    properties: "{}".to_owned(),
4085                    source_ref: Some("src-1".to_owned()),
4086                    upsert: false,
4087                }],
4088                edge_retires: vec![],
4089                chunks: vec![],
4090                runs: vec![],
4091                steps: vec![],
4092                actions: vec![],
4093                optional_backfills: vec![],
4094                vec_inserts: vec![],
4095                operational_writes: vec![],
4096            })
4097            .expect("seed write");
4098
4099        writer
4100            .submit(WriteRequest {
4101                label: "retire-edge".to_owned(),
4102                nodes: vec![],
4103                node_retires: vec![],
4104                edges: vec![],
4105                edge_retires: vec![EdgeRetire {
4106                    logical_id: "edge-lg-1".to_owned(),
4107                    source_ref: Some("src-2".to_owned()),
4108                }],
4109                chunks: vec![],
4110                runs: vec![],
4111                steps: vec![],
4112                actions: vec![],
4113                optional_backfills: vec![],
4114                vec_inserts: vec![],
4115                operational_writes: vec![],
4116            })
4117            .expect("retire edge write");
4118
4119        let conn = rusqlite::Connection::open(db.path()).expect("open");
4120        let active: i64 = conn
4121            .query_row(
4122                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4123                [],
4124                |r| r.get(0),
4125            )
4126            .expect("active edge count");
4127
4128        assert_eq!(active, 0, "active edge count must be 0 after retire");
4129    }
4130
4131    #[test]
4132    fn writer_retire_without_source_ref_emits_provenance_warning() {
4133        let db = NamedTempFile::new().expect("temporary db");
4134        let writer = WriterActor::start(
4135            db.path(),
4136            Arc::new(SchemaManager::new()),
4137            ProvenanceMode::Warn,
4138            Arc::new(TelemetryCounters::default()),
4139        )
4140        .expect("writer");
4141
4142        writer
4143            .submit(WriteRequest {
4144                label: "seed".to_owned(),
4145                nodes: vec![NodeInsert {
4146                    row_id: "row-1".to_owned(),
4147                    logical_id: "meeting-1".to_owned(),
4148                    kind: "Meeting".to_owned(),
4149                    properties: "{}".to_owned(),
4150                    source_ref: Some("src-1".to_owned()),
4151                    upsert: false,
4152                    chunk_policy: ChunkPolicy::Preserve,
4153                    content_ref: None,
4154                }],
4155                node_retires: vec![],
4156                edges: vec![],
4157                edge_retires: vec![],
4158                chunks: vec![],
4159                runs: vec![],
4160                steps: vec![],
4161                actions: vec![],
4162                optional_backfills: vec![],
4163                vec_inserts: vec![],
4164                operational_writes: vec![],
4165            })
4166            .expect("seed write");
4167
4168        let receipt = writer
4169            .submit(WriteRequest {
4170                label: "retire-no-src".to_owned(),
4171                nodes: vec![],
4172                node_retires: vec![NodeRetire {
4173                    logical_id: "meeting-1".to_owned(),
4174                    source_ref: None,
4175                }],
4176                edges: vec![],
4177                edge_retires: vec![],
4178                chunks: vec![],
4179                runs: vec![],
4180                steps: vec![],
4181                actions: vec![],
4182                optional_backfills: vec![],
4183                vec_inserts: vec![],
4184                operational_writes: vec![],
4185            })
4186            .expect("retire write");
4187
4188        assert!(
4189            !receipt.provenance_warnings.is_empty(),
4190            "retire without source_ref must emit a provenance warning"
4191        );
4192    }
4193
4194    #[test]
4195    #[allow(clippy::too_many_lines)]
4196    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4197        let db = NamedTempFile::new().expect("temporary db");
4198        let writer = WriterActor::start(
4199            db.path(),
4200            Arc::new(SchemaManager::new()),
4201            ProvenanceMode::Warn,
4202            Arc::new(TelemetryCounters::default()),
4203        )
4204        .expect("writer");
4205
4206        writer
4207            .submit(WriteRequest {
4208                label: "v1".to_owned(),
4209                nodes: vec![NodeInsert {
4210                    row_id: "row-1".to_owned(),
4211                    logical_id: "meeting-1".to_owned(),
4212                    kind: "Meeting".to_owned(),
4213                    properties: "{}".to_owned(),
4214                    source_ref: Some("src-1".to_owned()),
4215                    upsert: false,
4216                    chunk_policy: ChunkPolicy::Preserve,
4217                    content_ref: None,
4218                }],
4219                node_retires: vec![],
4220                edges: vec![],
4221                edge_retires: vec![],
4222                chunks: vec![ChunkInsert {
4223                    id: "chunk-old".to_owned(),
4224                    node_logical_id: "meeting-1".to_owned(),
4225                    text_content: "old text".to_owned(),
4226                    byte_start: None,
4227                    byte_end: None,
4228                    content_hash: None,
4229                }],
4230                runs: vec![],
4231                steps: vec![],
4232                actions: vec![],
4233                optional_backfills: vec![],
4234                vec_inserts: vec![],
4235                operational_writes: vec![],
4236            })
4237            .expect("v1 write");
4238
4239        writer
4240            .submit(WriteRequest {
4241                label: "v2".to_owned(),
4242                nodes: vec![NodeInsert {
4243                    row_id: "row-2".to_owned(),
4244                    logical_id: "meeting-1".to_owned(),
4245                    kind: "Meeting".to_owned(),
4246                    properties: "{}".to_owned(),
4247                    source_ref: Some("src-2".to_owned()),
4248                    upsert: true,
4249                    chunk_policy: ChunkPolicy::Replace,
4250                    content_ref: None,
4251                }],
4252                node_retires: vec![],
4253                edges: vec![],
4254                edge_retires: vec![],
4255                chunks: vec![ChunkInsert {
4256                    id: "chunk-new".to_owned(),
4257                    node_logical_id: "meeting-1".to_owned(),
4258                    text_content: "new text".to_owned(),
4259                    byte_start: None,
4260                    byte_end: None,
4261                    content_hash: None,
4262                }],
4263                runs: vec![],
4264                steps: vec![],
4265                actions: vec![],
4266                optional_backfills: vec![],
4267                vec_inserts: vec![],
4268                operational_writes: vec![],
4269            })
4270            .expect("v2 write");
4271
4272        let conn = rusqlite::Connection::open(db.path()).expect("open");
4273        let old_chunk: i64 = conn
4274            .query_row(
4275                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4276                [],
4277                |r| r.get(0),
4278            )
4279            .expect("old chunk count");
4280        let new_chunk: i64 = conn
4281            .query_row(
4282                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4283                [],
4284                |r| r.get(0),
4285            )
4286            .expect("new chunk count");
4287        let fts_old: i64 = conn
4288            .query_row(
4289                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4290                [],
4291                |r| r.get(0),
4292            )
4293            .expect("old fts count");
4294
4295        assert_eq!(
4296            old_chunk, 0,
4297            "old chunk must be deleted by ChunkPolicy::Replace"
4298        );
4299        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4300        assert_eq!(
4301            fts_old, 0,
4302            "old FTS row must be deleted by ChunkPolicy::Replace"
4303        );
4304    }
4305
4306    #[test]
4307    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4308        let db = NamedTempFile::new().expect("temporary db");
4309        let writer = WriterActor::start(
4310            db.path(),
4311            Arc::new(SchemaManager::new()),
4312            ProvenanceMode::Warn,
4313            Arc::new(TelemetryCounters::default()),
4314        )
4315        .expect("writer");
4316
4317        writer
4318            .submit(WriteRequest {
4319                label: "v1".to_owned(),
4320                nodes: vec![NodeInsert {
4321                    row_id: "row-1".to_owned(),
4322                    logical_id: "meeting-1".to_owned(),
4323                    kind: "Meeting".to_owned(),
4324                    properties: "{}".to_owned(),
4325                    source_ref: Some("src-1".to_owned()),
4326                    upsert: false,
4327                    chunk_policy: ChunkPolicy::Preserve,
4328                    content_ref: None,
4329                }],
4330                node_retires: vec![],
4331                edges: vec![],
4332                edge_retires: vec![],
4333                chunks: vec![ChunkInsert {
4334                    id: "chunk-old".to_owned(),
4335                    node_logical_id: "meeting-1".to_owned(),
4336                    text_content: "old text".to_owned(),
4337                    byte_start: None,
4338                    byte_end: None,
4339                    content_hash: None,
4340                }],
4341                runs: vec![],
4342                steps: vec![],
4343                actions: vec![],
4344                optional_backfills: vec![],
4345                vec_inserts: vec![],
4346                operational_writes: vec![],
4347            })
4348            .expect("v1 write");
4349
4350        writer
4351            .submit(WriteRequest {
4352                label: "v2-props-only".to_owned(),
4353                nodes: vec![NodeInsert {
4354                    row_id: "row-2".to_owned(),
4355                    logical_id: "meeting-1".to_owned(),
4356                    kind: "Meeting".to_owned(),
4357                    properties: r#"{"status":"updated"}"#.to_owned(),
4358                    source_ref: Some("src-2".to_owned()),
4359                    upsert: true,
4360                    chunk_policy: ChunkPolicy::Preserve,
4361                    content_ref: None,
4362                }],
4363                node_retires: vec![],
4364                edges: vec![],
4365                edge_retires: vec![],
4366                chunks: vec![],
4367                runs: vec![],
4368                steps: vec![],
4369                actions: vec![],
4370                optional_backfills: vec![],
4371                vec_inserts: vec![],
4372                operational_writes: vec![],
4373            })
4374            .expect("v2 preserve write");
4375
4376        let conn = rusqlite::Connection::open(db.path()).expect("open");
4377        let old_chunk: i64 = conn
4378            .query_row(
4379                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4380                [],
4381                |r| r.get(0),
4382            )
4383            .expect("old chunk count");
4384
4385        assert_eq!(
4386            old_chunk, 1,
4387            "old chunk must be preserved by ChunkPolicy::Preserve"
4388        );
4389    }
4390
4391    #[test]
4392    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4393        let db = NamedTempFile::new().expect("temporary db");
4394        let writer = WriterActor::start(
4395            db.path(),
4396            Arc::new(SchemaManager::new()),
4397            ProvenanceMode::Warn,
4398            Arc::new(TelemetryCounters::default()),
4399        )
4400        .expect("writer");
4401
4402        writer
4403            .submit(WriteRequest {
4404                label: "v1".to_owned(),
4405                nodes: vec![NodeInsert {
4406                    row_id: "row-1".to_owned(),
4407                    logical_id: "meeting-1".to_owned(),
4408                    kind: "Meeting".to_owned(),
4409                    properties: "{}".to_owned(),
4410                    source_ref: Some("src-1".to_owned()),
4411                    upsert: false,
4412                    chunk_policy: ChunkPolicy::Preserve,
4413                    content_ref: None,
4414                }],
4415                node_retires: vec![],
4416                edges: vec![],
4417                edge_retires: vec![],
4418                chunks: vec![ChunkInsert {
4419                    id: "chunk-existing".to_owned(),
4420                    node_logical_id: "meeting-1".to_owned(),
4421                    text_content: "existing text".to_owned(),
4422                    byte_start: None,
4423                    byte_end: None,
4424                    content_hash: None,
4425                }],
4426                runs: vec![],
4427                steps: vec![],
4428                actions: vec![],
4429                optional_backfills: vec![],
4430                vec_inserts: vec![],
4431                operational_writes: vec![],
4432            })
4433            .expect("v1 write");
4434
4435        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4436        writer
4437            .submit(WriteRequest {
4438                label: "insert-no-upsert".to_owned(),
4439                nodes: vec![NodeInsert {
4440                    row_id: "row-2".to_owned(),
4441                    logical_id: "meeting-2".to_owned(),
4442                    kind: "Meeting".to_owned(),
4443                    properties: "{}".to_owned(),
4444                    source_ref: Some("src-2".to_owned()),
4445                    upsert: false,
4446                    chunk_policy: ChunkPolicy::Replace,
4447                    content_ref: None,
4448                }],
4449                node_retires: vec![],
4450                edges: vec![],
4451                edge_retires: vec![],
4452                chunks: vec![],
4453                runs: vec![],
4454                steps: vec![],
4455                actions: vec![],
4456                optional_backfills: vec![],
4457                vec_inserts: vec![],
4458                operational_writes: vec![],
4459            })
4460            .expect("insert no-upsert write");
4461
4462        let conn = rusqlite::Connection::open(db.path()).expect("open");
4463        let existing_chunk: i64 = conn
4464            .query_row(
4465                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4466                [],
4467                |r| r.get(0),
4468            )
4469            .expect("chunk count");
4470
4471        assert_eq!(
4472            existing_chunk, 1,
4473            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4474        );
4475    }
4476
4477    #[test]
4478    fn writer_run_upsert_supersedes_prior_active_run() {
4479        let db = NamedTempFile::new().expect("temporary db");
4480        let writer = WriterActor::start(
4481            db.path(),
4482            Arc::new(SchemaManager::new()),
4483            ProvenanceMode::Warn,
4484            Arc::new(TelemetryCounters::default()),
4485        )
4486        .expect("writer");
4487
4488        writer
4489            .submit(WriteRequest {
4490                label: "v1".to_owned(),
4491                nodes: vec![],
4492                node_retires: vec![],
4493                edges: vec![],
4494                edge_retires: vec![],
4495                chunks: vec![],
4496                runs: vec![RunInsert {
4497                    id: "run-v1".to_owned(),
4498                    kind: "session".to_owned(),
4499                    status: "completed".to_owned(),
4500                    properties: "{}".to_owned(),
4501                    source_ref: Some("src-1".to_owned()),
4502                    upsert: false,
4503                    supersedes_id: None,
4504                }],
4505                steps: vec![],
4506                actions: vec![],
4507                optional_backfills: vec![],
4508                vec_inserts: vec![],
4509                operational_writes: vec![],
4510            })
4511            .expect("v1 run write");
4512
4513        writer
4514            .submit(WriteRequest {
4515                label: "v2".to_owned(),
4516                nodes: vec![],
4517                node_retires: vec![],
4518                edges: vec![],
4519                edge_retires: vec![],
4520                chunks: vec![],
4521                runs: vec![RunInsert {
4522                    id: "run-v2".to_owned(),
4523                    kind: "session".to_owned(),
4524                    status: "completed".to_owned(),
4525                    properties: "{}".to_owned(),
4526                    source_ref: Some("src-2".to_owned()),
4527                    upsert: true,
4528                    supersedes_id: Some("run-v1".to_owned()),
4529                }],
4530                steps: vec![],
4531                actions: vec![],
4532                optional_backfills: vec![],
4533                vec_inserts: vec![],
4534                operational_writes: vec![],
4535            })
4536            .expect("v2 run write");
4537
4538        let conn = rusqlite::Connection::open(db.path()).expect("open");
4539        let v1_historical: i64 = conn
4540            .query_row(
4541                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4542                [],
4543                |r| r.get(0),
4544            )
4545            .expect("v1 historical count");
4546        let v2_active: i64 = conn
4547            .query_row(
4548                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4549                [],
4550                |r| r.get(0),
4551            )
4552            .expect("v2 active count");
4553
4554        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4555        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4556    }
4557
4558    #[test]
4559    fn writer_step_upsert_supersedes_prior_active_step() {
4560        let db = NamedTempFile::new().expect("temporary db");
4561        let writer = WriterActor::start(
4562            db.path(),
4563            Arc::new(SchemaManager::new()),
4564            ProvenanceMode::Warn,
4565            Arc::new(TelemetryCounters::default()),
4566        )
4567        .expect("writer");
4568
4569        writer
4570            .submit(WriteRequest {
4571                label: "v1".to_owned(),
4572                nodes: vec![],
4573                node_retires: vec![],
4574                edges: vec![],
4575                edge_retires: vec![],
4576                chunks: vec![],
4577                runs: vec![RunInsert {
4578                    id: "run-1".to_owned(),
4579                    kind: "session".to_owned(),
4580                    status: "completed".to_owned(),
4581                    properties: "{}".to_owned(),
4582                    source_ref: Some("src-1".to_owned()),
4583                    upsert: false,
4584                    supersedes_id: None,
4585                }],
4586                steps: vec![StepInsert {
4587                    id: "step-v1".to_owned(),
4588                    run_id: "run-1".to_owned(),
4589                    kind: "llm".to_owned(),
4590                    status: "completed".to_owned(),
4591                    properties: "{}".to_owned(),
4592                    source_ref: Some("src-1".to_owned()),
4593                    upsert: false,
4594                    supersedes_id: None,
4595                }],
4596                actions: vec![],
4597                optional_backfills: vec![],
4598                vec_inserts: vec![],
4599                operational_writes: vec![],
4600            })
4601            .expect("v1 step write");
4602
4603        writer
4604            .submit(WriteRequest {
4605                label: "v2".to_owned(),
4606                nodes: vec![],
4607                node_retires: vec![],
4608                edges: vec![],
4609                edge_retires: vec![],
4610                chunks: vec![],
4611                runs: vec![],
4612                steps: vec![StepInsert {
4613                    id: "step-v2".to_owned(),
4614                    run_id: "run-1".to_owned(),
4615                    kind: "llm".to_owned(),
4616                    status: "completed".to_owned(),
4617                    properties: "{}".to_owned(),
4618                    source_ref: Some("src-2".to_owned()),
4619                    upsert: true,
4620                    supersedes_id: Some("step-v1".to_owned()),
4621                }],
4622                actions: vec![],
4623                optional_backfills: vec![],
4624                vec_inserts: vec![],
4625                operational_writes: vec![],
4626            })
4627            .expect("v2 step write");
4628
4629        let conn = rusqlite::Connection::open(db.path()).expect("open");
4630        let v1_historical: i64 = conn
4631            .query_row(
4632                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4633                [],
4634                |r| r.get(0),
4635            )
4636            .expect("v1 historical count");
4637        let v2_active: i64 = conn
4638            .query_row(
4639                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4640                [],
4641                |r| r.get(0),
4642            )
4643            .expect("v2 active count");
4644
4645        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4646        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4647    }
4648
4649    #[test]
4650    fn writer_action_upsert_supersedes_prior_active_action() {
4651        let db = NamedTempFile::new().expect("temporary db");
4652        let writer = WriterActor::start(
4653            db.path(),
4654            Arc::new(SchemaManager::new()),
4655            ProvenanceMode::Warn,
4656            Arc::new(TelemetryCounters::default()),
4657        )
4658        .expect("writer");
4659
4660        writer
4661            .submit(WriteRequest {
4662                label: "v1".to_owned(),
4663                nodes: vec![],
4664                node_retires: vec![],
4665                edges: vec![],
4666                edge_retires: vec![],
4667                chunks: vec![],
4668                runs: vec![RunInsert {
4669                    id: "run-1".to_owned(),
4670                    kind: "session".to_owned(),
4671                    status: "completed".to_owned(),
4672                    properties: "{}".to_owned(),
4673                    source_ref: Some("src-1".to_owned()),
4674                    upsert: false,
4675                    supersedes_id: None,
4676                }],
4677                steps: vec![StepInsert {
4678                    id: "step-1".to_owned(),
4679                    run_id: "run-1".to_owned(),
4680                    kind: "llm".to_owned(),
4681                    status: "completed".to_owned(),
4682                    properties: "{}".to_owned(),
4683                    source_ref: Some("src-1".to_owned()),
4684                    upsert: false,
4685                    supersedes_id: None,
4686                }],
4687                actions: vec![ActionInsert {
4688                    id: "action-v1".to_owned(),
4689                    step_id: "step-1".to_owned(),
4690                    kind: "emit".to_owned(),
4691                    status: "completed".to_owned(),
4692                    properties: "{}".to_owned(),
4693                    source_ref: Some("src-1".to_owned()),
4694                    upsert: false,
4695                    supersedes_id: None,
4696                }],
4697                optional_backfills: vec![],
4698                vec_inserts: vec![],
4699                operational_writes: vec![],
4700            })
4701            .expect("v1 action write");
4702
4703        writer
4704            .submit(WriteRequest {
4705                label: "v2".to_owned(),
4706                nodes: vec![],
4707                node_retires: vec![],
4708                edges: vec![],
4709                edge_retires: vec![],
4710                chunks: vec![],
4711                runs: vec![],
4712                steps: vec![],
4713                actions: vec![ActionInsert {
4714                    id: "action-v2".to_owned(),
4715                    step_id: "step-1".to_owned(),
4716                    kind: "emit".to_owned(),
4717                    status: "completed".to_owned(),
4718                    properties: "{}".to_owned(),
4719                    source_ref: Some("src-2".to_owned()),
4720                    upsert: true,
4721                    supersedes_id: Some("action-v1".to_owned()),
4722                }],
4723                optional_backfills: vec![],
4724                vec_inserts: vec![],
4725                operational_writes: vec![],
4726            })
4727            .expect("v2 action write");
4728
4729        let conn = rusqlite::Connection::open(db.path()).expect("open");
4730        let v1_historical: i64 = conn
4731            .query_row(
4732                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4733                [],
4734                |r| r.get(0),
4735            )
4736            .expect("v1 historical count");
4737        let v2_active: i64 = conn
4738            .query_row(
4739                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4740                [],
4741                |r| r.get(0),
4742            )
4743            .expect("v2 active count");
4744
4745        assert_eq!(
4746            v1_historical, 1,
4747            "action-v1 must be historical after upsert"
4748        );
4749        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4750    }
4751
4752    // P0: runtime upsert without supersedes_id must be rejected
4753
4754    #[test]
4755    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4756        let db = NamedTempFile::new().expect("temporary db");
4757        let writer = WriterActor::start(
4758            db.path(),
4759            Arc::new(SchemaManager::new()),
4760            ProvenanceMode::Warn,
4761            Arc::new(TelemetryCounters::default()),
4762        )
4763        .expect("writer");
4764
4765        let result = writer.submit(WriteRequest {
4766            label: "bad".to_owned(),
4767            nodes: vec![],
4768            node_retires: vec![],
4769            edges: vec![],
4770            edge_retires: vec![],
4771            chunks: vec![],
4772            runs: vec![RunInsert {
4773                id: "run-1".to_owned(),
4774                kind: "session".to_owned(),
4775                status: "completed".to_owned(),
4776                properties: "{}".to_owned(),
4777                source_ref: None,
4778                upsert: true,
4779                supersedes_id: None,
4780            }],
4781            steps: vec![],
4782            actions: vec![],
4783            optional_backfills: vec![],
4784            vec_inserts: vec![],
4785            operational_writes: vec![],
4786        });
4787
4788        assert!(
4789            matches!(result, Err(EngineError::InvalidWrite(_))),
4790            "run upsert=true without supersedes_id must return InvalidWrite"
4791        );
4792    }
4793
4794    #[test]
4795    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4796        let db = NamedTempFile::new().expect("temporary db");
4797        let writer = WriterActor::start(
4798            db.path(),
4799            Arc::new(SchemaManager::new()),
4800            ProvenanceMode::Warn,
4801            Arc::new(TelemetryCounters::default()),
4802        )
4803        .expect("writer");
4804
4805        let result = writer.submit(WriteRequest {
4806            label: "bad".to_owned(),
4807            nodes: vec![],
4808            node_retires: vec![],
4809            edges: vec![],
4810            edge_retires: vec![],
4811            chunks: vec![],
4812            runs: vec![],
4813            steps: vec![StepInsert {
4814                id: "step-1".to_owned(),
4815                run_id: "run-1".to_owned(),
4816                kind: "llm".to_owned(),
4817                status: "completed".to_owned(),
4818                properties: "{}".to_owned(),
4819                source_ref: None,
4820                upsert: true,
4821                supersedes_id: None,
4822            }],
4823            actions: vec![],
4824            optional_backfills: vec![],
4825            vec_inserts: vec![],
4826            operational_writes: vec![],
4827        });
4828
4829        assert!(
4830            matches!(result, Err(EngineError::InvalidWrite(_))),
4831            "step upsert=true without supersedes_id must return InvalidWrite"
4832        );
4833    }
4834
4835    #[test]
4836    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4837        let db = NamedTempFile::new().expect("temporary db");
4838        let writer = WriterActor::start(
4839            db.path(),
4840            Arc::new(SchemaManager::new()),
4841            ProvenanceMode::Warn,
4842            Arc::new(TelemetryCounters::default()),
4843        )
4844        .expect("writer");
4845
4846        let result = writer.submit(WriteRequest {
4847            label: "bad".to_owned(),
4848            nodes: vec![],
4849            node_retires: vec![],
4850            edges: vec![],
4851            edge_retires: vec![],
4852            chunks: vec![],
4853            runs: vec![],
4854            steps: vec![],
4855            actions: vec![ActionInsert {
4856                id: "action-1".to_owned(),
4857                step_id: "step-1".to_owned(),
4858                kind: "emit".to_owned(),
4859                status: "completed".to_owned(),
4860                properties: "{}".to_owned(),
4861                source_ref: None,
4862                upsert: true,
4863                supersedes_id: None,
4864            }],
4865            optional_backfills: vec![],
4866            vec_inserts: vec![],
4867            operational_writes: vec![],
4868        });
4869
4870        assert!(
4871            matches!(result, Err(EngineError::InvalidWrite(_))),
4872            "action upsert=true without supersedes_id must return InvalidWrite"
4873        );
4874    }
4875
4876    // P1a/b: provenance warnings for edge inserts and runtime table inserts
4877
4878    #[test]
4879    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4880        let db = NamedTempFile::new().expect("temporary db");
4881        let writer = WriterActor::start(
4882            db.path(),
4883            Arc::new(SchemaManager::new()),
4884            ProvenanceMode::Warn,
4885            Arc::new(TelemetryCounters::default()),
4886        )
4887        .expect("writer");
4888
4889        let receipt = writer
4890            .submit(WriteRequest {
4891                label: "test".to_owned(),
4892                nodes: vec![
4893                    NodeInsert {
4894                        row_id: "row-a".to_owned(),
4895                        logical_id: "node-a".to_owned(),
4896                        kind: "Meeting".to_owned(),
4897                        properties: "{}".to_owned(),
4898                        source_ref: Some("src-1".to_owned()),
4899                        upsert: false,
4900                        chunk_policy: ChunkPolicy::Preserve,
4901                        content_ref: None,
4902                    },
4903                    NodeInsert {
4904                        row_id: "row-b".to_owned(),
4905                        logical_id: "node-b".to_owned(),
4906                        kind: "Task".to_owned(),
4907                        properties: "{}".to_owned(),
4908                        source_ref: Some("src-1".to_owned()),
4909                        upsert: false,
4910                        chunk_policy: ChunkPolicy::Preserve,
4911                        content_ref: None,
4912                    },
4913                ],
4914                node_retires: vec![],
4915                edges: vec![EdgeInsert {
4916                    row_id: "edge-1".to_owned(),
4917                    logical_id: "edge-lg-1".to_owned(),
4918                    source_logical_id: "node-a".to_owned(),
4919                    target_logical_id: "node-b".to_owned(),
4920                    kind: "HAS_TASK".to_owned(),
4921                    properties: "{}".to_owned(),
4922                    source_ref: None,
4923                    upsert: false,
4924                }],
4925                edge_retires: vec![],
4926                chunks: vec![],
4927                runs: vec![],
4928                steps: vec![],
4929                actions: vec![],
4930                optional_backfills: vec![],
4931                vec_inserts: vec![],
4932                operational_writes: vec![],
4933            })
4934            .expect("write");
4935
4936        assert!(
4937            !receipt.provenance_warnings.is_empty(),
4938            "edge insert without source_ref must emit a provenance warning"
4939        );
4940    }
4941
4942    #[test]
4943    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4944        let db = NamedTempFile::new().expect("temporary db");
4945        let writer = WriterActor::start(
4946            db.path(),
4947            Arc::new(SchemaManager::new()),
4948            ProvenanceMode::Warn,
4949            Arc::new(TelemetryCounters::default()),
4950        )
4951        .expect("writer");
4952
4953        let receipt = writer
4954            .submit(WriteRequest {
4955                label: "test".to_owned(),
4956                nodes: vec![],
4957                node_retires: vec![],
4958                edges: vec![],
4959                edge_retires: vec![],
4960                chunks: vec![],
4961                runs: vec![RunInsert {
4962                    id: "run-1".to_owned(),
4963                    kind: "session".to_owned(),
4964                    status: "completed".to_owned(),
4965                    properties: "{}".to_owned(),
4966                    source_ref: None,
4967                    upsert: false,
4968                    supersedes_id: None,
4969                }],
4970                steps: vec![],
4971                actions: vec![],
4972                optional_backfills: vec![],
4973                vec_inserts: vec![],
4974                operational_writes: vec![],
4975            })
4976            .expect("write");
4977
4978        assert!(
4979            !receipt.provenance_warnings.is_empty(),
4980            "run insert without source_ref must emit a provenance warning"
4981        );
4982    }
4983
4984    // P1c: retire a node AND submit chunks for the same logical_id in one request
4985
4986    #[test]
4987    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4988        let db = NamedTempFile::new().expect("temporary db");
4989        let writer = WriterActor::start(
4990            db.path(),
4991            Arc::new(SchemaManager::new()),
4992            ProvenanceMode::Warn,
4993            Arc::new(TelemetryCounters::default()),
4994        )
4995        .expect("writer");
4996
4997        // First seed the node so it exists
4998        writer
4999            .submit(WriteRequest {
5000                label: "seed".to_owned(),
5001                nodes: vec![NodeInsert {
5002                    row_id: "row-1".to_owned(),
5003                    logical_id: "meeting-1".to_owned(),
5004                    kind: "Meeting".to_owned(),
5005                    properties: "{}".to_owned(),
5006                    source_ref: Some("src-1".to_owned()),
5007                    upsert: false,
5008                    chunk_policy: ChunkPolicy::Preserve,
5009                    content_ref: None,
5010                }],
5011                node_retires: vec![],
5012                edges: vec![],
5013                edge_retires: vec![],
5014                chunks: vec![],
5015                runs: vec![],
5016                steps: vec![],
5017                actions: vec![],
5018                optional_backfills: vec![],
5019                vec_inserts: vec![],
5020                operational_writes: vec![],
5021            })
5022            .expect("seed write");
5023
5024        // Now try to retire it AND add a chunk for it in the same request
5025        let result = writer.submit(WriteRequest {
5026            label: "bad".to_owned(),
5027            nodes: vec![],
5028            node_retires: vec![NodeRetire {
5029                logical_id: "meeting-1".to_owned(),
5030                source_ref: Some("src-2".to_owned()),
5031            }],
5032            edges: vec![],
5033            edge_retires: vec![],
5034            chunks: vec![ChunkInsert {
5035                id: "chunk-bad".to_owned(),
5036                node_logical_id: "meeting-1".to_owned(),
5037                text_content: "some text".to_owned(),
5038                byte_start: None,
5039                byte_end: None,
5040                content_hash: None,
5041            }],
5042            runs: vec![],
5043            steps: vec![],
5044            actions: vec![],
5045            optional_backfills: vec![],
5046            vec_inserts: vec![],
5047            operational_writes: vec![],
5048        });
5049
5050        assert!(
5051            matches!(result, Err(EngineError::InvalidWrite(_))),
5052            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5053        );
5054    }
5055
5056    // --- Item 1: prepare_cached batch insert ---
5057
5058    #[test]
5059    fn writer_batch_insert_multiple_nodes() {
5060        let db = NamedTempFile::new().expect("temporary db");
5061        let writer = WriterActor::start(
5062            db.path(),
5063            Arc::new(SchemaManager::new()),
5064            ProvenanceMode::Warn,
5065            Arc::new(TelemetryCounters::default()),
5066        )
5067        .expect("writer");
5068
5069        let nodes: Vec<NodeInsert> = (0..100)
5070            .map(|i| NodeInsert {
5071                row_id: format!("row-{i}"),
5072                logical_id: format!("lg-{i}"),
5073                kind: "Note".to_owned(),
5074                properties: "{}".to_owned(),
5075                source_ref: Some("batch-src".to_owned()),
5076                upsert: false,
5077                chunk_policy: ChunkPolicy::Preserve,
5078                content_ref: None,
5079            })
5080            .collect();
5081
5082        writer
5083            .submit(WriteRequest {
5084                label: "batch".to_owned(),
5085                nodes,
5086                node_retires: vec![],
5087                edges: vec![],
5088                edge_retires: vec![],
5089                chunks: vec![],
5090                runs: vec![],
5091                steps: vec![],
5092                actions: vec![],
5093                optional_backfills: vec![],
5094                vec_inserts: vec![],
5095                operational_writes: vec![],
5096            })
5097            .expect("batch write");
5098
5099        let conn = rusqlite::Connection::open(db.path()).expect("open");
5100        let count: i64 = conn
5101            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5102            .expect("count nodes");
5103        assert_eq!(
5104            count, 100,
5105            "all 100 nodes must be present after batch insert"
5106        );
5107    }
5108
5109    // --- Item 2: ID validation ---
5110
5111    #[test]
5112    fn prepare_write_rejects_empty_node_row_id() {
5113        let db = NamedTempFile::new().expect("temporary db");
5114        let writer = WriterActor::start(
5115            db.path(),
5116            Arc::new(SchemaManager::new()),
5117            ProvenanceMode::Warn,
5118            Arc::new(TelemetryCounters::default()),
5119        )
5120        .expect("writer");
5121
5122        let result = writer.submit(WriteRequest {
5123            label: "test".to_owned(),
5124            nodes: vec![NodeInsert {
5125                row_id: String::new(),
5126                logical_id: "lg-1".to_owned(),
5127                kind: "Note".to_owned(),
5128                properties: "{}".to_owned(),
5129                source_ref: None,
5130                upsert: false,
5131                chunk_policy: ChunkPolicy::Preserve,
5132                content_ref: None,
5133            }],
5134            node_retires: vec![],
5135            edges: vec![],
5136            edge_retires: vec![],
5137            chunks: vec![],
5138            runs: vec![],
5139            steps: vec![],
5140            actions: vec![],
5141            optional_backfills: vec![],
5142            vec_inserts: vec![],
5143            operational_writes: vec![],
5144        });
5145
5146        assert!(
5147            matches!(result, Err(EngineError::InvalidWrite(_))),
5148            "empty row_id must be rejected"
5149        );
5150    }
5151
5152    #[test]
5153    fn prepare_write_rejects_empty_node_logical_id() {
5154        let db = NamedTempFile::new().expect("temporary db");
5155        let writer = WriterActor::start(
5156            db.path(),
5157            Arc::new(SchemaManager::new()),
5158            ProvenanceMode::Warn,
5159            Arc::new(TelemetryCounters::default()),
5160        )
5161        .expect("writer");
5162
5163        let result = writer.submit(WriteRequest {
5164            label: "test".to_owned(),
5165            nodes: vec![NodeInsert {
5166                row_id: "row-1".to_owned(),
5167                logical_id: String::new(),
5168                kind: "Note".to_owned(),
5169                properties: "{}".to_owned(),
5170                source_ref: None,
5171                upsert: false,
5172                chunk_policy: ChunkPolicy::Preserve,
5173                content_ref: None,
5174            }],
5175            node_retires: vec![],
5176            edges: vec![],
5177            edge_retires: vec![],
5178            chunks: vec![],
5179            runs: vec![],
5180            steps: vec![],
5181            actions: vec![],
5182            optional_backfills: vec![],
5183            vec_inserts: vec![],
5184            operational_writes: vec![],
5185        });
5186
5187        assert!(
5188            matches!(result, Err(EngineError::InvalidWrite(_))),
5189            "empty logical_id must be rejected"
5190        );
5191    }
5192
5193    #[test]
5194    fn prepare_write_rejects_duplicate_row_ids_in_request() {
5195        let db = NamedTempFile::new().expect("temporary db");
5196        let writer = WriterActor::start(
5197            db.path(),
5198            Arc::new(SchemaManager::new()),
5199            ProvenanceMode::Warn,
5200            Arc::new(TelemetryCounters::default()),
5201        )
5202        .expect("writer");
5203
5204        let result = writer.submit(WriteRequest {
5205            label: "test".to_owned(),
5206            nodes: vec![
5207                NodeInsert {
5208                    row_id: "row-1".to_owned(),
5209                    logical_id: "lg-1".to_owned(),
5210                    kind: "Note".to_owned(),
5211                    properties: "{}".to_owned(),
5212                    source_ref: None,
5213                    upsert: false,
5214                    chunk_policy: ChunkPolicy::Preserve,
5215                    content_ref: None,
5216                },
5217                NodeInsert {
5218                    row_id: "row-1".to_owned(), // duplicate
5219                    logical_id: "lg-2".to_owned(),
5220                    kind: "Note".to_owned(),
5221                    properties: "{}".to_owned(),
5222                    source_ref: None,
5223                    upsert: false,
5224                    chunk_policy: ChunkPolicy::Preserve,
5225                    content_ref: None,
5226                },
5227            ],
5228            node_retires: vec![],
5229            edges: vec![],
5230            edge_retires: vec![],
5231            chunks: vec![],
5232            runs: vec![],
5233            steps: vec![],
5234            actions: vec![],
5235            optional_backfills: vec![],
5236            vec_inserts: vec![],
5237            operational_writes: vec![],
5238        });
5239
5240        assert!(
5241            matches!(result, Err(EngineError::InvalidWrite(_))),
5242            "duplicate row_id within request must be rejected"
5243        );
5244    }
5245
5246    #[test]
5247    fn prepare_write_rejects_empty_chunk_id() {
5248        let db = NamedTempFile::new().expect("temporary db");
5249        let writer = WriterActor::start(
5250            db.path(),
5251            Arc::new(SchemaManager::new()),
5252            ProvenanceMode::Warn,
5253            Arc::new(TelemetryCounters::default()),
5254        )
5255        .expect("writer");
5256
5257        let result = writer.submit(WriteRequest {
5258            label: "test".to_owned(),
5259            nodes: vec![NodeInsert {
5260                row_id: "row-1".to_owned(),
5261                logical_id: "lg-1".to_owned(),
5262                kind: "Note".to_owned(),
5263                properties: "{}".to_owned(),
5264                source_ref: None,
5265                upsert: false,
5266                chunk_policy: ChunkPolicy::Preserve,
5267                content_ref: None,
5268            }],
5269            node_retires: vec![],
5270            edges: vec![],
5271            edge_retires: vec![],
5272            chunks: vec![ChunkInsert {
5273                id: String::new(),
5274                node_logical_id: "lg-1".to_owned(),
5275                text_content: "some text".to_owned(),
5276                byte_start: None,
5277                byte_end: None,
5278                content_hash: None,
5279            }],
5280            runs: vec![],
5281            steps: vec![],
5282            actions: vec![],
5283            optional_backfills: vec![],
5284            vec_inserts: vec![],
5285            operational_writes: vec![],
5286        });
5287
5288        assert!(
5289            matches!(result, Err(EngineError::InvalidWrite(_))),
5290            "empty chunk id must be rejected"
5291        );
5292    }
5293
5294    // --- Item 4: provenance warning coverage tests ---
5295
5296    #[test]
5297    fn writer_receipt_warns_on_step_without_source_ref() {
5298        let db = NamedTempFile::new().expect("temporary db");
5299        let writer = WriterActor::start(
5300            db.path(),
5301            Arc::new(SchemaManager::new()),
5302            ProvenanceMode::Warn,
5303            Arc::new(TelemetryCounters::default()),
5304        )
5305        .expect("writer");
5306
5307        // seed a run first so step FK is satisfied
5308        writer
5309            .submit(WriteRequest {
5310                label: "seed-run".to_owned(),
5311                nodes: vec![],
5312                node_retires: vec![],
5313                edges: vec![],
5314                edge_retires: vec![],
5315                chunks: vec![],
5316                runs: vec![RunInsert {
5317                    id: "run-1".to_owned(),
5318                    kind: "session".to_owned(),
5319                    status: "active".to_owned(),
5320                    properties: "{}".to_owned(),
5321                    source_ref: Some("src-1".to_owned()),
5322                    upsert: false,
5323                    supersedes_id: None,
5324                }],
5325                steps: vec![],
5326                actions: vec![],
5327                optional_backfills: vec![],
5328                vec_inserts: vec![],
5329                operational_writes: vec![],
5330            })
5331            .expect("seed run");
5332
5333        let receipt = writer
5334            .submit(WriteRequest {
5335                label: "test".to_owned(),
5336                nodes: vec![],
5337                node_retires: vec![],
5338                edges: vec![],
5339                edge_retires: vec![],
5340                chunks: vec![],
5341                runs: vec![],
5342                steps: vec![StepInsert {
5343                    id: "step-1".to_owned(),
5344                    run_id: "run-1".to_owned(),
5345                    kind: "llm_call".to_owned(),
5346                    status: "completed".to_owned(),
5347                    properties: "{}".to_owned(),
5348                    source_ref: None,
5349                    upsert: false,
5350                    supersedes_id: None,
5351                }],
5352                actions: vec![],
5353                optional_backfills: vec![],
5354                vec_inserts: vec![],
5355                operational_writes: vec![],
5356            })
5357            .expect("write");
5358
5359        assert!(
5360            !receipt.provenance_warnings.is_empty(),
5361            "step insert without source_ref must emit a provenance warning"
5362        );
5363    }
5364
5365    #[test]
5366    fn writer_receipt_warns_on_action_without_source_ref() {
5367        let db = NamedTempFile::new().expect("temporary db");
5368        let writer = WriterActor::start(
5369            db.path(),
5370            Arc::new(SchemaManager::new()),
5371            ProvenanceMode::Warn,
5372            Arc::new(TelemetryCounters::default()),
5373        )
5374        .expect("writer");
5375
5376        // seed run and step so action FK is satisfied
5377        writer
5378            .submit(WriteRequest {
5379                label: "seed".to_owned(),
5380                nodes: vec![],
5381                node_retires: vec![],
5382                edges: vec![],
5383                edge_retires: vec![],
5384                chunks: vec![],
5385                runs: vec![RunInsert {
5386                    id: "run-1".to_owned(),
5387                    kind: "session".to_owned(),
5388                    status: "active".to_owned(),
5389                    properties: "{}".to_owned(),
5390                    source_ref: Some("src-1".to_owned()),
5391                    upsert: false,
5392                    supersedes_id: None,
5393                }],
5394                steps: vec![StepInsert {
5395                    id: "step-1".to_owned(),
5396                    run_id: "run-1".to_owned(),
5397                    kind: "llm_call".to_owned(),
5398                    status: "completed".to_owned(),
5399                    properties: "{}".to_owned(),
5400                    source_ref: Some("src-1".to_owned()),
5401                    upsert: false,
5402                    supersedes_id: None,
5403                }],
5404                actions: vec![],
5405                optional_backfills: vec![],
5406                vec_inserts: vec![],
5407                operational_writes: vec![],
5408            })
5409            .expect("seed");
5410
5411        let receipt = writer
5412            .submit(WriteRequest {
5413                label: "test".to_owned(),
5414                nodes: vec![],
5415                node_retires: vec![],
5416                edges: vec![],
5417                edge_retires: vec![],
5418                chunks: vec![],
5419                runs: vec![],
5420                steps: vec![],
5421                actions: vec![ActionInsert {
5422                    id: "action-1".to_owned(),
5423                    step_id: "step-1".to_owned(),
5424                    kind: "tool_call".to_owned(),
5425                    status: "completed".to_owned(),
5426                    properties: "{}".to_owned(),
5427                    source_ref: None,
5428                    upsert: false,
5429                    supersedes_id: None,
5430                }],
5431                optional_backfills: vec![],
5432                vec_inserts: vec![],
5433                operational_writes: vec![],
5434            })
5435            .expect("write");
5436
5437        assert!(
5438            !receipt.provenance_warnings.is_empty(),
5439            "action insert without source_ref must emit a provenance warning"
5440        );
5441    }
5442
5443    #[test]
5444    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5445        let db = NamedTempFile::new().expect("temporary db");
5446        let writer = WriterActor::start(
5447            db.path(),
5448            Arc::new(SchemaManager::new()),
5449            ProvenanceMode::Warn,
5450            Arc::new(TelemetryCounters::default()),
5451        )
5452        .expect("writer");
5453
5454        let receipt = writer
5455            .submit(WriteRequest {
5456                label: "test".to_owned(),
5457                nodes: vec![NodeInsert {
5458                    row_id: "row-1".to_owned(),
5459                    logical_id: "node-1".to_owned(),
5460                    kind: "Note".to_owned(),
5461                    properties: "{}".to_owned(),
5462                    source_ref: Some("src-1".to_owned()),
5463                    upsert: false,
5464                    chunk_policy: ChunkPolicy::Preserve,
5465                    content_ref: None,
5466                }],
5467                node_retires: vec![],
5468                edges: vec![],
5469                edge_retires: vec![],
5470                chunks: vec![],
5471                runs: vec![RunInsert {
5472                    id: "run-1".to_owned(),
5473                    kind: "session".to_owned(),
5474                    status: "active".to_owned(),
5475                    properties: "{}".to_owned(),
5476                    source_ref: Some("src-1".to_owned()),
5477                    upsert: false,
5478                    supersedes_id: None,
5479                }],
5480                steps: vec![StepInsert {
5481                    id: "step-1".to_owned(),
5482                    run_id: "run-1".to_owned(),
5483                    kind: "llm_call".to_owned(),
5484                    status: "completed".to_owned(),
5485                    properties: "{}".to_owned(),
5486                    source_ref: Some("src-1".to_owned()),
5487                    upsert: false,
5488                    supersedes_id: None,
5489                }],
5490                actions: vec![ActionInsert {
5491                    id: "action-1".to_owned(),
5492                    step_id: "step-1".to_owned(),
5493                    kind: "tool_call".to_owned(),
5494                    status: "completed".to_owned(),
5495                    properties: "{}".to_owned(),
5496                    source_ref: Some("src-1".to_owned()),
5497                    upsert: false,
5498                    supersedes_id: None,
5499                }],
5500                optional_backfills: vec![],
5501                vec_inserts: vec![],
5502                operational_writes: vec![],
5503            })
5504            .expect("write");
5505
5506        assert!(
5507            receipt.provenance_warnings.is_empty(),
5508            "no warnings expected when all types have source_ref; got: {:?}",
5509            receipt.provenance_warnings
5510        );
5511    }
5512
5513    // --- Item 4 Task 2: ProvenanceMode tests ---
5514
5515    #[test]
5516    fn default_provenance_mode_is_warn() {
5517        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5518        let db = NamedTempFile::new().expect("temporary db");
5519        let writer = WriterActor::start(
5520            db.path(),
5521            Arc::new(SchemaManager::new()),
5522            ProvenanceMode::default(),
5523            Arc::new(TelemetryCounters::default()),
5524        )
5525        .expect("writer");
5526
5527        let receipt = writer
5528            .submit(WriteRequest {
5529                label: "test".to_owned(),
5530                nodes: vec![NodeInsert {
5531                    row_id: "row-1".to_owned(),
5532                    logical_id: "node-1".to_owned(),
5533                    kind: "Note".to_owned(),
5534                    properties: "{}".to_owned(),
5535                    source_ref: None,
5536                    upsert: false,
5537                    chunk_policy: ChunkPolicy::Preserve,
5538                    content_ref: None,
5539                }],
5540                node_retires: vec![],
5541                edges: vec![],
5542                edge_retires: vec![],
5543                chunks: vec![],
5544                runs: vec![],
5545                steps: vec![],
5546                actions: vec![],
5547                optional_backfills: vec![],
5548                vec_inserts: vec![],
5549                operational_writes: vec![],
5550            })
5551            .expect("Warn mode must not reject missing source_ref");
5552
5553        assert!(
5554            !receipt.provenance_warnings.is_empty(),
5555            "Warn mode must emit a warning instead of rejecting"
5556        );
5557    }
5558
5559    #[test]
5560    fn require_mode_rejects_node_without_source_ref() {
5561        let db = NamedTempFile::new().expect("temporary db");
5562        let writer = WriterActor::start(
5563            db.path(),
5564            Arc::new(SchemaManager::new()),
5565            ProvenanceMode::Require,
5566            Arc::new(TelemetryCounters::default()),
5567        )
5568        .expect("writer");
5569
5570        let result = writer.submit(WriteRequest {
5571            label: "test".to_owned(),
5572            nodes: vec![NodeInsert {
5573                row_id: "row-1".to_owned(),
5574                logical_id: "node-1".to_owned(),
5575                kind: "Note".to_owned(),
5576                properties: "{}".to_owned(),
5577                source_ref: None,
5578                upsert: false,
5579                chunk_policy: ChunkPolicy::Preserve,
5580                content_ref: None,
5581            }],
5582            node_retires: vec![],
5583            edges: vec![],
5584            edge_retires: vec![],
5585            chunks: vec![],
5586            runs: vec![],
5587            steps: vec![],
5588            actions: vec![],
5589            optional_backfills: vec![],
5590            vec_inserts: vec![],
5591            operational_writes: vec![],
5592        });
5593
5594        assert!(
5595            matches!(result, Err(EngineError::InvalidWrite(_))),
5596            "Require mode must reject node without source_ref"
5597        );
5598    }
5599
5600    #[test]
5601    fn require_mode_accepts_node_with_source_ref() {
5602        let db = NamedTempFile::new().expect("temporary db");
5603        let writer = WriterActor::start(
5604            db.path(),
5605            Arc::new(SchemaManager::new()),
5606            ProvenanceMode::Require,
5607            Arc::new(TelemetryCounters::default()),
5608        )
5609        .expect("writer");
5610
5611        let result = writer.submit(WriteRequest {
5612            label: "test".to_owned(),
5613            nodes: vec![NodeInsert {
5614                row_id: "row-1".to_owned(),
5615                logical_id: "node-1".to_owned(),
5616                kind: "Note".to_owned(),
5617                properties: "{}".to_owned(),
5618                source_ref: Some("src-1".to_owned()),
5619                upsert: false,
5620                chunk_policy: ChunkPolicy::Preserve,
5621                content_ref: None,
5622            }],
5623            node_retires: vec![],
5624            edges: vec![],
5625            edge_retires: vec![],
5626            chunks: vec![],
5627            runs: vec![],
5628            steps: vec![],
5629            actions: vec![],
5630            optional_backfills: vec![],
5631            vec_inserts: vec![],
5632            operational_writes: vec![],
5633        });
5634
5635        assert!(
5636            result.is_ok(),
5637            "Require mode must accept node with source_ref"
5638        );
5639    }
5640
5641    #[test]
5642    fn require_mode_rejects_edge_without_source_ref() {
5643        let db = NamedTempFile::new().expect("temporary db");
5644        let writer = WriterActor::start(
5645            db.path(),
5646            Arc::new(SchemaManager::new()),
5647            ProvenanceMode::Require,
5648            Arc::new(TelemetryCounters::default()),
5649        )
5650        .expect("writer");
5651
5652        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5653        let result = writer.submit(WriteRequest {
5654            label: "test".to_owned(),
5655            nodes: vec![
5656                NodeInsert {
5657                    row_id: "row-a".to_owned(),
5658                    logical_id: "node-a".to_owned(),
5659                    kind: "Note".to_owned(),
5660                    properties: "{}".to_owned(),
5661                    source_ref: Some("src-1".to_owned()),
5662                    upsert: false,
5663                    chunk_policy: ChunkPolicy::Preserve,
5664                    content_ref: None,
5665                },
5666                NodeInsert {
5667                    row_id: "row-b".to_owned(),
5668                    logical_id: "node-b".to_owned(),
5669                    kind: "Note".to_owned(),
5670                    properties: "{}".to_owned(),
5671                    source_ref: Some("src-1".to_owned()),
5672                    upsert: false,
5673                    chunk_policy: ChunkPolicy::Preserve,
5674                    content_ref: None,
5675                },
5676            ],
5677            node_retires: vec![],
5678            edges: vec![EdgeInsert {
5679                row_id: "edge-row-1".to_owned(),
5680                logical_id: "edge-1".to_owned(),
5681                source_logical_id: "node-a".to_owned(),
5682                target_logical_id: "node-b".to_owned(),
5683                kind: "LINKS_TO".to_owned(),
5684                properties: "{}".to_owned(),
5685                source_ref: None,
5686                upsert: false,
5687            }],
5688            edge_retires: vec![],
5689            chunks: vec![],
5690            runs: vec![],
5691            steps: vec![],
5692            actions: vec![],
5693            optional_backfills: vec![],
5694            vec_inserts: vec![],
5695            operational_writes: vec![],
5696        });
5697
5698        assert!(
5699            matches!(result, Err(EngineError::InvalidWrite(_))),
5700            "Require mode must reject edge without source_ref"
5701        );
5702    }
5703
5704    // --- Item 5: FTS projection coverage tests ---
5705
5706    #[test]
5707    fn fts_row_has_correct_kind_from_co_submitted_node() {
5708        let db = NamedTempFile::new().expect("temporary db");
5709        let writer = WriterActor::start(
5710            db.path(),
5711            Arc::new(SchemaManager::new()),
5712            ProvenanceMode::Warn,
5713            Arc::new(TelemetryCounters::default()),
5714        )
5715        .expect("writer");
5716
5717        writer
5718            .submit(WriteRequest {
5719                label: "test".to_owned(),
5720                nodes: vec![NodeInsert {
5721                    row_id: "row-1".to_owned(),
5722                    logical_id: "node-1".to_owned(),
5723                    kind: "Meeting".to_owned(),
5724                    properties: "{}".to_owned(),
5725                    source_ref: Some("src-1".to_owned()),
5726                    upsert: false,
5727                    chunk_policy: ChunkPolicy::Preserve,
5728                    content_ref: None,
5729                }],
5730                node_retires: vec![],
5731                edges: vec![],
5732                edge_retires: vec![],
5733                chunks: vec![ChunkInsert {
5734                    id: "chunk-1".to_owned(),
5735                    node_logical_id: "node-1".to_owned(),
5736                    text_content: "some text".to_owned(),
5737                    byte_start: None,
5738                    byte_end: None,
5739                    content_hash: None,
5740                }],
5741                runs: vec![],
5742                steps: vec![],
5743                actions: vec![],
5744                optional_backfills: vec![],
5745                vec_inserts: vec![],
5746                operational_writes: vec![],
5747            })
5748            .expect("write");
5749
5750        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5751        let kind: String = conn
5752            .query_row(
5753                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5754                [],
5755                |row| row.get(0),
5756            )
5757            .expect("fts row");
5758
5759        assert_eq!(kind, "Meeting");
5760    }
5761
5762    #[test]
5763    fn fts_row_has_correct_text_content() {
5764        let db = NamedTempFile::new().expect("temporary db");
5765        let writer = WriterActor::start(
5766            db.path(),
5767            Arc::new(SchemaManager::new()),
5768            ProvenanceMode::Warn,
5769            Arc::new(TelemetryCounters::default()),
5770        )
5771        .expect("writer");
5772
5773        writer
5774            .submit(WriteRequest {
5775                label: "test".to_owned(),
5776                nodes: vec![NodeInsert {
5777                    row_id: "row-1".to_owned(),
5778                    logical_id: "node-1".to_owned(),
5779                    kind: "Note".to_owned(),
5780                    properties: "{}".to_owned(),
5781                    source_ref: Some("src-1".to_owned()),
5782                    upsert: false,
5783                    chunk_policy: ChunkPolicy::Preserve,
5784                    content_ref: None,
5785                }],
5786                node_retires: vec![],
5787                edges: vec![],
5788                edge_retires: vec![],
5789                chunks: vec![ChunkInsert {
5790                    id: "chunk-1".to_owned(),
5791                    node_logical_id: "node-1".to_owned(),
5792                    text_content: "exactly this text".to_owned(),
5793                    byte_start: None,
5794                    byte_end: None,
5795                    content_hash: None,
5796                }],
5797                runs: vec![],
5798                steps: vec![],
5799                actions: vec![],
5800                optional_backfills: vec![],
5801                vec_inserts: vec![],
5802                operational_writes: vec![],
5803            })
5804            .expect("write");
5805
5806        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5807        let text: String = conn
5808            .query_row(
5809                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5810                [],
5811                |row| row.get(0),
5812            )
5813            .expect("fts row");
5814
5815        assert_eq!(text, "exactly this text");
5816    }
5817
5818    #[test]
5819    fn fts_row_has_correct_kind_from_pre_existing_node() {
5820        let db = NamedTempFile::new().expect("temporary db");
5821        let writer = WriterActor::start(
5822            db.path(),
5823            Arc::new(SchemaManager::new()),
5824            ProvenanceMode::Warn,
5825            Arc::new(TelemetryCounters::default()),
5826        )
5827        .expect("writer");
5828
5829        // Request 1: node only
5830        writer
5831            .submit(WriteRequest {
5832                label: "r1".to_owned(),
5833                nodes: vec![NodeInsert {
5834                    row_id: "row-1".to_owned(),
5835                    logical_id: "node-1".to_owned(),
5836                    kind: "Document".to_owned(),
5837                    properties: "{}".to_owned(),
5838                    source_ref: Some("src-1".to_owned()),
5839                    upsert: false,
5840                    chunk_policy: ChunkPolicy::Preserve,
5841                    content_ref: None,
5842                }],
5843                node_retires: vec![],
5844                edges: vec![],
5845                edge_retires: vec![],
5846                chunks: vec![],
5847                runs: vec![],
5848                steps: vec![],
5849                actions: vec![],
5850                optional_backfills: vec![],
5851                vec_inserts: vec![],
5852                operational_writes: vec![],
5853            })
5854            .expect("r1 write");
5855
5856        // Request 2: chunk for pre-existing node
5857        writer
5858            .submit(WriteRequest {
5859                label: "r2".to_owned(),
5860                nodes: vec![],
5861                node_retires: vec![],
5862                edges: vec![],
5863                edge_retires: vec![],
5864                chunks: vec![ChunkInsert {
5865                    id: "chunk-1".to_owned(),
5866                    node_logical_id: "node-1".to_owned(),
5867                    text_content: "some text".to_owned(),
5868                    byte_start: None,
5869                    byte_end: None,
5870                    content_hash: None,
5871                }],
5872                runs: vec![],
5873                steps: vec![],
5874                actions: vec![],
5875                optional_backfills: vec![],
5876                vec_inserts: vec![],
5877                operational_writes: vec![],
5878            })
5879            .expect("r2 write");
5880
5881        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5882        let kind: String = conn
5883            .query_row(
5884                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5885                [],
5886                |row| row.get(0),
5887            )
5888            .expect("fts row");
5889
5890        assert_eq!(kind, "Document");
5891    }
5892
5893    #[test]
5894    fn fts_derives_rows_for_multiple_chunks_per_node() {
5895        let db = NamedTempFile::new().expect("temporary db");
5896        let writer = WriterActor::start(
5897            db.path(),
5898            Arc::new(SchemaManager::new()),
5899            ProvenanceMode::Warn,
5900            Arc::new(TelemetryCounters::default()),
5901        )
5902        .expect("writer");
5903
5904        writer
5905            .submit(WriteRequest {
5906                label: "test".to_owned(),
5907                nodes: vec![NodeInsert {
5908                    row_id: "row-1".to_owned(),
5909                    logical_id: "node-1".to_owned(),
5910                    kind: "Meeting".to_owned(),
5911                    properties: "{}".to_owned(),
5912                    source_ref: Some("src-1".to_owned()),
5913                    upsert: false,
5914                    chunk_policy: ChunkPolicy::Preserve,
5915                    content_ref: None,
5916                }],
5917                node_retires: vec![],
5918                edges: vec![],
5919                edge_retires: vec![],
5920                chunks: vec![
5921                    ChunkInsert {
5922                        id: "chunk-a".to_owned(),
5923                        node_logical_id: "node-1".to_owned(),
5924                        text_content: "intro".to_owned(),
5925                        byte_start: None,
5926                        byte_end: None,
5927                        content_hash: None,
5928                    },
5929                    ChunkInsert {
5930                        id: "chunk-b".to_owned(),
5931                        node_logical_id: "node-1".to_owned(),
5932                        text_content: "body".to_owned(),
5933                        byte_start: None,
5934                        byte_end: None,
5935                        content_hash: None,
5936                    },
5937                    ChunkInsert {
5938                        id: "chunk-c".to_owned(),
5939                        node_logical_id: "node-1".to_owned(),
5940                        text_content: "conclusion".to_owned(),
5941                        byte_start: None,
5942                        byte_end: None,
5943                        content_hash: None,
5944                    },
5945                ],
5946                runs: vec![],
5947                steps: vec![],
5948                actions: vec![],
5949                optional_backfills: vec![],
5950                vec_inserts: vec![],
5951                operational_writes: vec![],
5952            })
5953            .expect("write");
5954
5955        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5956        let count: i64 = conn
5957            .query_row(
5958                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5959                [],
5960                |row| row.get(0),
5961            )
5962            .expect("fts count");
5963
5964        assert_eq!(count, 3, "three chunks must produce three FTS rows");
5965    }
5966
5967    #[test]
5968    fn fts_resolves_mixed_fast_and_db_paths() {
5969        let db = NamedTempFile::new().expect("temporary db");
5970        let writer = WriterActor::start(
5971            db.path(),
5972            Arc::new(SchemaManager::new()),
5973            ProvenanceMode::Warn,
5974            Arc::new(TelemetryCounters::default()),
5975        )
5976        .expect("writer");
5977
5978        // Seed pre-existing node
5979        writer
5980            .submit(WriteRequest {
5981                label: "seed".to_owned(),
5982                nodes: vec![NodeInsert {
5983                    row_id: "row-existing".to_owned(),
5984                    logical_id: "existing-node".to_owned(),
5985                    kind: "Archive".to_owned(),
5986                    properties: "{}".to_owned(),
5987                    source_ref: Some("src-1".to_owned()),
5988                    upsert: false,
5989                    chunk_policy: ChunkPolicy::Preserve,
5990                    content_ref: None,
5991                }],
5992                node_retires: vec![],
5993                edges: vec![],
5994                edge_retires: vec![],
5995                chunks: vec![],
5996                runs: vec![],
5997                steps: vec![],
5998                actions: vec![],
5999                optional_backfills: vec![],
6000                vec_inserts: vec![],
6001                operational_writes: vec![],
6002            })
6003            .expect("seed");
6004
6005        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
6006        writer
6007            .submit(WriteRequest {
6008                label: "mixed".to_owned(),
6009                nodes: vec![NodeInsert {
6010                    row_id: "row-new".to_owned(),
6011                    logical_id: "new-node".to_owned(),
6012                    kind: "Inbox".to_owned(),
6013                    properties: "{}".to_owned(),
6014                    source_ref: Some("src-2".to_owned()),
6015                    upsert: false,
6016                    chunk_policy: ChunkPolicy::Preserve,
6017                    content_ref: None,
6018                }],
6019                node_retires: vec![],
6020                edges: vec![],
6021                edge_retires: vec![],
6022                chunks: vec![
6023                    ChunkInsert {
6024                        id: "chunk-fast".to_owned(),
6025                        node_logical_id: "new-node".to_owned(),
6026                        text_content: "new content".to_owned(),
6027                        byte_start: None,
6028                        byte_end: None,
6029                        content_hash: None,
6030                    },
6031                    ChunkInsert {
6032                        id: "chunk-db".to_owned(),
6033                        node_logical_id: "existing-node".to_owned(),
6034                        text_content: "archive content".to_owned(),
6035                        byte_start: None,
6036                        byte_end: None,
6037                        content_hash: None,
6038                    },
6039                ],
6040                runs: vec![],
6041                steps: vec![],
6042                actions: vec![],
6043                optional_backfills: vec![],
6044                vec_inserts: vec![],
6045                operational_writes: vec![],
6046            })
6047            .expect("mixed write");
6048
6049        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6050        let fast_kind: String = conn
6051            .query_row(
6052                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6053                [],
6054                |row| row.get(0),
6055            )
6056            .expect("fast path fts row");
6057        let db_kind: String = conn
6058            .query_row(
6059                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6060                [],
6061                |row| row.get(0),
6062            )
6063            .expect("db path fts row");
6064
6065        assert_eq!(fast_kind, "Inbox");
6066        assert_eq!(db_kind, "Archive");
6067    }
6068
6069    #[test]
6070    fn prepare_write_rejects_empty_chunk_text() {
6071        let db = NamedTempFile::new().expect("temporary db");
6072        let writer = WriterActor::start(
6073            db.path(),
6074            Arc::new(SchemaManager::new()),
6075            ProvenanceMode::Warn,
6076            Arc::new(TelemetryCounters::default()),
6077        )
6078        .expect("writer");
6079
6080        let result = writer.submit(WriteRequest {
6081            label: "test".to_owned(),
6082            nodes: vec![NodeInsert {
6083                row_id: "row-1".to_owned(),
6084                logical_id: "node-1".to_owned(),
6085                kind: "Note".to_owned(),
6086                properties: "{}".to_owned(),
6087                source_ref: None,
6088                upsert: false,
6089                chunk_policy: ChunkPolicy::Preserve,
6090                content_ref: None,
6091            }],
6092            node_retires: vec![],
6093            edges: vec![],
6094            edge_retires: vec![],
6095            chunks: vec![ChunkInsert {
6096                id: "chunk-1".to_owned(),
6097                node_logical_id: "node-1".to_owned(),
6098                text_content: String::new(),
6099                byte_start: None,
6100                byte_end: None,
6101                content_hash: None,
6102            }],
6103            runs: vec![],
6104            steps: vec![],
6105            actions: vec![],
6106            optional_backfills: vec![],
6107            vec_inserts: vec![],
6108            operational_writes: vec![],
6109        });
6110
6111        assert!(
6112            matches!(result, Err(EngineError::InvalidWrite(_))),
6113            "empty text_content must be rejected"
6114        );
6115    }
6116
6117    #[test]
6118    fn receipt_reports_zero_backfills_when_none_submitted() {
6119        let db = NamedTempFile::new().expect("temporary db");
6120        let writer = WriterActor::start(
6121            db.path(),
6122            Arc::new(SchemaManager::new()),
6123            ProvenanceMode::Warn,
6124            Arc::new(TelemetryCounters::default()),
6125        )
6126        .expect("writer");
6127
6128        let receipt = writer
6129            .submit(WriteRequest {
6130                label: "test".to_owned(),
6131                nodes: vec![NodeInsert {
6132                    row_id: "row-1".to_owned(),
6133                    logical_id: "node-1".to_owned(),
6134                    kind: "Note".to_owned(),
6135                    properties: "{}".to_owned(),
6136                    source_ref: Some("src-1".to_owned()),
6137                    upsert: false,
6138                    chunk_policy: ChunkPolicy::Preserve,
6139                    content_ref: None,
6140                }],
6141                node_retires: vec![],
6142                edges: vec![],
6143                edge_retires: vec![],
6144                chunks: vec![],
6145                runs: vec![],
6146                steps: vec![],
6147                actions: vec![],
6148                optional_backfills: vec![],
6149                vec_inserts: vec![],
6150                operational_writes: vec![],
6151            })
6152            .expect("write");
6153
6154        assert_eq!(receipt.optional_backfill_count, 0);
6155    }
6156
6157    #[test]
6158    fn receipt_reports_correct_backfill_count() {
6159        let db = NamedTempFile::new().expect("temporary db");
6160        let writer = WriterActor::start(
6161            db.path(),
6162            Arc::new(SchemaManager::new()),
6163            ProvenanceMode::Warn,
6164            Arc::new(TelemetryCounters::default()),
6165        )
6166        .expect("writer");
6167
6168        let receipt = writer
6169            .submit(WriteRequest {
6170                label: "test".to_owned(),
6171                nodes: vec![NodeInsert {
6172                    row_id: "row-1".to_owned(),
6173                    logical_id: "node-1".to_owned(),
6174                    kind: "Note".to_owned(),
6175                    properties: "{}".to_owned(),
6176                    source_ref: Some("src-1".to_owned()),
6177                    upsert: false,
6178                    chunk_policy: ChunkPolicy::Preserve,
6179                    content_ref: None,
6180                }],
6181                node_retires: vec![],
6182                edges: vec![],
6183                edge_retires: vec![],
6184                chunks: vec![],
6185                runs: vec![],
6186                steps: vec![],
6187                actions: vec![],
6188                optional_backfills: vec![
6189                    OptionalProjectionTask {
6190                        target: ProjectionTarget::Fts,
6191                        payload: "p1".to_owned(),
6192                    },
6193                    OptionalProjectionTask {
6194                        target: ProjectionTarget::Vec,
6195                        payload: "p2".to_owned(),
6196                    },
6197                    OptionalProjectionTask {
6198                        target: ProjectionTarget::All,
6199                        payload: "p3".to_owned(),
6200                    },
6201                ],
6202                vec_inserts: vec![],
6203                operational_writes: vec![],
6204            })
6205            .expect("write");
6206
6207        assert_eq!(receipt.optional_backfill_count, 3);
6208    }
6209
6210    #[test]
6211    fn backfill_tasks_are_not_executed_during_write() {
6212        let db = NamedTempFile::new().expect("temporary db");
6213        let writer = WriterActor::start(
6214            db.path(),
6215            Arc::new(SchemaManager::new()),
6216            ProvenanceMode::Warn,
6217            Arc::new(TelemetryCounters::default()),
6218        )
6219        .expect("writer");
6220
6221        // Write a node + chunk. Submit a backfill task targeting FTS.
6222        // The write path must not create any extra FTS rows beyond the required one.
6223        writer
6224            .submit(WriteRequest {
6225                label: "test".to_owned(),
6226                nodes: vec![NodeInsert {
6227                    row_id: "row-1".to_owned(),
6228                    logical_id: "node-1".to_owned(),
6229                    kind: "Note".to_owned(),
6230                    properties: "{}".to_owned(),
6231                    source_ref: Some("src-1".to_owned()),
6232                    upsert: false,
6233                    chunk_policy: ChunkPolicy::Preserve,
6234                    content_ref: None,
6235                }],
6236                node_retires: vec![],
6237                edges: vec![],
6238                edge_retires: vec![],
6239                chunks: vec![ChunkInsert {
6240                    id: "chunk-1".to_owned(),
6241                    node_logical_id: "node-1".to_owned(),
6242                    text_content: "required text".to_owned(),
6243                    byte_start: None,
6244                    byte_end: None,
6245                    content_hash: None,
6246                }],
6247                runs: vec![],
6248                steps: vec![],
6249                actions: vec![],
6250                optional_backfills: vec![OptionalProjectionTask {
6251                    target: ProjectionTarget::Fts,
6252                    payload: "backfill-payload".to_owned(),
6253                }],
6254                vec_inserts: vec![],
6255                operational_writes: vec![],
6256            })
6257            .expect("write");
6258
6259        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6260        let count: i64 = conn
6261            .query_row(
6262                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6263                [],
6264                |row| row.get(0),
6265            )
6266            .expect("fts count");
6267
6268        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6269    }
6270
6271    #[test]
6272    fn fts_row_uses_new_kind_after_node_replace() {
6273        let db = NamedTempFile::new().expect("temporary db");
6274        let writer = WriterActor::start(
6275            db.path(),
6276            Arc::new(SchemaManager::new()),
6277            ProvenanceMode::Warn,
6278            Arc::new(TelemetryCounters::default()),
6279        )
6280        .expect("writer");
6281
6282        // Write original node as "Note"
6283        writer
6284            .submit(WriteRequest {
6285                label: "v1".to_owned(),
6286                nodes: vec![NodeInsert {
6287                    row_id: "row-1".to_owned(),
6288                    logical_id: "node-1".to_owned(),
6289                    kind: "Note".to_owned(),
6290                    properties: "{}".to_owned(),
6291                    source_ref: Some("src-1".to_owned()),
6292                    upsert: false,
6293                    chunk_policy: ChunkPolicy::Preserve,
6294                    content_ref: None,
6295                }],
6296                node_retires: vec![],
6297                edges: vec![],
6298                edge_retires: vec![],
6299                chunks: vec![ChunkInsert {
6300                    id: "chunk-v1".to_owned(),
6301                    node_logical_id: "node-1".to_owned(),
6302                    text_content: "original".to_owned(),
6303                    byte_start: None,
6304                    byte_end: None,
6305                    content_hash: None,
6306                }],
6307                runs: vec![],
6308                steps: vec![],
6309                actions: vec![],
6310                optional_backfills: vec![],
6311                vec_inserts: vec![],
6312                operational_writes: vec![],
6313            })
6314            .expect("v1 write");
6315
6316        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
6317        writer
6318            .submit(WriteRequest {
6319                label: "v2".to_owned(),
6320                nodes: vec![NodeInsert {
6321                    row_id: "row-2".to_owned(),
6322                    logical_id: "node-1".to_owned(),
6323                    kind: "Meeting".to_owned(),
6324                    properties: "{}".to_owned(),
6325                    source_ref: Some("src-2".to_owned()),
6326                    upsert: true,
6327                    chunk_policy: ChunkPolicy::Replace,
6328                    content_ref: None,
6329                }],
6330                node_retires: vec![],
6331                edges: vec![],
6332                edge_retires: vec![],
6333                chunks: vec![ChunkInsert {
6334                    id: "chunk-v2".to_owned(),
6335                    node_logical_id: "node-1".to_owned(),
6336                    text_content: "updated".to_owned(),
6337                    byte_start: None,
6338                    byte_end: None,
6339                    content_hash: None,
6340                }],
6341                runs: vec![],
6342                steps: vec![],
6343                actions: vec![],
6344                optional_backfills: vec![],
6345                vec_inserts: vec![],
6346                operational_writes: vec![],
6347            })
6348            .expect("v2 write");
6349
6350        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6351
6352        // Old FTS row must be gone
6353        let old_count: i64 = conn
6354            .query_row(
6355                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6356                [],
6357                |row| row.get(0),
6358            )
6359            .expect("old fts count");
6360        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6361
6362        // New FTS row must use the new kind
6363        let new_kind: String = conn
6364            .query_row(
6365                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6366                [],
6367                |row| row.get(0),
6368            )
6369            .expect("new fts row");
6370        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6371    }
6372
6373    // --- Item 3: VecInsert tests ---
6374
6375    #[test]
6376    fn vec_insert_empty_chunk_id_is_rejected() {
6377        let db = NamedTempFile::new().expect("temporary db");
6378        let writer = WriterActor::start(
6379            db.path(),
6380            Arc::new(SchemaManager::new()),
6381            ProvenanceMode::Warn,
6382            Arc::new(TelemetryCounters::default()),
6383        )
6384        .expect("writer");
6385        let result = writer.submit(WriteRequest {
6386            label: "vec-test".to_owned(),
6387            nodes: vec![],
6388            node_retires: vec![],
6389            edges: vec![],
6390            edge_retires: vec![],
6391            chunks: vec![],
6392            runs: vec![],
6393            steps: vec![],
6394            actions: vec![],
6395            optional_backfills: vec![],
6396            vec_inserts: vec![VecInsert {
6397                chunk_id: String::new(),
6398                embedding: vec![0.1, 0.2, 0.3],
6399            }],
6400            operational_writes: vec![],
6401        });
6402        assert!(
6403            matches!(result, Err(EngineError::InvalidWrite(_))),
6404            "empty chunk_id in VecInsert must be rejected"
6405        );
6406    }
6407
6408    #[test]
6409    fn vec_insert_empty_embedding_is_rejected() {
6410        let db = NamedTempFile::new().expect("temporary db");
6411        let writer = WriterActor::start(
6412            db.path(),
6413            Arc::new(SchemaManager::new()),
6414            ProvenanceMode::Warn,
6415            Arc::new(TelemetryCounters::default()),
6416        )
6417        .expect("writer");
6418        let result = writer.submit(WriteRequest {
6419            label: "vec-test".to_owned(),
6420            nodes: vec![],
6421            node_retires: vec![],
6422            edges: vec![],
6423            edge_retires: vec![],
6424            chunks: vec![],
6425            runs: vec![],
6426            steps: vec![],
6427            actions: vec![],
6428            optional_backfills: vec![],
6429            vec_inserts: vec![VecInsert {
6430                chunk_id: "chunk-1".to_owned(),
6431                embedding: vec![],
6432            }],
6433            operational_writes: vec![],
6434        });
6435        assert!(
6436            matches!(result, Err(EngineError::InvalidWrite(_))),
6437            "empty embedding in VecInsert must be rejected"
6438        );
6439    }
6440
6441    #[test]
6442    fn vec_insert_noop_without_feature() {
6443        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6444        // (no error) but not write to any vec table.
6445        let db = NamedTempFile::new().expect("temporary db");
6446        let writer = WriterActor::start(
6447            db.path(),
6448            Arc::new(SchemaManager::new()),
6449            ProvenanceMode::Warn,
6450            Arc::new(TelemetryCounters::default()),
6451        )
6452        .expect("writer");
6453        let result = writer.submit(WriteRequest {
6454            label: "vec-noop".to_owned(),
6455            nodes: vec![],
6456            node_retires: vec![],
6457            edges: vec![],
6458            edge_retires: vec![],
6459            chunks: vec![],
6460            runs: vec![],
6461            steps: vec![],
6462            actions: vec![],
6463            optional_backfills: vec![],
6464            vec_inserts: vec![VecInsert {
6465                chunk_id: "chunk-noop".to_owned(),
6466                embedding: vec![1.0, 2.0, 3.0],
6467            }],
6468            operational_writes: vec![],
6469        });
6470        #[cfg(not(feature = "sqlite-vec"))]
6471        result.expect("noop VecInsert without feature must succeed");
6472        // The result variable is used above; silence unused warning for cfg-on path.
6473        #[cfg(feature = "sqlite-vec")]
6474        let _ = result;
6475    }
6476
6477    #[cfg(feature = "sqlite-vec")]
6478    #[test]
6479    fn node_retire_preserves_vec_rows_for_later_restore() {
6480        use crate::sqlite::open_connection_with_vec;
6481
6482        let db = NamedTempFile::new().expect("temporary db");
6483        let schema_manager = Arc::new(SchemaManager::new());
6484
6485        {
6486            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6487            schema_manager.bootstrap(&conn).expect("bootstrap");
6488            schema_manager
6489                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6490                .expect("ensure profile");
6491        }
6492
6493        let writer = WriterActor::start(
6494            db.path(),
6495            Arc::clone(&schema_manager),
6496            ProvenanceMode::Warn,
6497            Arc::new(TelemetryCounters::default()),
6498        )
6499        .expect("writer");
6500
6501        // Insert node + chunk + vec row
6502        writer
6503            .submit(WriteRequest {
6504                label: "setup".to_owned(),
6505                nodes: vec![NodeInsert {
6506                    row_id: "row-retire-vec".to_owned(),
6507                    logical_id: "node-retire-vec".to_owned(),
6508                    kind: "Doc".to_owned(),
6509                    properties: "{}".to_owned(),
6510                    source_ref: Some("src".to_owned()),
6511                    upsert: false,
6512                    chunk_policy: ChunkPolicy::Preserve,
6513                    content_ref: None,
6514                }],
6515                node_retires: vec![],
6516                edges: vec![],
6517                edge_retires: vec![],
6518                chunks: vec![ChunkInsert {
6519                    id: "chunk-retire-vec".to_owned(),
6520                    node_logical_id: "node-retire-vec".to_owned(),
6521                    text_content: "text".to_owned(),
6522                    byte_start: None,
6523                    byte_end: None,
6524                    content_hash: None,
6525                }],
6526                runs: vec![],
6527                steps: vec![],
6528                actions: vec![],
6529                optional_backfills: vec![],
6530                vec_inserts: vec![VecInsert {
6531                    chunk_id: "chunk-retire-vec".to_owned(),
6532                    embedding: vec![0.1, 0.2, 0.3],
6533                }],
6534                operational_writes: vec![],
6535            })
6536            .expect("setup write");
6537
6538        // Retire the node
6539        writer
6540            .submit(WriteRequest {
6541                label: "retire".to_owned(),
6542                nodes: vec![],
6543                node_retires: vec![NodeRetire {
6544                    logical_id: "node-retire-vec".to_owned(),
6545                    source_ref: Some("src".to_owned()),
6546                }],
6547                edges: vec![],
6548                edge_retires: vec![],
6549                chunks: vec![],
6550                runs: vec![],
6551                steps: vec![],
6552                actions: vec![],
6553                optional_backfills: vec![],
6554                vec_inserts: vec![],
6555                operational_writes: vec![],
6556            })
6557            .expect("retire write");
6558
6559        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6560        let count: i64 = conn
6561            .query_row(
6562                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6563                [],
6564                |row| row.get(0),
6565            )
6566            .expect("count");
6567        assert_eq!(
6568            count, 1,
6569            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6570        );
6571    }
6572
6573    #[cfg(feature = "sqlite-vec")]
6574    #[test]
6575    #[allow(clippy::too_many_lines)]
6576    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6577        use crate::sqlite::open_connection_with_vec;
6578
6579        let db = NamedTempFile::new().expect("temporary db");
6580        let schema_manager = Arc::new(SchemaManager::new());
6581
6582        {
6583            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6584            schema_manager.bootstrap(&conn).expect("bootstrap");
6585            schema_manager
6586                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6587                .expect("ensure profile");
6588        }
6589
6590        let writer = WriterActor::start(
6591            db.path(),
6592            Arc::clone(&schema_manager),
6593            ProvenanceMode::Warn,
6594            Arc::new(TelemetryCounters::default()),
6595        )
6596        .expect("writer");
6597
6598        // Insert node + chunk-A + vec-A
6599        writer
6600            .submit(WriteRequest {
6601                label: "v1".to_owned(),
6602                nodes: vec![NodeInsert {
6603                    row_id: "row-replace-v1".to_owned(),
6604                    logical_id: "node-replace-vec".to_owned(),
6605                    kind: "Doc".to_owned(),
6606                    properties: "{}".to_owned(),
6607                    source_ref: Some("src".to_owned()),
6608                    upsert: false,
6609                    chunk_policy: ChunkPolicy::Preserve,
6610                    content_ref: None,
6611                }],
6612                node_retires: vec![],
6613                edges: vec![],
6614                edge_retires: vec![],
6615                chunks: vec![ChunkInsert {
6616                    id: "chunk-replace-A".to_owned(),
6617                    node_logical_id: "node-replace-vec".to_owned(),
6618                    text_content: "version one".to_owned(),
6619                    byte_start: None,
6620                    byte_end: None,
6621                    content_hash: None,
6622                }],
6623                runs: vec![],
6624                steps: vec![],
6625                actions: vec![],
6626                optional_backfills: vec![],
6627                vec_inserts: vec![VecInsert {
6628                    chunk_id: "chunk-replace-A".to_owned(),
6629                    embedding: vec![0.1, 0.2, 0.3],
6630                }],
6631                operational_writes: vec![],
6632            })
6633            .expect("v1 write");
6634
6635        // Upsert with Replace + chunk-B + vec-B
6636        writer
6637            .submit(WriteRequest {
6638                label: "v2".to_owned(),
6639                nodes: vec![NodeInsert {
6640                    row_id: "row-replace-v2".to_owned(),
6641                    logical_id: "node-replace-vec".to_owned(),
6642                    kind: "Doc".to_owned(),
6643                    properties: "{}".to_owned(),
6644                    source_ref: Some("src".to_owned()),
6645                    upsert: true,
6646                    chunk_policy: ChunkPolicy::Replace,
6647                    content_ref: None,
6648                }],
6649                node_retires: vec![],
6650                edges: vec![],
6651                edge_retires: vec![],
6652                chunks: vec![ChunkInsert {
6653                    id: "chunk-replace-B".to_owned(),
6654                    node_logical_id: "node-replace-vec".to_owned(),
6655                    text_content: "version two".to_owned(),
6656                    byte_start: None,
6657                    byte_end: None,
6658                    content_hash: None,
6659                }],
6660                runs: vec![],
6661                steps: vec![],
6662                actions: vec![],
6663                optional_backfills: vec![],
6664                vec_inserts: vec![VecInsert {
6665                    chunk_id: "chunk-replace-B".to_owned(),
6666                    embedding: vec![0.4, 0.5, 0.6],
6667                }],
6668                operational_writes: vec![],
6669            })
6670            .expect("v2 write");
6671
6672        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6673        let count_a: i64 = conn
6674            .query_row(
6675                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6676                [],
6677                |row| row.get(0),
6678            )
6679            .expect("count A");
6680        let count_b: i64 = conn
6681            .query_row(
6682                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6683                [],
6684                |row| row.get(0),
6685            )
6686            .expect("count B");
6687        assert_eq!(
6688            count_a, 0,
6689            "old vec row (chunk-A) must be deleted on Replace"
6690        );
6691        assert_eq!(
6692            count_b, 1,
6693            "new vec row (chunk-B) must be present after Replace"
6694        );
6695    }
6696
6697    #[cfg(feature = "sqlite-vec")]
6698    #[test]
6699    fn vec_insert_is_persisted_when_feature_enabled() {
6700        use crate::sqlite::open_connection_with_vec;
6701
6702        let db = NamedTempFile::new().expect("temporary db");
6703        let schema_manager = Arc::new(SchemaManager::new());
6704
6705        // Open a vec-capable connection and bootstrap + ensure profile
6706        {
6707            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6708            schema_manager.bootstrap(&conn).expect("bootstrap");
6709            schema_manager
6710                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6711                .expect("ensure profile");
6712        }
6713
6714        let writer = WriterActor::start(
6715            db.path(),
6716            Arc::clone(&schema_manager),
6717            ProvenanceMode::Warn,
6718            Arc::new(TelemetryCounters::default()),
6719        )
6720        .expect("writer");
6721
6722        writer
6723            .submit(WriteRequest {
6724                label: "vec-insert".to_owned(),
6725                nodes: vec![],
6726                node_retires: vec![],
6727                edges: vec![],
6728                edge_retires: vec![],
6729                chunks: vec![],
6730                runs: vec![],
6731                steps: vec![],
6732                actions: vec![],
6733                optional_backfills: vec![],
6734                vec_inserts: vec![VecInsert {
6735                    chunk_id: "chunk-vec".to_owned(),
6736                    embedding: vec![0.1, 0.2, 0.3],
6737                }],
6738                operational_writes: vec![],
6739            })
6740            .expect("vec insert write");
6741
6742        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6743        let count: i64 = conn
6744            .query_row(
6745                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6746                [],
6747                |row| row.get(0),
6748            )
6749            .expect("count");
6750        assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6751    }
6752
6753    // --- WriteRequest size validation tests ---
6754
6755    #[test]
6756    fn write_request_exceeding_node_limit_is_rejected() {
6757        let nodes: Vec<NodeInsert> = (0..10_001)
6758            .map(|i| NodeInsert {
6759                row_id: format!("row-{i}"),
6760                logical_id: format!("lg-{i}"),
6761                kind: "Note".to_owned(),
6762                properties: "{}".to_owned(),
6763                source_ref: None,
6764                upsert: false,
6765                chunk_policy: ChunkPolicy::Preserve,
6766                content_ref: None,
6767            })
6768            .collect();
6769
6770        let request = WriteRequest {
6771            label: "too-many-nodes".to_owned(),
6772            nodes,
6773            node_retires: vec![],
6774            edges: vec![],
6775            edge_retires: vec![],
6776            chunks: vec![],
6777            runs: vec![],
6778            steps: vec![],
6779            actions: vec![],
6780            optional_backfills: vec![],
6781            vec_inserts: vec![],
6782            operational_writes: vec![],
6783        };
6784
6785        let result = prepare_write(request, ProvenanceMode::Warn)
6786            .map(|_| ())
6787            .map_err(|e| format!("{e}"));
6788        assert!(
6789            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6790            "exceeding node limit must return InvalidWrite: got {result:?}"
6791        );
6792    }
6793
6794    #[test]
6795    fn write_request_exceeding_total_limit_is_rejected() {
6796        // Spread items across fields to exceed 100_000 total
6797        // without exceeding any single per-field limit.
6798        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
6799        let request = WriteRequest {
6800            label: "too-many-total".to_owned(),
6801            nodes: (0..10_000)
6802                .map(|i| NodeInsert {
6803                    row_id: format!("row-{i}"),
6804                    logical_id: format!("lg-{i}"),
6805                    kind: "Note".to_owned(),
6806                    properties: "{}".to_owned(),
6807                    source_ref: None,
6808                    upsert: false,
6809                    chunk_policy: ChunkPolicy::Preserve,
6810                    content_ref: None,
6811                })
6812                .collect(),
6813            node_retires: vec![],
6814            edges: (0..10_000)
6815                .map(|i| EdgeInsert {
6816                    row_id: format!("edge-row-{i}"),
6817                    logical_id: format!("edge-lg-{i}"),
6818                    kind: "link".to_owned(),
6819                    source_logical_id: format!("lg-{i}"),
6820                    target_logical_id: format!("lg-{}", i + 1),
6821                    properties: "{}".to_owned(),
6822                    source_ref: None,
6823                    upsert: false,
6824                })
6825                .collect(),
6826            edge_retires: vec![],
6827            chunks: (0..50_000)
6828                .map(|i| ChunkInsert {
6829                    id: format!("chunk-{i}"),
6830                    node_logical_id: "lg-0".to_owned(),
6831                    text_content: "text".to_owned(),
6832                    byte_start: None,
6833                    byte_end: None,
6834                    content_hash: None,
6835                })
6836                .collect(),
6837            runs: vec![],
6838            steps: vec![],
6839            actions: vec![],
6840            optional_backfills: vec![],
6841            vec_inserts: (0..20_001)
6842                .map(|i| VecInsert {
6843                    chunk_id: format!("vec-chunk-{i}"),
6844                    embedding: vec![0.1],
6845                })
6846                .collect(),
6847            operational_writes: (0..10_000)
6848                .map(|i| OperationalWrite::Append {
6849                    collection: format!("col-{i}"),
6850                    record_key: format!("key-{i}"),
6851                    payload_json: "{}".to_owned(),
6852                    source_ref: None,
6853                })
6854                .collect(),
6855        };
6856
6857        let result = prepare_write(request, ProvenanceMode::Warn)
6858            .map(|_| ())
6859            .map_err(|e| format!("{e}"));
6860        assert!(
6861            matches!(result, Err(ref msg) if msg.contains("too many total items")),
6862            "exceeding total item limit must return InvalidWrite: got {result:?}"
6863        );
6864    }
6865
6866    #[test]
6867    fn write_request_within_limits_succeeds() {
6868        let db = NamedTempFile::new().expect("temporary db");
6869        let writer = WriterActor::start(
6870            db.path(),
6871            Arc::new(SchemaManager::new()),
6872            ProvenanceMode::Warn,
6873            Arc::new(TelemetryCounters::default()),
6874        )
6875        .expect("writer");
6876
6877        let result = writer.submit(WriteRequest {
6878            label: "within-limits".to_owned(),
6879            nodes: vec![NodeInsert {
6880                row_id: "row-1".to_owned(),
6881                logical_id: "lg-1".to_owned(),
6882                kind: "Note".to_owned(),
6883                properties: "{}".to_owned(),
6884                source_ref: None,
6885                upsert: false,
6886                chunk_policy: ChunkPolicy::Preserve,
6887                content_ref: None,
6888            }],
6889            node_retires: vec![],
6890            edges: vec![],
6891            edge_retires: vec![],
6892            chunks: vec![],
6893            runs: vec![],
6894            steps: vec![],
6895            actions: vec![],
6896            optional_backfills: vec![],
6897            vec_inserts: vec![],
6898            operational_writes: vec![],
6899        });
6900
6901        assert!(
6902            result.is_ok(),
6903            "write request within limits must succeed: got {result:?}"
6904        );
6905    }
6906
6907    #[test]
6908    fn property_fts_rows_created_on_node_insert() {
6909        let db = NamedTempFile::new().expect("temporary db");
6910        // Bootstrap and register a property schema before starting the writer.
6911        let schema = Arc::new(SchemaManager::new());
6912        {
6913            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6914            schema.bootstrap(&conn).expect("bootstrap");
6915            conn.execute(
6916                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6917                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6918                [],
6919            )
6920            .expect("register schema");
6921        }
6922        let writer = WriterActor::start(
6923            db.path(),
6924            Arc::clone(&schema),
6925            ProvenanceMode::Warn,
6926            Arc::new(TelemetryCounters::default()),
6927        )
6928        .expect("writer");
6929
6930        writer
6931            .submit(WriteRequest {
6932                label: "goal-insert".to_owned(),
6933                nodes: vec![NodeInsert {
6934                    row_id: "row-1".to_owned(),
6935                    logical_id: "goal-1".to_owned(),
6936                    kind: "Goal".to_owned(),
6937                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6938                        .to_owned(),
6939                    source_ref: Some("src-1".to_owned()),
6940                    upsert: false,
6941                    chunk_policy: ChunkPolicy::Preserve,
6942                    content_ref: None,
6943                }],
6944                node_retires: vec![],
6945                edges: vec![],
6946                edge_retires: vec![],
6947                chunks: vec![],
6948                runs: vec![],
6949                steps: vec![],
6950                actions: vec![],
6951                optional_backfills: vec![],
6952                vec_inserts: vec![],
6953                operational_writes: vec![],
6954            })
6955            .expect("write");
6956
6957        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6958        let text: String = conn
6959            .query_row(
6960                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6961                [],
6962                |row| row.get(0),
6963            )
6964            .expect("property FTS row must exist");
6965        assert_eq!(text, "Ship v2 Launch the redesign");
6966    }
6967
6968    #[test]
6969    fn property_fts_rows_replaced_on_upsert() {
6970        let db = NamedTempFile::new().expect("temporary db");
6971        let schema = Arc::new(SchemaManager::new());
6972        {
6973            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6974            schema.bootstrap(&conn).expect("bootstrap");
6975            conn.execute(
6976                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6977                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6978                [],
6979            )
6980            .expect("register schema");
6981        }
6982        let writer = WriterActor::start(
6983            db.path(),
6984            Arc::clone(&schema),
6985            ProvenanceMode::Warn,
6986            Arc::new(TelemetryCounters::default()),
6987        )
6988        .expect("writer");
6989
6990        // Initial insert.
6991        writer
6992            .submit(WriteRequest {
6993                label: "insert".to_owned(),
6994                nodes: vec![NodeInsert {
6995                    row_id: "row-1".to_owned(),
6996                    logical_id: "goal-1".to_owned(),
6997                    kind: "Goal".to_owned(),
6998                    properties: r#"{"name":"Alpha"}"#.to_owned(),
6999                    source_ref: Some("src-1".to_owned()),
7000                    upsert: false,
7001                    chunk_policy: ChunkPolicy::Preserve,
7002                    content_ref: None,
7003                }],
7004                node_retires: vec![],
7005                edges: vec![],
7006                edge_retires: vec![],
7007                chunks: vec![],
7008                runs: vec![],
7009                steps: vec![],
7010                actions: vec![],
7011                optional_backfills: vec![],
7012                vec_inserts: vec![],
7013                operational_writes: vec![],
7014            })
7015            .expect("insert");
7016
7017        // Upsert with changed property.
7018        writer
7019            .submit(WriteRequest {
7020                label: "upsert".to_owned(),
7021                nodes: vec![NodeInsert {
7022                    row_id: "row-2".to_owned(),
7023                    logical_id: "goal-1".to_owned(),
7024                    kind: "Goal".to_owned(),
7025                    properties: r#"{"name":"Beta"}"#.to_owned(),
7026                    source_ref: Some("src-2".to_owned()),
7027                    upsert: true,
7028                    chunk_policy: ChunkPolicy::Preserve,
7029                    content_ref: None,
7030                }],
7031                node_retires: vec![],
7032                edges: vec![],
7033                edge_retires: vec![],
7034                chunks: vec![],
7035                runs: vec![],
7036                steps: vec![],
7037                actions: vec![],
7038                optional_backfills: vec![],
7039                vec_inserts: vec![],
7040                operational_writes: vec![],
7041            })
7042            .expect("upsert");
7043
7044        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7045        let count: i64 = conn
7046            .query_row(
7047                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7048                [],
7049                |row| row.get(0),
7050            )
7051            .expect("count");
7052        assert_eq!(
7053            count, 1,
7054            "must have exactly one property FTS row after upsert"
7055        );
7056
7057        let text: String = conn
7058            .query_row(
7059                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7060                [],
7061                |row| row.get(0),
7062            )
7063            .expect("text");
7064        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7065    }
7066
7067    #[test]
7068    fn property_fts_rows_deleted_on_retire() {
7069        let db = NamedTempFile::new().expect("temporary db");
7070        let schema = Arc::new(SchemaManager::new());
7071        {
7072            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7073            schema.bootstrap(&conn).expect("bootstrap");
7074            conn.execute(
7075                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7076                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7077                [],
7078            )
7079            .expect("register schema");
7080        }
7081        let writer = WriterActor::start(
7082            db.path(),
7083            Arc::clone(&schema),
7084            ProvenanceMode::Warn,
7085            Arc::new(TelemetryCounters::default()),
7086        )
7087        .expect("writer");
7088
7089        // Insert a node.
7090        writer
7091            .submit(WriteRequest {
7092                label: "insert".to_owned(),
7093                nodes: vec![NodeInsert {
7094                    row_id: "row-1".to_owned(),
7095                    logical_id: "goal-1".to_owned(),
7096                    kind: "Goal".to_owned(),
7097                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7098                    source_ref: Some("src-1".to_owned()),
7099                    upsert: false,
7100                    chunk_policy: ChunkPolicy::Preserve,
7101                    content_ref: None,
7102                }],
7103                node_retires: vec![],
7104                edges: vec![],
7105                edge_retires: vec![],
7106                chunks: vec![],
7107                runs: vec![],
7108                steps: vec![],
7109                actions: vec![],
7110                optional_backfills: vec![],
7111                vec_inserts: vec![],
7112                operational_writes: vec![],
7113            })
7114            .expect("insert");
7115
7116        // Retire the node.
7117        writer
7118            .submit(WriteRequest {
7119                label: "retire".to_owned(),
7120                nodes: vec![],
7121                node_retires: vec![NodeRetire {
7122                    logical_id: "goal-1".to_owned(),
7123                    source_ref: Some("forget-1".to_owned()),
7124                }],
7125                edges: vec![],
7126                edge_retires: vec![],
7127                chunks: vec![],
7128                runs: vec![],
7129                steps: vec![],
7130                actions: vec![],
7131                optional_backfills: vec![],
7132                vec_inserts: vec![],
7133                operational_writes: vec![],
7134            })
7135            .expect("retire");
7136
7137        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7138        let count: i64 = conn
7139            .query_row(
7140                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7141                [],
7142                |row| row.get(0),
7143            )
7144            .expect("count");
7145        assert_eq!(count, 0, "property FTS row must be deleted on retire");
7146    }
7147
7148    #[test]
7149    fn no_property_fts_row_for_unregistered_kind() {
7150        let db = NamedTempFile::new().expect("temporary db");
7151        let schema = Arc::new(SchemaManager::new());
7152        {
7153            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7154            schema.bootstrap(&conn).expect("bootstrap");
7155            // No schema registered for "Note" kind.
7156        }
7157        let writer = WriterActor::start(
7158            db.path(),
7159            Arc::clone(&schema),
7160            ProvenanceMode::Warn,
7161            Arc::new(TelemetryCounters::default()),
7162        )
7163        .expect("writer");
7164
7165        writer
7166            .submit(WriteRequest {
7167                label: "insert".to_owned(),
7168                nodes: vec![NodeInsert {
7169                    row_id: "row-1".to_owned(),
7170                    logical_id: "note-1".to_owned(),
7171                    kind: "Note".to_owned(),
7172                    properties: r#"{"title":"hello"}"#.to_owned(),
7173                    source_ref: Some("src-1".to_owned()),
7174                    upsert: false,
7175                    chunk_policy: ChunkPolicy::Preserve,
7176                    content_ref: None,
7177                }],
7178                node_retires: vec![],
7179                edges: vec![],
7180                edge_retires: vec![],
7181                chunks: vec![],
7182                runs: vec![],
7183                steps: vec![],
7184                actions: vec![],
7185                optional_backfills: vec![],
7186                vec_inserts: vec![],
7187                operational_writes: vec![],
7188            })
7189            .expect("insert");
7190
7191        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7192        let count: i64 = conn
7193            .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
7194                row.get(0)
7195            })
7196            .expect("count");
7197        assert_eq!(count, 0, "no property FTS rows for unregistered kind");
7198    }
7199
7200    mod extract_json_path_tests {
7201        use super::super::extract_json_path;
7202        use serde_json::json;
7203
7204        #[test]
7205        fn string_value() {
7206            let v = json!({"name": "alice"});
7207            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7208        }
7209
7210        #[test]
7211        fn number_value() {
7212            let v = json!({"age": 42});
7213            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7214        }
7215
7216        #[test]
7217        fn bool_value() {
7218            let v = json!({"active": true});
7219            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7220        }
7221
7222        #[test]
7223        fn null_value() {
7224            let v = json!({"x": null});
7225            assert!(extract_json_path(&v, "$.x").is_empty());
7226        }
7227
7228        #[test]
7229        fn missing_path() {
7230            let v = json!({"name": "a"});
7231            assert!(extract_json_path(&v, "$.missing").is_empty());
7232        }
7233
7234        #[test]
7235        fn nested_path() {
7236            let v = json!({"address": {"city": "NYC"}});
7237            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7238        }
7239
7240        #[test]
7241        fn array_of_strings() {
7242            let v = json!({"tags": ["a", "b", "c"]});
7243            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7244        }
7245
7246        #[test]
7247        fn array_mixed_scalars() {
7248            let v = json!({"vals": ["x", 1, true]});
7249            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7250        }
7251
7252        #[test]
7253        fn array_only_objects_returns_empty() {
7254            let v = json!({"data": [{"k": "v"}]});
7255            assert!(extract_json_path(&v, "$.data").is_empty());
7256        }
7257
7258        #[test]
7259        fn array_mixed_objects_and_scalars() {
7260            let v = json!({"data": ["keep", {"skip": true}, "also"]});
7261            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7262        }
7263
7264        #[test]
7265        fn object_returns_empty() {
7266            let v = json!({"meta": {"k": "v"}});
7267            assert!(extract_json_path(&v, "$.meta").is_empty());
7268        }
7269
7270        #[test]
7271        fn no_prefix_returns_empty() {
7272            let v = json!({"name": "a"});
7273            assert!(extract_json_path(&v, "name").is_empty());
7274        }
7275    }
7276
7277    mod recursive_extraction_tests {
7278        use super::super::{
7279            LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7280            PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7281        };
7282        use serde_json::json;
7283
7284        fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7285            PropertyFtsSchema {
7286                paths,
7287                separator: " ".to_owned(),
7288                exclude_paths: Vec::new(),
7289            }
7290        }
7291
7292        fn schema_with_excludes(
7293            paths: Vec<PropertyPathEntry>,
7294            excludes: Vec<String>,
7295        ) -> PropertyFtsSchema {
7296            PropertyFtsSchema {
7297                paths,
7298                separator: " ".to_owned(),
7299                exclude_paths: excludes,
7300            }
7301        }
7302
7303        #[test]
7304        fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7305            let props = json!({"payload": {"b": "two", "a": "one"}});
7306            let (blob, positions, _stats) = extract_property_fts(
7307                &props,
7308                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7309            );
7310            let blob = blob.expect("blob emitted");
7311            let idx_one = blob.find("one").expect("contains 'one'");
7312            let idx_two = blob.find("two").expect("contains 'two'");
7313            assert!(
7314                idx_one < idx_two,
7315                "lex order: 'one' (key a) before 'two' (key b)"
7316            );
7317            assert_eq!(positions.len(), 2);
7318            assert_eq!(positions[0].leaf_path, "$.payload.a");
7319            assert_eq!(positions[1].leaf_path, "$.payload.b");
7320        }
7321
7322        #[test]
7323        fn recursive_extraction_walks_arrays_of_scalars() {
7324            let props = json!({"tags": ["red", "blue"]});
7325            let (_blob, positions, _stats) = extract_property_fts(
7326                &props,
7327                &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7328            );
7329            assert_eq!(positions.len(), 2);
7330            assert_eq!(positions[0].leaf_path, "$.tags[0]");
7331            assert_eq!(positions[1].leaf_path, "$.tags[1]");
7332        }
7333
7334        #[test]
7335        fn recursive_extraction_walks_arrays_of_objects() {
7336            let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7337            let (_blob, positions, _stats) = extract_property_fts(
7338                &props,
7339                &schema(vec![PropertyPathEntry::recursive("$.items")]),
7340            );
7341            assert_eq!(positions.len(), 2);
7342            assert_eq!(positions[0].leaf_path, "$.items[0].name");
7343            assert_eq!(positions[1].leaf_path, "$.items[1].name");
7344        }
7345
7346        #[test]
7347        fn recursive_extraction_stringifies_numbers_and_bools() {
7348            let props = json!({"root": {"n": 42, "ok": true}});
7349            let (blob, _positions, _stats) = extract_property_fts(
7350                &props,
7351                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7352            );
7353            let blob = blob.expect("blob emitted");
7354            assert!(blob.contains("42"));
7355            assert!(blob.contains("true"));
7356        }
7357
7358        #[test]
7359        fn recursive_extraction_skips_nulls_and_missing() {
7360            let props = json!({"root": {"x": null, "y": "present"}});
7361            let (blob, positions, _stats) = extract_property_fts(
7362                &props,
7363                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7364            );
7365            let blob = blob.expect("blob emitted");
7366            assert!(!blob.contains("null"));
7367            assert_eq!(positions.len(), 1);
7368            assert_eq!(positions[0].leaf_path, "$.root.y");
7369        }
7370
7371        #[test]
7372        fn recursive_extraction_respects_max_depth_guardrail() {
7373            // Build nested object 10 levels deep. MAX_RECURSIVE_DEPTH = 8.
7374            let mut inner = json!("leaf-value");
7375            for _ in 0..10 {
7376                inner = json!({ "k": inner });
7377            }
7378            let props = json!({ "root": inner });
7379            let (blob, positions, stats) = extract_property_fts(
7380                &props,
7381                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7382            );
7383            assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7384            // The walk should stop before reaching the leaf (depth 8).
7385            assert!(
7386                blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7387                "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7388            );
7389            // Row is still indexed with whatever was emitted — even if
7390            // that's nothing. The contract is only that the walk clamped.
7391            let _ = positions;
7392            let _ = MAX_RECURSIVE_DEPTH;
7393        }
7394
7395        #[test]
7396        fn recursive_extraction_respects_max_bytes_guardrail() {
7397            // Build an array of 4KB strings; total blob > 64KB.
7398            let leaves: Vec<String> = (0..40)
7399                .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7400                .collect();
7401            let props = json!({ "root": leaves });
7402            let (blob, positions, stats) = extract_property_fts(
7403                &props,
7404                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7405            );
7406            assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7407            let blob = blob.expect("blob must still be emitted");
7408            assert!(
7409                blob.len() <= MAX_EXTRACTED_BYTES,
7410                "blob must not exceed MAX_EXTRACTED_BYTES"
7411            );
7412            // No partial leaves: every position end must fall inside blob length.
7413            for pos in &positions {
7414                assert!(pos.end_offset <= blob.len());
7415                let slice = &blob[pos.start_offset..pos.end_offset];
7416                // Slice must equal some original leaf value; we only
7417                // verify it's not empty and doesn't straddle a separator.
7418                assert!(!slice.is_empty());
7419                assert!(!slice.contains(LEAF_SEPARATOR));
7420            }
7421            // Blob cannot end in a dangling separator.
7422            assert!(!blob.ends_with(LEAF_SEPARATOR));
7423        }
7424
7425        #[test]
7426        fn recursive_extraction_respects_exclude_paths() {
7427            let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7428            let (blob, positions, stats) = extract_property_fts(
7429                &props,
7430                &schema_with_excludes(
7431                    vec![PropertyPathEntry::recursive("$.payload")],
7432                    vec!["$.payload.priv".to_owned()],
7433                ),
7434            );
7435            let blob = blob.expect("blob emitted");
7436            assert!(blob.contains("yes"));
7437            assert!(!blob.contains("no"));
7438            assert_eq!(positions.len(), 1);
7439            assert_eq!(positions[0].leaf_path, "$.payload.pub");
7440            assert!(stats.excluded_subtree > 0);
7441        }
7442
7443        #[test]
7444        fn position_map_entries_match_emitted_leaves_in_order() {
7445            let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7446            let (blob, positions, _stats) = extract_property_fts(
7447                &props,
7448                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7449            );
7450            let blob = blob.expect("blob emitted");
7451            assert_eq!(positions.len(), 3);
7452            // Leaf paths in lexicographic order.
7453            assert_eq!(positions[0].leaf_path, "$.root.a");
7454            assert_eq!(positions[1].leaf_path, "$.root.b");
7455            assert_eq!(positions[2].leaf_path, "$.root.c");
7456            // Monotonic, non-overlapping offsets.
7457            let mut prev_end: usize = 0;
7458            for (i, pos) in positions.iter().enumerate() {
7459                assert!(pos.start_offset >= prev_end);
7460                assert!(pos.end_offset > pos.start_offset);
7461                assert!(pos.end_offset <= blob.len());
7462                let slice = &blob[pos.start_offset..pos.end_offset];
7463                match i {
7464                    0 => assert_eq!(slice, "alpha"),
7465                    1 => assert_eq!(slice, "bravo"),
7466                    2 => assert_eq!(slice, "charlie"),
7467                    _ => unreachable!(),
7468                }
7469                prev_end = pos.end_offset;
7470            }
7471        }
7472
7473        #[test]
7474        fn scalar_only_schema_produces_empty_position_map() {
7475            let props = json!({"name": "alpha", "title": "beta"});
7476            let (blob, positions, _stats) = extract_property_fts(
7477                &props,
7478                &schema(vec![
7479                    PropertyPathEntry::scalar("$.name"),
7480                    PropertyPathEntry::scalar("$.title"),
7481                ]),
7482            );
7483            assert_eq!(blob.as_deref(), Some("alpha beta"));
7484            assert!(
7485                positions.is_empty(),
7486                "scalar-only schema must emit no position entries"
7487            );
7488            // Sanity-check the type so the test fails loudly if the
7489            // signature changes.
7490            let _: Vec<PositionEntry> = positions;
7491        }
7492
7493        fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7494            let mut seen = std::collections::HashSet::new();
7495            for pos in positions {
7496                let start = pos.start_offset;
7497                assert!(
7498                    seen.insert(start),
7499                    "duplicate start_offset {start} in positions {positions:?}"
7500                );
7501            }
7502        }
7503
7504        #[test]
7505        fn recursive_extraction_empty_then_nonempty_in_array() {
7506            let props = json!({"payload": {"xs": ["", "x"]}});
7507            let (combined, positions, _stats) = extract_property_fts(
7508                &props,
7509                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7510            );
7511            assert_eq!(combined.as_deref(), Some("x"));
7512            assert_eq!(positions.len(), 1);
7513            assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7514            assert_eq!(positions[0].start_offset, 0);
7515            assert_eq!(positions[0].end_offset, 1);
7516            assert_unique_start_offsets(&positions);
7517        }
7518
7519        #[test]
7520        fn recursive_extraction_two_empties_then_nonempty_in_array() {
7521            let props = json!({"payload": {"xs": ["", "", "x"]}});
7522            let (combined, positions, _stats) = extract_property_fts(
7523                &props,
7524                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7525            );
7526            assert_eq!(combined.as_deref(), Some("x"));
7527            assert_eq!(positions.len(), 1);
7528            assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7529            assert_eq!(positions[0].start_offset, 0);
7530            assert_eq!(positions[0].end_offset, 1);
7531            assert_unique_start_offsets(&positions);
7532        }
7533
7534        #[test]
7535        fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7536            let props = json!({"payload": {"a": "", "b": "x"}});
7537            let (combined, positions, _stats) = extract_property_fts(
7538                &props,
7539                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7540            );
7541            assert_eq!(combined.as_deref(), Some("x"));
7542            assert_eq!(positions.len(), 1);
7543            assert_eq!(positions[0].leaf_path, "$.payload.b");
7544            assert_eq!(positions[0].start_offset, 0);
7545            assert_eq!(positions[0].end_offset, 1);
7546            assert_unique_start_offsets(&positions);
7547        }
7548
7549        #[test]
7550        fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7551            let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7552            let (combined, positions, _stats) = extract_property_fts(
7553                &props,
7554                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7555            );
7556            assert_eq!(combined.as_deref(), Some("x"));
7557            assert_eq!(positions.len(), 1);
7558            assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7559            assert_eq!(positions[0].start_offset, 0);
7560            assert_eq!(positions[0].end_offset, 1);
7561            assert_unique_start_offsets(&positions);
7562        }
7563
7564        #[test]
7565        fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7566            let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7567            let (combined, positions, _stats) = extract_property_fts(
7568                &props,
7569                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7570            );
7571            assert_eq!(combined.as_deref(), Some("x"));
7572            assert_eq!(positions.len(), 1);
7573            assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7574            assert_eq!(positions[0].start_offset, 0);
7575            assert_eq!(positions[0].end_offset, 1);
7576            assert_unique_start_offsets(&positions);
7577        }
7578
7579        #[test]
7580        fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7581            let cases = vec![
7582                json!({"payload": {}}),
7583                json!({"payload": {"a": ""}}),
7584                json!({"payload": {"xs": []}}),
7585                json!({"payload": {"xs": [""]}}),
7586                json!({"payload": {"xs": ["", ""]}}),
7587                json!({"payload": {"xs": ["", "", ""]}}),
7588            ];
7589            for case in cases {
7590                let (combined, positions, _stats) = extract_property_fts(
7591                    &case,
7592                    &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7593                );
7594                assert!(
7595                    combined.is_none(),
7596                    "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7597                );
7598                assert!(
7599                    positions.is_empty(),
7600                    "all-empty payload {case:?} must produce no positions, got {positions:?}"
7601                );
7602            }
7603        }
7604
7605        #[test]
7606        fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7607            let props = json!({"payload": {"xs": ["x", "", "y"]}});
7608            let (combined, positions, _stats) = extract_property_fts(
7609                &props,
7610                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7611            );
7612            let blob = combined.expect("blob emitted");
7613            assert_eq!(positions.len(), 2);
7614            assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7615            assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7616            assert_eq!(positions[0].start_offset, 0);
7617            assert_eq!(positions[0].end_offset, 1);
7618            // The two non-empty leaves are separated by exactly one
7619            // LEAF_SEPARATOR; the empty middle leaf contributes neither
7620            // bytes nor a position.
7621            assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7622            assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7623            assert_eq!(
7624                &blob[positions[0].start_offset..positions[0].end_offset],
7625                "x"
7626            );
7627            assert_eq!(
7628                &blob[positions[1].start_offset..positions[1].end_offset],
7629                "y"
7630            );
7631            assert_unique_start_offsets(&positions);
7632        }
7633
7634        #[test]
7635        fn recursive_extraction_null_leaves_unchanged() {
7636            let props = json!({"payload": {"xs": [null, null]}});
7637            let (combined, positions, _stats) = extract_property_fts(
7638                &props,
7639                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7640            );
7641            assert!(combined.is_none());
7642            assert!(positions.is_empty());
7643        }
7644    }
7645}