Skip to main content

fathomdb_engine/
writer.rs

1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16    OperationalValidationMode, extract_secondary_index_entries_for_current,
17    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18    parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23/// A deferred projection backfill task submitted alongside a write.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26    /// Which projection to backfill (FTS, vec, etc.).
27    pub target: ProjectionTarget,
28    /// JSON payload describing the backfill work.
29    pub payload: String,
30}
31
32/// Policy for handling existing chunks when upserting a node.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35    /// Keep existing chunks and FTS rows.
36    #[default]
37    Preserve,
38    /// Delete existing chunks and FTS rows before inserting new ones.
39    Replace,
40}
41
42/// Controls how missing `source_ref` values are handled at write time.
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
46    #[default]
47    Warn,
48    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
49    /// insert or retire is missing `source_ref`.
50    Require,
51}
52
53/// A node to be inserted in a [`WriteRequest`].
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56    pub row_id: String,
57    pub logical_id: String,
58    pub kind: String,
59    pub properties: String,
60    pub source_ref: Option<String>,
61    /// When true the writer supersedes the current active row for this `logical_id`
62    /// before inserting this new version. The supersession and insert are atomic.
63    pub upsert: bool,
64    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
65    pub chunk_policy: ChunkPolicy,
66    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
67    pub content_ref: Option<String>,
68}
69
70/// An edge to be inserted in a [`WriteRequest`].
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73    pub row_id: String,
74    pub logical_id: String,
75    pub source_logical_id: String,
76    pub target_logical_id: String,
77    pub kind: String,
78    pub properties: String,
79    pub source_ref: Option<String>,
80    /// When true the writer supersedes the current active edge for this `logical_id`
81    /// before inserting this new version. The supersession and insert are atomic.
82    pub upsert: bool,
83}
84
85/// A node to be retired (soft-deleted) in a [`WriteRequest`].
86#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88    pub logical_id: String,
89    pub source_ref: Option<String>,
90}
91
92/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95    pub logical_id: String,
96    pub source_ref: Option<String>,
97}
98
99/// A text chunk to be inserted in a [`WriteRequest`].
100#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102    pub id: String,
103    pub node_logical_id: String,
104    pub text_content: String,
105    pub byte_start: Option<i64>,
106    pub byte_end: Option<i64>,
107    /// Optional hash of the external content this chunk was derived from.
108    pub content_hash: Option<String>,
109}
110
111/// A vector embedding to attach to an existing chunk.
112///
113/// The `chunk_id` must reference a chunk already present in the database or
114/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
115/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
116/// without the feature the insert is silently skipped.
117#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119    pub chunk_id: String,
120    pub embedding: Vec<f32>,
121}
122
123/// A mutation to an operational collection submitted as part of a write request.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126    Append {
127        collection: String,
128        record_key: String,
129        payload_json: String,
130        source_ref: Option<String>,
131    },
132    Put {
133        collection: String,
134        record_key: String,
135        payload_json: String,
136        source_ref: Option<String>,
137    },
138    Delete {
139        collection: String,
140        record_key: String,
141        source_ref: Option<String>,
142    },
143}
144
145/// A run to be inserted in a [`WriteRequest`].
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148    pub id: String,
149    pub kind: String,
150    pub status: String,
151    pub properties: String,
152    pub source_ref: Option<String>,
153    pub upsert: bool,
154    pub supersedes_id: Option<String>,
155}
156
157/// A step to be inserted in a [`WriteRequest`].
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160    pub id: String,
161    pub run_id: String,
162    pub kind: String,
163    pub status: String,
164    pub properties: String,
165    pub source_ref: Option<String>,
166    pub upsert: bool,
167    pub supersedes_id: Option<String>,
168}
169
170/// An action to be inserted in a [`WriteRequest`].
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173    pub id: String,
174    pub step_id: String,
175    pub kind: String,
176    pub status: String,
177    pub properties: String,
178    pub source_ref: Option<String>,
179    pub upsert: bool,
180    pub supersedes_id: Option<String>,
181}
182
183// --- Write-request size limits ---
184const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
193///
194/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
195/// writer thread is **not** cancelled, so the write may still commit.  This is
196/// intentionally distinct from [`EngineError::WriterRejected`], which signals
197/// that the writer thread has disconnected and the write will **not** commit.
198const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
201#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203    pub label: String,
204    pub nodes: Vec<NodeInsert>,
205    pub node_retires: Vec<NodeRetire>,
206    pub edges: Vec<EdgeInsert>,
207    pub edge_retires: Vec<EdgeRetire>,
208    pub chunks: Vec<ChunkInsert>,
209    pub runs: Vec<RunInsert>,
210    pub steps: Vec<StepInsert>,
211    pub actions: Vec<ActionInsert>,
212    pub optional_backfills: Vec<OptionalProjectionTask>,
213    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
214    /// `sqlite-vec` feature is absent.
215    pub vec_inserts: Vec<VecInsert>,
216    pub operational_writes: Vec<OperationalWrite>,
217}
218
219/// Receipt returned after a successful write transaction.
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222    pub label: String,
223    pub optional_backfill_count: usize,
224    pub warnings: Vec<String>,
225    pub provenance_warnings: Vec<String>,
226}
227
228/// Request to update `last_accessed_at` timestamps for a batch of nodes.
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231    pub logical_ids: Vec<String>,
232    pub touched_at: i64,
233    pub source_ref: Option<String>,
234}
235
236/// Report from a last-access touch operation.
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239    pub touched_logical_ids: usize,
240    pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245    chunk_id: String,
246    node_logical_id: String,
247    kind: String,
248    text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253    node_logical_id: String,
254    kind: String,
255    text_content: String,
256    positions: Vec<PositionEntry>,
257}
258
259/// Maximum depth the recursive property-FTS extraction walk descends before
260/// clamping. A walk that reaches this depth will not emit leaves below it.
261pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
262
263/// Maximum blob size the recursive property-FTS extraction walk will emit.
264/// When the next complete leaf would push the blob past this cap, the walk
265/// stops on the preceding leaf boundary — the existing blob is indexed, the
266/// truncated leaf is not partially emitted.
267///
268/// This budget applies only to the recursive-walk portion of the blob.
269/// Scalar-prefix emissions and the scalar↔recursive [`LEAF_SEPARATOR`] token
270/// are not counted against it; the cap governs how much nested-subtree
271/// content is serialized, not the final blob length.
272pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
273
274/// Hard phrase-break separator inserted between two adjacent recursive
275/// leaves in the concatenated blob.
276///
277/// FTS5 phrase queries match on consecutive token positions, so simply
278/// inserting non-token whitespace or control characters does NOT prevent a
279/// phrase like `"foo bar"` from matching when `foo` ends one leaf and
280/// `bar` starts the next — the tokenizer would discard the control
281/// character and the two content tokens would still be adjacent.
282///
283/// To create a real position gap we embed a sentinel *token*
284/// `fathomdbphrasebreaksentinel` between leaves. Under `porter
285/// unicode61 remove_diacritics 2` this survives tokenization as its own
286/// position, so the content tokens on either side are separated by at
287/// least one intervening position and phrase queries spanning the gap
288/// cannot match. The sentinel is long, lowercase, and obviously
289/// synthetic to minimize the chance of accidental collisions with real
290/// content. It remains searchable as a word — callers should avoid
291/// passing it to `text_search` directly.
292///
293/// Validated by the integration test
294/// `leaf_separator_is_hard_phrase_break_under_unicode61_porter`.
295///
296/// Note: because the sentinel is a real token it may appear in FTS5
297/// `snippet()` output when the snippet window spans a leaf boundary.
298/// Phase 5 presentation-layer snippet post-processing is responsible for
299/// stripping it before the snippet is surfaced to callers.
300pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
301
302/// Whether a registered property-FTS path extracts a single scalar value
303/// or recursively walks the subtree rooted at the path.
304#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
305pub(crate) enum PropertyPathMode {
306    /// Legacy scalar extraction — resolve the path, append the scalar
307    /// (flattening arrays of scalars as before Phase 4).
308    #[default]
309    Scalar,
310    /// Recursively walk scalar leaves rooted at the path, emitting one
311    /// leaf per scalar with position-map tracking.
312    Recursive,
313}
314
315/// A single registered property-FTS path with its extraction mode.
316#[derive(Clone, Debug, PartialEq, Eq)]
317pub(crate) struct PropertyPathEntry {
318    pub path: String,
319    pub mode: PropertyPathMode,
320}
321
322impl PropertyPathEntry {
323    pub(crate) fn scalar(path: impl Into<String>) -> Self {
324        Self {
325            path: path.into(),
326            mode: PropertyPathMode::Scalar,
327        }
328    }
329
330    #[cfg(test)]
331    pub(crate) fn recursive(path: impl Into<String>) -> Self {
332        Self {
333            path: path.into(),
334            mode: PropertyPathMode::Recursive,
335        }
336    }
337}
338
339/// Parsed property-FTS schema definition for a single kind.
340#[derive(Clone, Debug, PartialEq, Eq)]
341pub(crate) struct PropertyFtsSchema {
342    /// Property paths to extract. Each entry is matched exactly against
343    /// the JSON tree from the node's root; see [`PropertyPathMode`].
344    pub paths: Vec<PropertyPathEntry>,
345    /// Separator inserted between adjacent leaf values in the extracted
346    /// blob. Acts as a hard phrase break under FTS5.
347    pub separator: String,
348    /// JSON paths to skip during recursive extraction. Matched as an
349    /// *exact* path equality — the walk visits each object/array before
350    /// descending into it, so listing `$.x.y` suppresses the entire
351    /// subtree rooted at `$.x.y`. Prefix matching is NOT supported: an
352    /// exclude of `$.x` does not implicitly exclude `$.x.y`, and
353    /// `$.payload.priv` does not implicitly exclude `$.payload.priv.inner`
354    /// via prefix — the walker will still descend into any subtree that
355    /// is not itself listed exactly in `exclude_paths`.
356    pub exclude_paths: Vec<String>,
357}
358
359/// One position-map entry: a half-open `[start, end)` byte range within the
360/// extracted blob plus the `JSONPath` of the scalar leaf whose value occupies
361/// that range.
362#[derive(Clone, Debug, PartialEq, Eq)]
363pub(crate) struct PositionEntry {
364    pub start_offset: usize,
365    pub end_offset: usize,
366    pub leaf_path: String,
367}
368
369/// Per-kind extraction stats accumulated by a recursive walk. Surface-level
370/// Phase 4 logging only; Phase 5 may expose these to callers.
371#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
372pub(crate) struct ExtractStats {
373    pub depth_cap_hit: usize,
374    /// `true` once the extracted blob has reached `MAX_EXTRACTED_BYTES` and
375    /// the walker has stopped accepting additional leaves. Unlike the
376    /// depth counter this is a boolean: the walker's `stopped` guard
377    /// blocks further `emit_leaf` calls after the first truncation.
378    pub byte_cap_reached: bool,
379    pub excluded_subtree: usize,
380}
381
382impl ExtractStats {
383    fn merge(&mut self, other: ExtractStats) {
384        self.depth_cap_hit += other.depth_cap_hit;
385        self.byte_cap_reached |= other.byte_cap_reached;
386        self.excluded_subtree += other.excluded_subtree;
387    }
388}
389
390struct PreparedWrite {
391    label: String,
392    nodes: Vec<NodeInsert>,
393    node_retires: Vec<NodeRetire>,
394    edges: Vec<EdgeInsert>,
395    edge_retires: Vec<EdgeRetire>,
396    chunks: Vec<ChunkInsert>,
397    runs: Vec<RunInsert>,
398    steps: Vec<StepInsert>,
399    actions: Vec<ActionInsert>,
400    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
401    /// consumed by the cfg-gated apply block.
402    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
403    vec_inserts: Vec<VecInsert>,
404    operational_writes: Vec<OperationalWrite>,
405    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
406    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
407    operational_validation_warnings: Vec<String>,
408    /// `node_logical_id` → kind for nodes co-submitted in this request.
409    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
410    node_kinds: HashMap<String, String>,
411    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
412    required_fts_rows: Vec<FtsProjectionRow>,
413    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
414    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
415    optional_backfills: Vec<OptionalProjectionTask>,
416}
417
418enum WriteMessage {
419    Submit {
420        prepared: Box<PreparedWrite>,
421        reply: Sender<Result<WriteReceipt, EngineError>>,
422    },
423    TouchLastAccessed {
424        request: LastAccessTouchRequest,
425        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
426    },
427}
428
429/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
430///
431/// On drop, the channel is closed and the writer thread is joined, ensuring all
432/// in-flight writes complete and the `SQLite` connection closes cleanly.
433#[derive(Debug)]
434pub struct WriterActor {
435    sender: ManuallyDrop<SyncSender<WriteMessage>>,
436    thread_handle: Option<thread::JoinHandle<()>>,
437    provenance_mode: ProvenanceMode,
438    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
439    _telemetry: Arc<TelemetryCounters>,
440}
441
442impl WriterActor {
443    /// # Errors
444    /// Returns [`EngineError`] if the writer thread cannot be spawned.
445    pub fn start(
446        path: impl AsRef<Path>,
447        schema_manager: Arc<SchemaManager>,
448        provenance_mode: ProvenanceMode,
449        telemetry: Arc<TelemetryCounters>,
450    ) -> Result<Self, EngineError> {
451        let database_path = path.as_ref().to_path_buf();
452        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
453
454        let writer_telemetry = Arc::clone(&telemetry);
455        let handle = thread::Builder::new()
456            .name("fathomdb-writer".to_owned())
457            .spawn(move || {
458                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
459            })
460            .map_err(EngineError::Io)?;
461
462        Ok(Self {
463            sender: ManuallyDrop::new(sender),
464            thread_handle: Some(handle),
465            provenance_mode,
466            _telemetry: telemetry,
467        })
468    }
469
470    /// Returns `true` if the writer thread is still running.
471    fn is_thread_alive(&self) -> bool {
472        self.thread_handle
473            .as_ref()
474            .is_some_and(|h| !h.is_finished())
475    }
476
477    /// Returns `WriterRejected` if the writer thread has exited.
478    fn check_thread_alive(&self) -> Result<(), EngineError> {
479        if self.is_thread_alive() {
480            Ok(())
481        } else {
482            Err(EngineError::WriterRejected(
483                "writer thread has exited".to_owned(),
484            ))
485        }
486    }
487
488    /// # Errors
489    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
490    /// down, or the underlying `SQLite` transaction fails.
491    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
492        self.check_thread_alive()?;
493        let prepared = prepare_write(request, self.provenance_mode)?;
494        let (reply_tx, reply_rx) = mpsc::channel();
495        self.sender
496            .send(WriteMessage::Submit {
497                prepared: Box::new(prepared),
498                reply: reply_tx,
499            })
500            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
501
502        recv_with_timeout(&reply_rx)
503    }
504
505    /// # Errors
506    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
507    /// or the underlying `SQLite` transaction fails.
508    pub fn touch_last_accessed(
509        &self,
510        request: LastAccessTouchRequest,
511    ) -> Result<LastAccessTouchReport, EngineError> {
512        self.check_thread_alive()?;
513        prepare_touch_last_accessed(&request, self.provenance_mode)?;
514        let (reply_tx, reply_rx) = mpsc::channel();
515        self.sender
516            .send(WriteMessage::TouchLastAccessed {
517                request,
518                reply: reply_tx,
519            })
520            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
521
522        recv_with_timeout(&reply_rx)
523    }
524}
525
526#[cfg(not(feature = "tracing"))]
527#[allow(clippy::print_stderr)]
528fn stderr_panic_notice() {
529    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
530}
531
532impl Drop for WriterActor {
533    fn drop(&mut self) {
534        // Phase 1: close the channel so the writer thread's `for msg in receiver`
535        // loop exits after finishing any in-progress message.
536        // Must happen BEFORE join to avoid deadlock.
537        //
538        // SAFETY: drop is called exactly once, and no method accesses the sender
539        // after drop begins.
540        unsafe { ManuallyDrop::drop(&mut self.sender) };
541
542        // Phase 2: join the writer thread to ensure the SQLite connection closes.
543        if let Some(handle) = self.thread_handle.take() {
544            match handle.join() {
545                Ok(()) => {}
546                Err(payload) => {
547                    if std::thread::panicking() {
548                        trace_warn!(
549                            "writer thread panicked during shutdown (suppressed: already panicking)"
550                        );
551                        #[cfg(not(feature = "tracing"))]
552                        stderr_panic_notice();
553                    } else {
554                        std::panic::resume_unwind(payload);
555                    }
556                }
557            }
558        }
559    }
560}
561
562/// Wait for a reply from the writer thread, with a timeout.
563fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
564    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
565        .map_err(|error| match error {
566            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
567                "write timed out waiting for writer thread reply — the write may still commit"
568                    .to_owned(),
569            ),
570            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
571        })
572        .and_then(|result| result)
573}
574
575fn prepare_touch_last_accessed(
576    request: &LastAccessTouchRequest,
577    mode: ProvenanceMode,
578) -> Result<(), EngineError> {
579    if request.logical_ids.is_empty() {
580        return Err(EngineError::InvalidWrite(
581            "touch_last_accessed requires at least one logical_id".to_owned(),
582        ));
583    }
584    for logical_id in &request.logical_ids {
585        if logical_id.trim().is_empty() {
586            return Err(EngineError::InvalidWrite(
587                "touch_last_accessed requires non-empty logical_ids".to_owned(),
588            ));
589        }
590    }
591    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
592        return Err(EngineError::InvalidWrite(
593            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
594                .to_owned(),
595        ));
596    }
597    Ok(())
598}
599
600fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
601    let missing: Vec<String> = request
602        .nodes
603        .iter()
604        .filter(|n| n.source_ref.is_none())
605        .map(|n| format!("node '{}'", n.logical_id))
606        .chain(
607            request
608                .node_retires
609                .iter()
610                .filter(|r| r.source_ref.is_none())
611                .map(|r| format!("node retire '{}'", r.logical_id)),
612        )
613        .chain(
614            request
615                .edges
616                .iter()
617                .filter(|e| e.source_ref.is_none())
618                .map(|e| format!("edge '{}'", e.logical_id)),
619        )
620        .chain(
621            request
622                .edge_retires
623                .iter()
624                .filter(|r| r.source_ref.is_none())
625                .map(|r| format!("edge retire '{}'", r.logical_id)),
626        )
627        .chain(
628            request
629                .runs
630                .iter()
631                .filter(|r| r.source_ref.is_none())
632                .map(|r| format!("run '{}'", r.id)),
633        )
634        .chain(
635            request
636                .steps
637                .iter()
638                .filter(|s| s.source_ref.is_none())
639                .map(|s| format!("step '{}'", s.id)),
640        )
641        .chain(
642            request
643                .actions
644                .iter()
645                .filter(|a| a.source_ref.is_none())
646                .map(|a| format!("action '{}'", a.id)),
647        )
648        .chain(
649            request
650                .operational_writes
651                .iter()
652                .filter(|write| operational_write_source_ref(write).is_none())
653                .map(|write| {
654                    format!(
655                        "operational {} '{}:{}'",
656                        operational_write_kind(write),
657                        operational_write_collection(write),
658                        operational_write_record_key(write)
659                    )
660                }),
661        )
662        .collect();
663
664    if missing.is_empty() {
665        Ok(())
666    } else {
667        Err(EngineError::InvalidWrite(format!(
668            "ProvenanceMode::Require: missing source_ref on: {}",
669            missing.join(", ")
670        )))
671    }
672}
673
674fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
675    if request.nodes.len() > MAX_NODES {
676        return Err(EngineError::InvalidWrite(format!(
677            "too many nodes: {} exceeds limit of {MAX_NODES}",
678            request.nodes.len()
679        )));
680    }
681    if request.edges.len() > MAX_EDGES {
682        return Err(EngineError::InvalidWrite(format!(
683            "too many edges: {} exceeds limit of {MAX_EDGES}",
684            request.edges.len()
685        )));
686    }
687    if request.chunks.len() > MAX_CHUNKS {
688        return Err(EngineError::InvalidWrite(format!(
689            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
690            request.chunks.len()
691        )));
692    }
693    let retires = request.node_retires.len() + request.edge_retires.len();
694    if retires > MAX_RETIRES {
695        return Err(EngineError::InvalidWrite(format!(
696            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
697        )));
698    }
699    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
700    if runtime_items > MAX_RUNTIME_ITEMS {
701        return Err(EngineError::InvalidWrite(format!(
702            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
703        )));
704    }
705    if request.operational_writes.len() > MAX_OPERATIONAL {
706        return Err(EngineError::InvalidWrite(format!(
707            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
708            request.operational_writes.len()
709        )));
710    }
711    let total = request.nodes.len()
712        + request.node_retires.len()
713        + request.edges.len()
714        + request.edge_retires.len()
715        + request.chunks.len()
716        + request.runs.len()
717        + request.steps.len()
718        + request.actions.len()
719        + request.vec_inserts.len()
720        + request.operational_writes.len();
721    if total > MAX_TOTAL_ITEMS {
722        return Err(EngineError::InvalidWrite(format!(
723            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
724        )));
725    }
726    Ok(())
727}
728
729#[allow(clippy::too_many_lines)]
730fn prepare_write(
731    request: WriteRequest,
732    mode: ProvenanceMode,
733) -> Result<PreparedWrite, EngineError> {
734    validate_request_size(&request)?;
735
736    // --- ID validation: reject empty IDs ---
737    for node in &request.nodes {
738        if node.row_id.is_empty() {
739            return Err(EngineError::InvalidWrite(
740                "NodeInsert has empty row_id".to_owned(),
741            ));
742        }
743        if node.logical_id.is_empty() {
744            return Err(EngineError::InvalidWrite(
745                "NodeInsert has empty logical_id".to_owned(),
746            ));
747        }
748    }
749    for edge in &request.edges {
750        if edge.row_id.is_empty() {
751            return Err(EngineError::InvalidWrite(
752                "EdgeInsert has empty row_id".to_owned(),
753            ));
754        }
755        if edge.logical_id.is_empty() {
756            return Err(EngineError::InvalidWrite(
757                "EdgeInsert has empty logical_id".to_owned(),
758            ));
759        }
760    }
761    for chunk in &request.chunks {
762        if chunk.id.is_empty() {
763            return Err(EngineError::InvalidWrite(
764                "ChunkInsert has empty id".to_owned(),
765            ));
766        }
767        if chunk.text_content.is_empty() {
768            return Err(EngineError::InvalidWrite(format!(
769                "chunk '{}' has empty text_content; empty chunks are not allowed",
770                chunk.id
771            )));
772        }
773    }
774    for run in &request.runs {
775        if run.id.is_empty() {
776            return Err(EngineError::InvalidWrite(
777                "RunInsert has empty id".to_owned(),
778            ));
779        }
780    }
781    for step in &request.steps {
782        if step.id.is_empty() {
783            return Err(EngineError::InvalidWrite(
784                "StepInsert has empty id".to_owned(),
785            ));
786        }
787    }
788    for action in &request.actions {
789        if action.id.is_empty() {
790            return Err(EngineError::InvalidWrite(
791                "ActionInsert has empty id".to_owned(),
792            ));
793        }
794    }
795    for vi in &request.vec_inserts {
796        if vi.chunk_id.is_empty() {
797            return Err(EngineError::InvalidWrite(
798                "VecInsert has empty chunk_id".to_owned(),
799            ));
800        }
801        if vi.embedding.is_empty() {
802            return Err(EngineError::InvalidWrite(format!(
803                "VecInsert for chunk '{}' has empty embedding",
804                vi.chunk_id
805            )));
806        }
807    }
808    for operational in &request.operational_writes {
809        if operational_write_collection(operational).is_empty() {
810            return Err(EngineError::InvalidWrite(
811                "OperationalWrite has empty collection".to_owned(),
812            ));
813        }
814        if operational_write_record_key(operational).is_empty() {
815            return Err(EngineError::InvalidWrite(format!(
816                "OperationalWrite for collection '{}' has empty record_key",
817                operational_write_collection(operational)
818            )));
819        }
820        match operational {
821            OperationalWrite::Append { payload_json, .. }
822            | OperationalWrite::Put { payload_json, .. } => {
823                if payload_json.is_empty() {
824                    return Err(EngineError::InvalidWrite(format!(
825                        "OperationalWrite {} '{}:{}' has empty payload_json",
826                        operational_write_kind(operational),
827                        operational_write_collection(operational),
828                        operational_write_record_key(operational)
829                    )));
830                }
831            }
832            OperationalWrite::Delete { .. } => {}
833        }
834    }
835
836    // --- ID validation: reject duplicate row_ids within the request ---
837    {
838        let mut seen = std::collections::HashSet::new();
839        for node in &request.nodes {
840            if !seen.insert(node.row_id.as_str()) {
841                return Err(EngineError::InvalidWrite(format!(
842                    "duplicate row_id '{}' within the same WriteRequest",
843                    node.row_id
844                )));
845            }
846        }
847        for edge in &request.edges {
848            if !seen.insert(edge.row_id.as_str()) {
849                return Err(EngineError::InvalidWrite(format!(
850                    "duplicate row_id '{}' within the same WriteRequest",
851                    edge.row_id
852                )));
853            }
854        }
855    }
856
857    // --- ProvenanceMode::Require enforcement ---
858    if mode == ProvenanceMode::Require {
859        check_require_provenance(&request)?;
860    }
861
862    // --- Runtime table upsert validation ---
863    for run in &request.runs {
864        if run.upsert && run.supersedes_id.is_none() {
865            return Err(EngineError::InvalidWrite(format!(
866                "run '{}': upsert=true requires supersedes_id to be set",
867                run.id
868            )));
869        }
870    }
871    for step in &request.steps {
872        if step.upsert && step.supersedes_id.is_none() {
873            return Err(EngineError::InvalidWrite(format!(
874                "step '{}': upsert=true requires supersedes_id to be set",
875                step.id
876            )));
877        }
878    }
879    for action in &request.actions {
880        if action.upsert && action.supersedes_id.is_none() {
881            return Err(EngineError::InvalidWrite(format!(
882                "action '{}': upsert=true requires supersedes_id to be set",
883                action.id
884            )));
885        }
886    }
887
888    let node_kinds = request
889        .nodes
890        .iter()
891        .map(|node| (node.logical_id.clone(), node.kind.clone()))
892        .collect::<HashMap<_, _>>();
893
894    Ok(PreparedWrite {
895        label: request.label,
896        nodes: request.nodes,
897        node_retires: request.node_retires,
898        edges: request.edges,
899        edge_retires: request.edge_retires,
900        chunks: request.chunks,
901        runs: request.runs,
902        steps: request.steps,
903        actions: request.actions,
904        vec_inserts: request.vec_inserts,
905        operational_writes: request.operational_writes,
906        operational_collection_kinds: HashMap::new(),
907        operational_collection_filter_fields: HashMap::new(),
908        operational_validation_warnings: Vec::new(),
909        node_kinds,
910        required_fts_rows: Vec::new(),
911        required_property_fts_rows: Vec::new(),
912        optional_backfills: request.optional_backfills,
913    })
914}
915
916fn writer_loop(
917    database_path: &Path,
918    schema_manager: &Arc<SchemaManager>,
919    receiver: mpsc::Receiver<WriteMessage>,
920    telemetry: &TelemetryCounters,
921) {
922    trace_info!("writer thread started");
923
924    let mut conn = match sqlite::open_connection(database_path) {
925        Ok(conn) => conn,
926        Err(error) => {
927            trace_error!(error = %error, "writer thread: database connection failed");
928            reject_all(receiver, &error.to_string());
929            return;
930        }
931    };
932
933    if let Err(error) = schema_manager.bootstrap(&conn) {
934        trace_error!(error = %error, "writer thread: schema bootstrap failed");
935        reject_all(receiver, &error.to_string());
936        return;
937    }
938
939    for message in receiver {
940        match message {
941            WriteMessage::Submit {
942                mut prepared,
943                reply,
944            } => {
945                #[cfg(feature = "tracing")]
946                let start = std::time::Instant::now();
947                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
948                    resolve_and_apply(&mut conn, &mut prepared)
949                }));
950                if let Ok(inner) = result {
951                    #[allow(unused_variables)]
952                    if let Err(error) = &inner {
953                        trace_error!(
954                            label = %prepared.label,
955                            error = %error,
956                            "write failed"
957                        );
958                        telemetry.increment_errors();
959                    } else {
960                        let row_count = (prepared.nodes.len()
961                            + prepared.edges.len()
962                            + prepared.chunks.len()) as u64;
963                        telemetry.increment_writes(row_count);
964                        trace_info!(
965                            label = %prepared.label,
966                            nodes = prepared.nodes.len(),
967                            edges = prepared.edges.len(),
968                            chunks = prepared.chunks.len(),
969                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
970                            "write committed"
971                        );
972                    }
973                    let _ = reply.send(inner);
974                } else {
975                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
976                    telemetry.increment_errors();
977                    // Attempt to clean up any open transaction after a panic.
978                    let _ = conn.execute_batch("ROLLBACK");
979                    let _ = reply.send(Err(EngineError::WriterRejected(
980                        "writer thread panic during resolve_and_apply".to_owned(),
981                    )));
982                }
983            }
984            WriteMessage::TouchLastAccessed { request, reply } => {
985                let result = apply_touch_last_accessed(&mut conn, &request);
986                if result.is_ok() {
987                    telemetry.increment_writes(0);
988                } else {
989                    telemetry.increment_errors();
990                }
991                let _ = reply.send(result);
992            }
993        }
994    }
995
996    trace_info!("writer thread shutting down");
997}
998
999fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1000    for message in receiver {
1001        match message {
1002            WriteMessage::Submit { reply, .. } => {
1003                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1004            }
1005            WriteMessage::TouchLastAccessed { reply, .. } => {
1006                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1007            }
1008        }
1009    }
1010}
1011
1012/// Resolve FTS projection rows before the write transaction begins.
1013///
1014/// For each chunk in the prepared write, determines the node `kind` needed
1015/// to populate the FTS index. If the node was co-submitted in the same
1016/// request its kind is taken from `prepared.node_kinds` without a DB query.
1017/// If the node is pre-existing it is looked up in the database. If the node
1018/// cannot be found in either place, an `InvalidWrite` error is returned.
1019fn resolve_fts_rows(
1020    conn: &rusqlite::Connection,
1021    prepared: &mut PreparedWrite,
1022) -> Result<(), EngineError> {
1023    let retiring_ids: std::collections::HashSet<&str> = prepared
1024        .node_retires
1025        .iter()
1026        .map(|r| r.logical_id.as_str())
1027        .collect();
1028    for chunk in &prepared.chunks {
1029        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1030            return Err(EngineError::InvalidWrite(format!(
1031                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1032                 WriteRequest; retire and chunk insertion for the same node must not be combined",
1033                chunk.id, chunk.node_logical_id
1034            )));
1035        }
1036    }
1037    for chunk in &prepared.chunks {
1038        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1039            k.clone()
1040        } else {
1041            match conn.query_row(
1042                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1043                params![chunk.node_logical_id],
1044                |row| row.get::<_, String>(0),
1045            ) {
1046                Ok(kind) => kind,
1047                Err(rusqlite::Error::QueryReturnedNoRows) => {
1048                    return Err(EngineError::InvalidWrite(format!(
1049                        "chunk '{}' references node_logical_id '{}' that is not present in this \
1050                         write request or the database \
1051                         (v1 limitation: chunks and their nodes must be submitted together or the \
1052                         node must already exist)",
1053                        chunk.id, chunk.node_logical_id
1054                    )));
1055                }
1056                Err(e) => return Err(EngineError::Sqlite(e)),
1057            }
1058        };
1059        prepared.required_fts_rows.push(FtsProjectionRow {
1060            chunk_id: chunk.id.clone(),
1061            node_logical_id: chunk.node_logical_id.clone(),
1062            kind,
1063            text_content: chunk.text_content.clone(),
1064        });
1065    }
1066    trace_debug!(
1067        fts_rows = prepared.required_fts_rows.len(),
1068        chunks_processed = prepared.chunks.len(),
1069        "fts row resolution completed"
1070    );
1071    Ok(())
1072}
1073
1074/// Load FTS property schemas from the database and extract property values from
1075/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
1076fn resolve_property_fts_rows(
1077    conn: &rusqlite::Connection,
1078    prepared: &mut PreparedWrite,
1079) -> Result<(), EngineError> {
1080    if prepared.nodes.is_empty() {
1081        return Ok(());
1082    }
1083
1084    let schemas: HashMap<String, PropertyFtsSchema> =
1085        load_fts_property_schemas(conn)?.into_iter().collect();
1086
1087    if schemas.is_empty() {
1088        return Ok(());
1089    }
1090
1091    let mut combined_stats = ExtractStats::default();
1092    for node in &prepared.nodes {
1093        let Some(schema) = schemas.get(&node.kind) else {
1094            continue;
1095        };
1096        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1097        let (text_content, positions, stats) = extract_property_fts(&props, schema);
1098        combined_stats.merge(stats);
1099        if let Some(text_content) = text_content {
1100            prepared
1101                .required_property_fts_rows
1102                .push(FtsPropertyProjectionRow {
1103                    node_logical_id: node.logical_id.clone(),
1104                    kind: node.kind.clone(),
1105                    text_content,
1106                    positions,
1107                });
1108        }
1109    }
1110    if combined_stats != ExtractStats::default() {
1111        trace_debug!(
1112            depth_cap_hit = combined_stats.depth_cap_hit,
1113            byte_cap_reached = combined_stats.byte_cap_reached,
1114            excluded_subtree = combined_stats.excluded_subtree,
1115            "property fts recursive extraction guardrails engaged"
1116        );
1117    }
1118    trace_debug!(
1119        property_fts_rows = prepared.required_property_fts_rows.len(),
1120        nodes_processed = prepared.nodes.len(),
1121        "property fts row resolution completed"
1122    );
1123    Ok(())
1124}
1125
1126/// Extract scalar values from a JSON object at a `$.field` path.
1127/// Supports simple dot-notation paths like `$.name`, `$.address.city`.
1128/// Returns individual scalars so callers can join them with the configured separator.
1129pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1130    let Some(path) = path.strip_prefix("$.") else {
1131        return Vec::new();
1132    };
1133    let mut current = value;
1134    for segment in path.split('.') {
1135        match current.get(segment) {
1136            Some(v) => current = v,
1137            None => return Vec::new(),
1138        }
1139    }
1140    match current {
1141        serde_json::Value::String(s) => vec![s.clone()],
1142        serde_json::Value::Number(n) => vec![n.to_string()],
1143        serde_json::Value::Bool(b) => vec![b.to_string()],
1144        serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1145        serde_json::Value::Array(arr) => arr
1146            .iter()
1147            .filter_map(|v| match v {
1148                serde_json::Value::String(s) => Some(s.clone()),
1149                serde_json::Value::Number(n) => Some(n.to_string()),
1150                serde_json::Value::Bool(b) => Some(b.to_string()),
1151                _ => None,
1152            })
1153            .collect(),
1154    }
1155}
1156
1157/// Core property-FTS extraction. Produces the concatenated blob, the
1158/// position map identifying which byte range in the blob came from which
1159/// JSON leaf path, and the guardrail stats for the walk.
1160///
1161/// Emission rules:
1162/// - Path entries are processed in their registered order.
1163/// - Scalar-mode entries append the scalar value(s) directly (arrays of
1164///   scalars flatten, matching pre-Phase-4 behavior). Scalar emissions do
1165///   not produce position-map entries — the position map is only populated
1166///   when a recursive path contributes a leaf.
1167/// - Recursive-mode entries walk every descendant scalar leaf in stable
1168///   lexicographic key order. Each leaf emits a position-map entry whose
1169///   `[start, end)` spans the leaf value's bytes in the blob (NOT the
1170///   trailing separator).
1171/// - Between any two emitted values (whether from the same or different
1172///   path entries) the walk inserts [`LEAF_SEPARATOR`], a hard phrase
1173///   break under FTS5's `porter unicode61` tokenizer. Scalar-mode emissions
1174///   between each other use the schema's configured `separator` for
1175///   backwards compatibility with Phase 0.
1176pub(crate) fn extract_property_fts(
1177    props: &serde_json::Value,
1178    schema: &PropertyFtsSchema,
1179) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1180    let mut walker = RecursiveWalker {
1181        blob: String::new(),
1182        positions: Vec::new(),
1183        stats: ExtractStats::default(),
1184        exclude_paths: schema.exclude_paths.clone(),
1185        stopped: false,
1186    };
1187
1188    let mut scalar_parts: Vec<String> = Vec::new();
1189
1190    for entry in &schema.paths {
1191        match entry.mode {
1192            PropertyPathMode::Scalar => {
1193                scalar_parts.extend(extract_json_path(props, &entry.path));
1194            }
1195            PropertyPathMode::Recursive => {
1196                let root = resolve_path_root(props, &entry.path);
1197                if let Some(root) = root {
1198                    walker.walk(&entry.path, root, 0);
1199                }
1200            }
1201        }
1202    }
1203
1204    // Flush scalar parts into the blob. Scalar emissions go first so that
1205    // their byte offsets are predictable relative to any recursive leaves
1206    // that follow. Scalars use the configured `separator`; the boundary
1207    // between scalars and recursive leaves uses `LEAF_SEPARATOR` (so phrase
1208    // queries cannot cross it).
1209    let scalar_text = if scalar_parts.is_empty() {
1210        None
1211    } else {
1212        Some(scalar_parts.join(&schema.separator))
1213    };
1214
1215    let combined = match (scalar_text, walker.blob.is_empty()) {
1216        (None, true) => None,
1217        (None, false) => Some(walker.blob.clone()),
1218        (Some(s), true) => Some(s),
1219        (Some(mut s), false) => {
1220            // Shift recursive positions by the prefix length so they stay
1221            // correct relative to the combined blob.
1222            let offset = s.len() + LEAF_SEPARATOR.len();
1223            for pos in &mut walker.positions {
1224                pos.start_offset += offset;
1225                pos.end_offset += offset;
1226            }
1227            s.push_str(LEAF_SEPARATOR);
1228            s.push_str(&walker.blob);
1229            Some(s)
1230        }
1231    };
1232
1233    (combined, walker.positions, walker.stats)
1234}
1235
1236fn resolve_path_root<'a>(
1237    value: &'a serde_json::Value,
1238    path: &str,
1239) -> Option<&'a serde_json::Value> {
1240    let stripped = path.strip_prefix("$.")?;
1241    let mut current = value;
1242    for segment in stripped.split('.') {
1243        current = current.get(segment)?;
1244    }
1245    Some(current)
1246}
1247
1248struct RecursiveWalker {
1249    blob: String,
1250    positions: Vec<PositionEntry>,
1251    stats: ExtractStats,
1252    exclude_paths: Vec<String>,
1253    /// Once the byte cap is hit, the walker refuses to emit any further
1254    /// leaves. Subsequent walks still account for depth/exclude stats but
1255    /// do not append.
1256    stopped: bool,
1257}
1258
1259impl RecursiveWalker {
1260    fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1261        if self.stopped {
1262            return;
1263        }
1264        if self.exclude_paths.iter().any(|p| p == current_path) {
1265            self.stats.excluded_subtree += 1;
1266            return;
1267        }
1268        match value {
1269            serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1270            serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1271            serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1272            serde_json::Value::Null => {}
1273            serde_json::Value::Object(map) => {
1274                if depth >= MAX_RECURSIVE_DEPTH {
1275                    self.stats.depth_cap_hit += 1;
1276                    return;
1277                }
1278                let mut keys: Vec<&String> = map.keys().collect();
1279                keys.sort();
1280                for key in keys {
1281                    if self.stopped {
1282                        return;
1283                    }
1284                    let child_path = format!("{current_path}.{key}");
1285                    if let Some(child) = map.get(key) {
1286                        self.walk(&child_path, child, depth + 1);
1287                    }
1288                }
1289            }
1290            serde_json::Value::Array(items) => {
1291                if depth >= MAX_RECURSIVE_DEPTH {
1292                    self.stats.depth_cap_hit += 1;
1293                    return;
1294                }
1295                for (idx, item) in items.iter().enumerate() {
1296                    if self.stopped {
1297                        return;
1298                    }
1299                    let child_path = format!("{current_path}[{idx}]");
1300                    self.walk(&child_path, item, depth + 1);
1301                }
1302            }
1303        }
1304    }
1305
1306    fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1307        if self.stopped {
1308            return;
1309        }
1310        if value.is_empty() {
1311            return;
1312        }
1313        // Compute the projected blob size if we accept this leaf.
1314        // If we already emitted at least one leaf, we must account for
1315        // the separator that precedes this one.
1316        let sep_len = if self.blob.is_empty() {
1317            0
1318        } else {
1319            LEAF_SEPARATOR.len()
1320        };
1321        let projected_len = self.blob.len() + sep_len + value.len();
1322        if projected_len > MAX_EXTRACTED_BYTES {
1323            self.stats.byte_cap_reached = true;
1324            self.stopped = true;
1325            return;
1326        }
1327        if !self.blob.is_empty() {
1328            self.blob.push_str(LEAF_SEPARATOR);
1329        }
1330        let start_offset = self.blob.len();
1331        self.blob.push_str(value);
1332        let end_offset = self.blob.len();
1333        self.positions.push(PositionEntry {
1334            start_offset,
1335            end_offset,
1336            leaf_path: leaf_path.to_owned(),
1337        });
1338    }
1339}
1340
1341/// Load all registered FTS property schemas from the database, tolerating
1342/// both the legacy JSON shape (array of bare path strings = scalar mode)
1343/// and the Phase 4 shape (objects carrying `path`, `mode`, optional
1344/// `exclude_paths`, or a top-level object carrying `paths` + global
1345/// `exclude_paths`).
1346pub(crate) fn load_fts_property_schemas(
1347    conn: &rusqlite::Connection,
1348) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1349    let mut stmt =
1350        conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1351    stmt.query_map([], |row| {
1352        let kind: String = row.get(0)?;
1353        let paths_json: String = row.get(1)?;
1354        let separator: String = row.get(2)?;
1355        let schema = parse_property_schema_json(&paths_json, &separator);
1356        Ok((kind, schema))
1357    })?
1358    .collect::<Result<Vec<_>, _>>()
1359}
1360
1361pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1362    let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1363    let mut paths = Vec::new();
1364    let mut exclude_paths: Vec<String> = Vec::new();
1365
1366    let path_values: Vec<serde_json::Value> = match value {
1367        serde_json::Value::Array(arr) => arr,
1368        serde_json::Value::Object(map) => {
1369            if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1370                exclude_paths = excl
1371                    .iter()
1372                    .filter_map(|v| v.as_str().map(str::to_owned))
1373                    .collect();
1374            }
1375            match map.get("paths") {
1376                Some(serde_json::Value::Array(arr)) => arr.clone(),
1377                _ => Vec::new(),
1378            }
1379        }
1380        _ => Vec::new(),
1381    };
1382
1383    for entry in path_values {
1384        match entry {
1385            serde_json::Value::String(path) => {
1386                paths.push(PropertyPathEntry::scalar(path));
1387            }
1388            serde_json::Value::Object(map) => {
1389                let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1390                    continue;
1391                };
1392                let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1393                    PropertyPathMode::Scalar,
1394                    |m| match m {
1395                        "recursive" => PropertyPathMode::Recursive,
1396                        _ => PropertyPathMode::Scalar,
1397                    },
1398                );
1399                paths.push(PropertyPathEntry {
1400                    path: path.to_owned(),
1401                    mode,
1402                });
1403                if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1404                    for p in excl {
1405                        if let Some(s) = p.as_str() {
1406                            exclude_paths.push(s.to_owned());
1407                        }
1408                    }
1409                }
1410            }
1411            _ => {}
1412        }
1413    }
1414
1415    PropertyFtsSchema {
1416        paths,
1417        separator: separator.to_owned(),
1418        exclude_paths,
1419    }
1420}
1421
1422fn resolve_operational_writes(
1423    conn: &rusqlite::Connection,
1424    prepared: &mut PreparedWrite,
1425) -> Result<(), EngineError> {
1426    let mut collection_kinds = HashMap::new();
1427    let mut collection_filter_fields = HashMap::new();
1428    let mut collection_validation_contracts = HashMap::new();
1429    for write in &prepared.operational_writes {
1430        let collection = operational_write_collection(write);
1431        if !collection_kinds.contains_key(collection) {
1432            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1433                .query_row(
1434                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1435                    params![collection],
1436                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1437                )
1438                .optional()
1439                .map_err(EngineError::Sqlite)?;
1440            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1441                .ok_or_else(|| {
1442                    EngineError::InvalidWrite(format!(
1443                        "operational collection '{collection}' is not registered"
1444                    ))
1445                })?;
1446            if disabled_at.is_some() {
1447                return Err(EngineError::InvalidWrite(format!(
1448                    "operational collection '{collection}' is disabled"
1449                )));
1450            }
1451            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1452                .map_err(EngineError::InvalidWrite)?;
1453            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1454            let validation_contract = parse_operational_validation_contract(&validation_json)
1455                .map_err(EngineError::InvalidWrite)?;
1456            collection_kinds.insert(collection.to_owned(), kind);
1457            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1458            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1459        }
1460
1461        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1462            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1463        })?;
1464        match (kind, write) {
1465            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1466            | (
1467                OperationalCollectionKind::LatestState,
1468                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1469            ) => {}
1470            (OperationalCollectionKind::AppendOnlyLog, _) => {
1471                return Err(EngineError::InvalidWrite(format!(
1472                    "operational collection '{collection}' is append_only_log and only accepts Append"
1473                )));
1474            }
1475            (OperationalCollectionKind::LatestState, _) => {
1476                return Err(EngineError::InvalidWrite(format!(
1477                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1478                )));
1479            }
1480        }
1481        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1482            let _ = check_operational_write_against_contract(write, contract)?;
1483        }
1484    }
1485    prepared.operational_collection_kinds = collection_kinds;
1486    prepared.operational_collection_filter_fields = collection_filter_fields;
1487    Ok(())
1488}
1489
1490fn parse_operational_filter_fields(
1491    filter_fields_json: &str,
1492) -> Result<Vec<OperationalFilterField>, EngineError> {
1493    let fields: Vec<OperationalFilterField> =
1494        serde_json::from_str(filter_fields_json).map_err(|error| {
1495            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1496        })?;
1497    let mut seen = std::collections::HashSet::new();
1498    for field in &fields {
1499        if field.name.trim().is_empty() {
1500            return Err(EngineError::InvalidWrite(
1501                "filter_fields_json field names must not be empty".to_owned(),
1502            ));
1503        }
1504        if !seen.insert(field.name.as_str()) {
1505            return Err(EngineError::InvalidWrite(format!(
1506                "filter_fields_json contains duplicate field '{}'",
1507                field.name
1508            )));
1509        }
1510        if field.modes.is_empty() {
1511            return Err(EngineError::InvalidWrite(format!(
1512                "filter_fields_json field '{}' must declare at least one mode",
1513                field.name
1514            )));
1515        }
1516        if field.modes.contains(&OperationalFilterMode::Prefix)
1517            && field.field_type != OperationalFilterFieldType::String
1518        {
1519            return Err(EngineError::InvalidWrite(format!(
1520                "filter field '{}' only supports prefix for string types",
1521                field.name
1522            )));
1523        }
1524    }
1525    Ok(fields)
1526}
1527
1528#[derive(Clone, Debug, PartialEq, Eq)]
1529struct OperationalFilterValueRow {
1530    field_name: String,
1531    string_value: Option<String>,
1532    integer_value: Option<i64>,
1533}
1534
1535fn extract_operational_filter_values(
1536    filter_fields: &[OperationalFilterField],
1537    payload_json: &str,
1538) -> Vec<OperationalFilterValueRow> {
1539    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1540        return Vec::new();
1541    };
1542    let Some(object) = parsed.as_object() else {
1543        return Vec::new();
1544    };
1545
1546    filter_fields
1547        .iter()
1548        .filter_map(|field| {
1549            let value = object.get(&field.name)?;
1550            match field.field_type {
1551                OperationalFilterFieldType::String => {
1552                    value
1553                        .as_str()
1554                        .map(|string_value| OperationalFilterValueRow {
1555                            field_name: field.name.clone(),
1556                            string_value: Some(string_value.to_owned()),
1557                            integer_value: None,
1558                        })
1559                }
1560                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1561                    value
1562                        .as_i64()
1563                        .map(|integer_value| OperationalFilterValueRow {
1564                            field_name: field.name.clone(),
1565                            string_value: None,
1566                            integer_value: Some(integer_value),
1567                        })
1568                }
1569            }
1570        })
1571        .collect()
1572}
1573
1574fn resolve_and_apply(
1575    conn: &mut rusqlite::Connection,
1576    prepared: &mut PreparedWrite,
1577) -> Result<WriteReceipt, EngineError> {
1578    resolve_fts_rows(conn, prepared)?;
1579    resolve_property_fts_rows(conn, prepared)?;
1580    resolve_operational_writes(conn, prepared)?;
1581    apply_write(conn, prepared)
1582}
1583
1584fn apply_touch_last_accessed(
1585    conn: &mut rusqlite::Connection,
1586    request: &LastAccessTouchRequest,
1587) -> Result<LastAccessTouchReport, EngineError> {
1588    let mut seen = std::collections::HashSet::new();
1589    let logical_ids = request
1590        .logical_ids
1591        .iter()
1592        .filter(|logical_id| seen.insert(logical_id.as_str()))
1593        .cloned()
1594        .collect::<Vec<_>>();
1595    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1596
1597    for logical_id in &logical_ids {
1598        let exists = tx
1599            .query_row(
1600                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1601                params![logical_id],
1602                |row| row.get::<_, i64>(0),
1603            )
1604            .optional()?
1605            .is_some();
1606        if !exists {
1607            return Err(EngineError::InvalidWrite(format!(
1608                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1609            )));
1610        }
1611    }
1612
1613    {
1614        let mut upsert_metadata = tx.prepare_cached(
1615            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1616             VALUES (?1, ?2, ?2) \
1617             ON CONFLICT(logical_id) DO UPDATE SET \
1618                 last_accessed_at = excluded.last_accessed_at, \
1619                 updated_at = excluded.updated_at",
1620        )?;
1621        let mut insert_provenance = tx.prepare_cached(
1622            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1623             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1624        )?;
1625        for logical_id in &logical_ids {
1626            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1627            insert_provenance.execute(params![
1628                new_id(),
1629                logical_id,
1630                request.source_ref.as_deref(),
1631                format!("{{\"touched_at\":{}}}", request.touched_at),
1632            ])?;
1633        }
1634    }
1635
1636    tx.commit()?;
1637    Ok(LastAccessTouchReport {
1638        touched_logical_ids: logical_ids.len(),
1639        touched_at: request.touched_at,
1640    })
1641}
1642
1643fn ensure_operational_collections_writable(
1644    tx: &rusqlite::Transaction<'_>,
1645    prepared: &PreparedWrite,
1646) -> Result<(), EngineError> {
1647    for collection in prepared.operational_collection_kinds.keys() {
1648        let disabled_at: Option<Option<i64>> = tx
1649            .query_row(
1650                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1651                params![collection],
1652                |row| row.get::<_, Option<i64>>(0),
1653            )
1654            .optional()?;
1655        match disabled_at {
1656            Some(Some(_)) => {
1657                return Err(EngineError::InvalidWrite(format!(
1658                    "operational collection '{collection}' is disabled"
1659                )));
1660            }
1661            Some(None) => {}
1662            None => {
1663                return Err(EngineError::InvalidWrite(format!(
1664                    "operational collection '{collection}' is not registered"
1665                )));
1666            }
1667        }
1668    }
1669    Ok(())
1670}
1671
1672fn validate_operational_writes_against_live_contracts(
1673    tx: &rusqlite::Transaction<'_>,
1674    prepared: &PreparedWrite,
1675) -> Result<Vec<String>, EngineError> {
1676    let mut collection_validation_contracts =
1677        HashMap::<String, Option<OperationalValidationContract>>::new();
1678    for collection in prepared.operational_collection_kinds.keys() {
1679        let validation_json: String = tx
1680            .query_row(
1681                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1682                params![collection],
1683                |row| row.get(0),
1684            )
1685            .map_err(EngineError::Sqlite)?;
1686        let validation_contract = parse_operational_validation_contract(&validation_json)
1687            .map_err(EngineError::InvalidWrite)?;
1688        collection_validation_contracts.insert(collection.clone(), validation_contract);
1689    }
1690
1691    let mut warnings = Vec::new();
1692    for write in &prepared.operational_writes {
1693        if let Some(Some(contract)) =
1694            collection_validation_contracts.get(operational_write_collection(write))
1695            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1696        {
1697            warnings.push(warning);
1698        }
1699    }
1700
1701    Ok(warnings)
1702}
1703
1704fn load_live_operational_secondary_indexes(
1705    tx: &rusqlite::Transaction<'_>,
1706    prepared: &PreparedWrite,
1707) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1708    let mut collection_indexes = HashMap::new();
1709    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1710        let secondary_indexes_json: String = tx
1711            .query_row(
1712                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1713                params![collection],
1714                |row| row.get(0),
1715            )
1716            .map_err(EngineError::Sqlite)?;
1717        let indexes =
1718            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1719                .map_err(EngineError::InvalidWrite)?;
1720        collection_indexes.insert(collection.clone(), indexes);
1721    }
1722    Ok(collection_indexes)
1723}
1724
1725fn check_operational_write_against_contract(
1726    write: &OperationalWrite,
1727    contract: &OperationalValidationContract,
1728) -> Result<Option<String>, EngineError> {
1729    if contract.mode == OperationalValidationMode::Disabled {
1730        return Ok(None);
1731    }
1732
1733    let (payload_json, collection, record_key) = match write {
1734        OperationalWrite::Append {
1735            collection,
1736            record_key,
1737            payload_json,
1738            ..
1739        }
1740        | OperationalWrite::Put {
1741            collection,
1742            record_key,
1743            payload_json,
1744            ..
1745        } => (
1746            payload_json.as_str(),
1747            collection.as_str(),
1748            record_key.as_str(),
1749        ),
1750        OperationalWrite::Delete { .. } => return Ok(None),
1751    };
1752
1753    match validate_operational_payload_against_contract(contract, payload_json) {
1754        Ok(()) => Ok(None),
1755        Err(message) => match contract.mode {
1756            OperationalValidationMode::Disabled => Ok(None),
1757            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1758                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1759                kind = operational_write_kind(write)
1760            ))),
1761            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1762                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1763                kind = operational_write_kind(write)
1764            ))),
1765        },
1766    }
1767}
1768
1769#[allow(clippy::too_many_lines)]
1770fn apply_write(
1771    conn: &mut rusqlite::Connection,
1772    prepared: &mut PreparedWrite,
1773) -> Result<WriteReceipt, EngineError> {
1774    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1775
1776    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1777    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1778    {
1779        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1780        let mut del_prop_fts =
1781            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1782        let mut del_prop_positions = tx
1783            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1784        // Double-write cleanup: also remove staging rows for this node so that a
1785        // rebuild-in-progress does not re-insert a retired node at swap time.
1786        let mut del_staging = tx.prepare_cached(
1787            "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1788        )?;
1789        let mut sup_node = tx.prepare_cached(
1790            "UPDATE nodes SET superseded_at = unixepoch() \
1791             WHERE logical_id = ?1 AND superseded_at IS NULL",
1792        )?;
1793        let mut ins_event = tx.prepare_cached(
1794            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1795             VALUES (?1, 'node_retire', ?2, ?3)",
1796        )?;
1797        for retire in &prepared.node_retires {
1798            del_fts.execute(params![retire.logical_id])?;
1799            del_prop_fts.execute(params![retire.logical_id])?;
1800            del_prop_positions.execute(params![retire.logical_id])?;
1801            del_staging.execute(params![retire.logical_id])?;
1802            sup_node.execute(params![retire.logical_id])?;
1803            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1804        }
1805    }
1806
1807    // Edge retires: mark superseded, record audit event.
1808    {
1809        let mut sup_edge = tx.prepare_cached(
1810            "UPDATE edges SET superseded_at = unixepoch() \
1811             WHERE logical_id = ?1 AND superseded_at IS NULL",
1812        )?;
1813        let mut ins_event = tx.prepare_cached(
1814            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1815             VALUES (?1, 'edge_retire', ?2, ?3)",
1816        )?;
1817        for retire in &prepared.edge_retires {
1818            sup_edge.execute(params![retire.logical_id])?;
1819            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1820        }
1821    }
1822
1823    // Node inserts (with optional upsert + chunk-policy handling).
1824    {
1825        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1826        let mut del_prop_fts =
1827            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1828        let mut del_prop_positions = tx
1829            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1830        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1831        let mut sup_node = tx.prepare_cached(
1832            "UPDATE nodes SET superseded_at = unixepoch() \
1833             WHERE logical_id = ?1 AND superseded_at IS NULL",
1834        )?;
1835        let mut ins_node = tx.prepare_cached(
1836            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1837             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1838        )?;
1839        #[cfg(feature = "sqlite-vec")]
1840        let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1841                            (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1842        #[cfg(feature = "sqlite-vec")]
1843        let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1844            Ok(stmt) => Some(stmt),
1845            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1846            Err(e) => return Err(e.into()),
1847        };
1848        for node in &prepared.nodes {
1849            if node.upsert {
1850                // Property FTS rows are always replaced on upsert since properties change.
1851                del_prop_fts.execute(params![node.logical_id])?;
1852                del_prop_positions.execute(params![node.logical_id])?;
1853                if node.chunk_policy == ChunkPolicy::Replace {
1854                    #[cfg(feature = "sqlite-vec")]
1855                    if let Some(ref mut stmt) = del_vec {
1856                        stmt.execute(params![node.logical_id])?;
1857                    }
1858                    del_fts.execute(params![node.logical_id])?;
1859                    del_chunks.execute(params![node.logical_id])?;
1860                }
1861                sup_node.execute(params![node.logical_id])?;
1862            }
1863            ins_node.execute(params![
1864                node.row_id,
1865                node.logical_id,
1866                node.kind,
1867                node.properties,
1868                node.source_ref,
1869                node.content_ref,
1870            ])?;
1871        }
1872    }
1873
1874    // Edge inserts (with optional upsert).
1875    {
1876        let mut sup_edge = tx.prepare_cached(
1877            "UPDATE edges SET superseded_at = unixepoch() \
1878             WHERE logical_id = ?1 AND superseded_at IS NULL",
1879        )?;
1880        let mut ins_edge = tx.prepare_cached(
1881            "INSERT INTO edges \
1882             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1883             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1884        )?;
1885        for edge in &prepared.edges {
1886            if edge.upsert {
1887                sup_edge.execute(params![edge.logical_id])?;
1888            }
1889            ins_edge.execute(params![
1890                edge.row_id,
1891                edge.logical_id,
1892                edge.source_logical_id,
1893                edge.target_logical_id,
1894                edge.kind,
1895                edge.properties,
1896                edge.source_ref,
1897            ])?;
1898        }
1899    }
1900
1901    // Chunk inserts.
1902    {
1903        let mut ins_chunk = tx.prepare_cached(
1904            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1905             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1906        )?;
1907        for chunk in &prepared.chunks {
1908            ins_chunk.execute(params![
1909                chunk.id,
1910                chunk.node_logical_id,
1911                chunk.text_content,
1912                chunk.byte_start,
1913                chunk.byte_end,
1914                chunk.content_hash,
1915            ])?;
1916        }
1917    }
1918
1919    // Run inserts (with optional upsert).
1920    {
1921        let mut sup_run = tx.prepare_cached(
1922            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1923        )?;
1924        let mut ins_run = tx.prepare_cached(
1925            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1926             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1927        )?;
1928        for run in &prepared.runs {
1929            if run.upsert
1930                && let Some(ref prior_id) = run.supersedes_id
1931            {
1932                sup_run.execute(params![prior_id])?;
1933            }
1934            ins_run.execute(params![
1935                run.id,
1936                run.kind,
1937                run.status,
1938                run.properties,
1939                run.source_ref
1940            ])?;
1941        }
1942    }
1943
1944    // Step inserts (with optional upsert).
1945    {
1946        let mut sup_step = tx.prepare_cached(
1947            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1948        )?;
1949        let mut ins_step = tx.prepare_cached(
1950            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1951             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1952        )?;
1953        for step in &prepared.steps {
1954            if step.upsert
1955                && let Some(ref prior_id) = step.supersedes_id
1956            {
1957                sup_step.execute(params![prior_id])?;
1958            }
1959            ins_step.execute(params![
1960                step.id,
1961                step.run_id,
1962                step.kind,
1963                step.status,
1964                step.properties,
1965                step.source_ref,
1966            ])?;
1967        }
1968    }
1969
1970    // Action inserts (with optional upsert).
1971    {
1972        let mut sup_action = tx.prepare_cached(
1973            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1974        )?;
1975        let mut ins_action = tx.prepare_cached(
1976            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1977             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1978        )?;
1979        for action in &prepared.actions {
1980            if action.upsert
1981                && let Some(ref prior_id) = action.supersedes_id
1982            {
1983                sup_action.execute(params![prior_id])?;
1984            }
1985            ins_action.execute(params![
1986                action.id,
1987                action.step_id,
1988                action.kind,
1989                action.status,
1990                action.properties,
1991                action.source_ref,
1992            ])?;
1993        }
1994    }
1995
1996    // Operational mutation log writes and latest-state current rows.
1997    {
1998        ensure_operational_collections_writable(&tx, prepared)?;
1999        prepared.operational_validation_warnings =
2000            validate_operational_writes_against_live_contracts(&tx, prepared)?;
2001        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
2002
2003        let mut next_mutation_order: i64 = tx.query_row(
2004            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
2005            [],
2006            |row| row.get(0),
2007        )?;
2008        let mut ins_mutation = tx.prepare_cached(
2009            "INSERT INTO operational_mutations \
2010             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2011             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2012        )?;
2013        let mut ins_filter_value = tx.prepare_cached(
2014            "INSERT INTO operational_filter_values \
2015             (mutation_id, collection_name, field_name, string_value, integer_value) \
2016             VALUES (?1, ?2, ?3, ?4, ?5)",
2017        )?;
2018        let mut upsert_current = tx.prepare_cached(
2019            "INSERT INTO operational_current \
2020             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2021             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2022             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2023                 payload_json = excluded.payload_json, \
2024                 updated_at = excluded.updated_at, \
2025                 last_mutation_id = excluded.last_mutation_id",
2026        )?;
2027        let mut del_current = tx.prepare_cached(
2028            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2029        )?;
2030        let mut del_current_secondary_indexes = tx.prepare_cached(
2031            "DELETE FROM operational_secondary_index_entries \
2032             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2033        )?;
2034        let mut ins_secondary_index = tx.prepare_cached(
2035            "INSERT INTO operational_secondary_index_entries \
2036             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2037              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2038             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2039        )?;
2040        let mut current_row_stmt = tx.prepare_cached(
2041            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2042             WHERE collection_name = ?1 AND record_key = ?2",
2043        )?;
2044
2045        for write in &prepared.operational_writes {
2046            let collection = operational_write_collection(write);
2047            let record_key = operational_write_record_key(write);
2048            let mutation_id = new_id();
2049            next_mutation_order += 1;
2050            let payload_json = operational_write_payload(write);
2051            ins_mutation.execute(params![
2052                &mutation_id,
2053                collection,
2054                record_key,
2055                operational_write_kind(write),
2056                payload_json,
2057                operational_write_source_ref(write),
2058                next_mutation_order,
2059            ])?;
2060            if let Some(indexes) = collection_secondary_indexes.get(collection) {
2061                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2062                    ins_secondary_index.execute(params![
2063                        collection,
2064                        entry.index_name,
2065                        "mutation",
2066                        &mutation_id,
2067                        record_key,
2068                        entry.sort_timestamp,
2069                        entry.slot1_text,
2070                        entry.slot1_integer,
2071                        entry.slot2_text,
2072                        entry.slot2_integer,
2073                        entry.slot3_text,
2074                        entry.slot3_integer,
2075                    ])?;
2076                }
2077            }
2078            if let Some(filter_fields) = prepared
2079                .operational_collection_filter_fields
2080                .get(collection)
2081            {
2082                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2083                    ins_filter_value.execute(params![
2084                        &mutation_id,
2085                        collection,
2086                        filter_value.field_name,
2087                        filter_value.string_value,
2088                        filter_value.integer_value,
2089                    ])?;
2090                }
2091            }
2092
2093            if prepared.operational_collection_kinds.get(collection)
2094                == Some(&OperationalCollectionKind::LatestState)
2095            {
2096                del_current_secondary_indexes.execute(params![collection, record_key])?;
2097                match write {
2098                    OperationalWrite::Put { payload_json, .. } => {
2099                        upsert_current.execute(params![
2100                            collection,
2101                            record_key,
2102                            payload_json,
2103                            &mutation_id,
2104                        ])?;
2105                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
2106                            let (current_payload_json, updated_at, last_mutation_id): (
2107                                String,
2108                                i64,
2109                                String,
2110                            ) = current_row_stmt
2111                                .query_row(params![collection, record_key], |row| {
2112                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2113                                })?;
2114                            for entry in extract_secondary_index_entries_for_current(
2115                                indexes,
2116                                &current_payload_json,
2117                                updated_at,
2118                            ) {
2119                                ins_secondary_index.execute(params![
2120                                    collection,
2121                                    entry.index_name,
2122                                    "current",
2123                                    last_mutation_id.as_str(),
2124                                    record_key,
2125                                    entry.sort_timestamp,
2126                                    entry.slot1_text,
2127                                    entry.slot1_integer,
2128                                    entry.slot2_text,
2129                                    entry.slot2_integer,
2130                                    entry.slot3_text,
2131                                    entry.slot3_integer,
2132                                ])?;
2133                            }
2134                        }
2135                    }
2136                    OperationalWrite::Delete { .. } => {
2137                        del_current.execute(params![collection, record_key])?;
2138                    }
2139                    OperationalWrite::Append { .. } => {}
2140                }
2141            }
2142        }
2143    }
2144
2145    // FTS row inserts.
2146    {
2147        let mut ins_fts = tx.prepare_cached(
2148            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2149             VALUES (?1, ?2, ?3, ?4)",
2150        )?;
2151        for fts_row in &prepared.required_fts_rows {
2152            ins_fts.execute(params![
2153                fts_row.chunk_id,
2154                fts_row.node_logical_id,
2155                fts_row.kind,
2156                fts_row.text_content,
2157            ])?;
2158        }
2159    }
2160
2161    // Property FTS row inserts.
2162    if !prepared.required_property_fts_rows.is_empty() {
2163        let mut ins_prop_fts = tx.prepare_cached(
2164            "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
2165             VALUES (?1, ?2, ?3)",
2166        )?;
2167        let mut ins_positions = tx.prepare_cached(
2168            "INSERT INTO fts_node_property_positions \
2169             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2170             VALUES (?1, ?2, ?3, ?4, ?5)",
2171        )?;
2172        // Delete any stale position rows for these nodes; writer already
2173        // deleted the `fts_node_properties` rows above.
2174        let mut del_positions = tx
2175            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2176        for row in &prepared.required_property_fts_rows {
2177            del_positions.execute(params![row.node_logical_id])?;
2178            ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
2179            for pos in &row.positions {
2180                ins_positions.execute(params![
2181                    row.node_logical_id,
2182                    row.kind,
2183                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2184                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2185                    pos.leaf_path,
2186                ])?;
2187            }
2188        }
2189    }
2190
2191    // Double-write to staging: for nodes whose kind has an active rebuild
2192    // (state IN ('PENDING','BUILDING','SWAPPING')), upsert the precomputed
2193    // text_content into fts_property_rebuild_staging.  Both this upsert and
2194    // the fts_node_properties insert above happen inside the same transaction.
2195    if !prepared.required_property_fts_rows.is_empty() {
2196        let mut ins_staging = tx.prepare_cached(
2197            "INSERT INTO fts_property_rebuild_staging \
2198             (kind, node_logical_id, text_content) \
2199             VALUES (?1, ?2, ?3) \
2200             ON CONFLICT(kind, node_logical_id) DO UPDATE \
2201             SET text_content = excluded.text_content",
2202        )?;
2203        let mut check_rebuild = tx.prepare_cached(
2204            "SELECT 1 FROM fts_property_rebuild_state \
2205             WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2206        )?;
2207        for row in &prepared.required_property_fts_rows {
2208            let in_rebuild: bool = check_rebuild
2209                .query_row(params![row.kind], |_| Ok(true))
2210                .optional()?
2211                .unwrap_or(false);
2212            if in_rebuild {
2213                ins_staging.execute(params![row.kind, row.node_logical_id, row.text_content])?;
2214            }
2215        }
2216    }
2217
2218    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
2219    #[cfg(feature = "sqlite-vec")]
2220    {
2221        match tx
2222            .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2223        {
2224            Ok(mut ins_vec) => {
2225                for vi in &prepared.vec_inserts {
2226                    let bytes: Vec<u8> =
2227                        vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2228                    ins_vec.execute(params![vi.chunk_id, bytes])?;
2229                }
2230            }
2231            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2232                // vec profile absent: vec inserts are silently skipped.
2233            }
2234            Err(e) => return Err(e.into()),
2235        }
2236    }
2237
2238    tx.commit()?;
2239
2240    let provenance_warnings: Vec<String> = prepared
2241        .nodes
2242        .iter()
2243        .filter(|node| node.source_ref.is_none())
2244        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2245        .chain(
2246            prepared
2247                .node_retires
2248                .iter()
2249                .filter(|r| r.source_ref.is_none())
2250                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2251        )
2252        .chain(
2253            prepared
2254                .edges
2255                .iter()
2256                .filter(|e| e.source_ref.is_none())
2257                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2258        )
2259        .chain(
2260            prepared
2261                .edge_retires
2262                .iter()
2263                .filter(|r| r.source_ref.is_none())
2264                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2265        )
2266        .chain(
2267            prepared
2268                .runs
2269                .iter()
2270                .filter(|r| r.source_ref.is_none())
2271                .map(|r| format!("run '{}' has no source_ref", r.id)),
2272        )
2273        .chain(
2274            prepared
2275                .steps
2276                .iter()
2277                .filter(|s| s.source_ref.is_none())
2278                .map(|s| format!("step '{}' has no source_ref", s.id)),
2279        )
2280        .chain(
2281            prepared
2282                .actions
2283                .iter()
2284                .filter(|a| a.source_ref.is_none())
2285                .map(|a| format!("action '{}' has no source_ref", a.id)),
2286        )
2287        .chain(
2288            prepared
2289                .operational_writes
2290                .iter()
2291                .filter(|write| operational_write_source_ref(write).is_none())
2292                .map(|write| {
2293                    format!(
2294                        "operational {} '{}:{}' has no source_ref",
2295                        operational_write_kind(write),
2296                        operational_write_collection(write),
2297                        operational_write_record_key(write)
2298                    )
2299                }),
2300        )
2301        .collect();
2302
2303    let mut warnings = provenance_warnings.clone();
2304    warnings.extend(prepared.operational_validation_warnings.clone());
2305
2306    Ok(WriteReceipt {
2307        label: prepared.label.clone(),
2308        optional_backfill_count: prepared.optional_backfills.len(),
2309        warnings,
2310        provenance_warnings,
2311    })
2312}
2313
2314fn operational_write_collection(write: &OperationalWrite) -> &str {
2315    match write {
2316        OperationalWrite::Append { collection, .. }
2317        | OperationalWrite::Put { collection, .. }
2318        | OperationalWrite::Delete { collection, .. } => collection,
2319    }
2320}
2321
2322fn operational_write_record_key(write: &OperationalWrite) -> &str {
2323    match write {
2324        OperationalWrite::Append { record_key, .. }
2325        | OperationalWrite::Put { record_key, .. }
2326        | OperationalWrite::Delete { record_key, .. } => record_key,
2327    }
2328}
2329
2330fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2331    match write {
2332        OperationalWrite::Append { .. } => "append",
2333        OperationalWrite::Put { .. } => "put",
2334        OperationalWrite::Delete { .. } => "delete",
2335    }
2336}
2337
2338fn operational_write_payload(write: &OperationalWrite) -> &str {
2339    match write {
2340        OperationalWrite::Append { payload_json, .. }
2341        | OperationalWrite::Put { payload_json, .. } => payload_json,
2342        OperationalWrite::Delete { .. } => "null",
2343    }
2344}
2345
2346fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2347    match write {
2348        OperationalWrite::Append { source_ref, .. }
2349        | OperationalWrite::Put { source_ref, .. }
2350        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2351    }
2352}
2353
2354#[cfg(test)]
2355#[allow(clippy::expect_used)]
2356mod tests {
2357    use std::sync::Arc;
2358
2359    use fathomdb_schema::SchemaManager;
2360    use tempfile::NamedTempFile;
2361
2362    use super::{apply_write, prepare_write, resolve_operational_writes};
2363    use crate::{
2364        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2365        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2366        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2367        projection::ProjectionTarget,
2368    };
2369
2370    #[test]
2371    fn writer_executes_runtime_table_rows() {
2372        let db = NamedTempFile::new().expect("temporary db");
2373        let writer = WriterActor::start(
2374            db.path(),
2375            Arc::new(SchemaManager::new()),
2376            ProvenanceMode::Warn,
2377            Arc::new(TelemetryCounters::default()),
2378        )
2379        .expect("writer");
2380
2381        let receipt = writer
2382            .submit(WriteRequest {
2383                label: "runtime".to_owned(),
2384                nodes: vec![],
2385                node_retires: vec![],
2386                edges: vec![],
2387                edge_retires: vec![],
2388                chunks: vec![],
2389                runs: vec![RunInsert {
2390                    id: "run-1".to_owned(),
2391                    kind: "session".to_owned(),
2392                    status: "completed".to_owned(),
2393                    properties: "{}".to_owned(),
2394                    source_ref: Some("src-1".to_owned()),
2395                    upsert: false,
2396                    supersedes_id: None,
2397                }],
2398                steps: vec![StepInsert {
2399                    id: "step-1".to_owned(),
2400                    run_id: "run-1".to_owned(),
2401                    kind: "llm".to_owned(),
2402                    status: "completed".to_owned(),
2403                    properties: "{}".to_owned(),
2404                    source_ref: Some("src-1".to_owned()),
2405                    upsert: false,
2406                    supersedes_id: None,
2407                }],
2408                actions: vec![ActionInsert {
2409                    id: "action-1".to_owned(),
2410                    step_id: "step-1".to_owned(),
2411                    kind: "emit".to_owned(),
2412                    status: "completed".to_owned(),
2413                    properties: "{}".to_owned(),
2414                    source_ref: Some("src-1".to_owned()),
2415                    upsert: false,
2416                    supersedes_id: None,
2417                }],
2418                optional_backfills: vec![],
2419                vec_inserts: vec![],
2420                operational_writes: vec![],
2421            })
2422            .expect("write receipt");
2423
2424        assert_eq!(receipt.label, "runtime");
2425    }
2426
2427    #[test]
2428    fn writer_put_operational_write_updates_current_and_mutations() {
2429        let db = NamedTempFile::new().expect("temporary db");
2430        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2431        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2432        conn.execute(
2433            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2434             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2435            [],
2436        )
2437        .expect("seed collection");
2438        drop(conn);
2439        let writer = WriterActor::start(
2440            db.path(),
2441            Arc::new(SchemaManager::new()),
2442            ProvenanceMode::Warn,
2443            Arc::new(TelemetryCounters::default()),
2444        )
2445        .expect("writer");
2446
2447        writer
2448            .submit(WriteRequest {
2449                label: "node-and-operational".to_owned(),
2450                nodes: vec![NodeInsert {
2451                    row_id: "row-1".to_owned(),
2452                    logical_id: "lg-1".to_owned(),
2453                    kind: "Meeting".to_owned(),
2454                    properties: "{}".to_owned(),
2455                    source_ref: Some("src-1".to_owned()),
2456                    upsert: false,
2457                    chunk_policy: ChunkPolicy::Preserve,
2458                    content_ref: None,
2459                }],
2460                node_retires: vec![],
2461                edges: vec![],
2462                edge_retires: vec![],
2463                chunks: vec![],
2464                runs: vec![],
2465                steps: vec![],
2466                actions: vec![],
2467                optional_backfills: vec![],
2468                vec_inserts: vec![],
2469                operational_writes: vec![OperationalWrite::Put {
2470                    collection: "connector_health".to_owned(),
2471                    record_key: "gmail".to_owned(),
2472                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2473                    source_ref: Some("src-1".to_owned()),
2474                }],
2475            })
2476            .expect("write receipt");
2477
2478        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2479        let node_count: i64 = conn
2480            .query_row(
2481                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2482                [],
2483                |row| row.get(0),
2484            )
2485            .expect("node count");
2486        assert_eq!(node_count, 1);
2487        let mutation_count: i64 = conn
2488            .query_row(
2489                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2490                 AND record_key = 'gmail'",
2491                [],
2492                |row| row.get(0),
2493            )
2494            .expect("mutation count");
2495        assert_eq!(mutation_count, 1);
2496        let payload: String = conn
2497            .query_row(
2498                "SELECT payload_json FROM operational_current \
2499                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2500                [],
2501                |row| row.get(0),
2502            )
2503            .expect("current payload");
2504        assert_eq!(payload, r#"{"status":"ok"}"#);
2505    }
2506
2507    #[test]
2508    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2509        let db = NamedTempFile::new().expect("temporary db");
2510        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2511        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2512        conn.execute(
2513            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2514             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2515            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2516        )
2517        .expect("seed collection");
2518        drop(conn);
2519        let writer = WriterActor::start(
2520            db.path(),
2521            Arc::new(SchemaManager::new()),
2522            ProvenanceMode::Warn,
2523            Arc::new(TelemetryCounters::default()),
2524        )
2525        .expect("writer");
2526
2527        writer
2528            .submit(WriteRequest {
2529                label: "disabled-validation".to_owned(),
2530                nodes: vec![],
2531                node_retires: vec![],
2532                edges: vec![],
2533                edge_retires: vec![],
2534                chunks: vec![],
2535                runs: vec![],
2536                steps: vec![],
2537                actions: vec![],
2538                optional_backfills: vec![],
2539                vec_inserts: vec![],
2540                operational_writes: vec![OperationalWrite::Put {
2541                    collection: "connector_health".to_owned(),
2542                    record_key: "gmail".to_owned(),
2543                    payload_json: r#"{"bogus":true}"#.to_owned(),
2544                    source_ref: Some("src-1".to_owned()),
2545                }],
2546            })
2547            .expect("write receipt");
2548
2549        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2550        let payload: String = conn
2551            .query_row(
2552                "SELECT payload_json FROM operational_current \
2553                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2554                [],
2555                |row| row.get(0),
2556            )
2557            .expect("current payload");
2558        assert_eq!(payload, r#"{"bogus":true}"#);
2559    }
2560
2561    #[test]
2562    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2563        let db = NamedTempFile::new().expect("temporary db");
2564        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2565        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2566        conn.execute(
2567            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2568             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2569            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2570        )
2571        .expect("seed collection");
2572        drop(conn);
2573        let writer = WriterActor::start(
2574            db.path(),
2575            Arc::new(SchemaManager::new()),
2576            ProvenanceMode::Warn,
2577            Arc::new(TelemetryCounters::default()),
2578        )
2579        .expect("writer");
2580
2581        let receipt = writer
2582            .submit(WriteRequest {
2583                label: "report-only-validation".to_owned(),
2584                nodes: vec![],
2585                node_retires: vec![],
2586                edges: vec![],
2587                edge_retires: vec![],
2588                chunks: vec![],
2589                runs: vec![],
2590                steps: vec![],
2591                actions: vec![],
2592                optional_backfills: vec![],
2593                vec_inserts: vec![],
2594                operational_writes: vec![OperationalWrite::Put {
2595                    collection: "connector_health".to_owned(),
2596                    record_key: "gmail".to_owned(),
2597                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2598                    source_ref: Some("src-1".to_owned()),
2599                }],
2600            })
2601            .expect("report_only write should succeed");
2602
2603        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2604        assert_eq!(receipt.warnings.len(), 1);
2605        assert!(
2606            receipt.warnings[0].contains("connector_health"),
2607            "warning should identify collection"
2608        );
2609        assert!(
2610            receipt.warnings[0].contains("must be one of"),
2611            "warning should explain validation failure"
2612        );
2613
2614        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2615        let payload: String = conn
2616            .query_row(
2617                "SELECT payload_json FROM operational_current \
2618                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2619                [],
2620                |row| row.get(0),
2621            )
2622            .expect("current payload");
2623        assert_eq!(payload, r#"{"status":"bogus"}"#);
2624    }
2625
2626    #[test]
2627    fn writer_rejects_operational_write_for_missing_collection() {
2628        let db = NamedTempFile::new().expect("temporary db");
2629        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2630        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2631        drop(conn);
2632        let writer = WriterActor::start(
2633            db.path(),
2634            Arc::new(SchemaManager::new()),
2635            ProvenanceMode::Warn,
2636            Arc::new(TelemetryCounters::default()),
2637        )
2638        .expect("writer");
2639
2640        let result = writer.submit(WriteRequest {
2641            label: "missing-operational-collection".to_owned(),
2642            nodes: vec![],
2643            node_retires: vec![],
2644            edges: vec![],
2645            edge_retires: vec![],
2646            chunks: vec![],
2647            runs: vec![],
2648            steps: vec![],
2649            actions: vec![],
2650            optional_backfills: vec![],
2651            vec_inserts: vec![],
2652            operational_writes: vec![OperationalWrite::Put {
2653                collection: "connector_health".to_owned(),
2654                record_key: "gmail".to_owned(),
2655                payload_json: r#"{"status":"ok"}"#.to_owned(),
2656                source_ref: Some("src-1".to_owned()),
2657            }],
2658        });
2659
2660        assert!(
2661            matches!(result, Err(EngineError::InvalidWrite(_))),
2662            "missing operational collection must return InvalidWrite"
2663        );
2664    }
2665
2666    #[test]
2667    fn writer_append_operational_write_records_history_without_current_row() {
2668        let db = NamedTempFile::new().expect("temporary db");
2669        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2670        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2671        conn.execute(
2672            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2673             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2674            [],
2675        )
2676        .expect("seed collection");
2677        drop(conn);
2678        let writer = WriterActor::start(
2679            db.path(),
2680            Arc::new(SchemaManager::new()),
2681            ProvenanceMode::Warn,
2682            Arc::new(TelemetryCounters::default()),
2683        )
2684        .expect("writer");
2685
2686        writer
2687            .submit(WriteRequest {
2688                label: "append-operational".to_owned(),
2689                nodes: vec![],
2690                node_retires: vec![],
2691                edges: vec![],
2692                edge_retires: vec![],
2693                chunks: vec![],
2694                runs: vec![],
2695                steps: vec![],
2696                actions: vec![],
2697                optional_backfills: vec![],
2698                vec_inserts: vec![],
2699                operational_writes: vec![OperationalWrite::Append {
2700                    collection: "audit_log".to_owned(),
2701                    record_key: "evt-1".to_owned(),
2702                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2703                    source_ref: Some("src-1".to_owned()),
2704                }],
2705            })
2706            .expect("write receipt");
2707
2708        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2709        let mutation: (String, String) = conn
2710            .query_row(
2711                "SELECT op_kind, payload_json FROM operational_mutations \
2712                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2713                [],
2714                |row| Ok((row.get(0)?, row.get(1)?)),
2715            )
2716            .expect("mutation row");
2717        assert_eq!(mutation.0, "append");
2718        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2719        let current_count: i64 = conn
2720            .query_row(
2721                "SELECT count(*) FROM operational_current \
2722                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2723                [],
2724                |row| row.get(0),
2725            )
2726            .expect("current count");
2727        assert_eq!(current_count, 0);
2728    }
2729
2730    #[test]
2731    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2732        let db = NamedTempFile::new().expect("temporary db");
2733        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2734        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2735        conn.execute(
2736            "INSERT INTO operational_collections \
2737             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2738             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2739                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2740            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2741        )
2742        .expect("seed collection");
2743        drop(conn);
2744        let writer = WriterActor::start(
2745            db.path(),
2746            Arc::new(SchemaManager::new()),
2747            ProvenanceMode::Warn,
2748            Arc::new(TelemetryCounters::default()),
2749        )
2750        .expect("writer");
2751
2752        let error = writer
2753            .submit(WriteRequest {
2754                label: "invalid-append".to_owned(),
2755                nodes: vec![],
2756                node_retires: vec![],
2757                edges: vec![],
2758                edge_retires: vec![],
2759                chunks: vec![],
2760                runs: vec![],
2761                steps: vec![],
2762                actions: vec![],
2763                optional_backfills: vec![],
2764                vec_inserts: vec![],
2765                operational_writes: vec![OperationalWrite::Append {
2766                    collection: "audit_log".to_owned(),
2767                    record_key: "evt-1".to_owned(),
2768                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2769                    source_ref: Some("src-1".to_owned()),
2770                }],
2771            })
2772            .expect_err("invalid append must reject");
2773        assert!(matches!(error, EngineError::InvalidWrite(_)));
2774        assert!(error.to_string().contains("must be one of"));
2775
2776        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2777        let mutation_count: i64 = conn
2778            .query_row(
2779                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2780                [],
2781                |row| row.get(0),
2782            )
2783            .expect("mutation count");
2784        assert_eq!(mutation_count, 0);
2785        let filter_count: i64 = conn
2786            .query_row(
2787                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2788                [],
2789                |row| row.get(0),
2790            )
2791            .expect("filter count");
2792        assert_eq!(filter_count, 0);
2793    }
2794
2795    #[test]
2796    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2797        let db = NamedTempFile::new().expect("temporary db");
2798        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2799        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2800        conn.execute(
2801            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2802             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2803            [],
2804        )
2805        .expect("seed collection");
2806        drop(conn);
2807        let writer = WriterActor::start(
2808            db.path(),
2809            Arc::new(SchemaManager::new()),
2810            ProvenanceMode::Warn,
2811            Arc::new(TelemetryCounters::default()),
2812        )
2813        .expect("writer");
2814
2815        writer
2816            .submit(WriteRequest {
2817                label: "put-operational".to_owned(),
2818                nodes: vec![],
2819                node_retires: vec![],
2820                edges: vec![],
2821                edge_retires: vec![],
2822                chunks: vec![],
2823                runs: vec![],
2824                steps: vec![],
2825                actions: vec![],
2826                optional_backfills: vec![],
2827                vec_inserts: vec![],
2828                operational_writes: vec![OperationalWrite::Put {
2829                    collection: "connector_health".to_owned(),
2830                    record_key: "gmail".to_owned(),
2831                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2832                    source_ref: Some("src-1".to_owned()),
2833                }],
2834            })
2835            .expect("put receipt");
2836
2837        writer
2838            .submit(WriteRequest {
2839                label: "delete-operational".to_owned(),
2840                nodes: vec![],
2841                node_retires: vec![],
2842                edges: vec![],
2843                edge_retires: vec![],
2844                chunks: vec![],
2845                runs: vec![],
2846                steps: vec![],
2847                actions: vec![],
2848                optional_backfills: vec![],
2849                vec_inserts: vec![],
2850                operational_writes: vec![OperationalWrite::Delete {
2851                    collection: "connector_health".to_owned(),
2852                    record_key: "gmail".to_owned(),
2853                    source_ref: Some("src-2".to_owned()),
2854                }],
2855            })
2856            .expect("delete receipt");
2857
2858        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2859        let mutation_kinds: Vec<String> = {
2860            let mut stmt = conn
2861                .prepare(
2862                    "SELECT op_kind FROM operational_mutations \
2863                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2864                     ORDER BY mutation_order ASC",
2865                )
2866                .expect("stmt");
2867            stmt.query_map([], |row| row.get(0))
2868                .expect("rows")
2869                .collect::<Result<_, _>>()
2870                .expect("collect")
2871        };
2872        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2873        let current_count: i64 = conn
2874            .query_row(
2875                "SELECT count(*) FROM operational_current \
2876                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2877                [],
2878                |row| row.get(0),
2879            )
2880            .expect("current count");
2881        assert_eq!(current_count, 0);
2882    }
2883
2884    #[test]
2885    fn writer_delete_bypasses_validation_contract() {
2886        let db = NamedTempFile::new().expect("temporary db");
2887        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2888        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2889        conn.execute(
2890            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2891             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2892            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2893        )
2894        .expect("seed collection");
2895        drop(conn);
2896        let writer = WriterActor::start(
2897            db.path(),
2898            Arc::new(SchemaManager::new()),
2899            ProvenanceMode::Warn,
2900            Arc::new(TelemetryCounters::default()),
2901        )
2902        .expect("writer");
2903
2904        writer
2905            .submit(WriteRequest {
2906                label: "valid-put".to_owned(),
2907                nodes: vec![],
2908                node_retires: vec![],
2909                edges: vec![],
2910                edge_retires: vec![],
2911                chunks: vec![],
2912                runs: vec![],
2913                steps: vec![],
2914                actions: vec![],
2915                optional_backfills: vec![],
2916                vec_inserts: vec![],
2917                operational_writes: vec![OperationalWrite::Put {
2918                    collection: "connector_health".to_owned(),
2919                    record_key: "gmail".to_owned(),
2920                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2921                    source_ref: Some("src-1".to_owned()),
2922                }],
2923            })
2924            .expect("put receipt");
2925        writer
2926            .submit(WriteRequest {
2927                label: "delete-after-put".to_owned(),
2928                nodes: vec![],
2929                node_retires: vec![],
2930                edges: vec![],
2931                edge_retires: vec![],
2932                chunks: vec![],
2933                runs: vec![],
2934                steps: vec![],
2935                actions: vec![],
2936                optional_backfills: vec![],
2937                vec_inserts: vec![],
2938                operational_writes: vec![OperationalWrite::Delete {
2939                    collection: "connector_health".to_owned(),
2940                    record_key: "gmail".to_owned(),
2941                    source_ref: Some("src-2".to_owned()),
2942                }],
2943            })
2944            .expect("delete receipt");
2945
2946        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2947        let current_count: i64 = conn
2948            .query_row(
2949                "SELECT count(*) FROM operational_current \
2950                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2951                [],
2952                |row| row.get(0),
2953            )
2954            .expect("current count");
2955        assert_eq!(current_count, 0);
2956    }
2957
2958    #[test]
2959    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2960        let db = NamedTempFile::new().expect("temporary db");
2961        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2962        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2963        conn.execute(
2964            "INSERT INTO operational_collections \
2965             (name, kind, schema_json, retention_json, secondary_indexes_json) \
2966             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2967            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2968        )
2969        .expect("seed collection");
2970        drop(conn);
2971        let writer = WriterActor::start(
2972            db.path(),
2973            Arc::new(SchemaManager::new()),
2974            ProvenanceMode::Warn,
2975            Arc::new(TelemetryCounters::default()),
2976        )
2977        .expect("writer");
2978
2979        writer
2980            .submit(WriteRequest {
2981                label: "secondary-index-put".to_owned(),
2982                nodes: vec![],
2983                node_retires: vec![],
2984                edges: vec![],
2985                edge_retires: vec![],
2986                chunks: vec![],
2987                runs: vec![],
2988                steps: vec![],
2989                actions: vec![],
2990                optional_backfills: vec![],
2991                vec_inserts: vec![],
2992                operational_writes: vec![OperationalWrite::Put {
2993                    collection: "connector_health".to_owned(),
2994                    record_key: "gmail".to_owned(),
2995                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2996                        .to_owned(),
2997                    source_ref: Some("src-1".to_owned()),
2998                }],
2999            })
3000            .expect("put receipt");
3001
3002        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3003        let current_entry_count: i64 = conn
3004            .query_row(
3005                "SELECT count(*) FROM operational_secondary_index_entries \
3006                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3007                [],
3008                |row| row.get(0),
3009            )
3010            .expect("current secondary index count");
3011        assert_eq!(current_entry_count, 2);
3012        drop(conn);
3013
3014        writer
3015            .submit(WriteRequest {
3016                label: "secondary-index-delete".to_owned(),
3017                nodes: vec![],
3018                node_retires: vec![],
3019                edges: vec![],
3020                edge_retires: vec![],
3021                chunks: vec![],
3022                runs: vec![],
3023                steps: vec![],
3024                actions: vec![],
3025                optional_backfills: vec![],
3026                vec_inserts: vec![],
3027                operational_writes: vec![OperationalWrite::Delete {
3028                    collection: "connector_health".to_owned(),
3029                    record_key: "gmail".to_owned(),
3030                    source_ref: Some("src-2".to_owned()),
3031                }],
3032            })
3033            .expect("delete receipt");
3034
3035        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3036        let current_entry_count: i64 = conn
3037            .query_row(
3038                "SELECT count(*) FROM operational_secondary_index_entries \
3039                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3040                [],
3041                |row| row.get(0),
3042            )
3043            .expect("current secondary index count");
3044        assert_eq!(current_entry_count, 0);
3045    }
3046
3047    #[test]
3048    fn writer_latest_state_operational_writes_persist_mutation_order() {
3049        let db = NamedTempFile::new().expect("temporary db");
3050        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3051        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3052        conn.execute(
3053            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3054             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3055            [],
3056        )
3057        .expect("seed collection");
3058        drop(conn);
3059
3060        let writer = WriterActor::start(
3061            db.path(),
3062            Arc::new(SchemaManager::new()),
3063            ProvenanceMode::Warn,
3064            Arc::new(TelemetryCounters::default()),
3065        )
3066        .expect("writer");
3067
3068        writer
3069            .submit(WriteRequest {
3070                label: "ordered-operational-batch".to_owned(),
3071                nodes: vec![],
3072                node_retires: vec![],
3073                edges: vec![],
3074                edge_retires: vec![],
3075                chunks: vec![],
3076                runs: vec![],
3077                steps: vec![],
3078                actions: vec![],
3079                optional_backfills: vec![],
3080                vec_inserts: vec![],
3081                operational_writes: vec![
3082                    OperationalWrite::Put {
3083                        collection: "connector_health".to_owned(),
3084                        record_key: "gmail".to_owned(),
3085                        payload_json: r#"{"status":"old"}"#.to_owned(),
3086                        source_ref: Some("src-1".to_owned()),
3087                    },
3088                    OperationalWrite::Delete {
3089                        collection: "connector_health".to_owned(),
3090                        record_key: "gmail".to_owned(),
3091                        source_ref: Some("src-2".to_owned()),
3092                    },
3093                    OperationalWrite::Put {
3094                        collection: "connector_health".to_owned(),
3095                        record_key: "gmail".to_owned(),
3096                        payload_json: r#"{"status":"new"}"#.to_owned(),
3097                        source_ref: Some("src-3".to_owned()),
3098                    },
3099                ],
3100            })
3101            .expect("write receipt");
3102
3103        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3104        let rows: Vec<(String, i64)> = {
3105            let mut stmt = conn
3106                .prepare(
3107                    "SELECT op_kind, mutation_order FROM operational_mutations \
3108                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3109                     ORDER BY mutation_order ASC",
3110                )
3111                .expect("stmt");
3112            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3113                .expect("rows")
3114                .collect::<Result<_, _>>()
3115                .expect("collect")
3116        };
3117        assert_eq!(
3118            rows,
3119            vec![
3120                ("put".to_owned(), 1),
3121                ("delete".to_owned(), 2),
3122                ("put".to_owned(), 3),
3123            ]
3124        );
3125        let payload: String = conn
3126            .query_row(
3127                "SELECT payload_json FROM operational_current \
3128                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3129                [],
3130                |row| row.get(0),
3131            )
3132            .expect("current payload");
3133        assert_eq!(payload, r#"{"status":"new"}"#);
3134    }
3135
3136    #[test]
3137    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3138        let db = NamedTempFile::new().expect("temporary db");
3139        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3140        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3141        conn.execute(
3142            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3143             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3144            [],
3145        )
3146        .expect("seed collection");
3147
3148        let request = WriteRequest {
3149            label: "disabled-race".to_owned(),
3150            nodes: vec![],
3151            node_retires: vec![],
3152            edges: vec![],
3153            edge_retires: vec![],
3154            chunks: vec![],
3155            runs: vec![],
3156            steps: vec![],
3157            actions: vec![],
3158            optional_backfills: vec![],
3159            vec_inserts: vec![],
3160            operational_writes: vec![OperationalWrite::Put {
3161                collection: "connector_health".to_owned(),
3162                record_key: "gmail".to_owned(),
3163                payload_json: r#"{"status":"ok"}"#.to_owned(),
3164                source_ref: Some("src-1".to_owned()),
3165            }],
3166        };
3167        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3168        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3169
3170        conn.execute(
3171            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3172            [],
3173        )
3174        .expect("disable collection after preflight");
3175
3176        let error =
3177            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3178        assert!(matches!(error, EngineError::InvalidWrite(_)));
3179        assert!(error.to_string().contains("is disabled"));
3180
3181        let mutation_count: i64 = conn
3182            .query_row(
3183                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3184                [],
3185                |row| row.get(0),
3186            )
3187            .expect("mutation count");
3188        assert_eq!(mutation_count, 0);
3189
3190        let current_count: i64 = conn
3191            .query_row(
3192                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3193                [],
3194                |row| row.get(0),
3195            )
3196            .expect("current count");
3197        assert_eq!(current_count, 0);
3198    }
3199
3200    #[test]
3201    fn writer_enforce_validation_rejects_invalid_put_atomically() {
3202        let db = NamedTempFile::new().expect("temporary db");
3203        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3204        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3205        conn.execute(
3206            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3207             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3208            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3209        )
3210        .expect("seed collection");
3211        drop(conn);
3212        let writer = WriterActor::start(
3213            db.path(),
3214            Arc::new(SchemaManager::new()),
3215            ProvenanceMode::Warn,
3216            Arc::new(TelemetryCounters::default()),
3217        )
3218        .expect("writer");
3219
3220        let error = writer
3221            .submit(WriteRequest {
3222                label: "invalid-put".to_owned(),
3223                nodes: vec![NodeInsert {
3224                    row_id: "row-1".to_owned(),
3225                    logical_id: "lg-1".to_owned(),
3226                    kind: "Meeting".to_owned(),
3227                    properties: "{}".to_owned(),
3228                    source_ref: Some("src-1".to_owned()),
3229                    upsert: false,
3230                    chunk_policy: ChunkPolicy::Preserve,
3231                    content_ref: None,
3232                }],
3233                node_retires: vec![],
3234                edges: vec![],
3235                edge_retires: vec![],
3236                chunks: vec![],
3237                runs: vec![],
3238                steps: vec![],
3239                actions: vec![],
3240                optional_backfills: vec![],
3241                vec_inserts: vec![],
3242                operational_writes: vec![OperationalWrite::Put {
3243                    collection: "connector_health".to_owned(),
3244                    record_key: "gmail".to_owned(),
3245                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
3246                    source_ref: Some("src-1".to_owned()),
3247                }],
3248            })
3249            .expect_err("invalid put must reject");
3250        assert!(matches!(error, EngineError::InvalidWrite(_)));
3251        assert!(error.to_string().contains("must be one of"));
3252
3253        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3254        let node_count: i64 = conn
3255            .query_row(
3256                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3257                [],
3258                |row| row.get(0),
3259            )
3260            .expect("node count");
3261        assert_eq!(node_count, 0);
3262        let mutation_count: i64 = conn
3263            .query_row(
3264                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3265                [],
3266                |row| row.get(0),
3267            )
3268            .expect("mutation count");
3269        assert_eq!(mutation_count, 0);
3270        let current_count: i64 = conn
3271            .query_row(
3272                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3273                [],
3274                |row| row.get(0),
3275            )
3276            .expect("current count");
3277        assert_eq!(current_count, 0);
3278    }
3279
3280    #[test]
3281    fn writer_rejects_append_against_latest_state_collection() {
3282        let db = NamedTempFile::new().expect("temporary db");
3283        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3284        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3285        conn.execute(
3286            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3287             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3288            [],
3289        )
3290        .expect("seed collection");
3291        drop(conn);
3292        let writer = WriterActor::start(
3293            db.path(),
3294            Arc::new(SchemaManager::new()),
3295            ProvenanceMode::Warn,
3296            Arc::new(TelemetryCounters::default()),
3297        )
3298        .expect("writer");
3299
3300        let result = writer.submit(WriteRequest {
3301            label: "bad-append".to_owned(),
3302            nodes: vec![],
3303            node_retires: vec![],
3304            edges: vec![],
3305            edge_retires: vec![],
3306            chunks: vec![],
3307            runs: vec![],
3308            steps: vec![],
3309            actions: vec![],
3310            optional_backfills: vec![],
3311            vec_inserts: vec![],
3312            operational_writes: vec![OperationalWrite::Append {
3313                collection: "connector_health".to_owned(),
3314                record_key: "gmail".to_owned(),
3315                payload_json: r#"{"status":"ok"}"#.to_owned(),
3316                source_ref: Some("src-1".to_owned()),
3317            }],
3318        });
3319
3320        assert!(
3321            matches!(result, Err(EngineError::InvalidWrite(_))),
3322            "latest_state collection must reject Append"
3323        );
3324    }
3325
3326    #[test]
3327    fn writer_upsert_supersedes_prior_active_node() {
3328        let db = NamedTempFile::new().expect("temporary db");
3329        let writer = WriterActor::start(
3330            db.path(),
3331            Arc::new(SchemaManager::new()),
3332            ProvenanceMode::Warn,
3333            Arc::new(TelemetryCounters::default()),
3334        )
3335        .expect("writer");
3336
3337        writer
3338            .submit(WriteRequest {
3339                label: "v1".to_owned(),
3340                nodes: vec![NodeInsert {
3341                    row_id: "row-1".to_owned(),
3342                    logical_id: "lg-1".to_owned(),
3343                    kind: "Meeting".to_owned(),
3344                    properties: r#"{"version":1}"#.to_owned(),
3345                    source_ref: Some("src-1".to_owned()),
3346                    upsert: false,
3347                    chunk_policy: ChunkPolicy::Preserve,
3348                    content_ref: None,
3349                }],
3350                node_retires: vec![],
3351                edges: vec![],
3352                edge_retires: vec![],
3353                chunks: vec![],
3354                runs: vec![],
3355                steps: vec![],
3356                actions: vec![],
3357                optional_backfills: vec![],
3358                vec_inserts: vec![],
3359                operational_writes: vec![],
3360            })
3361            .expect("v1 write");
3362
3363        writer
3364            .submit(WriteRequest {
3365                label: "v2".to_owned(),
3366                nodes: vec![NodeInsert {
3367                    row_id: "row-2".to_owned(),
3368                    logical_id: "lg-1".to_owned(),
3369                    kind: "Meeting".to_owned(),
3370                    properties: r#"{"version":2}"#.to_owned(),
3371                    source_ref: Some("src-2".to_owned()),
3372                    upsert: true,
3373                    chunk_policy: ChunkPolicy::Preserve,
3374                    content_ref: None,
3375                }],
3376                node_retires: vec![],
3377                edges: vec![],
3378                edge_retires: vec![],
3379                chunks: vec![],
3380                runs: vec![],
3381                steps: vec![],
3382                actions: vec![],
3383                optional_backfills: vec![],
3384                vec_inserts: vec![],
3385                operational_writes: vec![],
3386            })
3387            .expect("v2 upsert write");
3388
3389        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3390        let (active_row_id, props): (String, String) = conn
3391            .query_row(
3392                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3393                [],
3394                |row| Ok((row.get(0)?, row.get(1)?)),
3395            )
3396            .expect("active row");
3397        assert_eq!(active_row_id, "row-2");
3398        assert!(props.contains("\"version\":2"));
3399
3400        let superseded: i64 = conn
3401            .query_row(
3402                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3403                [],
3404                |row| row.get(0),
3405            )
3406            .expect("superseded count");
3407        assert_eq!(superseded, 1);
3408    }
3409
3410    #[test]
3411    fn writer_inserts_edge_between_two_nodes() {
3412        let db = NamedTempFile::new().expect("temporary db");
3413        let writer = WriterActor::start(
3414            db.path(),
3415            Arc::new(SchemaManager::new()),
3416            ProvenanceMode::Warn,
3417            Arc::new(TelemetryCounters::default()),
3418        )
3419        .expect("writer");
3420
3421        writer
3422            .submit(WriteRequest {
3423                label: "nodes-and-edge".to_owned(),
3424                nodes: vec![
3425                    NodeInsert {
3426                        row_id: "row-meeting".to_owned(),
3427                        logical_id: "meeting-1".to_owned(),
3428                        kind: "Meeting".to_owned(),
3429                        properties: "{}".to_owned(),
3430                        source_ref: Some("src-1".to_owned()),
3431                        upsert: false,
3432                        chunk_policy: ChunkPolicy::Preserve,
3433                        content_ref: None,
3434                    },
3435                    NodeInsert {
3436                        row_id: "row-task".to_owned(),
3437                        logical_id: "task-1".to_owned(),
3438                        kind: "Task".to_owned(),
3439                        properties: "{}".to_owned(),
3440                        source_ref: Some("src-1".to_owned()),
3441                        upsert: false,
3442                        chunk_policy: ChunkPolicy::Preserve,
3443                        content_ref: None,
3444                    },
3445                ],
3446                node_retires: vec![],
3447                edges: vec![EdgeInsert {
3448                    row_id: "edge-1".to_owned(),
3449                    logical_id: "edge-lg-1".to_owned(),
3450                    source_logical_id: "meeting-1".to_owned(),
3451                    target_logical_id: "task-1".to_owned(),
3452                    kind: "HAS_TASK".to_owned(),
3453                    properties: "{}".to_owned(),
3454                    source_ref: Some("src-1".to_owned()),
3455                    upsert: false,
3456                }],
3457                edge_retires: vec![],
3458                chunks: vec![],
3459                runs: vec![],
3460                steps: vec![],
3461                actions: vec![],
3462                optional_backfills: vec![],
3463                vec_inserts: vec![],
3464                operational_writes: vec![],
3465            })
3466            .expect("write receipt");
3467
3468        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3469        let (src, tgt, kind): (String, String, String) = conn
3470            .query_row(
3471                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3472                [],
3473                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3474            )
3475            .expect("edge row");
3476        assert_eq!(src, "meeting-1");
3477        assert_eq!(tgt, "task-1");
3478        assert_eq!(kind, "HAS_TASK");
3479    }
3480
3481    #[test]
3482    #[allow(clippy::too_many_lines)]
3483    fn writer_upsert_supersedes_prior_active_edge() {
3484        let db = NamedTempFile::new().expect("temporary db");
3485        let writer = WriterActor::start(
3486            db.path(),
3487            Arc::new(SchemaManager::new()),
3488            ProvenanceMode::Warn,
3489            Arc::new(TelemetryCounters::default()),
3490        )
3491        .expect("writer");
3492
3493        // Write two nodes
3494        writer
3495            .submit(WriteRequest {
3496                label: "nodes".to_owned(),
3497                nodes: vec![
3498                    NodeInsert {
3499                        row_id: "row-a".to_owned(),
3500                        logical_id: "node-a".to_owned(),
3501                        kind: "Meeting".to_owned(),
3502                        properties: "{}".to_owned(),
3503                        source_ref: Some("src-1".to_owned()),
3504                        upsert: false,
3505                        chunk_policy: ChunkPolicy::Preserve,
3506                        content_ref: None,
3507                    },
3508                    NodeInsert {
3509                        row_id: "row-b".to_owned(),
3510                        logical_id: "node-b".to_owned(),
3511                        kind: "Task".to_owned(),
3512                        properties: "{}".to_owned(),
3513                        source_ref: Some("src-1".to_owned()),
3514                        upsert: false,
3515                        chunk_policy: ChunkPolicy::Preserve,
3516                        content_ref: None,
3517                    },
3518                ],
3519                node_retires: vec![],
3520                edges: vec![],
3521                edge_retires: vec![],
3522                chunks: vec![],
3523                runs: vec![],
3524                steps: vec![],
3525                actions: vec![],
3526                optional_backfills: vec![],
3527                vec_inserts: vec![],
3528                operational_writes: vec![],
3529            })
3530            .expect("nodes write");
3531
3532        // Write v1 edge
3533        writer
3534            .submit(WriteRequest {
3535                label: "edge-v1".to_owned(),
3536                nodes: vec![],
3537                node_retires: vec![],
3538                edges: vec![EdgeInsert {
3539                    row_id: "edge-row-1".to_owned(),
3540                    logical_id: "edge-lg-1".to_owned(),
3541                    source_logical_id: "node-a".to_owned(),
3542                    target_logical_id: "node-b".to_owned(),
3543                    kind: "HAS_TASK".to_owned(),
3544                    properties: r#"{"weight":1}"#.to_owned(),
3545                    source_ref: Some("src-1".to_owned()),
3546                    upsert: false,
3547                }],
3548                edge_retires: vec![],
3549                chunks: vec![],
3550                runs: vec![],
3551                steps: vec![],
3552                actions: vec![],
3553                optional_backfills: vec![],
3554                vec_inserts: vec![],
3555                operational_writes: vec![],
3556            })
3557            .expect("edge v1 write");
3558
3559        // Upsert v2 edge
3560        writer
3561            .submit(WriteRequest {
3562                label: "edge-v2".to_owned(),
3563                nodes: vec![],
3564                node_retires: vec![],
3565                edges: vec![EdgeInsert {
3566                    row_id: "edge-row-2".to_owned(),
3567                    logical_id: "edge-lg-1".to_owned(),
3568                    source_logical_id: "node-a".to_owned(),
3569                    target_logical_id: "node-b".to_owned(),
3570                    kind: "HAS_TASK".to_owned(),
3571                    properties: r#"{"weight":2}"#.to_owned(),
3572                    source_ref: Some("src-2".to_owned()),
3573                    upsert: true,
3574                }],
3575                edge_retires: vec![],
3576                chunks: vec![],
3577                runs: vec![],
3578                steps: vec![],
3579                actions: vec![],
3580                optional_backfills: vec![],
3581                vec_inserts: vec![],
3582                operational_writes: vec![],
3583            })
3584            .expect("edge v2 upsert");
3585
3586        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3587        let (active_row_id, props): (String, String) = conn
3588            .query_row(
3589                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3590                [],
3591                |row| Ok((row.get(0)?, row.get(1)?)),
3592            )
3593            .expect("active edge");
3594        assert_eq!(active_row_id, "edge-row-2");
3595        assert!(props.contains("\"weight\":2"));
3596
3597        let superseded: i64 = conn
3598            .query_row(
3599                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3600                [],
3601                |row| row.get(0),
3602            )
3603            .expect("superseded count");
3604        assert_eq!(superseded, 1);
3605    }
3606
3607    #[test]
3608    fn writer_fts_rows_are_written_to_database() {
3609        let db = NamedTempFile::new().expect("temporary db");
3610        let writer = WriterActor::start(
3611            db.path(),
3612            Arc::new(SchemaManager::new()),
3613            ProvenanceMode::Warn,
3614            Arc::new(TelemetryCounters::default()),
3615        )
3616        .expect("writer");
3617
3618        writer
3619            .submit(WriteRequest {
3620                label: "seed".to_owned(),
3621                nodes: vec![NodeInsert {
3622                    row_id: "row-1".to_owned(),
3623                    logical_id: "logical-1".to_owned(),
3624                    kind: "Meeting".to_owned(),
3625                    properties: "{}".to_owned(),
3626                    source_ref: Some("src-1".to_owned()),
3627                    upsert: false,
3628                    chunk_policy: ChunkPolicy::Preserve,
3629                    content_ref: None,
3630                }],
3631                node_retires: vec![],
3632                edges: vec![],
3633                edge_retires: vec![],
3634                chunks: vec![ChunkInsert {
3635                    id: "chunk-1".to_owned(),
3636                    node_logical_id: "logical-1".to_owned(),
3637                    text_content: "budget discussion".to_owned(),
3638                    byte_start: None,
3639                    byte_end: None,
3640                    content_hash: None,
3641                }],
3642                runs: vec![],
3643                steps: vec![],
3644                actions: vec![],
3645                optional_backfills: vec![],
3646                vec_inserts: vec![],
3647                operational_writes: vec![],
3648            })
3649            .expect("write receipt");
3650
3651        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3652        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3653            conn.query_row(
3654                "SELECT chunk_id, node_logical_id, kind, text_content \
3655                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3656                [],
3657                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3658            )
3659            .expect("fts row");
3660        assert_eq!(chunk_id, "chunk-1");
3661        assert_eq!(node_logical_id, "logical-1");
3662        assert_eq!(kind, "Meeting");
3663        assert_eq!(text_content, "budget discussion");
3664    }
3665
3666    #[test]
3667    fn writer_receipt_warns_on_nodes_without_source_ref() {
3668        let db = NamedTempFile::new().expect("temporary db");
3669        let writer = WriterActor::start(
3670            db.path(),
3671            Arc::new(SchemaManager::new()),
3672            ProvenanceMode::Warn,
3673            Arc::new(TelemetryCounters::default()),
3674        )
3675        .expect("writer");
3676
3677        let receipt = writer
3678            .submit(WriteRequest {
3679                label: "no-source".to_owned(),
3680                nodes: vec![NodeInsert {
3681                    row_id: "row-1".to_owned(),
3682                    logical_id: "logical-1".to_owned(),
3683                    kind: "Meeting".to_owned(),
3684                    properties: "{}".to_owned(),
3685                    source_ref: None,
3686                    upsert: false,
3687                    chunk_policy: ChunkPolicy::Preserve,
3688                    content_ref: None,
3689                }],
3690                node_retires: vec![],
3691                edges: vec![],
3692                edge_retires: vec![],
3693                chunks: vec![],
3694                runs: vec![],
3695                steps: vec![],
3696                actions: vec![],
3697                optional_backfills: vec![],
3698                vec_inserts: vec![],
3699                operational_writes: vec![],
3700            })
3701            .expect("write receipt");
3702
3703        assert_eq!(receipt.provenance_warnings.len(), 1);
3704        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3705    }
3706
3707    #[test]
3708    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3709        let db = NamedTempFile::new().expect("temporary db");
3710        let writer = WriterActor::start(
3711            db.path(),
3712            Arc::new(SchemaManager::new()),
3713            ProvenanceMode::Warn,
3714            Arc::new(TelemetryCounters::default()),
3715        )
3716        .expect("writer");
3717
3718        let receipt = writer
3719            .submit(WriteRequest {
3720                label: "with-source".to_owned(),
3721                nodes: vec![NodeInsert {
3722                    row_id: "row-1".to_owned(),
3723                    logical_id: "logical-1".to_owned(),
3724                    kind: "Meeting".to_owned(),
3725                    properties: "{}".to_owned(),
3726                    source_ref: Some("src-1".to_owned()),
3727                    upsert: false,
3728                    chunk_policy: ChunkPolicy::Preserve,
3729                    content_ref: None,
3730                }],
3731                node_retires: vec![],
3732                edges: vec![],
3733                edge_retires: vec![],
3734                chunks: vec![],
3735                runs: vec![],
3736                steps: vec![],
3737                actions: vec![],
3738                optional_backfills: vec![],
3739                vec_inserts: vec![],
3740                operational_writes: vec![],
3741            })
3742            .expect("write receipt");
3743
3744        assert!(receipt.provenance_warnings.is_empty());
3745    }
3746
3747    #[test]
3748    fn writer_accepts_chunk_for_pre_existing_node() {
3749        let db = NamedTempFile::new().expect("temporary db");
3750        let writer = WriterActor::start(
3751            db.path(),
3752            Arc::new(SchemaManager::new()),
3753            ProvenanceMode::Warn,
3754            Arc::new(TelemetryCounters::default()),
3755        )
3756        .expect("writer");
3757
3758        // Request 1: submit node only
3759        writer
3760            .submit(WriteRequest {
3761                label: "r1".to_owned(),
3762                nodes: vec![NodeInsert {
3763                    row_id: "row-1".to_owned(),
3764                    logical_id: "logical-1".to_owned(),
3765                    kind: "Meeting".to_owned(),
3766                    properties: "{}".to_owned(),
3767                    source_ref: Some("src-1".to_owned()),
3768                    upsert: false,
3769                    chunk_policy: ChunkPolicy::Preserve,
3770                    content_ref: None,
3771                }],
3772                node_retires: vec![],
3773                edges: vec![],
3774                edge_retires: vec![],
3775                chunks: vec![],
3776                runs: vec![],
3777                steps: vec![],
3778                actions: vec![],
3779                optional_backfills: vec![],
3780                vec_inserts: vec![],
3781                operational_writes: vec![],
3782            })
3783            .expect("r1 write");
3784
3785        // Request 2: submit chunk for pre-existing node
3786        writer
3787            .submit(WriteRequest {
3788                label: "r2".to_owned(),
3789                nodes: vec![],
3790                node_retires: vec![],
3791                edges: vec![],
3792                edge_retires: vec![],
3793                chunks: vec![ChunkInsert {
3794                    id: "chunk-1".to_owned(),
3795                    node_logical_id: "logical-1".to_owned(),
3796                    text_content: "budget discussion".to_owned(),
3797                    byte_start: None,
3798                    byte_end: None,
3799                    content_hash: None,
3800                }],
3801                runs: vec![],
3802                steps: vec![],
3803                actions: vec![],
3804                optional_backfills: vec![],
3805                vec_inserts: vec![],
3806                operational_writes: vec![],
3807            })
3808            .expect("r2 write — chunk for pre-existing node");
3809
3810        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3811        let count: i64 = conn
3812            .query_row(
3813                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3814                [],
3815                |row| row.get(0),
3816            )
3817            .expect("fts count");
3818        assert_eq!(
3819            count, 1,
3820            "FTS row must exist for chunk attached to pre-existing node"
3821        );
3822    }
3823
3824    #[test]
3825    fn writer_rejects_chunk_for_completely_unknown_node() {
3826        let db = NamedTempFile::new().expect("temporary db");
3827        let writer = WriterActor::start(
3828            db.path(),
3829            Arc::new(SchemaManager::new()),
3830            ProvenanceMode::Warn,
3831            Arc::new(TelemetryCounters::default()),
3832        )
3833        .expect("writer");
3834
3835        let result = writer.submit(WriteRequest {
3836            label: "bad".to_owned(),
3837            nodes: vec![],
3838            node_retires: vec![],
3839            edges: vec![],
3840            edge_retires: vec![],
3841            chunks: vec![ChunkInsert {
3842                id: "chunk-1".to_owned(),
3843                node_logical_id: "nonexistent".to_owned(),
3844                text_content: "some text".to_owned(),
3845                byte_start: None,
3846                byte_end: None,
3847                content_hash: None,
3848            }],
3849            runs: vec![],
3850            steps: vec![],
3851            actions: vec![],
3852            optional_backfills: vec![],
3853            vec_inserts: vec![],
3854            operational_writes: vec![],
3855        });
3856
3857        assert!(
3858            matches!(result, Err(EngineError::InvalidWrite(_))),
3859            "completely unknown node must return InvalidWrite"
3860        );
3861    }
3862
3863    #[test]
3864    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3865        let db = NamedTempFile::new().expect("temporary db");
3866        let writer = WriterActor::start(
3867            db.path(),
3868            Arc::new(SchemaManager::new()),
3869            ProvenanceMode::Warn,
3870            Arc::new(TelemetryCounters::default()),
3871        )
3872        .expect("writer");
3873
3874        let receipt = writer
3875            .submit(WriteRequest {
3876                label: "seed".to_owned(),
3877                nodes: vec![NodeInsert {
3878                    row_id: "row-1".to_owned(),
3879                    logical_id: "logical-1".to_owned(),
3880                    kind: "Meeting".to_owned(),
3881                    properties: "{}".to_owned(),
3882                    source_ref: None,
3883                    upsert: false,
3884                    chunk_policy: ChunkPolicy::Preserve,
3885                    content_ref: None,
3886                }],
3887                node_retires: vec![],
3888                edges: vec![],
3889                edge_retires: vec![],
3890                chunks: vec![ChunkInsert {
3891                    id: "chunk-1".to_owned(),
3892                    node_logical_id: "logical-1".to_owned(),
3893                    text_content: "budget discussion".to_owned(),
3894                    byte_start: None,
3895                    byte_end: None,
3896                    content_hash: None,
3897                }],
3898                runs: vec![],
3899                steps: vec![],
3900                actions: vec![],
3901                optional_backfills: vec![],
3902                vec_inserts: vec![],
3903                operational_writes: vec![],
3904            })
3905            .expect("write receipt");
3906
3907        assert_eq!(receipt.label, "seed");
3908    }
3909
3910    #[test]
3911    fn writer_node_retire_supersedes_active_node() {
3912        let db = NamedTempFile::new().expect("temporary db");
3913        let writer = WriterActor::start(
3914            db.path(),
3915            Arc::new(SchemaManager::new()),
3916            ProvenanceMode::Warn,
3917            Arc::new(TelemetryCounters::default()),
3918        )
3919        .expect("writer");
3920
3921        writer
3922            .submit(WriteRequest {
3923                label: "seed".to_owned(),
3924                nodes: vec![NodeInsert {
3925                    row_id: "row-1".to_owned(),
3926                    logical_id: "meeting-1".to_owned(),
3927                    kind: "Meeting".to_owned(),
3928                    properties: "{}".to_owned(),
3929                    source_ref: Some("src-1".to_owned()),
3930                    upsert: false,
3931                    chunk_policy: ChunkPolicy::Preserve,
3932                    content_ref: None,
3933                }],
3934                node_retires: vec![],
3935                edges: vec![],
3936                edge_retires: vec![],
3937                chunks: vec![],
3938                runs: vec![],
3939                steps: vec![],
3940                actions: vec![],
3941                optional_backfills: vec![],
3942                vec_inserts: vec![],
3943                operational_writes: vec![],
3944            })
3945            .expect("seed write");
3946
3947        writer
3948            .submit(WriteRequest {
3949                label: "retire".to_owned(),
3950                nodes: vec![],
3951                node_retires: vec![NodeRetire {
3952                    logical_id: "meeting-1".to_owned(),
3953                    source_ref: Some("src-2".to_owned()),
3954                }],
3955                edges: vec![],
3956                edge_retires: vec![],
3957                chunks: vec![],
3958                runs: vec![],
3959                steps: vec![],
3960                actions: vec![],
3961                optional_backfills: vec![],
3962                vec_inserts: vec![],
3963                operational_writes: vec![],
3964            })
3965            .expect("retire write");
3966
3967        let conn = rusqlite::Connection::open(db.path()).expect("open");
3968        let active: i64 = conn
3969            .query_row(
3970                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3971                [],
3972                |r| r.get(0),
3973            )
3974            .expect("count active");
3975        let historical: i64 = conn
3976            .query_row(
3977                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3978                [],
3979                |r| r.get(0),
3980            )
3981            .expect("count historical");
3982
3983        assert_eq!(active, 0, "active count must be 0 after retire");
3984        assert_eq!(historical, 1, "historical count must be 1 after retire");
3985    }
3986
3987    #[test]
3988    fn writer_node_retire_preserves_chunks_and_clears_fts() {
3989        let db = NamedTempFile::new().expect("temporary db");
3990        let writer = WriterActor::start(
3991            db.path(),
3992            Arc::new(SchemaManager::new()),
3993            ProvenanceMode::Warn,
3994            Arc::new(TelemetryCounters::default()),
3995        )
3996        .expect("writer");
3997
3998        writer
3999            .submit(WriteRequest {
4000                label: "seed".to_owned(),
4001                nodes: vec![NodeInsert {
4002                    row_id: "row-1".to_owned(),
4003                    logical_id: "meeting-1".to_owned(),
4004                    kind: "Meeting".to_owned(),
4005                    properties: "{}".to_owned(),
4006                    source_ref: Some("src-1".to_owned()),
4007                    upsert: false,
4008                    chunk_policy: ChunkPolicy::Preserve,
4009                    content_ref: None,
4010                }],
4011                node_retires: vec![],
4012                edges: vec![],
4013                edge_retires: vec![],
4014                chunks: vec![ChunkInsert {
4015                    id: "chunk-1".to_owned(),
4016                    node_logical_id: "meeting-1".to_owned(),
4017                    text_content: "budget discussion".to_owned(),
4018                    byte_start: None,
4019                    byte_end: None,
4020                    content_hash: None,
4021                }],
4022                runs: vec![],
4023                steps: vec![],
4024                actions: vec![],
4025                optional_backfills: vec![],
4026                vec_inserts: vec![],
4027                operational_writes: vec![],
4028            })
4029            .expect("seed write");
4030
4031        writer
4032            .submit(WriteRequest {
4033                label: "retire".to_owned(),
4034                nodes: vec![],
4035                node_retires: vec![NodeRetire {
4036                    logical_id: "meeting-1".to_owned(),
4037                    source_ref: Some("src-2".to_owned()),
4038                }],
4039                edges: vec![],
4040                edge_retires: vec![],
4041                chunks: vec![],
4042                runs: vec![],
4043                steps: vec![],
4044                actions: vec![],
4045                optional_backfills: vec![],
4046                vec_inserts: vec![],
4047                operational_writes: vec![],
4048            })
4049            .expect("retire write");
4050
4051        let conn = rusqlite::Connection::open(db.path()).expect("open");
4052        let chunk_count: i64 = conn
4053            .query_row(
4054                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4055                [],
4056                |r| r.get(0),
4057            )
4058            .expect("chunk count");
4059        let fts_count: i64 = conn
4060            .query_row(
4061                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4062                [],
4063                |r| r.get(0),
4064            )
4065            .expect("fts count");
4066
4067        assert_eq!(
4068            chunk_count, 1,
4069            "chunks must remain after node retire so restore can re-establish content"
4070        );
4071        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4072    }
4073
4074    #[test]
4075    fn writer_edge_retire_supersedes_active_edge() {
4076        let db = NamedTempFile::new().expect("temporary db");
4077        let writer = WriterActor::start(
4078            db.path(),
4079            Arc::new(SchemaManager::new()),
4080            ProvenanceMode::Warn,
4081            Arc::new(TelemetryCounters::default()),
4082        )
4083        .expect("writer");
4084
4085        writer
4086            .submit(WriteRequest {
4087                label: "seed".to_owned(),
4088                nodes: vec![
4089                    NodeInsert {
4090                        row_id: "row-a".to_owned(),
4091                        logical_id: "node-a".to_owned(),
4092                        kind: "Meeting".to_owned(),
4093                        properties: "{}".to_owned(),
4094                        source_ref: Some("src-1".to_owned()),
4095                        upsert: false,
4096                        chunk_policy: ChunkPolicy::Preserve,
4097                        content_ref: None,
4098                    },
4099                    NodeInsert {
4100                        row_id: "row-b".to_owned(),
4101                        logical_id: "node-b".to_owned(),
4102                        kind: "Task".to_owned(),
4103                        properties: "{}".to_owned(),
4104                        source_ref: Some("src-1".to_owned()),
4105                        upsert: false,
4106                        chunk_policy: ChunkPolicy::Preserve,
4107                        content_ref: None,
4108                    },
4109                ],
4110                node_retires: vec![],
4111                edges: vec![EdgeInsert {
4112                    row_id: "edge-1".to_owned(),
4113                    logical_id: "edge-lg-1".to_owned(),
4114                    source_logical_id: "node-a".to_owned(),
4115                    target_logical_id: "node-b".to_owned(),
4116                    kind: "HAS_TASK".to_owned(),
4117                    properties: "{}".to_owned(),
4118                    source_ref: Some("src-1".to_owned()),
4119                    upsert: false,
4120                }],
4121                edge_retires: vec![],
4122                chunks: vec![],
4123                runs: vec![],
4124                steps: vec![],
4125                actions: vec![],
4126                optional_backfills: vec![],
4127                vec_inserts: vec![],
4128                operational_writes: vec![],
4129            })
4130            .expect("seed write");
4131
4132        writer
4133            .submit(WriteRequest {
4134                label: "retire-edge".to_owned(),
4135                nodes: vec![],
4136                node_retires: vec![],
4137                edges: vec![],
4138                edge_retires: vec![EdgeRetire {
4139                    logical_id: "edge-lg-1".to_owned(),
4140                    source_ref: Some("src-2".to_owned()),
4141                }],
4142                chunks: vec![],
4143                runs: vec![],
4144                steps: vec![],
4145                actions: vec![],
4146                optional_backfills: vec![],
4147                vec_inserts: vec![],
4148                operational_writes: vec![],
4149            })
4150            .expect("retire edge write");
4151
4152        let conn = rusqlite::Connection::open(db.path()).expect("open");
4153        let active: i64 = conn
4154            .query_row(
4155                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4156                [],
4157                |r| r.get(0),
4158            )
4159            .expect("active edge count");
4160
4161        assert_eq!(active, 0, "active edge count must be 0 after retire");
4162    }
4163
4164    #[test]
4165    fn writer_retire_without_source_ref_emits_provenance_warning() {
4166        let db = NamedTempFile::new().expect("temporary db");
4167        let writer = WriterActor::start(
4168            db.path(),
4169            Arc::new(SchemaManager::new()),
4170            ProvenanceMode::Warn,
4171            Arc::new(TelemetryCounters::default()),
4172        )
4173        .expect("writer");
4174
4175        writer
4176            .submit(WriteRequest {
4177                label: "seed".to_owned(),
4178                nodes: vec![NodeInsert {
4179                    row_id: "row-1".to_owned(),
4180                    logical_id: "meeting-1".to_owned(),
4181                    kind: "Meeting".to_owned(),
4182                    properties: "{}".to_owned(),
4183                    source_ref: Some("src-1".to_owned()),
4184                    upsert: false,
4185                    chunk_policy: ChunkPolicy::Preserve,
4186                    content_ref: None,
4187                }],
4188                node_retires: vec![],
4189                edges: vec![],
4190                edge_retires: vec![],
4191                chunks: vec![],
4192                runs: vec![],
4193                steps: vec![],
4194                actions: vec![],
4195                optional_backfills: vec![],
4196                vec_inserts: vec![],
4197                operational_writes: vec![],
4198            })
4199            .expect("seed write");
4200
4201        let receipt = writer
4202            .submit(WriteRequest {
4203                label: "retire-no-src".to_owned(),
4204                nodes: vec![],
4205                node_retires: vec![NodeRetire {
4206                    logical_id: "meeting-1".to_owned(),
4207                    source_ref: None,
4208                }],
4209                edges: vec![],
4210                edge_retires: vec![],
4211                chunks: vec![],
4212                runs: vec![],
4213                steps: vec![],
4214                actions: vec![],
4215                optional_backfills: vec![],
4216                vec_inserts: vec![],
4217                operational_writes: vec![],
4218            })
4219            .expect("retire write");
4220
4221        assert!(
4222            !receipt.provenance_warnings.is_empty(),
4223            "retire without source_ref must emit a provenance warning"
4224        );
4225    }
4226
4227    #[test]
4228    #[allow(clippy::too_many_lines)]
4229    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4230        let db = NamedTempFile::new().expect("temporary db");
4231        let writer = WriterActor::start(
4232            db.path(),
4233            Arc::new(SchemaManager::new()),
4234            ProvenanceMode::Warn,
4235            Arc::new(TelemetryCounters::default()),
4236        )
4237        .expect("writer");
4238
4239        writer
4240            .submit(WriteRequest {
4241                label: "v1".to_owned(),
4242                nodes: vec![NodeInsert {
4243                    row_id: "row-1".to_owned(),
4244                    logical_id: "meeting-1".to_owned(),
4245                    kind: "Meeting".to_owned(),
4246                    properties: "{}".to_owned(),
4247                    source_ref: Some("src-1".to_owned()),
4248                    upsert: false,
4249                    chunk_policy: ChunkPolicy::Preserve,
4250                    content_ref: None,
4251                }],
4252                node_retires: vec![],
4253                edges: vec![],
4254                edge_retires: vec![],
4255                chunks: vec![ChunkInsert {
4256                    id: "chunk-old".to_owned(),
4257                    node_logical_id: "meeting-1".to_owned(),
4258                    text_content: "old text".to_owned(),
4259                    byte_start: None,
4260                    byte_end: None,
4261                    content_hash: None,
4262                }],
4263                runs: vec![],
4264                steps: vec![],
4265                actions: vec![],
4266                optional_backfills: vec![],
4267                vec_inserts: vec![],
4268                operational_writes: vec![],
4269            })
4270            .expect("v1 write");
4271
4272        writer
4273            .submit(WriteRequest {
4274                label: "v2".to_owned(),
4275                nodes: vec![NodeInsert {
4276                    row_id: "row-2".to_owned(),
4277                    logical_id: "meeting-1".to_owned(),
4278                    kind: "Meeting".to_owned(),
4279                    properties: "{}".to_owned(),
4280                    source_ref: Some("src-2".to_owned()),
4281                    upsert: true,
4282                    chunk_policy: ChunkPolicy::Replace,
4283                    content_ref: None,
4284                }],
4285                node_retires: vec![],
4286                edges: vec![],
4287                edge_retires: vec![],
4288                chunks: vec![ChunkInsert {
4289                    id: "chunk-new".to_owned(),
4290                    node_logical_id: "meeting-1".to_owned(),
4291                    text_content: "new text".to_owned(),
4292                    byte_start: None,
4293                    byte_end: None,
4294                    content_hash: None,
4295                }],
4296                runs: vec![],
4297                steps: vec![],
4298                actions: vec![],
4299                optional_backfills: vec![],
4300                vec_inserts: vec![],
4301                operational_writes: vec![],
4302            })
4303            .expect("v2 write");
4304
4305        let conn = rusqlite::Connection::open(db.path()).expect("open");
4306        let old_chunk: i64 = conn
4307            .query_row(
4308                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4309                [],
4310                |r| r.get(0),
4311            )
4312            .expect("old chunk count");
4313        let new_chunk: i64 = conn
4314            .query_row(
4315                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4316                [],
4317                |r| r.get(0),
4318            )
4319            .expect("new chunk count");
4320        let fts_old: i64 = conn
4321            .query_row(
4322                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4323                [],
4324                |r| r.get(0),
4325            )
4326            .expect("old fts count");
4327
4328        assert_eq!(
4329            old_chunk, 0,
4330            "old chunk must be deleted by ChunkPolicy::Replace"
4331        );
4332        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4333        assert_eq!(
4334            fts_old, 0,
4335            "old FTS row must be deleted by ChunkPolicy::Replace"
4336        );
4337    }
4338
4339    #[test]
4340    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4341        let db = NamedTempFile::new().expect("temporary db");
4342        let writer = WriterActor::start(
4343            db.path(),
4344            Arc::new(SchemaManager::new()),
4345            ProvenanceMode::Warn,
4346            Arc::new(TelemetryCounters::default()),
4347        )
4348        .expect("writer");
4349
4350        writer
4351            .submit(WriteRequest {
4352                label: "v1".to_owned(),
4353                nodes: vec![NodeInsert {
4354                    row_id: "row-1".to_owned(),
4355                    logical_id: "meeting-1".to_owned(),
4356                    kind: "Meeting".to_owned(),
4357                    properties: "{}".to_owned(),
4358                    source_ref: Some("src-1".to_owned()),
4359                    upsert: false,
4360                    chunk_policy: ChunkPolicy::Preserve,
4361                    content_ref: None,
4362                }],
4363                node_retires: vec![],
4364                edges: vec![],
4365                edge_retires: vec![],
4366                chunks: vec![ChunkInsert {
4367                    id: "chunk-old".to_owned(),
4368                    node_logical_id: "meeting-1".to_owned(),
4369                    text_content: "old text".to_owned(),
4370                    byte_start: None,
4371                    byte_end: None,
4372                    content_hash: None,
4373                }],
4374                runs: vec![],
4375                steps: vec![],
4376                actions: vec![],
4377                optional_backfills: vec![],
4378                vec_inserts: vec![],
4379                operational_writes: vec![],
4380            })
4381            .expect("v1 write");
4382
4383        writer
4384            .submit(WriteRequest {
4385                label: "v2-props-only".to_owned(),
4386                nodes: vec![NodeInsert {
4387                    row_id: "row-2".to_owned(),
4388                    logical_id: "meeting-1".to_owned(),
4389                    kind: "Meeting".to_owned(),
4390                    properties: r#"{"status":"updated"}"#.to_owned(),
4391                    source_ref: Some("src-2".to_owned()),
4392                    upsert: true,
4393                    chunk_policy: ChunkPolicy::Preserve,
4394                    content_ref: None,
4395                }],
4396                node_retires: vec![],
4397                edges: vec![],
4398                edge_retires: vec![],
4399                chunks: vec![],
4400                runs: vec![],
4401                steps: vec![],
4402                actions: vec![],
4403                optional_backfills: vec![],
4404                vec_inserts: vec![],
4405                operational_writes: vec![],
4406            })
4407            .expect("v2 preserve write");
4408
4409        let conn = rusqlite::Connection::open(db.path()).expect("open");
4410        let old_chunk: i64 = conn
4411            .query_row(
4412                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4413                [],
4414                |r| r.get(0),
4415            )
4416            .expect("old chunk count");
4417
4418        assert_eq!(
4419            old_chunk, 1,
4420            "old chunk must be preserved by ChunkPolicy::Preserve"
4421        );
4422    }
4423
4424    #[test]
4425    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4426        let db = NamedTempFile::new().expect("temporary db");
4427        let writer = WriterActor::start(
4428            db.path(),
4429            Arc::new(SchemaManager::new()),
4430            ProvenanceMode::Warn,
4431            Arc::new(TelemetryCounters::default()),
4432        )
4433        .expect("writer");
4434
4435        writer
4436            .submit(WriteRequest {
4437                label: "v1".to_owned(),
4438                nodes: vec![NodeInsert {
4439                    row_id: "row-1".to_owned(),
4440                    logical_id: "meeting-1".to_owned(),
4441                    kind: "Meeting".to_owned(),
4442                    properties: "{}".to_owned(),
4443                    source_ref: Some("src-1".to_owned()),
4444                    upsert: false,
4445                    chunk_policy: ChunkPolicy::Preserve,
4446                    content_ref: None,
4447                }],
4448                node_retires: vec![],
4449                edges: vec![],
4450                edge_retires: vec![],
4451                chunks: vec![ChunkInsert {
4452                    id: "chunk-existing".to_owned(),
4453                    node_logical_id: "meeting-1".to_owned(),
4454                    text_content: "existing text".to_owned(),
4455                    byte_start: None,
4456                    byte_end: None,
4457                    content_hash: None,
4458                }],
4459                runs: vec![],
4460                steps: vec![],
4461                actions: vec![],
4462                optional_backfills: vec![],
4463                vec_inserts: vec![],
4464                operational_writes: vec![],
4465            })
4466            .expect("v1 write");
4467
4468        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4469        writer
4470            .submit(WriteRequest {
4471                label: "insert-no-upsert".to_owned(),
4472                nodes: vec![NodeInsert {
4473                    row_id: "row-2".to_owned(),
4474                    logical_id: "meeting-2".to_owned(),
4475                    kind: "Meeting".to_owned(),
4476                    properties: "{}".to_owned(),
4477                    source_ref: Some("src-2".to_owned()),
4478                    upsert: false,
4479                    chunk_policy: ChunkPolicy::Replace,
4480                    content_ref: None,
4481                }],
4482                node_retires: vec![],
4483                edges: vec![],
4484                edge_retires: vec![],
4485                chunks: vec![],
4486                runs: vec![],
4487                steps: vec![],
4488                actions: vec![],
4489                optional_backfills: vec![],
4490                vec_inserts: vec![],
4491                operational_writes: vec![],
4492            })
4493            .expect("insert no-upsert write");
4494
4495        let conn = rusqlite::Connection::open(db.path()).expect("open");
4496        let existing_chunk: i64 = conn
4497            .query_row(
4498                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4499                [],
4500                |r| r.get(0),
4501            )
4502            .expect("chunk count");
4503
4504        assert_eq!(
4505            existing_chunk, 1,
4506            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4507        );
4508    }
4509
4510    #[test]
4511    fn writer_run_upsert_supersedes_prior_active_run() {
4512        let db = NamedTempFile::new().expect("temporary db");
4513        let writer = WriterActor::start(
4514            db.path(),
4515            Arc::new(SchemaManager::new()),
4516            ProvenanceMode::Warn,
4517            Arc::new(TelemetryCounters::default()),
4518        )
4519        .expect("writer");
4520
4521        writer
4522            .submit(WriteRequest {
4523                label: "v1".to_owned(),
4524                nodes: vec![],
4525                node_retires: vec![],
4526                edges: vec![],
4527                edge_retires: vec![],
4528                chunks: vec![],
4529                runs: vec![RunInsert {
4530                    id: "run-v1".to_owned(),
4531                    kind: "session".to_owned(),
4532                    status: "completed".to_owned(),
4533                    properties: "{}".to_owned(),
4534                    source_ref: Some("src-1".to_owned()),
4535                    upsert: false,
4536                    supersedes_id: None,
4537                }],
4538                steps: vec![],
4539                actions: vec![],
4540                optional_backfills: vec![],
4541                vec_inserts: vec![],
4542                operational_writes: vec![],
4543            })
4544            .expect("v1 run write");
4545
4546        writer
4547            .submit(WriteRequest {
4548                label: "v2".to_owned(),
4549                nodes: vec![],
4550                node_retires: vec![],
4551                edges: vec![],
4552                edge_retires: vec![],
4553                chunks: vec![],
4554                runs: vec![RunInsert {
4555                    id: "run-v2".to_owned(),
4556                    kind: "session".to_owned(),
4557                    status: "completed".to_owned(),
4558                    properties: "{}".to_owned(),
4559                    source_ref: Some("src-2".to_owned()),
4560                    upsert: true,
4561                    supersedes_id: Some("run-v1".to_owned()),
4562                }],
4563                steps: vec![],
4564                actions: vec![],
4565                optional_backfills: vec![],
4566                vec_inserts: vec![],
4567                operational_writes: vec![],
4568            })
4569            .expect("v2 run write");
4570
4571        let conn = rusqlite::Connection::open(db.path()).expect("open");
4572        let v1_historical: i64 = conn
4573            .query_row(
4574                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4575                [],
4576                |r| r.get(0),
4577            )
4578            .expect("v1 historical count");
4579        let v2_active: i64 = conn
4580            .query_row(
4581                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4582                [],
4583                |r| r.get(0),
4584            )
4585            .expect("v2 active count");
4586
4587        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4588        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4589    }
4590
4591    #[test]
4592    fn writer_step_upsert_supersedes_prior_active_step() {
4593        let db = NamedTempFile::new().expect("temporary db");
4594        let writer = WriterActor::start(
4595            db.path(),
4596            Arc::new(SchemaManager::new()),
4597            ProvenanceMode::Warn,
4598            Arc::new(TelemetryCounters::default()),
4599        )
4600        .expect("writer");
4601
4602        writer
4603            .submit(WriteRequest {
4604                label: "v1".to_owned(),
4605                nodes: vec![],
4606                node_retires: vec![],
4607                edges: vec![],
4608                edge_retires: vec![],
4609                chunks: vec![],
4610                runs: vec![RunInsert {
4611                    id: "run-1".to_owned(),
4612                    kind: "session".to_owned(),
4613                    status: "completed".to_owned(),
4614                    properties: "{}".to_owned(),
4615                    source_ref: Some("src-1".to_owned()),
4616                    upsert: false,
4617                    supersedes_id: None,
4618                }],
4619                steps: vec![StepInsert {
4620                    id: "step-v1".to_owned(),
4621                    run_id: "run-1".to_owned(),
4622                    kind: "llm".to_owned(),
4623                    status: "completed".to_owned(),
4624                    properties: "{}".to_owned(),
4625                    source_ref: Some("src-1".to_owned()),
4626                    upsert: false,
4627                    supersedes_id: None,
4628                }],
4629                actions: vec![],
4630                optional_backfills: vec![],
4631                vec_inserts: vec![],
4632                operational_writes: vec![],
4633            })
4634            .expect("v1 step write");
4635
4636        writer
4637            .submit(WriteRequest {
4638                label: "v2".to_owned(),
4639                nodes: vec![],
4640                node_retires: vec![],
4641                edges: vec![],
4642                edge_retires: vec![],
4643                chunks: vec![],
4644                runs: vec![],
4645                steps: vec![StepInsert {
4646                    id: "step-v2".to_owned(),
4647                    run_id: "run-1".to_owned(),
4648                    kind: "llm".to_owned(),
4649                    status: "completed".to_owned(),
4650                    properties: "{}".to_owned(),
4651                    source_ref: Some("src-2".to_owned()),
4652                    upsert: true,
4653                    supersedes_id: Some("step-v1".to_owned()),
4654                }],
4655                actions: vec![],
4656                optional_backfills: vec![],
4657                vec_inserts: vec![],
4658                operational_writes: vec![],
4659            })
4660            .expect("v2 step write");
4661
4662        let conn = rusqlite::Connection::open(db.path()).expect("open");
4663        let v1_historical: i64 = conn
4664            .query_row(
4665                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4666                [],
4667                |r| r.get(0),
4668            )
4669            .expect("v1 historical count");
4670        let v2_active: i64 = conn
4671            .query_row(
4672                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4673                [],
4674                |r| r.get(0),
4675            )
4676            .expect("v2 active count");
4677
4678        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4679        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4680    }
4681
4682    #[test]
4683    fn writer_action_upsert_supersedes_prior_active_action() {
4684        let db = NamedTempFile::new().expect("temporary db");
4685        let writer = WriterActor::start(
4686            db.path(),
4687            Arc::new(SchemaManager::new()),
4688            ProvenanceMode::Warn,
4689            Arc::new(TelemetryCounters::default()),
4690        )
4691        .expect("writer");
4692
4693        writer
4694            .submit(WriteRequest {
4695                label: "v1".to_owned(),
4696                nodes: vec![],
4697                node_retires: vec![],
4698                edges: vec![],
4699                edge_retires: vec![],
4700                chunks: vec![],
4701                runs: vec![RunInsert {
4702                    id: "run-1".to_owned(),
4703                    kind: "session".to_owned(),
4704                    status: "completed".to_owned(),
4705                    properties: "{}".to_owned(),
4706                    source_ref: Some("src-1".to_owned()),
4707                    upsert: false,
4708                    supersedes_id: None,
4709                }],
4710                steps: vec![StepInsert {
4711                    id: "step-1".to_owned(),
4712                    run_id: "run-1".to_owned(),
4713                    kind: "llm".to_owned(),
4714                    status: "completed".to_owned(),
4715                    properties: "{}".to_owned(),
4716                    source_ref: Some("src-1".to_owned()),
4717                    upsert: false,
4718                    supersedes_id: None,
4719                }],
4720                actions: vec![ActionInsert {
4721                    id: "action-v1".to_owned(),
4722                    step_id: "step-1".to_owned(),
4723                    kind: "emit".to_owned(),
4724                    status: "completed".to_owned(),
4725                    properties: "{}".to_owned(),
4726                    source_ref: Some("src-1".to_owned()),
4727                    upsert: false,
4728                    supersedes_id: None,
4729                }],
4730                optional_backfills: vec![],
4731                vec_inserts: vec![],
4732                operational_writes: vec![],
4733            })
4734            .expect("v1 action write");
4735
4736        writer
4737            .submit(WriteRequest {
4738                label: "v2".to_owned(),
4739                nodes: vec![],
4740                node_retires: vec![],
4741                edges: vec![],
4742                edge_retires: vec![],
4743                chunks: vec![],
4744                runs: vec![],
4745                steps: vec![],
4746                actions: vec![ActionInsert {
4747                    id: "action-v2".to_owned(),
4748                    step_id: "step-1".to_owned(),
4749                    kind: "emit".to_owned(),
4750                    status: "completed".to_owned(),
4751                    properties: "{}".to_owned(),
4752                    source_ref: Some("src-2".to_owned()),
4753                    upsert: true,
4754                    supersedes_id: Some("action-v1".to_owned()),
4755                }],
4756                optional_backfills: vec![],
4757                vec_inserts: vec![],
4758                operational_writes: vec![],
4759            })
4760            .expect("v2 action write");
4761
4762        let conn = rusqlite::Connection::open(db.path()).expect("open");
4763        let v1_historical: i64 = conn
4764            .query_row(
4765                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4766                [],
4767                |r| r.get(0),
4768            )
4769            .expect("v1 historical count");
4770        let v2_active: i64 = conn
4771            .query_row(
4772                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4773                [],
4774                |r| r.get(0),
4775            )
4776            .expect("v2 active count");
4777
4778        assert_eq!(
4779            v1_historical, 1,
4780            "action-v1 must be historical after upsert"
4781        );
4782        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4783    }
4784
4785    // P0: runtime upsert without supersedes_id must be rejected
4786
4787    #[test]
4788    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4789        let db = NamedTempFile::new().expect("temporary db");
4790        let writer = WriterActor::start(
4791            db.path(),
4792            Arc::new(SchemaManager::new()),
4793            ProvenanceMode::Warn,
4794            Arc::new(TelemetryCounters::default()),
4795        )
4796        .expect("writer");
4797
4798        let result = writer.submit(WriteRequest {
4799            label: "bad".to_owned(),
4800            nodes: vec![],
4801            node_retires: vec![],
4802            edges: vec![],
4803            edge_retires: vec![],
4804            chunks: vec![],
4805            runs: vec![RunInsert {
4806                id: "run-1".to_owned(),
4807                kind: "session".to_owned(),
4808                status: "completed".to_owned(),
4809                properties: "{}".to_owned(),
4810                source_ref: None,
4811                upsert: true,
4812                supersedes_id: None,
4813            }],
4814            steps: vec![],
4815            actions: vec![],
4816            optional_backfills: vec![],
4817            vec_inserts: vec![],
4818            operational_writes: vec![],
4819        });
4820
4821        assert!(
4822            matches!(result, Err(EngineError::InvalidWrite(_))),
4823            "run upsert=true without supersedes_id must return InvalidWrite"
4824        );
4825    }
4826
4827    #[test]
4828    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4829        let db = NamedTempFile::new().expect("temporary db");
4830        let writer = WriterActor::start(
4831            db.path(),
4832            Arc::new(SchemaManager::new()),
4833            ProvenanceMode::Warn,
4834            Arc::new(TelemetryCounters::default()),
4835        )
4836        .expect("writer");
4837
4838        let result = writer.submit(WriteRequest {
4839            label: "bad".to_owned(),
4840            nodes: vec![],
4841            node_retires: vec![],
4842            edges: vec![],
4843            edge_retires: vec![],
4844            chunks: vec![],
4845            runs: vec![],
4846            steps: vec![StepInsert {
4847                id: "step-1".to_owned(),
4848                run_id: "run-1".to_owned(),
4849                kind: "llm".to_owned(),
4850                status: "completed".to_owned(),
4851                properties: "{}".to_owned(),
4852                source_ref: None,
4853                upsert: true,
4854                supersedes_id: None,
4855            }],
4856            actions: vec![],
4857            optional_backfills: vec![],
4858            vec_inserts: vec![],
4859            operational_writes: vec![],
4860        });
4861
4862        assert!(
4863            matches!(result, Err(EngineError::InvalidWrite(_))),
4864            "step upsert=true without supersedes_id must return InvalidWrite"
4865        );
4866    }
4867
4868    #[test]
4869    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4870        let db = NamedTempFile::new().expect("temporary db");
4871        let writer = WriterActor::start(
4872            db.path(),
4873            Arc::new(SchemaManager::new()),
4874            ProvenanceMode::Warn,
4875            Arc::new(TelemetryCounters::default()),
4876        )
4877        .expect("writer");
4878
4879        let result = writer.submit(WriteRequest {
4880            label: "bad".to_owned(),
4881            nodes: vec![],
4882            node_retires: vec![],
4883            edges: vec![],
4884            edge_retires: vec![],
4885            chunks: vec![],
4886            runs: vec![],
4887            steps: vec![],
4888            actions: vec![ActionInsert {
4889                id: "action-1".to_owned(),
4890                step_id: "step-1".to_owned(),
4891                kind: "emit".to_owned(),
4892                status: "completed".to_owned(),
4893                properties: "{}".to_owned(),
4894                source_ref: None,
4895                upsert: true,
4896                supersedes_id: None,
4897            }],
4898            optional_backfills: vec![],
4899            vec_inserts: vec![],
4900            operational_writes: vec![],
4901        });
4902
4903        assert!(
4904            matches!(result, Err(EngineError::InvalidWrite(_))),
4905            "action upsert=true without supersedes_id must return InvalidWrite"
4906        );
4907    }
4908
4909    // P1a/b: provenance warnings for edge inserts and runtime table inserts
4910
4911    #[test]
4912    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4913        let db = NamedTempFile::new().expect("temporary db");
4914        let writer = WriterActor::start(
4915            db.path(),
4916            Arc::new(SchemaManager::new()),
4917            ProvenanceMode::Warn,
4918            Arc::new(TelemetryCounters::default()),
4919        )
4920        .expect("writer");
4921
4922        let receipt = writer
4923            .submit(WriteRequest {
4924                label: "test".to_owned(),
4925                nodes: vec![
4926                    NodeInsert {
4927                        row_id: "row-a".to_owned(),
4928                        logical_id: "node-a".to_owned(),
4929                        kind: "Meeting".to_owned(),
4930                        properties: "{}".to_owned(),
4931                        source_ref: Some("src-1".to_owned()),
4932                        upsert: false,
4933                        chunk_policy: ChunkPolicy::Preserve,
4934                        content_ref: None,
4935                    },
4936                    NodeInsert {
4937                        row_id: "row-b".to_owned(),
4938                        logical_id: "node-b".to_owned(),
4939                        kind: "Task".to_owned(),
4940                        properties: "{}".to_owned(),
4941                        source_ref: Some("src-1".to_owned()),
4942                        upsert: false,
4943                        chunk_policy: ChunkPolicy::Preserve,
4944                        content_ref: None,
4945                    },
4946                ],
4947                node_retires: vec![],
4948                edges: vec![EdgeInsert {
4949                    row_id: "edge-1".to_owned(),
4950                    logical_id: "edge-lg-1".to_owned(),
4951                    source_logical_id: "node-a".to_owned(),
4952                    target_logical_id: "node-b".to_owned(),
4953                    kind: "HAS_TASK".to_owned(),
4954                    properties: "{}".to_owned(),
4955                    source_ref: None,
4956                    upsert: false,
4957                }],
4958                edge_retires: vec![],
4959                chunks: vec![],
4960                runs: vec![],
4961                steps: vec![],
4962                actions: vec![],
4963                optional_backfills: vec![],
4964                vec_inserts: vec![],
4965                operational_writes: vec![],
4966            })
4967            .expect("write");
4968
4969        assert!(
4970            !receipt.provenance_warnings.is_empty(),
4971            "edge insert without source_ref must emit a provenance warning"
4972        );
4973    }
4974
4975    #[test]
4976    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4977        let db = NamedTempFile::new().expect("temporary db");
4978        let writer = WriterActor::start(
4979            db.path(),
4980            Arc::new(SchemaManager::new()),
4981            ProvenanceMode::Warn,
4982            Arc::new(TelemetryCounters::default()),
4983        )
4984        .expect("writer");
4985
4986        let receipt = writer
4987            .submit(WriteRequest {
4988                label: "test".to_owned(),
4989                nodes: vec![],
4990                node_retires: vec![],
4991                edges: vec![],
4992                edge_retires: vec![],
4993                chunks: vec![],
4994                runs: vec![RunInsert {
4995                    id: "run-1".to_owned(),
4996                    kind: "session".to_owned(),
4997                    status: "completed".to_owned(),
4998                    properties: "{}".to_owned(),
4999                    source_ref: None,
5000                    upsert: false,
5001                    supersedes_id: None,
5002                }],
5003                steps: vec![],
5004                actions: vec![],
5005                optional_backfills: vec![],
5006                vec_inserts: vec![],
5007                operational_writes: vec![],
5008            })
5009            .expect("write");
5010
5011        assert!(
5012            !receipt.provenance_warnings.is_empty(),
5013            "run insert without source_ref must emit a provenance warning"
5014        );
5015    }
5016
5017    // P1c: retire a node AND submit chunks for the same logical_id in one request
5018
5019    #[test]
5020    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5021        let db = NamedTempFile::new().expect("temporary db");
5022        let writer = WriterActor::start(
5023            db.path(),
5024            Arc::new(SchemaManager::new()),
5025            ProvenanceMode::Warn,
5026            Arc::new(TelemetryCounters::default()),
5027        )
5028        .expect("writer");
5029
5030        // First seed the node so it exists
5031        writer
5032            .submit(WriteRequest {
5033                label: "seed".to_owned(),
5034                nodes: vec![NodeInsert {
5035                    row_id: "row-1".to_owned(),
5036                    logical_id: "meeting-1".to_owned(),
5037                    kind: "Meeting".to_owned(),
5038                    properties: "{}".to_owned(),
5039                    source_ref: Some("src-1".to_owned()),
5040                    upsert: false,
5041                    chunk_policy: ChunkPolicy::Preserve,
5042                    content_ref: None,
5043                }],
5044                node_retires: vec![],
5045                edges: vec![],
5046                edge_retires: vec![],
5047                chunks: vec![],
5048                runs: vec![],
5049                steps: vec![],
5050                actions: vec![],
5051                optional_backfills: vec![],
5052                vec_inserts: vec![],
5053                operational_writes: vec![],
5054            })
5055            .expect("seed write");
5056
5057        // Now try to retire it AND add a chunk for it in the same request
5058        let result = writer.submit(WriteRequest {
5059            label: "bad".to_owned(),
5060            nodes: vec![],
5061            node_retires: vec![NodeRetire {
5062                logical_id: "meeting-1".to_owned(),
5063                source_ref: Some("src-2".to_owned()),
5064            }],
5065            edges: vec![],
5066            edge_retires: vec![],
5067            chunks: vec![ChunkInsert {
5068                id: "chunk-bad".to_owned(),
5069                node_logical_id: "meeting-1".to_owned(),
5070                text_content: "some text".to_owned(),
5071                byte_start: None,
5072                byte_end: None,
5073                content_hash: None,
5074            }],
5075            runs: vec![],
5076            steps: vec![],
5077            actions: vec![],
5078            optional_backfills: vec![],
5079            vec_inserts: vec![],
5080            operational_writes: vec![],
5081        });
5082
5083        assert!(
5084            matches!(result, Err(EngineError::InvalidWrite(_))),
5085            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5086        );
5087    }
5088
5089    // --- Item 1: prepare_cached batch insert ---
5090
5091    #[test]
5092    fn writer_batch_insert_multiple_nodes() {
5093        let db = NamedTempFile::new().expect("temporary db");
5094        let writer = WriterActor::start(
5095            db.path(),
5096            Arc::new(SchemaManager::new()),
5097            ProvenanceMode::Warn,
5098            Arc::new(TelemetryCounters::default()),
5099        )
5100        .expect("writer");
5101
5102        let nodes: Vec<NodeInsert> = (0..100)
5103            .map(|i| NodeInsert {
5104                row_id: format!("row-{i}"),
5105                logical_id: format!("lg-{i}"),
5106                kind: "Note".to_owned(),
5107                properties: "{}".to_owned(),
5108                source_ref: Some("batch-src".to_owned()),
5109                upsert: false,
5110                chunk_policy: ChunkPolicy::Preserve,
5111                content_ref: None,
5112            })
5113            .collect();
5114
5115        writer
5116            .submit(WriteRequest {
5117                label: "batch".to_owned(),
5118                nodes,
5119                node_retires: vec![],
5120                edges: vec![],
5121                edge_retires: vec![],
5122                chunks: vec![],
5123                runs: vec![],
5124                steps: vec![],
5125                actions: vec![],
5126                optional_backfills: vec![],
5127                vec_inserts: vec![],
5128                operational_writes: vec![],
5129            })
5130            .expect("batch write");
5131
5132        let conn = rusqlite::Connection::open(db.path()).expect("open");
5133        let count: i64 = conn
5134            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5135            .expect("count nodes");
5136        assert_eq!(
5137            count, 100,
5138            "all 100 nodes must be present after batch insert"
5139        );
5140    }
5141
5142    // --- Item 2: ID validation ---
5143
5144    #[test]
5145    fn prepare_write_rejects_empty_node_row_id() {
5146        let db = NamedTempFile::new().expect("temporary db");
5147        let writer = WriterActor::start(
5148            db.path(),
5149            Arc::new(SchemaManager::new()),
5150            ProvenanceMode::Warn,
5151            Arc::new(TelemetryCounters::default()),
5152        )
5153        .expect("writer");
5154
5155        let result = writer.submit(WriteRequest {
5156            label: "test".to_owned(),
5157            nodes: vec![NodeInsert {
5158                row_id: String::new(),
5159                logical_id: "lg-1".to_owned(),
5160                kind: "Note".to_owned(),
5161                properties: "{}".to_owned(),
5162                source_ref: None,
5163                upsert: false,
5164                chunk_policy: ChunkPolicy::Preserve,
5165                content_ref: None,
5166            }],
5167            node_retires: vec![],
5168            edges: vec![],
5169            edge_retires: vec![],
5170            chunks: vec![],
5171            runs: vec![],
5172            steps: vec![],
5173            actions: vec![],
5174            optional_backfills: vec![],
5175            vec_inserts: vec![],
5176            operational_writes: vec![],
5177        });
5178
5179        assert!(
5180            matches!(result, Err(EngineError::InvalidWrite(_))),
5181            "empty row_id must be rejected"
5182        );
5183    }
5184
5185    #[test]
5186    fn prepare_write_rejects_empty_node_logical_id() {
5187        let db = NamedTempFile::new().expect("temporary db");
5188        let writer = WriterActor::start(
5189            db.path(),
5190            Arc::new(SchemaManager::new()),
5191            ProvenanceMode::Warn,
5192            Arc::new(TelemetryCounters::default()),
5193        )
5194        .expect("writer");
5195
5196        let result = writer.submit(WriteRequest {
5197            label: "test".to_owned(),
5198            nodes: vec![NodeInsert {
5199                row_id: "row-1".to_owned(),
5200                logical_id: String::new(),
5201                kind: "Note".to_owned(),
5202                properties: "{}".to_owned(),
5203                source_ref: None,
5204                upsert: false,
5205                chunk_policy: ChunkPolicy::Preserve,
5206                content_ref: None,
5207            }],
5208            node_retires: vec![],
5209            edges: vec![],
5210            edge_retires: vec![],
5211            chunks: vec![],
5212            runs: vec![],
5213            steps: vec![],
5214            actions: vec![],
5215            optional_backfills: vec![],
5216            vec_inserts: vec![],
5217            operational_writes: vec![],
5218        });
5219
5220        assert!(
5221            matches!(result, Err(EngineError::InvalidWrite(_))),
5222            "empty logical_id must be rejected"
5223        );
5224    }
5225
5226    #[test]
5227    fn prepare_write_rejects_duplicate_row_ids_in_request() {
5228        let db = NamedTempFile::new().expect("temporary db");
5229        let writer = WriterActor::start(
5230            db.path(),
5231            Arc::new(SchemaManager::new()),
5232            ProvenanceMode::Warn,
5233            Arc::new(TelemetryCounters::default()),
5234        )
5235        .expect("writer");
5236
5237        let result = writer.submit(WriteRequest {
5238            label: "test".to_owned(),
5239            nodes: vec![
5240                NodeInsert {
5241                    row_id: "row-1".to_owned(),
5242                    logical_id: "lg-1".to_owned(),
5243                    kind: "Note".to_owned(),
5244                    properties: "{}".to_owned(),
5245                    source_ref: None,
5246                    upsert: false,
5247                    chunk_policy: ChunkPolicy::Preserve,
5248                    content_ref: None,
5249                },
5250                NodeInsert {
5251                    row_id: "row-1".to_owned(), // duplicate
5252                    logical_id: "lg-2".to_owned(),
5253                    kind: "Note".to_owned(),
5254                    properties: "{}".to_owned(),
5255                    source_ref: None,
5256                    upsert: false,
5257                    chunk_policy: ChunkPolicy::Preserve,
5258                    content_ref: None,
5259                },
5260            ],
5261            node_retires: vec![],
5262            edges: vec![],
5263            edge_retires: vec![],
5264            chunks: vec![],
5265            runs: vec![],
5266            steps: vec![],
5267            actions: vec![],
5268            optional_backfills: vec![],
5269            vec_inserts: vec![],
5270            operational_writes: vec![],
5271        });
5272
5273        assert!(
5274            matches!(result, Err(EngineError::InvalidWrite(_))),
5275            "duplicate row_id within request must be rejected"
5276        );
5277    }
5278
5279    #[test]
5280    fn prepare_write_rejects_empty_chunk_id() {
5281        let db = NamedTempFile::new().expect("temporary db");
5282        let writer = WriterActor::start(
5283            db.path(),
5284            Arc::new(SchemaManager::new()),
5285            ProvenanceMode::Warn,
5286            Arc::new(TelemetryCounters::default()),
5287        )
5288        .expect("writer");
5289
5290        let result = writer.submit(WriteRequest {
5291            label: "test".to_owned(),
5292            nodes: vec![NodeInsert {
5293                row_id: "row-1".to_owned(),
5294                logical_id: "lg-1".to_owned(),
5295                kind: "Note".to_owned(),
5296                properties: "{}".to_owned(),
5297                source_ref: None,
5298                upsert: false,
5299                chunk_policy: ChunkPolicy::Preserve,
5300                content_ref: None,
5301            }],
5302            node_retires: vec![],
5303            edges: vec![],
5304            edge_retires: vec![],
5305            chunks: vec![ChunkInsert {
5306                id: String::new(),
5307                node_logical_id: "lg-1".to_owned(),
5308                text_content: "some text".to_owned(),
5309                byte_start: None,
5310                byte_end: None,
5311                content_hash: None,
5312            }],
5313            runs: vec![],
5314            steps: vec![],
5315            actions: vec![],
5316            optional_backfills: vec![],
5317            vec_inserts: vec![],
5318            operational_writes: vec![],
5319        });
5320
5321        assert!(
5322            matches!(result, Err(EngineError::InvalidWrite(_))),
5323            "empty chunk id must be rejected"
5324        );
5325    }
5326
5327    // --- Item 4: provenance warning coverage tests ---
5328
5329    #[test]
5330    fn writer_receipt_warns_on_step_without_source_ref() {
5331        let db = NamedTempFile::new().expect("temporary db");
5332        let writer = WriterActor::start(
5333            db.path(),
5334            Arc::new(SchemaManager::new()),
5335            ProvenanceMode::Warn,
5336            Arc::new(TelemetryCounters::default()),
5337        )
5338        .expect("writer");
5339
5340        // seed a run first so step FK is satisfied
5341        writer
5342            .submit(WriteRequest {
5343                label: "seed-run".to_owned(),
5344                nodes: vec![],
5345                node_retires: vec![],
5346                edges: vec![],
5347                edge_retires: vec![],
5348                chunks: vec![],
5349                runs: vec![RunInsert {
5350                    id: "run-1".to_owned(),
5351                    kind: "session".to_owned(),
5352                    status: "active".to_owned(),
5353                    properties: "{}".to_owned(),
5354                    source_ref: Some("src-1".to_owned()),
5355                    upsert: false,
5356                    supersedes_id: None,
5357                }],
5358                steps: vec![],
5359                actions: vec![],
5360                optional_backfills: vec![],
5361                vec_inserts: vec![],
5362                operational_writes: vec![],
5363            })
5364            .expect("seed run");
5365
5366        let receipt = writer
5367            .submit(WriteRequest {
5368                label: "test".to_owned(),
5369                nodes: vec![],
5370                node_retires: vec![],
5371                edges: vec![],
5372                edge_retires: vec![],
5373                chunks: vec![],
5374                runs: vec![],
5375                steps: vec![StepInsert {
5376                    id: "step-1".to_owned(),
5377                    run_id: "run-1".to_owned(),
5378                    kind: "llm_call".to_owned(),
5379                    status: "completed".to_owned(),
5380                    properties: "{}".to_owned(),
5381                    source_ref: None,
5382                    upsert: false,
5383                    supersedes_id: None,
5384                }],
5385                actions: vec![],
5386                optional_backfills: vec![],
5387                vec_inserts: vec![],
5388                operational_writes: vec![],
5389            })
5390            .expect("write");
5391
5392        assert!(
5393            !receipt.provenance_warnings.is_empty(),
5394            "step insert without source_ref must emit a provenance warning"
5395        );
5396    }
5397
5398    #[test]
5399    fn writer_receipt_warns_on_action_without_source_ref() {
5400        let db = NamedTempFile::new().expect("temporary db");
5401        let writer = WriterActor::start(
5402            db.path(),
5403            Arc::new(SchemaManager::new()),
5404            ProvenanceMode::Warn,
5405            Arc::new(TelemetryCounters::default()),
5406        )
5407        .expect("writer");
5408
5409        // seed run and step so action FK is satisfied
5410        writer
5411            .submit(WriteRequest {
5412                label: "seed".to_owned(),
5413                nodes: vec![],
5414                node_retires: vec![],
5415                edges: vec![],
5416                edge_retires: vec![],
5417                chunks: vec![],
5418                runs: vec![RunInsert {
5419                    id: "run-1".to_owned(),
5420                    kind: "session".to_owned(),
5421                    status: "active".to_owned(),
5422                    properties: "{}".to_owned(),
5423                    source_ref: Some("src-1".to_owned()),
5424                    upsert: false,
5425                    supersedes_id: None,
5426                }],
5427                steps: vec![StepInsert {
5428                    id: "step-1".to_owned(),
5429                    run_id: "run-1".to_owned(),
5430                    kind: "llm_call".to_owned(),
5431                    status: "completed".to_owned(),
5432                    properties: "{}".to_owned(),
5433                    source_ref: Some("src-1".to_owned()),
5434                    upsert: false,
5435                    supersedes_id: None,
5436                }],
5437                actions: vec![],
5438                optional_backfills: vec![],
5439                vec_inserts: vec![],
5440                operational_writes: vec![],
5441            })
5442            .expect("seed");
5443
5444        let receipt = writer
5445            .submit(WriteRequest {
5446                label: "test".to_owned(),
5447                nodes: vec![],
5448                node_retires: vec![],
5449                edges: vec![],
5450                edge_retires: vec![],
5451                chunks: vec![],
5452                runs: vec![],
5453                steps: vec![],
5454                actions: vec![ActionInsert {
5455                    id: "action-1".to_owned(),
5456                    step_id: "step-1".to_owned(),
5457                    kind: "tool_call".to_owned(),
5458                    status: "completed".to_owned(),
5459                    properties: "{}".to_owned(),
5460                    source_ref: None,
5461                    upsert: false,
5462                    supersedes_id: None,
5463                }],
5464                optional_backfills: vec![],
5465                vec_inserts: vec![],
5466                operational_writes: vec![],
5467            })
5468            .expect("write");
5469
5470        assert!(
5471            !receipt.provenance_warnings.is_empty(),
5472            "action insert without source_ref must emit a provenance warning"
5473        );
5474    }
5475
5476    #[test]
5477    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5478        let db = NamedTempFile::new().expect("temporary db");
5479        let writer = WriterActor::start(
5480            db.path(),
5481            Arc::new(SchemaManager::new()),
5482            ProvenanceMode::Warn,
5483            Arc::new(TelemetryCounters::default()),
5484        )
5485        .expect("writer");
5486
5487        let receipt = writer
5488            .submit(WriteRequest {
5489                label: "test".to_owned(),
5490                nodes: vec![NodeInsert {
5491                    row_id: "row-1".to_owned(),
5492                    logical_id: "node-1".to_owned(),
5493                    kind: "Note".to_owned(),
5494                    properties: "{}".to_owned(),
5495                    source_ref: Some("src-1".to_owned()),
5496                    upsert: false,
5497                    chunk_policy: ChunkPolicy::Preserve,
5498                    content_ref: None,
5499                }],
5500                node_retires: vec![],
5501                edges: vec![],
5502                edge_retires: vec![],
5503                chunks: vec![],
5504                runs: vec![RunInsert {
5505                    id: "run-1".to_owned(),
5506                    kind: "session".to_owned(),
5507                    status: "active".to_owned(),
5508                    properties: "{}".to_owned(),
5509                    source_ref: Some("src-1".to_owned()),
5510                    upsert: false,
5511                    supersedes_id: None,
5512                }],
5513                steps: vec![StepInsert {
5514                    id: "step-1".to_owned(),
5515                    run_id: "run-1".to_owned(),
5516                    kind: "llm_call".to_owned(),
5517                    status: "completed".to_owned(),
5518                    properties: "{}".to_owned(),
5519                    source_ref: Some("src-1".to_owned()),
5520                    upsert: false,
5521                    supersedes_id: None,
5522                }],
5523                actions: vec![ActionInsert {
5524                    id: "action-1".to_owned(),
5525                    step_id: "step-1".to_owned(),
5526                    kind: "tool_call".to_owned(),
5527                    status: "completed".to_owned(),
5528                    properties: "{}".to_owned(),
5529                    source_ref: Some("src-1".to_owned()),
5530                    upsert: false,
5531                    supersedes_id: None,
5532                }],
5533                optional_backfills: vec![],
5534                vec_inserts: vec![],
5535                operational_writes: vec![],
5536            })
5537            .expect("write");
5538
5539        assert!(
5540            receipt.provenance_warnings.is_empty(),
5541            "no warnings expected when all types have source_ref; got: {:?}",
5542            receipt.provenance_warnings
5543        );
5544    }
5545
5546    // --- Item 4 Task 2: ProvenanceMode tests ---
5547
5548    #[test]
5549    fn default_provenance_mode_is_warn() {
5550        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5551        let db = NamedTempFile::new().expect("temporary db");
5552        let writer = WriterActor::start(
5553            db.path(),
5554            Arc::new(SchemaManager::new()),
5555            ProvenanceMode::default(),
5556            Arc::new(TelemetryCounters::default()),
5557        )
5558        .expect("writer");
5559
5560        let receipt = writer
5561            .submit(WriteRequest {
5562                label: "test".to_owned(),
5563                nodes: vec![NodeInsert {
5564                    row_id: "row-1".to_owned(),
5565                    logical_id: "node-1".to_owned(),
5566                    kind: "Note".to_owned(),
5567                    properties: "{}".to_owned(),
5568                    source_ref: None,
5569                    upsert: false,
5570                    chunk_policy: ChunkPolicy::Preserve,
5571                    content_ref: None,
5572                }],
5573                node_retires: vec![],
5574                edges: vec![],
5575                edge_retires: vec![],
5576                chunks: vec![],
5577                runs: vec![],
5578                steps: vec![],
5579                actions: vec![],
5580                optional_backfills: vec![],
5581                vec_inserts: vec![],
5582                operational_writes: vec![],
5583            })
5584            .expect("Warn mode must not reject missing source_ref");
5585
5586        assert!(
5587            !receipt.provenance_warnings.is_empty(),
5588            "Warn mode must emit a warning instead of rejecting"
5589        );
5590    }
5591
5592    #[test]
5593    fn require_mode_rejects_node_without_source_ref() {
5594        let db = NamedTempFile::new().expect("temporary db");
5595        let writer = WriterActor::start(
5596            db.path(),
5597            Arc::new(SchemaManager::new()),
5598            ProvenanceMode::Require,
5599            Arc::new(TelemetryCounters::default()),
5600        )
5601        .expect("writer");
5602
5603        let result = writer.submit(WriteRequest {
5604            label: "test".to_owned(),
5605            nodes: vec![NodeInsert {
5606                row_id: "row-1".to_owned(),
5607                logical_id: "node-1".to_owned(),
5608                kind: "Note".to_owned(),
5609                properties: "{}".to_owned(),
5610                source_ref: None,
5611                upsert: false,
5612                chunk_policy: ChunkPolicy::Preserve,
5613                content_ref: None,
5614            }],
5615            node_retires: vec![],
5616            edges: vec![],
5617            edge_retires: vec![],
5618            chunks: vec![],
5619            runs: vec![],
5620            steps: vec![],
5621            actions: vec![],
5622            optional_backfills: vec![],
5623            vec_inserts: vec![],
5624            operational_writes: vec![],
5625        });
5626
5627        assert!(
5628            matches!(result, Err(EngineError::InvalidWrite(_))),
5629            "Require mode must reject node without source_ref"
5630        );
5631    }
5632
5633    #[test]
5634    fn require_mode_accepts_node_with_source_ref() {
5635        let db = NamedTempFile::new().expect("temporary db");
5636        let writer = WriterActor::start(
5637            db.path(),
5638            Arc::new(SchemaManager::new()),
5639            ProvenanceMode::Require,
5640            Arc::new(TelemetryCounters::default()),
5641        )
5642        .expect("writer");
5643
5644        let result = writer.submit(WriteRequest {
5645            label: "test".to_owned(),
5646            nodes: vec![NodeInsert {
5647                row_id: "row-1".to_owned(),
5648                logical_id: "node-1".to_owned(),
5649                kind: "Note".to_owned(),
5650                properties: "{}".to_owned(),
5651                source_ref: Some("src-1".to_owned()),
5652                upsert: false,
5653                chunk_policy: ChunkPolicy::Preserve,
5654                content_ref: None,
5655            }],
5656            node_retires: vec![],
5657            edges: vec![],
5658            edge_retires: vec![],
5659            chunks: vec![],
5660            runs: vec![],
5661            steps: vec![],
5662            actions: vec![],
5663            optional_backfills: vec![],
5664            vec_inserts: vec![],
5665            operational_writes: vec![],
5666        });
5667
5668        assert!(
5669            result.is_ok(),
5670            "Require mode must accept node with source_ref"
5671        );
5672    }
5673
5674    #[test]
5675    fn require_mode_rejects_edge_without_source_ref() {
5676        let db = NamedTempFile::new().expect("temporary db");
5677        let writer = WriterActor::start(
5678            db.path(),
5679            Arc::new(SchemaManager::new()),
5680            ProvenanceMode::Require,
5681            Arc::new(TelemetryCounters::default()),
5682        )
5683        .expect("writer");
5684
5685        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5686        let result = writer.submit(WriteRequest {
5687            label: "test".to_owned(),
5688            nodes: vec![
5689                NodeInsert {
5690                    row_id: "row-a".to_owned(),
5691                    logical_id: "node-a".to_owned(),
5692                    kind: "Note".to_owned(),
5693                    properties: "{}".to_owned(),
5694                    source_ref: Some("src-1".to_owned()),
5695                    upsert: false,
5696                    chunk_policy: ChunkPolicy::Preserve,
5697                    content_ref: None,
5698                },
5699                NodeInsert {
5700                    row_id: "row-b".to_owned(),
5701                    logical_id: "node-b".to_owned(),
5702                    kind: "Note".to_owned(),
5703                    properties: "{}".to_owned(),
5704                    source_ref: Some("src-1".to_owned()),
5705                    upsert: false,
5706                    chunk_policy: ChunkPolicy::Preserve,
5707                    content_ref: None,
5708                },
5709            ],
5710            node_retires: vec![],
5711            edges: vec![EdgeInsert {
5712                row_id: "edge-row-1".to_owned(),
5713                logical_id: "edge-1".to_owned(),
5714                source_logical_id: "node-a".to_owned(),
5715                target_logical_id: "node-b".to_owned(),
5716                kind: "LINKS_TO".to_owned(),
5717                properties: "{}".to_owned(),
5718                source_ref: None,
5719                upsert: false,
5720            }],
5721            edge_retires: vec![],
5722            chunks: vec![],
5723            runs: vec![],
5724            steps: vec![],
5725            actions: vec![],
5726            optional_backfills: vec![],
5727            vec_inserts: vec![],
5728            operational_writes: vec![],
5729        });
5730
5731        assert!(
5732            matches!(result, Err(EngineError::InvalidWrite(_))),
5733            "Require mode must reject edge without source_ref"
5734        );
5735    }
5736
5737    // --- Item 5: FTS projection coverage tests ---
5738
5739    #[test]
5740    fn fts_row_has_correct_kind_from_co_submitted_node() {
5741        let db = NamedTempFile::new().expect("temporary db");
5742        let writer = WriterActor::start(
5743            db.path(),
5744            Arc::new(SchemaManager::new()),
5745            ProvenanceMode::Warn,
5746            Arc::new(TelemetryCounters::default()),
5747        )
5748        .expect("writer");
5749
5750        writer
5751            .submit(WriteRequest {
5752                label: "test".to_owned(),
5753                nodes: vec![NodeInsert {
5754                    row_id: "row-1".to_owned(),
5755                    logical_id: "node-1".to_owned(),
5756                    kind: "Meeting".to_owned(),
5757                    properties: "{}".to_owned(),
5758                    source_ref: Some("src-1".to_owned()),
5759                    upsert: false,
5760                    chunk_policy: ChunkPolicy::Preserve,
5761                    content_ref: None,
5762                }],
5763                node_retires: vec![],
5764                edges: vec![],
5765                edge_retires: vec![],
5766                chunks: vec![ChunkInsert {
5767                    id: "chunk-1".to_owned(),
5768                    node_logical_id: "node-1".to_owned(),
5769                    text_content: "some text".to_owned(),
5770                    byte_start: None,
5771                    byte_end: None,
5772                    content_hash: None,
5773                }],
5774                runs: vec![],
5775                steps: vec![],
5776                actions: vec![],
5777                optional_backfills: vec![],
5778                vec_inserts: vec![],
5779                operational_writes: vec![],
5780            })
5781            .expect("write");
5782
5783        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5784        let kind: String = conn
5785            .query_row(
5786                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5787                [],
5788                |row| row.get(0),
5789            )
5790            .expect("fts row");
5791
5792        assert_eq!(kind, "Meeting");
5793    }
5794
5795    #[test]
5796    fn fts_row_has_correct_text_content() {
5797        let db = NamedTempFile::new().expect("temporary db");
5798        let writer = WriterActor::start(
5799            db.path(),
5800            Arc::new(SchemaManager::new()),
5801            ProvenanceMode::Warn,
5802            Arc::new(TelemetryCounters::default()),
5803        )
5804        .expect("writer");
5805
5806        writer
5807            .submit(WriteRequest {
5808                label: "test".to_owned(),
5809                nodes: vec![NodeInsert {
5810                    row_id: "row-1".to_owned(),
5811                    logical_id: "node-1".to_owned(),
5812                    kind: "Note".to_owned(),
5813                    properties: "{}".to_owned(),
5814                    source_ref: Some("src-1".to_owned()),
5815                    upsert: false,
5816                    chunk_policy: ChunkPolicy::Preserve,
5817                    content_ref: None,
5818                }],
5819                node_retires: vec![],
5820                edges: vec![],
5821                edge_retires: vec![],
5822                chunks: vec![ChunkInsert {
5823                    id: "chunk-1".to_owned(),
5824                    node_logical_id: "node-1".to_owned(),
5825                    text_content: "exactly this text".to_owned(),
5826                    byte_start: None,
5827                    byte_end: None,
5828                    content_hash: None,
5829                }],
5830                runs: vec![],
5831                steps: vec![],
5832                actions: vec![],
5833                optional_backfills: vec![],
5834                vec_inserts: vec![],
5835                operational_writes: vec![],
5836            })
5837            .expect("write");
5838
5839        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5840        let text: String = conn
5841            .query_row(
5842                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5843                [],
5844                |row| row.get(0),
5845            )
5846            .expect("fts row");
5847
5848        assert_eq!(text, "exactly this text");
5849    }
5850
5851    #[test]
5852    fn fts_row_has_correct_kind_from_pre_existing_node() {
5853        let db = NamedTempFile::new().expect("temporary db");
5854        let writer = WriterActor::start(
5855            db.path(),
5856            Arc::new(SchemaManager::new()),
5857            ProvenanceMode::Warn,
5858            Arc::new(TelemetryCounters::default()),
5859        )
5860        .expect("writer");
5861
5862        // Request 1: node only
5863        writer
5864            .submit(WriteRequest {
5865                label: "r1".to_owned(),
5866                nodes: vec![NodeInsert {
5867                    row_id: "row-1".to_owned(),
5868                    logical_id: "node-1".to_owned(),
5869                    kind: "Document".to_owned(),
5870                    properties: "{}".to_owned(),
5871                    source_ref: Some("src-1".to_owned()),
5872                    upsert: false,
5873                    chunk_policy: ChunkPolicy::Preserve,
5874                    content_ref: None,
5875                }],
5876                node_retires: vec![],
5877                edges: vec![],
5878                edge_retires: vec![],
5879                chunks: vec![],
5880                runs: vec![],
5881                steps: vec![],
5882                actions: vec![],
5883                optional_backfills: vec![],
5884                vec_inserts: vec![],
5885                operational_writes: vec![],
5886            })
5887            .expect("r1 write");
5888
5889        // Request 2: chunk for pre-existing node
5890        writer
5891            .submit(WriteRequest {
5892                label: "r2".to_owned(),
5893                nodes: vec![],
5894                node_retires: vec![],
5895                edges: vec![],
5896                edge_retires: vec![],
5897                chunks: vec![ChunkInsert {
5898                    id: "chunk-1".to_owned(),
5899                    node_logical_id: "node-1".to_owned(),
5900                    text_content: "some text".to_owned(),
5901                    byte_start: None,
5902                    byte_end: None,
5903                    content_hash: None,
5904                }],
5905                runs: vec![],
5906                steps: vec![],
5907                actions: vec![],
5908                optional_backfills: vec![],
5909                vec_inserts: vec![],
5910                operational_writes: vec![],
5911            })
5912            .expect("r2 write");
5913
5914        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5915        let kind: String = conn
5916            .query_row(
5917                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5918                [],
5919                |row| row.get(0),
5920            )
5921            .expect("fts row");
5922
5923        assert_eq!(kind, "Document");
5924    }
5925
5926    #[test]
5927    fn fts_derives_rows_for_multiple_chunks_per_node() {
5928        let db = NamedTempFile::new().expect("temporary db");
5929        let writer = WriterActor::start(
5930            db.path(),
5931            Arc::new(SchemaManager::new()),
5932            ProvenanceMode::Warn,
5933            Arc::new(TelemetryCounters::default()),
5934        )
5935        .expect("writer");
5936
5937        writer
5938            .submit(WriteRequest {
5939                label: "test".to_owned(),
5940                nodes: vec![NodeInsert {
5941                    row_id: "row-1".to_owned(),
5942                    logical_id: "node-1".to_owned(),
5943                    kind: "Meeting".to_owned(),
5944                    properties: "{}".to_owned(),
5945                    source_ref: Some("src-1".to_owned()),
5946                    upsert: false,
5947                    chunk_policy: ChunkPolicy::Preserve,
5948                    content_ref: None,
5949                }],
5950                node_retires: vec![],
5951                edges: vec![],
5952                edge_retires: vec![],
5953                chunks: vec![
5954                    ChunkInsert {
5955                        id: "chunk-a".to_owned(),
5956                        node_logical_id: "node-1".to_owned(),
5957                        text_content: "intro".to_owned(),
5958                        byte_start: None,
5959                        byte_end: None,
5960                        content_hash: None,
5961                    },
5962                    ChunkInsert {
5963                        id: "chunk-b".to_owned(),
5964                        node_logical_id: "node-1".to_owned(),
5965                        text_content: "body".to_owned(),
5966                        byte_start: None,
5967                        byte_end: None,
5968                        content_hash: None,
5969                    },
5970                    ChunkInsert {
5971                        id: "chunk-c".to_owned(),
5972                        node_logical_id: "node-1".to_owned(),
5973                        text_content: "conclusion".to_owned(),
5974                        byte_start: None,
5975                        byte_end: None,
5976                        content_hash: None,
5977                    },
5978                ],
5979                runs: vec![],
5980                steps: vec![],
5981                actions: vec![],
5982                optional_backfills: vec![],
5983                vec_inserts: vec![],
5984                operational_writes: vec![],
5985            })
5986            .expect("write");
5987
5988        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5989        let count: i64 = conn
5990            .query_row(
5991                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5992                [],
5993                |row| row.get(0),
5994            )
5995            .expect("fts count");
5996
5997        assert_eq!(count, 3, "three chunks must produce three FTS rows");
5998    }
5999
6000    #[test]
6001    fn fts_resolves_mixed_fast_and_db_paths() {
6002        let db = NamedTempFile::new().expect("temporary db");
6003        let writer = WriterActor::start(
6004            db.path(),
6005            Arc::new(SchemaManager::new()),
6006            ProvenanceMode::Warn,
6007            Arc::new(TelemetryCounters::default()),
6008        )
6009        .expect("writer");
6010
6011        // Seed pre-existing node
6012        writer
6013            .submit(WriteRequest {
6014                label: "seed".to_owned(),
6015                nodes: vec![NodeInsert {
6016                    row_id: "row-existing".to_owned(),
6017                    logical_id: "existing-node".to_owned(),
6018                    kind: "Archive".to_owned(),
6019                    properties: "{}".to_owned(),
6020                    source_ref: Some("src-1".to_owned()),
6021                    upsert: false,
6022                    chunk_policy: ChunkPolicy::Preserve,
6023                    content_ref: None,
6024                }],
6025                node_retires: vec![],
6026                edges: vec![],
6027                edge_retires: vec![],
6028                chunks: vec![],
6029                runs: vec![],
6030                steps: vec![],
6031                actions: vec![],
6032                optional_backfills: vec![],
6033                vec_inserts: vec![],
6034                operational_writes: vec![],
6035            })
6036            .expect("seed");
6037
6038        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
6039        writer
6040            .submit(WriteRequest {
6041                label: "mixed".to_owned(),
6042                nodes: vec![NodeInsert {
6043                    row_id: "row-new".to_owned(),
6044                    logical_id: "new-node".to_owned(),
6045                    kind: "Inbox".to_owned(),
6046                    properties: "{}".to_owned(),
6047                    source_ref: Some("src-2".to_owned()),
6048                    upsert: false,
6049                    chunk_policy: ChunkPolicy::Preserve,
6050                    content_ref: None,
6051                }],
6052                node_retires: vec![],
6053                edges: vec![],
6054                edge_retires: vec![],
6055                chunks: vec![
6056                    ChunkInsert {
6057                        id: "chunk-fast".to_owned(),
6058                        node_logical_id: "new-node".to_owned(),
6059                        text_content: "new content".to_owned(),
6060                        byte_start: None,
6061                        byte_end: None,
6062                        content_hash: None,
6063                    },
6064                    ChunkInsert {
6065                        id: "chunk-db".to_owned(),
6066                        node_logical_id: "existing-node".to_owned(),
6067                        text_content: "archive content".to_owned(),
6068                        byte_start: None,
6069                        byte_end: None,
6070                        content_hash: None,
6071                    },
6072                ],
6073                runs: vec![],
6074                steps: vec![],
6075                actions: vec![],
6076                optional_backfills: vec![],
6077                vec_inserts: vec![],
6078                operational_writes: vec![],
6079            })
6080            .expect("mixed write");
6081
6082        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6083        let fast_kind: String = conn
6084            .query_row(
6085                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6086                [],
6087                |row| row.get(0),
6088            )
6089            .expect("fast path fts row");
6090        let db_kind: String = conn
6091            .query_row(
6092                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6093                [],
6094                |row| row.get(0),
6095            )
6096            .expect("db path fts row");
6097
6098        assert_eq!(fast_kind, "Inbox");
6099        assert_eq!(db_kind, "Archive");
6100    }
6101
6102    #[test]
6103    fn prepare_write_rejects_empty_chunk_text() {
6104        let db = NamedTempFile::new().expect("temporary db");
6105        let writer = WriterActor::start(
6106            db.path(),
6107            Arc::new(SchemaManager::new()),
6108            ProvenanceMode::Warn,
6109            Arc::new(TelemetryCounters::default()),
6110        )
6111        .expect("writer");
6112
6113        let result = writer.submit(WriteRequest {
6114            label: "test".to_owned(),
6115            nodes: vec![NodeInsert {
6116                row_id: "row-1".to_owned(),
6117                logical_id: "node-1".to_owned(),
6118                kind: "Note".to_owned(),
6119                properties: "{}".to_owned(),
6120                source_ref: None,
6121                upsert: false,
6122                chunk_policy: ChunkPolicy::Preserve,
6123                content_ref: None,
6124            }],
6125            node_retires: vec![],
6126            edges: vec![],
6127            edge_retires: vec![],
6128            chunks: vec![ChunkInsert {
6129                id: "chunk-1".to_owned(),
6130                node_logical_id: "node-1".to_owned(),
6131                text_content: String::new(),
6132                byte_start: None,
6133                byte_end: None,
6134                content_hash: None,
6135            }],
6136            runs: vec![],
6137            steps: vec![],
6138            actions: vec![],
6139            optional_backfills: vec![],
6140            vec_inserts: vec![],
6141            operational_writes: vec![],
6142        });
6143
6144        assert!(
6145            matches!(result, Err(EngineError::InvalidWrite(_))),
6146            "empty text_content must be rejected"
6147        );
6148    }
6149
6150    #[test]
6151    fn receipt_reports_zero_backfills_when_none_submitted() {
6152        let db = NamedTempFile::new().expect("temporary db");
6153        let writer = WriterActor::start(
6154            db.path(),
6155            Arc::new(SchemaManager::new()),
6156            ProvenanceMode::Warn,
6157            Arc::new(TelemetryCounters::default()),
6158        )
6159        .expect("writer");
6160
6161        let receipt = writer
6162            .submit(WriteRequest {
6163                label: "test".to_owned(),
6164                nodes: vec![NodeInsert {
6165                    row_id: "row-1".to_owned(),
6166                    logical_id: "node-1".to_owned(),
6167                    kind: "Note".to_owned(),
6168                    properties: "{}".to_owned(),
6169                    source_ref: Some("src-1".to_owned()),
6170                    upsert: false,
6171                    chunk_policy: ChunkPolicy::Preserve,
6172                    content_ref: None,
6173                }],
6174                node_retires: vec![],
6175                edges: vec![],
6176                edge_retires: vec![],
6177                chunks: vec![],
6178                runs: vec![],
6179                steps: vec![],
6180                actions: vec![],
6181                optional_backfills: vec![],
6182                vec_inserts: vec![],
6183                operational_writes: vec![],
6184            })
6185            .expect("write");
6186
6187        assert_eq!(receipt.optional_backfill_count, 0);
6188    }
6189
6190    #[test]
6191    fn receipt_reports_correct_backfill_count() {
6192        let db = NamedTempFile::new().expect("temporary db");
6193        let writer = WriterActor::start(
6194            db.path(),
6195            Arc::new(SchemaManager::new()),
6196            ProvenanceMode::Warn,
6197            Arc::new(TelemetryCounters::default()),
6198        )
6199        .expect("writer");
6200
6201        let receipt = writer
6202            .submit(WriteRequest {
6203                label: "test".to_owned(),
6204                nodes: vec![NodeInsert {
6205                    row_id: "row-1".to_owned(),
6206                    logical_id: "node-1".to_owned(),
6207                    kind: "Note".to_owned(),
6208                    properties: "{}".to_owned(),
6209                    source_ref: Some("src-1".to_owned()),
6210                    upsert: false,
6211                    chunk_policy: ChunkPolicy::Preserve,
6212                    content_ref: None,
6213                }],
6214                node_retires: vec![],
6215                edges: vec![],
6216                edge_retires: vec![],
6217                chunks: vec![],
6218                runs: vec![],
6219                steps: vec![],
6220                actions: vec![],
6221                optional_backfills: vec![
6222                    OptionalProjectionTask {
6223                        target: ProjectionTarget::Fts,
6224                        payload: "p1".to_owned(),
6225                    },
6226                    OptionalProjectionTask {
6227                        target: ProjectionTarget::Vec,
6228                        payload: "p2".to_owned(),
6229                    },
6230                    OptionalProjectionTask {
6231                        target: ProjectionTarget::All,
6232                        payload: "p3".to_owned(),
6233                    },
6234                ],
6235                vec_inserts: vec![],
6236                operational_writes: vec![],
6237            })
6238            .expect("write");
6239
6240        assert_eq!(receipt.optional_backfill_count, 3);
6241    }
6242
6243    #[test]
6244    fn backfill_tasks_are_not_executed_during_write() {
6245        let db = NamedTempFile::new().expect("temporary db");
6246        let writer = WriterActor::start(
6247            db.path(),
6248            Arc::new(SchemaManager::new()),
6249            ProvenanceMode::Warn,
6250            Arc::new(TelemetryCounters::default()),
6251        )
6252        .expect("writer");
6253
6254        // Write a node + chunk. Submit a backfill task targeting FTS.
6255        // The write path must not create any extra FTS rows beyond the required one.
6256        writer
6257            .submit(WriteRequest {
6258                label: "test".to_owned(),
6259                nodes: vec![NodeInsert {
6260                    row_id: "row-1".to_owned(),
6261                    logical_id: "node-1".to_owned(),
6262                    kind: "Note".to_owned(),
6263                    properties: "{}".to_owned(),
6264                    source_ref: Some("src-1".to_owned()),
6265                    upsert: false,
6266                    chunk_policy: ChunkPolicy::Preserve,
6267                    content_ref: None,
6268                }],
6269                node_retires: vec![],
6270                edges: vec![],
6271                edge_retires: vec![],
6272                chunks: vec![ChunkInsert {
6273                    id: "chunk-1".to_owned(),
6274                    node_logical_id: "node-1".to_owned(),
6275                    text_content: "required text".to_owned(),
6276                    byte_start: None,
6277                    byte_end: None,
6278                    content_hash: None,
6279                }],
6280                runs: vec![],
6281                steps: vec![],
6282                actions: vec![],
6283                optional_backfills: vec![OptionalProjectionTask {
6284                    target: ProjectionTarget::Fts,
6285                    payload: "backfill-payload".to_owned(),
6286                }],
6287                vec_inserts: vec![],
6288                operational_writes: vec![],
6289            })
6290            .expect("write");
6291
6292        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6293        let count: i64 = conn
6294            .query_row(
6295                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6296                [],
6297                |row| row.get(0),
6298            )
6299            .expect("fts count");
6300
6301        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6302    }
6303
6304    #[test]
6305    fn fts_row_uses_new_kind_after_node_replace() {
6306        let db = NamedTempFile::new().expect("temporary db");
6307        let writer = WriterActor::start(
6308            db.path(),
6309            Arc::new(SchemaManager::new()),
6310            ProvenanceMode::Warn,
6311            Arc::new(TelemetryCounters::default()),
6312        )
6313        .expect("writer");
6314
6315        // Write original node as "Note"
6316        writer
6317            .submit(WriteRequest {
6318                label: "v1".to_owned(),
6319                nodes: vec![NodeInsert {
6320                    row_id: "row-1".to_owned(),
6321                    logical_id: "node-1".to_owned(),
6322                    kind: "Note".to_owned(),
6323                    properties: "{}".to_owned(),
6324                    source_ref: Some("src-1".to_owned()),
6325                    upsert: false,
6326                    chunk_policy: ChunkPolicy::Preserve,
6327                    content_ref: None,
6328                }],
6329                node_retires: vec![],
6330                edges: vec![],
6331                edge_retires: vec![],
6332                chunks: vec![ChunkInsert {
6333                    id: "chunk-v1".to_owned(),
6334                    node_logical_id: "node-1".to_owned(),
6335                    text_content: "original".to_owned(),
6336                    byte_start: None,
6337                    byte_end: None,
6338                    content_hash: None,
6339                }],
6340                runs: vec![],
6341                steps: vec![],
6342                actions: vec![],
6343                optional_backfills: vec![],
6344                vec_inserts: vec![],
6345                operational_writes: vec![],
6346            })
6347            .expect("v1 write");
6348
6349        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
6350        writer
6351            .submit(WriteRequest {
6352                label: "v2".to_owned(),
6353                nodes: vec![NodeInsert {
6354                    row_id: "row-2".to_owned(),
6355                    logical_id: "node-1".to_owned(),
6356                    kind: "Meeting".to_owned(),
6357                    properties: "{}".to_owned(),
6358                    source_ref: Some("src-2".to_owned()),
6359                    upsert: true,
6360                    chunk_policy: ChunkPolicy::Replace,
6361                    content_ref: None,
6362                }],
6363                node_retires: vec![],
6364                edges: vec![],
6365                edge_retires: vec![],
6366                chunks: vec![ChunkInsert {
6367                    id: "chunk-v2".to_owned(),
6368                    node_logical_id: "node-1".to_owned(),
6369                    text_content: "updated".to_owned(),
6370                    byte_start: None,
6371                    byte_end: None,
6372                    content_hash: None,
6373                }],
6374                runs: vec![],
6375                steps: vec![],
6376                actions: vec![],
6377                optional_backfills: vec![],
6378                vec_inserts: vec![],
6379                operational_writes: vec![],
6380            })
6381            .expect("v2 write");
6382
6383        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6384
6385        // Old FTS row must be gone
6386        let old_count: i64 = conn
6387            .query_row(
6388                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6389                [],
6390                |row| row.get(0),
6391            )
6392            .expect("old fts count");
6393        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6394
6395        // New FTS row must use the new kind
6396        let new_kind: String = conn
6397            .query_row(
6398                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6399                [],
6400                |row| row.get(0),
6401            )
6402            .expect("new fts row");
6403        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6404    }
6405
6406    // --- Item 3: VecInsert tests ---
6407
6408    #[test]
6409    fn vec_insert_empty_chunk_id_is_rejected() {
6410        let db = NamedTempFile::new().expect("temporary db");
6411        let writer = WriterActor::start(
6412            db.path(),
6413            Arc::new(SchemaManager::new()),
6414            ProvenanceMode::Warn,
6415            Arc::new(TelemetryCounters::default()),
6416        )
6417        .expect("writer");
6418        let result = writer.submit(WriteRequest {
6419            label: "vec-test".to_owned(),
6420            nodes: vec![],
6421            node_retires: vec![],
6422            edges: vec![],
6423            edge_retires: vec![],
6424            chunks: vec![],
6425            runs: vec![],
6426            steps: vec![],
6427            actions: vec![],
6428            optional_backfills: vec![],
6429            vec_inserts: vec![VecInsert {
6430                chunk_id: String::new(),
6431                embedding: vec![0.1, 0.2, 0.3],
6432            }],
6433            operational_writes: vec![],
6434        });
6435        assert!(
6436            matches!(result, Err(EngineError::InvalidWrite(_))),
6437            "empty chunk_id in VecInsert must be rejected"
6438        );
6439    }
6440
6441    #[test]
6442    fn vec_insert_empty_embedding_is_rejected() {
6443        let db = NamedTempFile::new().expect("temporary db");
6444        let writer = WriterActor::start(
6445            db.path(),
6446            Arc::new(SchemaManager::new()),
6447            ProvenanceMode::Warn,
6448            Arc::new(TelemetryCounters::default()),
6449        )
6450        .expect("writer");
6451        let result = writer.submit(WriteRequest {
6452            label: "vec-test".to_owned(),
6453            nodes: vec![],
6454            node_retires: vec![],
6455            edges: vec![],
6456            edge_retires: vec![],
6457            chunks: vec![],
6458            runs: vec![],
6459            steps: vec![],
6460            actions: vec![],
6461            optional_backfills: vec![],
6462            vec_inserts: vec![VecInsert {
6463                chunk_id: "chunk-1".to_owned(),
6464                embedding: vec![],
6465            }],
6466            operational_writes: vec![],
6467        });
6468        assert!(
6469            matches!(result, Err(EngineError::InvalidWrite(_))),
6470            "empty embedding in VecInsert must be rejected"
6471        );
6472    }
6473
6474    #[test]
6475    fn vec_insert_noop_without_feature() {
6476        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6477        // (no error) but not write to any vec table.
6478        let db = NamedTempFile::new().expect("temporary db");
6479        let writer = WriterActor::start(
6480            db.path(),
6481            Arc::new(SchemaManager::new()),
6482            ProvenanceMode::Warn,
6483            Arc::new(TelemetryCounters::default()),
6484        )
6485        .expect("writer");
6486        let result = writer.submit(WriteRequest {
6487            label: "vec-noop".to_owned(),
6488            nodes: vec![],
6489            node_retires: vec![],
6490            edges: vec![],
6491            edge_retires: vec![],
6492            chunks: vec![],
6493            runs: vec![],
6494            steps: vec![],
6495            actions: vec![],
6496            optional_backfills: vec![],
6497            vec_inserts: vec![VecInsert {
6498                chunk_id: "chunk-noop".to_owned(),
6499                embedding: vec![1.0, 2.0, 3.0],
6500            }],
6501            operational_writes: vec![],
6502        });
6503        #[cfg(not(feature = "sqlite-vec"))]
6504        result.expect("noop VecInsert without feature must succeed");
6505        // The result variable is used above; silence unused warning for cfg-on path.
6506        #[cfg(feature = "sqlite-vec")]
6507        let _ = result;
6508    }
6509
6510    #[cfg(feature = "sqlite-vec")]
6511    #[test]
6512    fn node_retire_preserves_vec_rows_for_later_restore() {
6513        use crate::sqlite::open_connection_with_vec;
6514
6515        let db = NamedTempFile::new().expect("temporary db");
6516        let schema_manager = Arc::new(SchemaManager::new());
6517
6518        {
6519            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6520            schema_manager.bootstrap(&conn).expect("bootstrap");
6521            schema_manager
6522                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6523                .expect("ensure profile");
6524        }
6525
6526        let writer = WriterActor::start(
6527            db.path(),
6528            Arc::clone(&schema_manager),
6529            ProvenanceMode::Warn,
6530            Arc::new(TelemetryCounters::default()),
6531        )
6532        .expect("writer");
6533
6534        // Insert node + chunk + vec row
6535        writer
6536            .submit(WriteRequest {
6537                label: "setup".to_owned(),
6538                nodes: vec![NodeInsert {
6539                    row_id: "row-retire-vec".to_owned(),
6540                    logical_id: "node-retire-vec".to_owned(),
6541                    kind: "Doc".to_owned(),
6542                    properties: "{}".to_owned(),
6543                    source_ref: Some("src".to_owned()),
6544                    upsert: false,
6545                    chunk_policy: ChunkPolicy::Preserve,
6546                    content_ref: None,
6547                }],
6548                node_retires: vec![],
6549                edges: vec![],
6550                edge_retires: vec![],
6551                chunks: vec![ChunkInsert {
6552                    id: "chunk-retire-vec".to_owned(),
6553                    node_logical_id: "node-retire-vec".to_owned(),
6554                    text_content: "text".to_owned(),
6555                    byte_start: None,
6556                    byte_end: None,
6557                    content_hash: None,
6558                }],
6559                runs: vec![],
6560                steps: vec![],
6561                actions: vec![],
6562                optional_backfills: vec![],
6563                vec_inserts: vec![VecInsert {
6564                    chunk_id: "chunk-retire-vec".to_owned(),
6565                    embedding: vec![0.1, 0.2, 0.3],
6566                }],
6567                operational_writes: vec![],
6568            })
6569            .expect("setup write");
6570
6571        // Retire the node
6572        writer
6573            .submit(WriteRequest {
6574                label: "retire".to_owned(),
6575                nodes: vec![],
6576                node_retires: vec![NodeRetire {
6577                    logical_id: "node-retire-vec".to_owned(),
6578                    source_ref: Some("src".to_owned()),
6579                }],
6580                edges: vec![],
6581                edge_retires: vec![],
6582                chunks: vec![],
6583                runs: vec![],
6584                steps: vec![],
6585                actions: vec![],
6586                optional_backfills: vec![],
6587                vec_inserts: vec![],
6588                operational_writes: vec![],
6589            })
6590            .expect("retire write");
6591
6592        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6593        let count: i64 = conn
6594            .query_row(
6595                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6596                [],
6597                |row| row.get(0),
6598            )
6599            .expect("count");
6600        assert_eq!(
6601            count, 1,
6602            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6603        );
6604    }
6605
6606    #[cfg(feature = "sqlite-vec")]
6607    #[test]
6608    #[allow(clippy::too_many_lines)]
6609    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6610        use crate::sqlite::open_connection_with_vec;
6611
6612        let db = NamedTempFile::new().expect("temporary db");
6613        let schema_manager = Arc::new(SchemaManager::new());
6614
6615        {
6616            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6617            schema_manager.bootstrap(&conn).expect("bootstrap");
6618            schema_manager
6619                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6620                .expect("ensure profile");
6621        }
6622
6623        let writer = WriterActor::start(
6624            db.path(),
6625            Arc::clone(&schema_manager),
6626            ProvenanceMode::Warn,
6627            Arc::new(TelemetryCounters::default()),
6628        )
6629        .expect("writer");
6630
6631        // Insert node + chunk-A + vec-A
6632        writer
6633            .submit(WriteRequest {
6634                label: "v1".to_owned(),
6635                nodes: vec![NodeInsert {
6636                    row_id: "row-replace-v1".to_owned(),
6637                    logical_id: "node-replace-vec".to_owned(),
6638                    kind: "Doc".to_owned(),
6639                    properties: "{}".to_owned(),
6640                    source_ref: Some("src".to_owned()),
6641                    upsert: false,
6642                    chunk_policy: ChunkPolicy::Preserve,
6643                    content_ref: None,
6644                }],
6645                node_retires: vec![],
6646                edges: vec![],
6647                edge_retires: vec![],
6648                chunks: vec![ChunkInsert {
6649                    id: "chunk-replace-A".to_owned(),
6650                    node_logical_id: "node-replace-vec".to_owned(),
6651                    text_content: "version one".to_owned(),
6652                    byte_start: None,
6653                    byte_end: None,
6654                    content_hash: None,
6655                }],
6656                runs: vec![],
6657                steps: vec![],
6658                actions: vec![],
6659                optional_backfills: vec![],
6660                vec_inserts: vec![VecInsert {
6661                    chunk_id: "chunk-replace-A".to_owned(),
6662                    embedding: vec![0.1, 0.2, 0.3],
6663                }],
6664                operational_writes: vec![],
6665            })
6666            .expect("v1 write");
6667
6668        // Upsert with Replace + chunk-B + vec-B
6669        writer
6670            .submit(WriteRequest {
6671                label: "v2".to_owned(),
6672                nodes: vec![NodeInsert {
6673                    row_id: "row-replace-v2".to_owned(),
6674                    logical_id: "node-replace-vec".to_owned(),
6675                    kind: "Doc".to_owned(),
6676                    properties: "{}".to_owned(),
6677                    source_ref: Some("src".to_owned()),
6678                    upsert: true,
6679                    chunk_policy: ChunkPolicy::Replace,
6680                    content_ref: None,
6681                }],
6682                node_retires: vec![],
6683                edges: vec![],
6684                edge_retires: vec![],
6685                chunks: vec![ChunkInsert {
6686                    id: "chunk-replace-B".to_owned(),
6687                    node_logical_id: "node-replace-vec".to_owned(),
6688                    text_content: "version two".to_owned(),
6689                    byte_start: None,
6690                    byte_end: None,
6691                    content_hash: None,
6692                }],
6693                runs: vec![],
6694                steps: vec![],
6695                actions: vec![],
6696                optional_backfills: vec![],
6697                vec_inserts: vec![VecInsert {
6698                    chunk_id: "chunk-replace-B".to_owned(),
6699                    embedding: vec![0.4, 0.5, 0.6],
6700                }],
6701                operational_writes: vec![],
6702            })
6703            .expect("v2 write");
6704
6705        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6706        let count_a: i64 = conn
6707            .query_row(
6708                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6709                [],
6710                |row| row.get(0),
6711            )
6712            .expect("count A");
6713        let count_b: i64 = conn
6714            .query_row(
6715                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6716                [],
6717                |row| row.get(0),
6718            )
6719            .expect("count B");
6720        assert_eq!(
6721            count_a, 0,
6722            "old vec row (chunk-A) must be deleted on Replace"
6723        );
6724        assert_eq!(
6725            count_b, 1,
6726            "new vec row (chunk-B) must be present after Replace"
6727        );
6728    }
6729
6730    #[cfg(feature = "sqlite-vec")]
6731    #[test]
6732    fn vec_insert_is_persisted_when_feature_enabled() {
6733        use crate::sqlite::open_connection_with_vec;
6734
6735        let db = NamedTempFile::new().expect("temporary db");
6736        let schema_manager = Arc::new(SchemaManager::new());
6737
6738        // Open a vec-capable connection and bootstrap + ensure profile
6739        {
6740            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6741            schema_manager.bootstrap(&conn).expect("bootstrap");
6742            schema_manager
6743                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6744                .expect("ensure profile");
6745        }
6746
6747        let writer = WriterActor::start(
6748            db.path(),
6749            Arc::clone(&schema_manager),
6750            ProvenanceMode::Warn,
6751            Arc::new(TelemetryCounters::default()),
6752        )
6753        .expect("writer");
6754
6755        writer
6756            .submit(WriteRequest {
6757                label: "vec-insert".to_owned(),
6758                nodes: vec![],
6759                node_retires: vec![],
6760                edges: vec![],
6761                edge_retires: vec![],
6762                chunks: vec![],
6763                runs: vec![],
6764                steps: vec![],
6765                actions: vec![],
6766                optional_backfills: vec![],
6767                vec_inserts: vec![VecInsert {
6768                    chunk_id: "chunk-vec".to_owned(),
6769                    embedding: vec![0.1, 0.2, 0.3],
6770                }],
6771                operational_writes: vec![],
6772            })
6773            .expect("vec insert write");
6774
6775        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6776        let count: i64 = conn
6777            .query_row(
6778                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6779                [],
6780                |row| row.get(0),
6781            )
6782            .expect("count");
6783        assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6784    }
6785
6786    // --- WriteRequest size validation tests ---
6787
6788    #[test]
6789    fn write_request_exceeding_node_limit_is_rejected() {
6790        let nodes: Vec<NodeInsert> = (0..10_001)
6791            .map(|i| NodeInsert {
6792                row_id: format!("row-{i}"),
6793                logical_id: format!("lg-{i}"),
6794                kind: "Note".to_owned(),
6795                properties: "{}".to_owned(),
6796                source_ref: None,
6797                upsert: false,
6798                chunk_policy: ChunkPolicy::Preserve,
6799                content_ref: None,
6800            })
6801            .collect();
6802
6803        let request = WriteRequest {
6804            label: "too-many-nodes".to_owned(),
6805            nodes,
6806            node_retires: vec![],
6807            edges: vec![],
6808            edge_retires: vec![],
6809            chunks: vec![],
6810            runs: vec![],
6811            steps: vec![],
6812            actions: vec![],
6813            optional_backfills: vec![],
6814            vec_inserts: vec![],
6815            operational_writes: vec![],
6816        };
6817
6818        let result = prepare_write(request, ProvenanceMode::Warn)
6819            .map(|_| ())
6820            .map_err(|e| format!("{e}"));
6821        assert!(
6822            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6823            "exceeding node limit must return InvalidWrite: got {result:?}"
6824        );
6825    }
6826
6827    #[test]
6828    fn write_request_exceeding_total_limit_is_rejected() {
6829        // Spread items across fields to exceed 100_000 total
6830        // without exceeding any single per-field limit.
6831        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
6832        let request = WriteRequest {
6833            label: "too-many-total".to_owned(),
6834            nodes: (0..10_000)
6835                .map(|i| NodeInsert {
6836                    row_id: format!("row-{i}"),
6837                    logical_id: format!("lg-{i}"),
6838                    kind: "Note".to_owned(),
6839                    properties: "{}".to_owned(),
6840                    source_ref: None,
6841                    upsert: false,
6842                    chunk_policy: ChunkPolicy::Preserve,
6843                    content_ref: None,
6844                })
6845                .collect(),
6846            node_retires: vec![],
6847            edges: (0..10_000)
6848                .map(|i| EdgeInsert {
6849                    row_id: format!("edge-row-{i}"),
6850                    logical_id: format!("edge-lg-{i}"),
6851                    kind: "link".to_owned(),
6852                    source_logical_id: format!("lg-{i}"),
6853                    target_logical_id: format!("lg-{}", i + 1),
6854                    properties: "{}".to_owned(),
6855                    source_ref: None,
6856                    upsert: false,
6857                })
6858                .collect(),
6859            edge_retires: vec![],
6860            chunks: (0..50_000)
6861                .map(|i| ChunkInsert {
6862                    id: format!("chunk-{i}"),
6863                    node_logical_id: "lg-0".to_owned(),
6864                    text_content: "text".to_owned(),
6865                    byte_start: None,
6866                    byte_end: None,
6867                    content_hash: None,
6868                })
6869                .collect(),
6870            runs: vec![],
6871            steps: vec![],
6872            actions: vec![],
6873            optional_backfills: vec![],
6874            vec_inserts: (0..20_001)
6875                .map(|i| VecInsert {
6876                    chunk_id: format!("vec-chunk-{i}"),
6877                    embedding: vec![0.1],
6878                })
6879                .collect(),
6880            operational_writes: (0..10_000)
6881                .map(|i| OperationalWrite::Append {
6882                    collection: format!("col-{i}"),
6883                    record_key: format!("key-{i}"),
6884                    payload_json: "{}".to_owned(),
6885                    source_ref: None,
6886                })
6887                .collect(),
6888        };
6889
6890        let result = prepare_write(request, ProvenanceMode::Warn)
6891            .map(|_| ())
6892            .map_err(|e| format!("{e}"));
6893        assert!(
6894            matches!(result, Err(ref msg) if msg.contains("too many total items")),
6895            "exceeding total item limit must return InvalidWrite: got {result:?}"
6896        );
6897    }
6898
6899    #[test]
6900    fn write_request_within_limits_succeeds() {
6901        let db = NamedTempFile::new().expect("temporary db");
6902        let writer = WriterActor::start(
6903            db.path(),
6904            Arc::new(SchemaManager::new()),
6905            ProvenanceMode::Warn,
6906            Arc::new(TelemetryCounters::default()),
6907        )
6908        .expect("writer");
6909
6910        let result = writer.submit(WriteRequest {
6911            label: "within-limits".to_owned(),
6912            nodes: vec![NodeInsert {
6913                row_id: "row-1".to_owned(),
6914                logical_id: "lg-1".to_owned(),
6915                kind: "Note".to_owned(),
6916                properties: "{}".to_owned(),
6917                source_ref: None,
6918                upsert: false,
6919                chunk_policy: ChunkPolicy::Preserve,
6920                content_ref: None,
6921            }],
6922            node_retires: vec![],
6923            edges: vec![],
6924            edge_retires: vec![],
6925            chunks: vec![],
6926            runs: vec![],
6927            steps: vec![],
6928            actions: vec![],
6929            optional_backfills: vec![],
6930            vec_inserts: vec![],
6931            operational_writes: vec![],
6932        });
6933
6934        assert!(
6935            result.is_ok(),
6936            "write request within limits must succeed: got {result:?}"
6937        );
6938    }
6939
6940    #[test]
6941    fn property_fts_rows_created_on_node_insert() {
6942        let db = NamedTempFile::new().expect("temporary db");
6943        // Bootstrap and register a property schema before starting the writer.
6944        let schema = Arc::new(SchemaManager::new());
6945        {
6946            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6947            schema.bootstrap(&conn).expect("bootstrap");
6948            conn.execute(
6949                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6950                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6951                [],
6952            )
6953            .expect("register schema");
6954        }
6955        let writer = WriterActor::start(
6956            db.path(),
6957            Arc::clone(&schema),
6958            ProvenanceMode::Warn,
6959            Arc::new(TelemetryCounters::default()),
6960        )
6961        .expect("writer");
6962
6963        writer
6964            .submit(WriteRequest {
6965                label: "goal-insert".to_owned(),
6966                nodes: vec![NodeInsert {
6967                    row_id: "row-1".to_owned(),
6968                    logical_id: "goal-1".to_owned(),
6969                    kind: "Goal".to_owned(),
6970                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6971                        .to_owned(),
6972                    source_ref: Some("src-1".to_owned()),
6973                    upsert: false,
6974                    chunk_policy: ChunkPolicy::Preserve,
6975                    content_ref: None,
6976                }],
6977                node_retires: vec![],
6978                edges: vec![],
6979                edge_retires: vec![],
6980                chunks: vec![],
6981                runs: vec![],
6982                steps: vec![],
6983                actions: vec![],
6984                optional_backfills: vec![],
6985                vec_inserts: vec![],
6986                operational_writes: vec![],
6987            })
6988            .expect("write");
6989
6990        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6991        let text: String = conn
6992            .query_row(
6993                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6994                [],
6995                |row| row.get(0),
6996            )
6997            .expect("property FTS row must exist");
6998        assert_eq!(text, "Ship v2 Launch the redesign");
6999    }
7000
7001    #[test]
7002    fn property_fts_rows_replaced_on_upsert() {
7003        let db = NamedTempFile::new().expect("temporary db");
7004        let schema = Arc::new(SchemaManager::new());
7005        {
7006            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7007            schema.bootstrap(&conn).expect("bootstrap");
7008            conn.execute(
7009                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7010                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7011                [],
7012            )
7013            .expect("register schema");
7014        }
7015        let writer = WriterActor::start(
7016            db.path(),
7017            Arc::clone(&schema),
7018            ProvenanceMode::Warn,
7019            Arc::new(TelemetryCounters::default()),
7020        )
7021        .expect("writer");
7022
7023        // Initial insert.
7024        writer
7025            .submit(WriteRequest {
7026                label: "insert".to_owned(),
7027                nodes: vec![NodeInsert {
7028                    row_id: "row-1".to_owned(),
7029                    logical_id: "goal-1".to_owned(),
7030                    kind: "Goal".to_owned(),
7031                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7032                    source_ref: Some("src-1".to_owned()),
7033                    upsert: false,
7034                    chunk_policy: ChunkPolicy::Preserve,
7035                    content_ref: None,
7036                }],
7037                node_retires: vec![],
7038                edges: vec![],
7039                edge_retires: vec![],
7040                chunks: vec![],
7041                runs: vec![],
7042                steps: vec![],
7043                actions: vec![],
7044                optional_backfills: vec![],
7045                vec_inserts: vec![],
7046                operational_writes: vec![],
7047            })
7048            .expect("insert");
7049
7050        // Upsert with changed property.
7051        writer
7052            .submit(WriteRequest {
7053                label: "upsert".to_owned(),
7054                nodes: vec![NodeInsert {
7055                    row_id: "row-2".to_owned(),
7056                    logical_id: "goal-1".to_owned(),
7057                    kind: "Goal".to_owned(),
7058                    properties: r#"{"name":"Beta"}"#.to_owned(),
7059                    source_ref: Some("src-2".to_owned()),
7060                    upsert: true,
7061                    chunk_policy: ChunkPolicy::Preserve,
7062                    content_ref: None,
7063                }],
7064                node_retires: vec![],
7065                edges: vec![],
7066                edge_retires: vec![],
7067                chunks: vec![],
7068                runs: vec![],
7069                steps: vec![],
7070                actions: vec![],
7071                optional_backfills: vec![],
7072                vec_inserts: vec![],
7073                operational_writes: vec![],
7074            })
7075            .expect("upsert");
7076
7077        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7078        let count: i64 = conn
7079            .query_row(
7080                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7081                [],
7082                |row| row.get(0),
7083            )
7084            .expect("count");
7085        assert_eq!(
7086            count, 1,
7087            "must have exactly one property FTS row after upsert"
7088        );
7089
7090        let text: String = conn
7091            .query_row(
7092                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7093                [],
7094                |row| row.get(0),
7095            )
7096            .expect("text");
7097        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7098    }
7099
7100    #[test]
7101    fn property_fts_rows_deleted_on_retire() {
7102        let db = NamedTempFile::new().expect("temporary db");
7103        let schema = Arc::new(SchemaManager::new());
7104        {
7105            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7106            schema.bootstrap(&conn).expect("bootstrap");
7107            conn.execute(
7108                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7109                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7110                [],
7111            )
7112            .expect("register schema");
7113        }
7114        let writer = WriterActor::start(
7115            db.path(),
7116            Arc::clone(&schema),
7117            ProvenanceMode::Warn,
7118            Arc::new(TelemetryCounters::default()),
7119        )
7120        .expect("writer");
7121
7122        // Insert a node.
7123        writer
7124            .submit(WriteRequest {
7125                label: "insert".to_owned(),
7126                nodes: vec![NodeInsert {
7127                    row_id: "row-1".to_owned(),
7128                    logical_id: "goal-1".to_owned(),
7129                    kind: "Goal".to_owned(),
7130                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7131                    source_ref: Some("src-1".to_owned()),
7132                    upsert: false,
7133                    chunk_policy: ChunkPolicy::Preserve,
7134                    content_ref: None,
7135                }],
7136                node_retires: vec![],
7137                edges: vec![],
7138                edge_retires: vec![],
7139                chunks: vec![],
7140                runs: vec![],
7141                steps: vec![],
7142                actions: vec![],
7143                optional_backfills: vec![],
7144                vec_inserts: vec![],
7145                operational_writes: vec![],
7146            })
7147            .expect("insert");
7148
7149        // Retire the node.
7150        writer
7151            .submit(WriteRequest {
7152                label: "retire".to_owned(),
7153                nodes: vec![],
7154                node_retires: vec![NodeRetire {
7155                    logical_id: "goal-1".to_owned(),
7156                    source_ref: Some("forget-1".to_owned()),
7157                }],
7158                edges: vec![],
7159                edge_retires: vec![],
7160                chunks: vec![],
7161                runs: vec![],
7162                steps: vec![],
7163                actions: vec![],
7164                optional_backfills: vec![],
7165                vec_inserts: vec![],
7166                operational_writes: vec![],
7167            })
7168            .expect("retire");
7169
7170        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7171        let count: i64 = conn
7172            .query_row(
7173                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7174                [],
7175                |row| row.get(0),
7176            )
7177            .expect("count");
7178        assert_eq!(count, 0, "property FTS row must be deleted on retire");
7179    }
7180
7181    #[test]
7182    fn no_property_fts_row_for_unregistered_kind() {
7183        let db = NamedTempFile::new().expect("temporary db");
7184        let schema = Arc::new(SchemaManager::new());
7185        {
7186            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7187            schema.bootstrap(&conn).expect("bootstrap");
7188            // No schema registered for "Note" kind.
7189        }
7190        let writer = WriterActor::start(
7191            db.path(),
7192            Arc::clone(&schema),
7193            ProvenanceMode::Warn,
7194            Arc::new(TelemetryCounters::default()),
7195        )
7196        .expect("writer");
7197
7198        writer
7199            .submit(WriteRequest {
7200                label: "insert".to_owned(),
7201                nodes: vec![NodeInsert {
7202                    row_id: "row-1".to_owned(),
7203                    logical_id: "note-1".to_owned(),
7204                    kind: "Note".to_owned(),
7205                    properties: r#"{"title":"hello"}"#.to_owned(),
7206                    source_ref: Some("src-1".to_owned()),
7207                    upsert: false,
7208                    chunk_policy: ChunkPolicy::Preserve,
7209                    content_ref: None,
7210                }],
7211                node_retires: vec![],
7212                edges: vec![],
7213                edge_retires: vec![],
7214                chunks: vec![],
7215                runs: vec![],
7216                steps: vec![],
7217                actions: vec![],
7218                optional_backfills: vec![],
7219                vec_inserts: vec![],
7220                operational_writes: vec![],
7221            })
7222            .expect("insert");
7223
7224        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7225        let count: i64 = conn
7226            .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
7227                row.get(0)
7228            })
7229            .expect("count");
7230        assert_eq!(count, 0, "no property FTS rows for unregistered kind");
7231    }
7232
7233    mod extract_json_path_tests {
7234        use super::super::extract_json_path;
7235        use serde_json::json;
7236
7237        #[test]
7238        fn string_value() {
7239            let v = json!({"name": "alice"});
7240            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7241        }
7242
7243        #[test]
7244        fn number_value() {
7245            let v = json!({"age": 42});
7246            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7247        }
7248
7249        #[test]
7250        fn bool_value() {
7251            let v = json!({"active": true});
7252            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7253        }
7254
7255        #[test]
7256        fn null_value() {
7257            let v = json!({"x": null});
7258            assert!(extract_json_path(&v, "$.x").is_empty());
7259        }
7260
7261        #[test]
7262        fn missing_path() {
7263            let v = json!({"name": "a"});
7264            assert!(extract_json_path(&v, "$.missing").is_empty());
7265        }
7266
7267        #[test]
7268        fn nested_path() {
7269            let v = json!({"address": {"city": "NYC"}});
7270            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7271        }
7272
7273        #[test]
7274        fn array_of_strings() {
7275            let v = json!({"tags": ["a", "b", "c"]});
7276            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7277        }
7278
7279        #[test]
7280        fn array_mixed_scalars() {
7281            let v = json!({"vals": ["x", 1, true]});
7282            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7283        }
7284
7285        #[test]
7286        fn array_only_objects_returns_empty() {
7287            let v = json!({"data": [{"k": "v"}]});
7288            assert!(extract_json_path(&v, "$.data").is_empty());
7289        }
7290
7291        #[test]
7292        fn array_mixed_objects_and_scalars() {
7293            let v = json!({"data": ["keep", {"skip": true}, "also"]});
7294            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7295        }
7296
7297        #[test]
7298        fn object_returns_empty() {
7299            let v = json!({"meta": {"k": "v"}});
7300            assert!(extract_json_path(&v, "$.meta").is_empty());
7301        }
7302
7303        #[test]
7304        fn no_prefix_returns_empty() {
7305            let v = json!({"name": "a"});
7306            assert!(extract_json_path(&v, "name").is_empty());
7307        }
7308    }
7309
7310    mod recursive_extraction_tests {
7311        use super::super::{
7312            LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7313            PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7314        };
7315        use serde_json::json;
7316
7317        fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7318            PropertyFtsSchema {
7319                paths,
7320                separator: " ".to_owned(),
7321                exclude_paths: Vec::new(),
7322            }
7323        }
7324
7325        fn schema_with_excludes(
7326            paths: Vec<PropertyPathEntry>,
7327            excludes: Vec<String>,
7328        ) -> PropertyFtsSchema {
7329            PropertyFtsSchema {
7330                paths,
7331                separator: " ".to_owned(),
7332                exclude_paths: excludes,
7333            }
7334        }
7335
7336        #[test]
7337        fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7338            let props = json!({"payload": {"b": "two", "a": "one"}});
7339            let (blob, positions, _stats) = extract_property_fts(
7340                &props,
7341                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7342            );
7343            let blob = blob.expect("blob emitted");
7344            let idx_one = blob.find("one").expect("contains 'one'");
7345            let idx_two = blob.find("two").expect("contains 'two'");
7346            assert!(
7347                idx_one < idx_two,
7348                "lex order: 'one' (key a) before 'two' (key b)"
7349            );
7350            assert_eq!(positions.len(), 2);
7351            assert_eq!(positions[0].leaf_path, "$.payload.a");
7352            assert_eq!(positions[1].leaf_path, "$.payload.b");
7353        }
7354
7355        #[test]
7356        fn recursive_extraction_walks_arrays_of_scalars() {
7357            let props = json!({"tags": ["red", "blue"]});
7358            let (_blob, positions, _stats) = extract_property_fts(
7359                &props,
7360                &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7361            );
7362            assert_eq!(positions.len(), 2);
7363            assert_eq!(positions[0].leaf_path, "$.tags[0]");
7364            assert_eq!(positions[1].leaf_path, "$.tags[1]");
7365        }
7366
7367        #[test]
7368        fn recursive_extraction_walks_arrays_of_objects() {
7369            let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7370            let (_blob, positions, _stats) = extract_property_fts(
7371                &props,
7372                &schema(vec![PropertyPathEntry::recursive("$.items")]),
7373            );
7374            assert_eq!(positions.len(), 2);
7375            assert_eq!(positions[0].leaf_path, "$.items[0].name");
7376            assert_eq!(positions[1].leaf_path, "$.items[1].name");
7377        }
7378
7379        #[test]
7380        fn recursive_extraction_stringifies_numbers_and_bools() {
7381            let props = json!({"root": {"n": 42, "ok": true}});
7382            let (blob, _positions, _stats) = extract_property_fts(
7383                &props,
7384                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7385            );
7386            let blob = blob.expect("blob emitted");
7387            assert!(blob.contains("42"));
7388            assert!(blob.contains("true"));
7389        }
7390
7391        #[test]
7392        fn recursive_extraction_skips_nulls_and_missing() {
7393            let props = json!({"root": {"x": null, "y": "present"}});
7394            let (blob, positions, _stats) = extract_property_fts(
7395                &props,
7396                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7397            );
7398            let blob = blob.expect("blob emitted");
7399            assert!(!blob.contains("null"));
7400            assert_eq!(positions.len(), 1);
7401            assert_eq!(positions[0].leaf_path, "$.root.y");
7402        }
7403
7404        #[test]
7405        fn recursive_extraction_respects_max_depth_guardrail() {
7406            // Build nested object 10 levels deep. MAX_RECURSIVE_DEPTH = 8.
7407            let mut inner = json!("leaf-value");
7408            for _ in 0..10 {
7409                inner = json!({ "k": inner });
7410            }
7411            let props = json!({ "root": inner });
7412            let (blob, positions, stats) = extract_property_fts(
7413                &props,
7414                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7415            );
7416            assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7417            // The walk should stop before reaching the leaf (depth 8).
7418            assert!(
7419                blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7420                "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7421            );
7422            // Row is still indexed with whatever was emitted — even if
7423            // that's nothing. The contract is only that the walk clamped.
7424            let _ = positions;
7425            let _ = MAX_RECURSIVE_DEPTH;
7426        }
7427
7428        #[test]
7429        fn recursive_extraction_respects_max_bytes_guardrail() {
7430            // Build an array of 4KB strings; total blob > 64KB.
7431            let leaves: Vec<String> = (0..40)
7432                .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7433                .collect();
7434            let props = json!({ "root": leaves });
7435            let (blob, positions, stats) = extract_property_fts(
7436                &props,
7437                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7438            );
7439            assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7440            let blob = blob.expect("blob must still be emitted");
7441            assert!(
7442                blob.len() <= MAX_EXTRACTED_BYTES,
7443                "blob must not exceed MAX_EXTRACTED_BYTES"
7444            );
7445            // No partial leaves: every position end must fall inside blob length.
7446            for pos in &positions {
7447                assert!(pos.end_offset <= blob.len());
7448                let slice = &blob[pos.start_offset..pos.end_offset];
7449                // Slice must equal some original leaf value; we only
7450                // verify it's not empty and doesn't straddle a separator.
7451                assert!(!slice.is_empty());
7452                assert!(!slice.contains(LEAF_SEPARATOR));
7453            }
7454            // Blob cannot end in a dangling separator.
7455            assert!(!blob.ends_with(LEAF_SEPARATOR));
7456        }
7457
7458        #[test]
7459        fn recursive_extraction_respects_exclude_paths() {
7460            let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7461            let (blob, positions, stats) = extract_property_fts(
7462                &props,
7463                &schema_with_excludes(
7464                    vec![PropertyPathEntry::recursive("$.payload")],
7465                    vec!["$.payload.priv".to_owned()],
7466                ),
7467            );
7468            let blob = blob.expect("blob emitted");
7469            assert!(blob.contains("yes"));
7470            assert!(!blob.contains("no"));
7471            assert_eq!(positions.len(), 1);
7472            assert_eq!(positions[0].leaf_path, "$.payload.pub");
7473            assert!(stats.excluded_subtree > 0);
7474        }
7475
7476        #[test]
7477        fn position_map_entries_match_emitted_leaves_in_order() {
7478            let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7479            let (blob, positions, _stats) = extract_property_fts(
7480                &props,
7481                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7482            );
7483            let blob = blob.expect("blob emitted");
7484            assert_eq!(positions.len(), 3);
7485            // Leaf paths in lexicographic order.
7486            assert_eq!(positions[0].leaf_path, "$.root.a");
7487            assert_eq!(positions[1].leaf_path, "$.root.b");
7488            assert_eq!(positions[2].leaf_path, "$.root.c");
7489            // Monotonic, non-overlapping offsets.
7490            let mut prev_end: usize = 0;
7491            for (i, pos) in positions.iter().enumerate() {
7492                assert!(pos.start_offset >= prev_end);
7493                assert!(pos.end_offset > pos.start_offset);
7494                assert!(pos.end_offset <= blob.len());
7495                let slice = &blob[pos.start_offset..pos.end_offset];
7496                match i {
7497                    0 => assert_eq!(slice, "alpha"),
7498                    1 => assert_eq!(slice, "bravo"),
7499                    2 => assert_eq!(slice, "charlie"),
7500                    _ => unreachable!(),
7501                }
7502                prev_end = pos.end_offset;
7503            }
7504        }
7505
7506        #[test]
7507        fn scalar_only_schema_produces_empty_position_map() {
7508            let props = json!({"name": "alpha", "title": "beta"});
7509            let (blob, positions, _stats) = extract_property_fts(
7510                &props,
7511                &schema(vec![
7512                    PropertyPathEntry::scalar("$.name"),
7513                    PropertyPathEntry::scalar("$.title"),
7514                ]),
7515            );
7516            assert_eq!(blob.as_deref(), Some("alpha beta"));
7517            assert!(
7518                positions.is_empty(),
7519                "scalar-only schema must emit no position entries"
7520            );
7521            // Sanity-check the type so the test fails loudly if the
7522            // signature changes.
7523            let _: Vec<PositionEntry> = positions;
7524        }
7525
7526        fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7527            let mut seen = std::collections::HashSet::new();
7528            for pos in positions {
7529                let start = pos.start_offset;
7530                assert!(
7531                    seen.insert(start),
7532                    "duplicate start_offset {start} in positions {positions:?}"
7533                );
7534            }
7535        }
7536
7537        #[test]
7538        fn recursive_extraction_empty_then_nonempty_in_array() {
7539            let props = json!({"payload": {"xs": ["", "x"]}});
7540            let (combined, positions, _stats) = extract_property_fts(
7541                &props,
7542                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7543            );
7544            assert_eq!(combined.as_deref(), Some("x"));
7545            assert_eq!(positions.len(), 1);
7546            assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7547            assert_eq!(positions[0].start_offset, 0);
7548            assert_eq!(positions[0].end_offset, 1);
7549            assert_unique_start_offsets(&positions);
7550        }
7551
7552        #[test]
7553        fn recursive_extraction_two_empties_then_nonempty_in_array() {
7554            let props = json!({"payload": {"xs": ["", "", "x"]}});
7555            let (combined, positions, _stats) = extract_property_fts(
7556                &props,
7557                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7558            );
7559            assert_eq!(combined.as_deref(), Some("x"));
7560            assert_eq!(positions.len(), 1);
7561            assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7562            assert_eq!(positions[0].start_offset, 0);
7563            assert_eq!(positions[0].end_offset, 1);
7564            assert_unique_start_offsets(&positions);
7565        }
7566
7567        #[test]
7568        fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7569            let props = json!({"payload": {"a": "", "b": "x"}});
7570            let (combined, positions, _stats) = extract_property_fts(
7571                &props,
7572                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7573            );
7574            assert_eq!(combined.as_deref(), Some("x"));
7575            assert_eq!(positions.len(), 1);
7576            assert_eq!(positions[0].leaf_path, "$.payload.b");
7577            assert_eq!(positions[0].start_offset, 0);
7578            assert_eq!(positions[0].end_offset, 1);
7579            assert_unique_start_offsets(&positions);
7580        }
7581
7582        #[test]
7583        fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7584            let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7585            let (combined, positions, _stats) = extract_property_fts(
7586                &props,
7587                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7588            );
7589            assert_eq!(combined.as_deref(), Some("x"));
7590            assert_eq!(positions.len(), 1);
7591            assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7592            assert_eq!(positions[0].start_offset, 0);
7593            assert_eq!(positions[0].end_offset, 1);
7594            assert_unique_start_offsets(&positions);
7595        }
7596
7597        #[test]
7598        fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7599            let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7600            let (combined, positions, _stats) = extract_property_fts(
7601                &props,
7602                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7603            );
7604            assert_eq!(combined.as_deref(), Some("x"));
7605            assert_eq!(positions.len(), 1);
7606            assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7607            assert_eq!(positions[0].start_offset, 0);
7608            assert_eq!(positions[0].end_offset, 1);
7609            assert_unique_start_offsets(&positions);
7610        }
7611
7612        #[test]
7613        fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7614            let cases = vec![
7615                json!({"payload": {}}),
7616                json!({"payload": {"a": ""}}),
7617                json!({"payload": {"xs": []}}),
7618                json!({"payload": {"xs": [""]}}),
7619                json!({"payload": {"xs": ["", ""]}}),
7620                json!({"payload": {"xs": ["", "", ""]}}),
7621            ];
7622            for case in cases {
7623                let (combined, positions, _stats) = extract_property_fts(
7624                    &case,
7625                    &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7626                );
7627                assert!(
7628                    combined.is_none(),
7629                    "all-empty payload {case:?} must produce no combined text, got {combined:?}"
7630                );
7631                assert!(
7632                    positions.is_empty(),
7633                    "all-empty payload {case:?} must produce no positions, got {positions:?}"
7634                );
7635            }
7636        }
7637
7638        #[test]
7639        fn recursive_extraction_nonempty_then_empty_then_nonempty() {
7640            let props = json!({"payload": {"xs": ["x", "", "y"]}});
7641            let (combined, positions, _stats) = extract_property_fts(
7642                &props,
7643                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7644            );
7645            let blob = combined.expect("blob emitted");
7646            assert_eq!(positions.len(), 2);
7647            assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
7648            assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
7649            assert_eq!(positions[0].start_offset, 0);
7650            assert_eq!(positions[0].end_offset, 1);
7651            // The two non-empty leaves are separated by exactly one
7652            // LEAF_SEPARATOR; the empty middle leaf contributes neither
7653            // bytes nor a position.
7654            assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
7655            assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
7656            assert_eq!(
7657                &blob[positions[0].start_offset..positions[0].end_offset],
7658                "x"
7659            );
7660            assert_eq!(
7661                &blob[positions[1].start_offset..positions[1].end_offset],
7662                "y"
7663            );
7664            assert_unique_start_offsets(&positions);
7665        }
7666
7667        #[test]
7668        fn recursive_extraction_null_leaves_unchanged() {
7669            let props = json!({"payload": {"xs": [null, null]}});
7670            let (combined, positions, _stats) = extract_property_fts(
7671                &props,
7672                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7673            );
7674            assert!(combined.is_none());
7675            assert!(positions.is_empty());
7676        }
7677    }
7678}