Skip to main content

fathomdb_engine/
writer.rs

1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::SchemaManager;
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16    OperationalValidationMode, extract_secondary_index_entries_for_current,
17    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18    parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23/// A deferred projection backfill task submitted alongside a write.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26    /// Which projection to backfill (FTS, vec, etc.).
27    pub target: ProjectionTarget,
28    /// JSON payload describing the backfill work.
29    pub payload: String,
30}
31
32/// Policy for handling existing chunks when upserting a node.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35    /// Keep existing chunks and FTS rows.
36    #[default]
37    Preserve,
38    /// Delete existing chunks and FTS rows before inserting new ones.
39    Replace,
40}
41
42/// Controls how missing `source_ref` values are handled at write time.
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
46    #[default]
47    Warn,
48    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
49    /// insert or retire is missing `source_ref`.
50    Require,
51}
52
53/// A node to be inserted in a [`WriteRequest`].
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56    pub row_id: String,
57    pub logical_id: String,
58    pub kind: String,
59    pub properties: String,
60    pub source_ref: Option<String>,
61    /// When true the writer supersedes the current active row for this `logical_id`
62    /// before inserting this new version. The supersession and insert are atomic.
63    pub upsert: bool,
64    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
65    pub chunk_policy: ChunkPolicy,
66    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
67    pub content_ref: Option<String>,
68}
69
70/// An edge to be inserted in a [`WriteRequest`].
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73    pub row_id: String,
74    pub logical_id: String,
75    pub source_logical_id: String,
76    pub target_logical_id: String,
77    pub kind: String,
78    pub properties: String,
79    pub source_ref: Option<String>,
80    /// When true the writer supersedes the current active edge for this `logical_id`
81    /// before inserting this new version. The supersession and insert are atomic.
82    pub upsert: bool,
83}
84
85/// A node to be retired (soft-deleted) in a [`WriteRequest`].
86#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88    pub logical_id: String,
89    pub source_ref: Option<String>,
90}
91
92/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95    pub logical_id: String,
96    pub source_ref: Option<String>,
97}
98
99/// A text chunk to be inserted in a [`WriteRequest`].
100#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102    pub id: String,
103    pub node_logical_id: String,
104    pub text_content: String,
105    pub byte_start: Option<i64>,
106    pub byte_end: Option<i64>,
107    /// Optional hash of the external content this chunk was derived from.
108    pub content_hash: Option<String>,
109}
110
111/// A vector embedding to attach to an existing chunk.
112///
113/// The `chunk_id` must reference a chunk already present in the database or
114/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
115/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
116/// without the feature the insert is silently skipped.
117#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119    pub chunk_id: String,
120    pub embedding: Vec<f32>,
121}
122
123/// A mutation to an operational collection submitted as part of a write request.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126    Append {
127        collection: String,
128        record_key: String,
129        payload_json: String,
130        source_ref: Option<String>,
131    },
132    Put {
133        collection: String,
134        record_key: String,
135        payload_json: String,
136        source_ref: Option<String>,
137    },
138    Delete {
139        collection: String,
140        record_key: String,
141        source_ref: Option<String>,
142    },
143}
144
145/// A run to be inserted in a [`WriteRequest`].
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148    pub id: String,
149    pub kind: String,
150    pub status: String,
151    pub properties: String,
152    pub source_ref: Option<String>,
153    pub upsert: bool,
154    pub supersedes_id: Option<String>,
155}
156
157/// A step to be inserted in a [`WriteRequest`].
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160    pub id: String,
161    pub run_id: String,
162    pub kind: String,
163    pub status: String,
164    pub properties: String,
165    pub source_ref: Option<String>,
166    pub upsert: bool,
167    pub supersedes_id: Option<String>,
168}
169
170/// An action to be inserted in a [`WriteRequest`].
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173    pub id: String,
174    pub step_id: String,
175    pub kind: String,
176    pub status: String,
177    pub properties: String,
178    pub source_ref: Option<String>,
179    pub upsert: bool,
180    pub supersedes_id: Option<String>,
181}
182
183// --- Write-request size limits ---
184const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
193///
194/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
195/// writer thread is **not** cancelled, so the write may still commit.  This is
196/// intentionally distinct from [`EngineError::WriterRejected`], which signals
197/// that the writer thread has disconnected and the write will **not** commit.
198const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
201#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203    pub label: String,
204    pub nodes: Vec<NodeInsert>,
205    pub node_retires: Vec<NodeRetire>,
206    pub edges: Vec<EdgeInsert>,
207    pub edge_retires: Vec<EdgeRetire>,
208    pub chunks: Vec<ChunkInsert>,
209    pub runs: Vec<RunInsert>,
210    pub steps: Vec<StepInsert>,
211    pub actions: Vec<ActionInsert>,
212    pub optional_backfills: Vec<OptionalProjectionTask>,
213    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
214    /// `sqlite-vec` feature is absent.
215    pub vec_inserts: Vec<VecInsert>,
216    pub operational_writes: Vec<OperationalWrite>,
217}
218
219/// Receipt returned after a successful write transaction.
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222    pub label: String,
223    pub optional_backfill_count: usize,
224    pub warnings: Vec<String>,
225    pub provenance_warnings: Vec<String>,
226}
227
228/// Request to update `last_accessed_at` timestamps for a batch of nodes.
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231    pub logical_ids: Vec<String>,
232    pub touched_at: i64,
233    pub source_ref: Option<String>,
234}
235
236/// Report from a last-access touch operation.
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239    pub touched_logical_ids: usize,
240    pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245    chunk_id: String,
246    node_logical_id: String,
247    kind: String,
248    text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253    node_logical_id: String,
254    kind: String,
255    text_content: String,
256    positions: Vec<PositionEntry>,
257}
258
259/// Maximum depth the recursive property-FTS extraction walk descends before
260/// clamping. A walk that reaches this depth will not emit leaves below it.
261pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
262
263/// Maximum blob size the recursive property-FTS extraction walk will emit.
264/// When the next complete leaf would push the blob past this cap, the walk
265/// stops on the preceding leaf boundary — the existing blob is indexed, the
266/// truncated leaf is not partially emitted.
267///
268/// This budget applies only to the recursive-walk portion of the blob.
269/// Scalar-prefix emissions and the scalar↔recursive [`LEAF_SEPARATOR`] token
270/// are not counted against it; the cap governs how much nested-subtree
271/// content is serialized, not the final blob length.
272pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
273
274/// Hard phrase-break separator inserted between two adjacent recursive
275/// leaves in the concatenated blob.
276///
277/// FTS5 phrase queries match on consecutive token positions, so simply
278/// inserting non-token whitespace or control characters does NOT prevent a
279/// phrase like `"foo bar"` from matching when `foo` ends one leaf and
280/// `bar` starts the next — the tokenizer would discard the control
281/// character and the two content tokens would still be adjacent.
282///
283/// To create a real position gap we embed a sentinel *token*
284/// `fathomdbphrasebreaksentinel` between leaves. Under `porter
285/// unicode61 remove_diacritics 2` this survives tokenization as its own
286/// position, so the content tokens on either side are separated by at
287/// least one intervening position and phrase queries spanning the gap
288/// cannot match. The sentinel is long, lowercase, and obviously
289/// synthetic to minimize the chance of accidental collisions with real
290/// content. It remains searchable as a word — callers should avoid
291/// passing it to `text_search` directly.
292///
293/// Validated by the integration test
294/// `leaf_separator_is_hard_phrase_break_under_unicode61_porter`.
295///
296/// Note: because the sentinel is a real token it may appear in FTS5
297/// `snippet()` output when the snippet window spans a leaf boundary.
298/// Phase 5 presentation-layer snippet post-processing is responsible for
299/// stripping it before the snippet is surfaced to callers.
300pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
301
302/// Whether a registered property-FTS path extracts a single scalar value
303/// or recursively walks the subtree rooted at the path.
304#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
305pub(crate) enum PropertyPathMode {
306    /// Legacy scalar extraction — resolve the path, append the scalar
307    /// (flattening arrays of scalars as before Phase 4).
308    #[default]
309    Scalar,
310    /// Recursively walk scalar leaves rooted at the path, emitting one
311    /// leaf per scalar with position-map tracking.
312    Recursive,
313}
314
315/// A single registered property-FTS path with its extraction mode.
316#[derive(Clone, Debug, PartialEq, Eq)]
317pub(crate) struct PropertyPathEntry {
318    pub path: String,
319    pub mode: PropertyPathMode,
320}
321
322impl PropertyPathEntry {
323    pub(crate) fn scalar(path: impl Into<String>) -> Self {
324        Self {
325            path: path.into(),
326            mode: PropertyPathMode::Scalar,
327        }
328    }
329
330    #[cfg(test)]
331    pub(crate) fn recursive(path: impl Into<String>) -> Self {
332        Self {
333            path: path.into(),
334            mode: PropertyPathMode::Recursive,
335        }
336    }
337}
338
339/// Parsed property-FTS schema definition for a single kind.
340#[derive(Clone, Debug, PartialEq, Eq)]
341pub(crate) struct PropertyFtsSchema {
342    /// Property paths to extract. Each entry is matched exactly against
343    /// the JSON tree from the node's root; see [`PropertyPathMode`].
344    pub paths: Vec<PropertyPathEntry>,
345    /// Separator inserted between adjacent leaf values in the extracted
346    /// blob. Acts as a hard phrase break under FTS5.
347    pub separator: String,
348    /// JSON paths to skip during recursive extraction. Matched as an
349    /// *exact* path equality — the walk visits each object/array before
350    /// descending into it, so listing `$.x.y` suppresses the entire
351    /// subtree rooted at `$.x.y`. Prefix matching is NOT supported: an
352    /// exclude of `$.x` does not implicitly exclude `$.x.y`, and
353    /// `$.payload.priv` does not implicitly exclude `$.payload.priv.inner`
354    /// via prefix — the walker will still descend into any subtree that
355    /// is not itself listed exactly in `exclude_paths`.
356    pub exclude_paths: Vec<String>,
357}
358
359/// One position-map entry: a half-open `[start, end)` byte range within the
360/// extracted blob plus the `JSONPath` of the scalar leaf whose value occupies
361/// that range.
362#[derive(Clone, Debug, PartialEq, Eq)]
363pub(crate) struct PositionEntry {
364    pub start_offset: usize,
365    pub end_offset: usize,
366    pub leaf_path: String,
367}
368
369/// Per-kind extraction stats accumulated by a recursive walk. Surface-level
370/// Phase 4 logging only; Phase 5 may expose these to callers.
371#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
372pub(crate) struct ExtractStats {
373    pub depth_cap_hit: usize,
374    /// `true` once the extracted blob has reached `MAX_EXTRACTED_BYTES` and
375    /// the walker has stopped accepting additional leaves. Unlike the
376    /// depth counter this is a boolean: the walker's `stopped` guard
377    /// blocks further `emit_leaf` calls after the first truncation.
378    pub byte_cap_reached: bool,
379    pub excluded_subtree: usize,
380}
381
382impl ExtractStats {
383    fn merge(&mut self, other: ExtractStats) {
384        self.depth_cap_hit += other.depth_cap_hit;
385        self.byte_cap_reached |= other.byte_cap_reached;
386        self.excluded_subtree += other.excluded_subtree;
387    }
388}
389
390struct PreparedWrite {
391    label: String,
392    nodes: Vec<NodeInsert>,
393    node_retires: Vec<NodeRetire>,
394    edges: Vec<EdgeInsert>,
395    edge_retires: Vec<EdgeRetire>,
396    chunks: Vec<ChunkInsert>,
397    runs: Vec<RunInsert>,
398    steps: Vec<StepInsert>,
399    actions: Vec<ActionInsert>,
400    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
401    /// consumed by the cfg-gated apply block.
402    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
403    vec_inserts: Vec<VecInsert>,
404    operational_writes: Vec<OperationalWrite>,
405    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
406    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
407    operational_validation_warnings: Vec<String>,
408    /// `node_logical_id` → kind for nodes co-submitted in this request.
409    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
410    node_kinds: HashMap<String, String>,
411    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
412    required_fts_rows: Vec<FtsProjectionRow>,
413    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
414    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
415    optional_backfills: Vec<OptionalProjectionTask>,
416}
417
418enum WriteMessage {
419    Submit {
420        prepared: Box<PreparedWrite>,
421        reply: Sender<Result<WriteReceipt, EngineError>>,
422    },
423    TouchLastAccessed {
424        request: LastAccessTouchRequest,
425        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
426    },
427}
428
429/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
430///
431/// On drop, the channel is closed and the writer thread is joined, ensuring all
432/// in-flight writes complete and the `SQLite` connection closes cleanly.
433#[derive(Debug)]
434pub struct WriterActor {
435    sender: ManuallyDrop<SyncSender<WriteMessage>>,
436    thread_handle: Option<thread::JoinHandle<()>>,
437    provenance_mode: ProvenanceMode,
438    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
439    _telemetry: Arc<TelemetryCounters>,
440}
441
442impl WriterActor {
443    /// # Errors
444    /// Returns [`EngineError`] if the writer thread cannot be spawned.
445    pub fn start(
446        path: impl AsRef<Path>,
447        schema_manager: Arc<SchemaManager>,
448        provenance_mode: ProvenanceMode,
449        telemetry: Arc<TelemetryCounters>,
450    ) -> Result<Self, EngineError> {
451        let database_path = path.as_ref().to_path_buf();
452        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
453
454        let writer_telemetry = Arc::clone(&telemetry);
455        let handle = thread::Builder::new()
456            .name("fathomdb-writer".to_owned())
457            .spawn(move || {
458                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
459            })
460            .map_err(EngineError::Io)?;
461
462        Ok(Self {
463            sender: ManuallyDrop::new(sender),
464            thread_handle: Some(handle),
465            provenance_mode,
466            _telemetry: telemetry,
467        })
468    }
469
470    /// Returns `true` if the writer thread is still running.
471    fn is_thread_alive(&self) -> bool {
472        self.thread_handle
473            .as_ref()
474            .is_some_and(|h| !h.is_finished())
475    }
476
477    /// Returns `WriterRejected` if the writer thread has exited.
478    fn check_thread_alive(&self) -> Result<(), EngineError> {
479        if self.is_thread_alive() {
480            Ok(())
481        } else {
482            Err(EngineError::WriterRejected(
483                "writer thread has exited".to_owned(),
484            ))
485        }
486    }
487
488    /// # Errors
489    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
490    /// down, or the underlying `SQLite` transaction fails.
491    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
492        self.check_thread_alive()?;
493        let prepared = prepare_write(request, self.provenance_mode)?;
494        let (reply_tx, reply_rx) = mpsc::channel();
495        self.sender
496            .send(WriteMessage::Submit {
497                prepared: Box::new(prepared),
498                reply: reply_tx,
499            })
500            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
501
502        recv_with_timeout(&reply_rx)
503    }
504
505    /// # Errors
506    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
507    /// or the underlying `SQLite` transaction fails.
508    pub fn touch_last_accessed(
509        &self,
510        request: LastAccessTouchRequest,
511    ) -> Result<LastAccessTouchReport, EngineError> {
512        self.check_thread_alive()?;
513        prepare_touch_last_accessed(&request, self.provenance_mode)?;
514        let (reply_tx, reply_rx) = mpsc::channel();
515        self.sender
516            .send(WriteMessage::TouchLastAccessed {
517                request,
518                reply: reply_tx,
519            })
520            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
521
522        recv_with_timeout(&reply_rx)
523    }
524}
525
526#[cfg(not(feature = "tracing"))]
527#[allow(clippy::print_stderr)]
528fn stderr_panic_notice() {
529    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
530}
531
532impl Drop for WriterActor {
533    fn drop(&mut self) {
534        // Phase 1: close the channel so the writer thread's `for msg in receiver`
535        // loop exits after finishing any in-progress message.
536        // Must happen BEFORE join to avoid deadlock.
537        //
538        // SAFETY: drop is called exactly once, and no method accesses the sender
539        // after drop begins.
540        unsafe { ManuallyDrop::drop(&mut self.sender) };
541
542        // Phase 2: join the writer thread to ensure the SQLite connection closes.
543        if let Some(handle) = self.thread_handle.take() {
544            match handle.join() {
545                Ok(()) => {}
546                Err(payload) => {
547                    if std::thread::panicking() {
548                        trace_warn!(
549                            "writer thread panicked during shutdown (suppressed: already panicking)"
550                        );
551                        #[cfg(not(feature = "tracing"))]
552                        stderr_panic_notice();
553                    } else {
554                        std::panic::resume_unwind(payload);
555                    }
556                }
557            }
558        }
559    }
560}
561
562/// Wait for a reply from the writer thread, with a timeout.
563fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
564    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
565        .map_err(|error| match error {
566            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
567                "write timed out waiting for writer thread reply — the write may still commit"
568                    .to_owned(),
569            ),
570            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
571        })
572        .and_then(|result| result)
573}
574
575fn prepare_touch_last_accessed(
576    request: &LastAccessTouchRequest,
577    mode: ProvenanceMode,
578) -> Result<(), EngineError> {
579    if request.logical_ids.is_empty() {
580        return Err(EngineError::InvalidWrite(
581            "touch_last_accessed requires at least one logical_id".to_owned(),
582        ));
583    }
584    for logical_id in &request.logical_ids {
585        if logical_id.trim().is_empty() {
586            return Err(EngineError::InvalidWrite(
587                "touch_last_accessed requires non-empty logical_ids".to_owned(),
588            ));
589        }
590    }
591    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
592        return Err(EngineError::InvalidWrite(
593            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
594                .to_owned(),
595        ));
596    }
597    Ok(())
598}
599
600fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
601    let missing: Vec<String> = request
602        .nodes
603        .iter()
604        .filter(|n| n.source_ref.is_none())
605        .map(|n| format!("node '{}'", n.logical_id))
606        .chain(
607            request
608                .node_retires
609                .iter()
610                .filter(|r| r.source_ref.is_none())
611                .map(|r| format!("node retire '{}'", r.logical_id)),
612        )
613        .chain(
614            request
615                .edges
616                .iter()
617                .filter(|e| e.source_ref.is_none())
618                .map(|e| format!("edge '{}'", e.logical_id)),
619        )
620        .chain(
621            request
622                .edge_retires
623                .iter()
624                .filter(|r| r.source_ref.is_none())
625                .map(|r| format!("edge retire '{}'", r.logical_id)),
626        )
627        .chain(
628            request
629                .runs
630                .iter()
631                .filter(|r| r.source_ref.is_none())
632                .map(|r| format!("run '{}'", r.id)),
633        )
634        .chain(
635            request
636                .steps
637                .iter()
638                .filter(|s| s.source_ref.is_none())
639                .map(|s| format!("step '{}'", s.id)),
640        )
641        .chain(
642            request
643                .actions
644                .iter()
645                .filter(|a| a.source_ref.is_none())
646                .map(|a| format!("action '{}'", a.id)),
647        )
648        .chain(
649            request
650                .operational_writes
651                .iter()
652                .filter(|write| operational_write_source_ref(write).is_none())
653                .map(|write| {
654                    format!(
655                        "operational {} '{}:{}'",
656                        operational_write_kind(write),
657                        operational_write_collection(write),
658                        operational_write_record_key(write)
659                    )
660                }),
661        )
662        .collect();
663
664    if missing.is_empty() {
665        Ok(())
666    } else {
667        Err(EngineError::InvalidWrite(format!(
668            "ProvenanceMode::Require: missing source_ref on: {}",
669            missing.join(", ")
670        )))
671    }
672}
673
674fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
675    if request.nodes.len() > MAX_NODES {
676        return Err(EngineError::InvalidWrite(format!(
677            "too many nodes: {} exceeds limit of {MAX_NODES}",
678            request.nodes.len()
679        )));
680    }
681    if request.edges.len() > MAX_EDGES {
682        return Err(EngineError::InvalidWrite(format!(
683            "too many edges: {} exceeds limit of {MAX_EDGES}",
684            request.edges.len()
685        )));
686    }
687    if request.chunks.len() > MAX_CHUNKS {
688        return Err(EngineError::InvalidWrite(format!(
689            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
690            request.chunks.len()
691        )));
692    }
693    let retires = request.node_retires.len() + request.edge_retires.len();
694    if retires > MAX_RETIRES {
695        return Err(EngineError::InvalidWrite(format!(
696            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
697        )));
698    }
699    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
700    if runtime_items > MAX_RUNTIME_ITEMS {
701        return Err(EngineError::InvalidWrite(format!(
702            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
703        )));
704    }
705    if request.operational_writes.len() > MAX_OPERATIONAL {
706        return Err(EngineError::InvalidWrite(format!(
707            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
708            request.operational_writes.len()
709        )));
710    }
711    let total = request.nodes.len()
712        + request.node_retires.len()
713        + request.edges.len()
714        + request.edge_retires.len()
715        + request.chunks.len()
716        + request.runs.len()
717        + request.steps.len()
718        + request.actions.len()
719        + request.vec_inserts.len()
720        + request.operational_writes.len();
721    if total > MAX_TOTAL_ITEMS {
722        return Err(EngineError::InvalidWrite(format!(
723            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
724        )));
725    }
726    Ok(())
727}
728
729#[allow(clippy::too_many_lines)]
730fn prepare_write(
731    request: WriteRequest,
732    mode: ProvenanceMode,
733) -> Result<PreparedWrite, EngineError> {
734    validate_request_size(&request)?;
735
736    // --- ID validation: reject empty IDs ---
737    for node in &request.nodes {
738        if node.row_id.is_empty() {
739            return Err(EngineError::InvalidWrite(
740                "NodeInsert has empty row_id".to_owned(),
741            ));
742        }
743        if node.logical_id.is_empty() {
744            return Err(EngineError::InvalidWrite(
745                "NodeInsert has empty logical_id".to_owned(),
746            ));
747        }
748    }
749    for edge in &request.edges {
750        if edge.row_id.is_empty() {
751            return Err(EngineError::InvalidWrite(
752                "EdgeInsert has empty row_id".to_owned(),
753            ));
754        }
755        if edge.logical_id.is_empty() {
756            return Err(EngineError::InvalidWrite(
757                "EdgeInsert has empty logical_id".to_owned(),
758            ));
759        }
760    }
761    for chunk in &request.chunks {
762        if chunk.id.is_empty() {
763            return Err(EngineError::InvalidWrite(
764                "ChunkInsert has empty id".to_owned(),
765            ));
766        }
767        if chunk.text_content.is_empty() {
768            return Err(EngineError::InvalidWrite(format!(
769                "chunk '{}' has empty text_content; empty chunks are not allowed",
770                chunk.id
771            )));
772        }
773    }
774    for run in &request.runs {
775        if run.id.is_empty() {
776            return Err(EngineError::InvalidWrite(
777                "RunInsert has empty id".to_owned(),
778            ));
779        }
780    }
781    for step in &request.steps {
782        if step.id.is_empty() {
783            return Err(EngineError::InvalidWrite(
784                "StepInsert has empty id".to_owned(),
785            ));
786        }
787    }
788    for action in &request.actions {
789        if action.id.is_empty() {
790            return Err(EngineError::InvalidWrite(
791                "ActionInsert has empty id".to_owned(),
792            ));
793        }
794    }
795    for vi in &request.vec_inserts {
796        if vi.chunk_id.is_empty() {
797            return Err(EngineError::InvalidWrite(
798                "VecInsert has empty chunk_id".to_owned(),
799            ));
800        }
801        if vi.embedding.is_empty() {
802            return Err(EngineError::InvalidWrite(format!(
803                "VecInsert for chunk '{}' has empty embedding",
804                vi.chunk_id
805            )));
806        }
807    }
808    for operational in &request.operational_writes {
809        if operational_write_collection(operational).is_empty() {
810            return Err(EngineError::InvalidWrite(
811                "OperationalWrite has empty collection".to_owned(),
812            ));
813        }
814        if operational_write_record_key(operational).is_empty() {
815            return Err(EngineError::InvalidWrite(format!(
816                "OperationalWrite for collection '{}' has empty record_key",
817                operational_write_collection(operational)
818            )));
819        }
820        match operational {
821            OperationalWrite::Append { payload_json, .. }
822            | OperationalWrite::Put { payload_json, .. } => {
823                if payload_json.is_empty() {
824                    return Err(EngineError::InvalidWrite(format!(
825                        "OperationalWrite {} '{}:{}' has empty payload_json",
826                        operational_write_kind(operational),
827                        operational_write_collection(operational),
828                        operational_write_record_key(operational)
829                    )));
830                }
831            }
832            OperationalWrite::Delete { .. } => {}
833        }
834    }
835
836    // --- ID validation: reject duplicate row_ids within the request ---
837    {
838        let mut seen = std::collections::HashSet::new();
839        for node in &request.nodes {
840            if !seen.insert(node.row_id.as_str()) {
841                return Err(EngineError::InvalidWrite(format!(
842                    "duplicate row_id '{}' within the same WriteRequest",
843                    node.row_id
844                )));
845            }
846        }
847        for edge in &request.edges {
848            if !seen.insert(edge.row_id.as_str()) {
849                return Err(EngineError::InvalidWrite(format!(
850                    "duplicate row_id '{}' within the same WriteRequest",
851                    edge.row_id
852                )));
853            }
854        }
855    }
856
857    // --- ProvenanceMode::Require enforcement ---
858    if mode == ProvenanceMode::Require {
859        check_require_provenance(&request)?;
860    }
861
862    // --- Runtime table upsert validation ---
863    for run in &request.runs {
864        if run.upsert && run.supersedes_id.is_none() {
865            return Err(EngineError::InvalidWrite(format!(
866                "run '{}': upsert=true requires supersedes_id to be set",
867                run.id
868            )));
869        }
870    }
871    for step in &request.steps {
872        if step.upsert && step.supersedes_id.is_none() {
873            return Err(EngineError::InvalidWrite(format!(
874                "step '{}': upsert=true requires supersedes_id to be set",
875                step.id
876            )));
877        }
878    }
879    for action in &request.actions {
880        if action.upsert && action.supersedes_id.is_none() {
881            return Err(EngineError::InvalidWrite(format!(
882                "action '{}': upsert=true requires supersedes_id to be set",
883                action.id
884            )));
885        }
886    }
887
888    let node_kinds = request
889        .nodes
890        .iter()
891        .map(|node| (node.logical_id.clone(), node.kind.clone()))
892        .collect::<HashMap<_, _>>();
893
894    Ok(PreparedWrite {
895        label: request.label,
896        nodes: request.nodes,
897        node_retires: request.node_retires,
898        edges: request.edges,
899        edge_retires: request.edge_retires,
900        chunks: request.chunks,
901        runs: request.runs,
902        steps: request.steps,
903        actions: request.actions,
904        vec_inserts: request.vec_inserts,
905        operational_writes: request.operational_writes,
906        operational_collection_kinds: HashMap::new(),
907        operational_collection_filter_fields: HashMap::new(),
908        operational_validation_warnings: Vec::new(),
909        node_kinds,
910        required_fts_rows: Vec::new(),
911        required_property_fts_rows: Vec::new(),
912        optional_backfills: request.optional_backfills,
913    })
914}
915
916fn writer_loop(
917    database_path: &Path,
918    schema_manager: &Arc<SchemaManager>,
919    receiver: mpsc::Receiver<WriteMessage>,
920    telemetry: &TelemetryCounters,
921) {
922    trace_info!("writer thread started");
923
924    let mut conn = match sqlite::open_connection(database_path) {
925        Ok(conn) => conn,
926        Err(error) => {
927            trace_error!(error = %error, "writer thread: database connection failed");
928            reject_all(receiver, &error.to_string());
929            return;
930        }
931    };
932
933    if let Err(error) = schema_manager.bootstrap(&conn) {
934        trace_error!(error = %error, "writer thread: schema bootstrap failed");
935        reject_all(receiver, &error.to_string());
936        return;
937    }
938
939    for message in receiver {
940        match message {
941            WriteMessage::Submit {
942                mut prepared,
943                reply,
944            } => {
945                #[cfg(feature = "tracing")]
946                let start = std::time::Instant::now();
947                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
948                    resolve_and_apply(&mut conn, &mut prepared)
949                }));
950                if let Ok(inner) = result {
951                    #[allow(unused_variables)]
952                    if let Err(error) = &inner {
953                        trace_error!(
954                            label = %prepared.label,
955                            error = %error,
956                            "write failed"
957                        );
958                        telemetry.increment_errors();
959                    } else {
960                        let row_count = (prepared.nodes.len()
961                            + prepared.edges.len()
962                            + prepared.chunks.len()) as u64;
963                        telemetry.increment_writes(row_count);
964                        trace_info!(
965                            label = %prepared.label,
966                            nodes = prepared.nodes.len(),
967                            edges = prepared.edges.len(),
968                            chunks = prepared.chunks.len(),
969                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
970                            "write committed"
971                        );
972                    }
973                    let _ = reply.send(inner);
974                } else {
975                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
976                    telemetry.increment_errors();
977                    // Attempt to clean up any open transaction after a panic.
978                    let _ = conn.execute_batch("ROLLBACK");
979                    let _ = reply.send(Err(EngineError::WriterRejected(
980                        "writer thread panic during resolve_and_apply".to_owned(),
981                    )));
982                }
983            }
984            WriteMessage::TouchLastAccessed { request, reply } => {
985                let result = apply_touch_last_accessed(&mut conn, &request);
986                if result.is_ok() {
987                    telemetry.increment_writes(0);
988                } else {
989                    telemetry.increment_errors();
990                }
991                let _ = reply.send(result);
992            }
993        }
994    }
995
996    trace_info!("writer thread shutting down");
997}
998
999fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1000    for message in receiver {
1001        match message {
1002            WriteMessage::Submit { reply, .. } => {
1003                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1004            }
1005            WriteMessage::TouchLastAccessed { reply, .. } => {
1006                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1007            }
1008        }
1009    }
1010}
1011
1012/// Resolve FTS projection rows before the write transaction begins.
1013///
1014/// For each chunk in the prepared write, determines the node `kind` needed
1015/// to populate the FTS index. If the node was co-submitted in the same
1016/// request its kind is taken from `prepared.node_kinds` without a DB query.
1017/// If the node is pre-existing it is looked up in the database. If the node
1018/// cannot be found in either place, an `InvalidWrite` error is returned.
1019fn resolve_fts_rows(
1020    conn: &rusqlite::Connection,
1021    prepared: &mut PreparedWrite,
1022) -> Result<(), EngineError> {
1023    let retiring_ids: std::collections::HashSet<&str> = prepared
1024        .node_retires
1025        .iter()
1026        .map(|r| r.logical_id.as_str())
1027        .collect();
1028    for chunk in &prepared.chunks {
1029        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1030            return Err(EngineError::InvalidWrite(format!(
1031                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1032                 WriteRequest; retire and chunk insertion for the same node must not be combined",
1033                chunk.id, chunk.node_logical_id
1034            )));
1035        }
1036    }
1037    for chunk in &prepared.chunks {
1038        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1039            k.clone()
1040        } else {
1041            match conn.query_row(
1042                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1043                params![chunk.node_logical_id],
1044                |row| row.get::<_, String>(0),
1045            ) {
1046                Ok(kind) => kind,
1047                Err(rusqlite::Error::QueryReturnedNoRows) => {
1048                    return Err(EngineError::InvalidWrite(format!(
1049                        "chunk '{}' references node_logical_id '{}' that is not present in this \
1050                         write request or the database \
1051                         (v1 limitation: chunks and their nodes must be submitted together or the \
1052                         node must already exist)",
1053                        chunk.id, chunk.node_logical_id
1054                    )));
1055                }
1056                Err(e) => return Err(EngineError::Sqlite(e)),
1057            }
1058        };
1059        prepared.required_fts_rows.push(FtsProjectionRow {
1060            chunk_id: chunk.id.clone(),
1061            node_logical_id: chunk.node_logical_id.clone(),
1062            kind,
1063            text_content: chunk.text_content.clone(),
1064        });
1065    }
1066    trace_debug!(
1067        fts_rows = prepared.required_fts_rows.len(),
1068        chunks_processed = prepared.chunks.len(),
1069        "fts row resolution completed"
1070    );
1071    Ok(())
1072}
1073
1074/// Load FTS property schemas from the database and extract property values from
1075/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
1076fn resolve_property_fts_rows(
1077    conn: &rusqlite::Connection,
1078    prepared: &mut PreparedWrite,
1079) -> Result<(), EngineError> {
1080    if prepared.nodes.is_empty() {
1081        return Ok(());
1082    }
1083
1084    let schemas: HashMap<String, PropertyFtsSchema> =
1085        load_fts_property_schemas(conn)?.into_iter().collect();
1086
1087    if schemas.is_empty() {
1088        return Ok(());
1089    }
1090
1091    let mut combined_stats = ExtractStats::default();
1092    for node in &prepared.nodes {
1093        let Some(schema) = schemas.get(&node.kind) else {
1094            continue;
1095        };
1096        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1097        let (text_content, positions, stats) = extract_property_fts(&props, schema);
1098        combined_stats.merge(stats);
1099        if let Some(text_content) = text_content {
1100            prepared
1101                .required_property_fts_rows
1102                .push(FtsPropertyProjectionRow {
1103                    node_logical_id: node.logical_id.clone(),
1104                    kind: node.kind.clone(),
1105                    text_content,
1106                    positions,
1107                });
1108        }
1109    }
1110    if combined_stats != ExtractStats::default() {
1111        trace_debug!(
1112            depth_cap_hit = combined_stats.depth_cap_hit,
1113            byte_cap_reached = combined_stats.byte_cap_reached,
1114            excluded_subtree = combined_stats.excluded_subtree,
1115            "property fts recursive extraction guardrails engaged"
1116        );
1117    }
1118    trace_debug!(
1119        property_fts_rows = prepared.required_property_fts_rows.len(),
1120        nodes_processed = prepared.nodes.len(),
1121        "property fts row resolution completed"
1122    );
1123    Ok(())
1124}
1125
1126/// Extract scalar values from a JSON object at a `$.field` path.
1127/// Supports simple dot-notation paths like `$.name`, `$.address.city`.
1128/// Returns individual scalars so callers can join them with the configured separator.
1129pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1130    let Some(path) = path.strip_prefix("$.") else {
1131        return Vec::new();
1132    };
1133    let mut current = value;
1134    for segment in path.split('.') {
1135        match current.get(segment) {
1136            Some(v) => current = v,
1137            None => return Vec::new(),
1138        }
1139    }
1140    match current {
1141        serde_json::Value::String(s) => vec![s.clone()],
1142        serde_json::Value::Number(n) => vec![n.to_string()],
1143        serde_json::Value::Bool(b) => vec![b.to_string()],
1144        serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1145        serde_json::Value::Array(arr) => arr
1146            .iter()
1147            .filter_map(|v| match v {
1148                serde_json::Value::String(s) => Some(s.clone()),
1149                serde_json::Value::Number(n) => Some(n.to_string()),
1150                serde_json::Value::Bool(b) => Some(b.to_string()),
1151                _ => None,
1152            })
1153            .collect(),
1154    }
1155}
1156
1157/// Core property-FTS extraction. Produces the concatenated blob, the
1158/// position map identifying which byte range in the blob came from which
1159/// JSON leaf path, and the guardrail stats for the walk.
1160///
1161/// Emission rules:
1162/// - Path entries are processed in their registered order.
1163/// - Scalar-mode entries append the scalar value(s) directly (arrays of
1164///   scalars flatten, matching pre-Phase-4 behavior). Scalar emissions do
1165///   not produce position-map entries — the position map is only populated
1166///   when a recursive path contributes a leaf.
1167/// - Recursive-mode entries walk every descendant scalar leaf in stable
1168///   lexicographic key order. Each leaf emits a position-map entry whose
1169///   `[start, end)` spans the leaf value's bytes in the blob (NOT the
1170///   trailing separator).
1171/// - Between any two emitted values (whether from the same or different
1172///   path entries) the walk inserts [`LEAF_SEPARATOR`], a hard phrase
1173///   break under FTS5's `porter unicode61` tokenizer. Scalar-mode emissions
1174///   between each other use the schema's configured `separator` for
1175///   backwards compatibility with Phase 0.
1176pub(crate) fn extract_property_fts(
1177    props: &serde_json::Value,
1178    schema: &PropertyFtsSchema,
1179) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1180    let mut walker = RecursiveWalker {
1181        blob: String::new(),
1182        positions: Vec::new(),
1183        stats: ExtractStats::default(),
1184        exclude_paths: schema.exclude_paths.clone(),
1185        stopped: false,
1186    };
1187
1188    let mut scalar_parts: Vec<String> = Vec::new();
1189
1190    for entry in &schema.paths {
1191        match entry.mode {
1192            PropertyPathMode::Scalar => {
1193                scalar_parts.extend(extract_json_path(props, &entry.path));
1194            }
1195            PropertyPathMode::Recursive => {
1196                let root = resolve_path_root(props, &entry.path);
1197                if let Some(root) = root {
1198                    walker.walk(&entry.path, root, 0);
1199                }
1200            }
1201        }
1202    }
1203
1204    // Flush scalar parts into the blob. Scalar emissions go first so that
1205    // their byte offsets are predictable relative to any recursive leaves
1206    // that follow. Scalars use the configured `separator`; the boundary
1207    // between scalars and recursive leaves uses `LEAF_SEPARATOR` (so phrase
1208    // queries cannot cross it).
1209    let scalar_text = if scalar_parts.is_empty() {
1210        None
1211    } else {
1212        Some(scalar_parts.join(&schema.separator))
1213    };
1214
1215    let combined = match (scalar_text, walker.blob.is_empty()) {
1216        (None, true) => None,
1217        (None, false) => Some(walker.blob.clone()),
1218        (Some(s), true) => Some(s),
1219        (Some(mut s), false) => {
1220            // Shift recursive positions by the prefix length so they stay
1221            // correct relative to the combined blob.
1222            let offset = s.len() + LEAF_SEPARATOR.len();
1223            for pos in &mut walker.positions {
1224                pos.start_offset += offset;
1225                pos.end_offset += offset;
1226            }
1227            s.push_str(LEAF_SEPARATOR);
1228            s.push_str(&walker.blob);
1229            Some(s)
1230        }
1231    };
1232
1233    (combined, walker.positions, walker.stats)
1234}
1235
1236fn resolve_path_root<'a>(
1237    value: &'a serde_json::Value,
1238    path: &str,
1239) -> Option<&'a serde_json::Value> {
1240    let stripped = path.strip_prefix("$.")?;
1241    let mut current = value;
1242    for segment in stripped.split('.') {
1243        current = current.get(segment)?;
1244    }
1245    Some(current)
1246}
1247
1248struct RecursiveWalker {
1249    blob: String,
1250    positions: Vec<PositionEntry>,
1251    stats: ExtractStats,
1252    exclude_paths: Vec<String>,
1253    /// Once the byte cap is hit, the walker refuses to emit any further
1254    /// leaves. Subsequent walks still account for depth/exclude stats but
1255    /// do not append.
1256    stopped: bool,
1257}
1258
1259impl RecursiveWalker {
1260    fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1261        if self.stopped {
1262            return;
1263        }
1264        if self.exclude_paths.iter().any(|p| p == current_path) {
1265            self.stats.excluded_subtree += 1;
1266            return;
1267        }
1268        match value {
1269            serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1270            serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1271            serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1272            serde_json::Value::Null => {}
1273            serde_json::Value::Object(map) => {
1274                if depth >= MAX_RECURSIVE_DEPTH {
1275                    self.stats.depth_cap_hit += 1;
1276                    return;
1277                }
1278                let mut keys: Vec<&String> = map.keys().collect();
1279                keys.sort();
1280                for key in keys {
1281                    if self.stopped {
1282                        return;
1283                    }
1284                    let child_path = format!("{current_path}.{key}");
1285                    if let Some(child) = map.get(key) {
1286                        self.walk(&child_path, child, depth + 1);
1287                    }
1288                }
1289            }
1290            serde_json::Value::Array(items) => {
1291                if depth >= MAX_RECURSIVE_DEPTH {
1292                    self.stats.depth_cap_hit += 1;
1293                    return;
1294                }
1295                for (idx, item) in items.iter().enumerate() {
1296                    if self.stopped {
1297                        return;
1298                    }
1299                    let child_path = format!("{current_path}[{idx}]");
1300                    self.walk(&child_path, item, depth + 1);
1301                }
1302            }
1303        }
1304    }
1305
1306    fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1307        if self.stopped {
1308            return;
1309        }
1310        // Compute the projected blob size if we accept this leaf.
1311        // If we already emitted at least one leaf, we must account for
1312        // the separator that precedes this one.
1313        let sep_len = if self.blob.is_empty() {
1314            0
1315        } else {
1316            LEAF_SEPARATOR.len()
1317        };
1318        let projected_len = self.blob.len() + sep_len + value.len();
1319        if projected_len > MAX_EXTRACTED_BYTES {
1320            self.stats.byte_cap_reached = true;
1321            self.stopped = true;
1322            return;
1323        }
1324        if !self.blob.is_empty() {
1325            self.blob.push_str(LEAF_SEPARATOR);
1326        }
1327        let start_offset = self.blob.len();
1328        self.blob.push_str(value);
1329        let end_offset = self.blob.len();
1330        self.positions.push(PositionEntry {
1331            start_offset,
1332            end_offset,
1333            leaf_path: leaf_path.to_owned(),
1334        });
1335    }
1336}
1337
1338/// Load all registered FTS property schemas from the database, tolerating
1339/// both the legacy JSON shape (array of bare path strings = scalar mode)
1340/// and the Phase 4 shape (objects carrying `path`, `mode`, optional
1341/// `exclude_paths`, or a top-level object carrying `paths` + global
1342/// `exclude_paths`).
1343pub(crate) fn load_fts_property_schemas(
1344    conn: &rusqlite::Connection,
1345) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1346    let mut stmt =
1347        conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1348    stmt.query_map([], |row| {
1349        let kind: String = row.get(0)?;
1350        let paths_json: String = row.get(1)?;
1351        let separator: String = row.get(2)?;
1352        let schema = parse_property_schema_json(&paths_json, &separator);
1353        Ok((kind, schema))
1354    })?
1355    .collect::<Result<Vec<_>, _>>()
1356}
1357
1358pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1359    let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1360    let mut paths = Vec::new();
1361    let mut exclude_paths: Vec<String> = Vec::new();
1362
1363    let path_values: Vec<serde_json::Value> = match value {
1364        serde_json::Value::Array(arr) => arr,
1365        serde_json::Value::Object(map) => {
1366            if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1367                exclude_paths = excl
1368                    .iter()
1369                    .filter_map(|v| v.as_str().map(str::to_owned))
1370                    .collect();
1371            }
1372            match map.get("paths") {
1373                Some(serde_json::Value::Array(arr)) => arr.clone(),
1374                _ => Vec::new(),
1375            }
1376        }
1377        _ => Vec::new(),
1378    };
1379
1380    for entry in path_values {
1381        match entry {
1382            serde_json::Value::String(path) => {
1383                paths.push(PropertyPathEntry::scalar(path));
1384            }
1385            serde_json::Value::Object(map) => {
1386                let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1387                    continue;
1388                };
1389                let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1390                    PropertyPathMode::Scalar,
1391                    |m| match m {
1392                        "recursive" => PropertyPathMode::Recursive,
1393                        _ => PropertyPathMode::Scalar,
1394                    },
1395                );
1396                paths.push(PropertyPathEntry {
1397                    path: path.to_owned(),
1398                    mode,
1399                });
1400                if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1401                    for p in excl {
1402                        if let Some(s) = p.as_str() {
1403                            exclude_paths.push(s.to_owned());
1404                        }
1405                    }
1406                }
1407            }
1408            _ => {}
1409        }
1410    }
1411
1412    PropertyFtsSchema {
1413        paths,
1414        separator: separator.to_owned(),
1415        exclude_paths,
1416    }
1417}
1418
1419fn resolve_operational_writes(
1420    conn: &rusqlite::Connection,
1421    prepared: &mut PreparedWrite,
1422) -> Result<(), EngineError> {
1423    let mut collection_kinds = HashMap::new();
1424    let mut collection_filter_fields = HashMap::new();
1425    let mut collection_validation_contracts = HashMap::new();
1426    for write in &prepared.operational_writes {
1427        let collection = operational_write_collection(write);
1428        if !collection_kinds.contains_key(collection) {
1429            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1430                .query_row(
1431                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1432                    params![collection],
1433                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1434                )
1435                .optional()
1436                .map_err(EngineError::Sqlite)?;
1437            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1438                .ok_or_else(|| {
1439                    EngineError::InvalidWrite(format!(
1440                        "operational collection '{collection}' is not registered"
1441                    ))
1442                })?;
1443            if disabled_at.is_some() {
1444                return Err(EngineError::InvalidWrite(format!(
1445                    "operational collection '{collection}' is disabled"
1446                )));
1447            }
1448            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1449                .map_err(EngineError::InvalidWrite)?;
1450            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1451            let validation_contract = parse_operational_validation_contract(&validation_json)
1452                .map_err(EngineError::InvalidWrite)?;
1453            collection_kinds.insert(collection.to_owned(), kind);
1454            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1455            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1456        }
1457
1458        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1459            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1460        })?;
1461        match (kind, write) {
1462            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1463            | (
1464                OperationalCollectionKind::LatestState,
1465                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1466            ) => {}
1467            (OperationalCollectionKind::AppendOnlyLog, _) => {
1468                return Err(EngineError::InvalidWrite(format!(
1469                    "operational collection '{collection}' is append_only_log and only accepts Append"
1470                )));
1471            }
1472            (OperationalCollectionKind::LatestState, _) => {
1473                return Err(EngineError::InvalidWrite(format!(
1474                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1475                )));
1476            }
1477        }
1478        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1479            let _ = check_operational_write_against_contract(write, contract)?;
1480        }
1481    }
1482    prepared.operational_collection_kinds = collection_kinds;
1483    prepared.operational_collection_filter_fields = collection_filter_fields;
1484    Ok(())
1485}
1486
1487fn parse_operational_filter_fields(
1488    filter_fields_json: &str,
1489) -> Result<Vec<OperationalFilterField>, EngineError> {
1490    let fields: Vec<OperationalFilterField> =
1491        serde_json::from_str(filter_fields_json).map_err(|error| {
1492            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1493        })?;
1494    let mut seen = std::collections::HashSet::new();
1495    for field in &fields {
1496        if field.name.trim().is_empty() {
1497            return Err(EngineError::InvalidWrite(
1498                "filter_fields_json field names must not be empty".to_owned(),
1499            ));
1500        }
1501        if !seen.insert(field.name.as_str()) {
1502            return Err(EngineError::InvalidWrite(format!(
1503                "filter_fields_json contains duplicate field '{}'",
1504                field.name
1505            )));
1506        }
1507        if field.modes.is_empty() {
1508            return Err(EngineError::InvalidWrite(format!(
1509                "filter_fields_json field '{}' must declare at least one mode",
1510                field.name
1511            )));
1512        }
1513        if field.modes.contains(&OperationalFilterMode::Prefix)
1514            && field.field_type != OperationalFilterFieldType::String
1515        {
1516            return Err(EngineError::InvalidWrite(format!(
1517                "filter field '{}' only supports prefix for string types",
1518                field.name
1519            )));
1520        }
1521    }
1522    Ok(fields)
1523}
1524
1525#[derive(Clone, Debug, PartialEq, Eq)]
1526struct OperationalFilterValueRow {
1527    field_name: String,
1528    string_value: Option<String>,
1529    integer_value: Option<i64>,
1530}
1531
1532fn extract_operational_filter_values(
1533    filter_fields: &[OperationalFilterField],
1534    payload_json: &str,
1535) -> Vec<OperationalFilterValueRow> {
1536    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1537        return Vec::new();
1538    };
1539    let Some(object) = parsed.as_object() else {
1540        return Vec::new();
1541    };
1542
1543    filter_fields
1544        .iter()
1545        .filter_map(|field| {
1546            let value = object.get(&field.name)?;
1547            match field.field_type {
1548                OperationalFilterFieldType::String => {
1549                    value
1550                        .as_str()
1551                        .map(|string_value| OperationalFilterValueRow {
1552                            field_name: field.name.clone(),
1553                            string_value: Some(string_value.to_owned()),
1554                            integer_value: None,
1555                        })
1556                }
1557                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1558                    value
1559                        .as_i64()
1560                        .map(|integer_value| OperationalFilterValueRow {
1561                            field_name: field.name.clone(),
1562                            string_value: None,
1563                            integer_value: Some(integer_value),
1564                        })
1565                }
1566            }
1567        })
1568        .collect()
1569}
1570
1571fn resolve_and_apply(
1572    conn: &mut rusqlite::Connection,
1573    prepared: &mut PreparedWrite,
1574) -> Result<WriteReceipt, EngineError> {
1575    resolve_fts_rows(conn, prepared)?;
1576    resolve_property_fts_rows(conn, prepared)?;
1577    resolve_operational_writes(conn, prepared)?;
1578    apply_write(conn, prepared)
1579}
1580
1581fn apply_touch_last_accessed(
1582    conn: &mut rusqlite::Connection,
1583    request: &LastAccessTouchRequest,
1584) -> Result<LastAccessTouchReport, EngineError> {
1585    let mut seen = std::collections::HashSet::new();
1586    let logical_ids = request
1587        .logical_ids
1588        .iter()
1589        .filter(|logical_id| seen.insert(logical_id.as_str()))
1590        .cloned()
1591        .collect::<Vec<_>>();
1592    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1593
1594    for logical_id in &logical_ids {
1595        let exists = tx
1596            .query_row(
1597                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1598                params![logical_id],
1599                |row| row.get::<_, i64>(0),
1600            )
1601            .optional()?
1602            .is_some();
1603        if !exists {
1604            return Err(EngineError::InvalidWrite(format!(
1605                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1606            )));
1607        }
1608    }
1609
1610    {
1611        let mut upsert_metadata = tx.prepare_cached(
1612            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1613             VALUES (?1, ?2, ?2) \
1614             ON CONFLICT(logical_id) DO UPDATE SET \
1615                 last_accessed_at = excluded.last_accessed_at, \
1616                 updated_at = excluded.updated_at",
1617        )?;
1618        let mut insert_provenance = tx.prepare_cached(
1619            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1620             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1621        )?;
1622        for logical_id in &logical_ids {
1623            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1624            insert_provenance.execute(params![
1625                new_id(),
1626                logical_id,
1627                request.source_ref.as_deref(),
1628                format!("{{\"touched_at\":{}}}", request.touched_at),
1629            ])?;
1630        }
1631    }
1632
1633    tx.commit()?;
1634    Ok(LastAccessTouchReport {
1635        touched_logical_ids: logical_ids.len(),
1636        touched_at: request.touched_at,
1637    })
1638}
1639
1640fn ensure_operational_collections_writable(
1641    tx: &rusqlite::Transaction<'_>,
1642    prepared: &PreparedWrite,
1643) -> Result<(), EngineError> {
1644    for collection in prepared.operational_collection_kinds.keys() {
1645        let disabled_at: Option<Option<i64>> = tx
1646            .query_row(
1647                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1648                params![collection],
1649                |row| row.get::<_, Option<i64>>(0),
1650            )
1651            .optional()?;
1652        match disabled_at {
1653            Some(Some(_)) => {
1654                return Err(EngineError::InvalidWrite(format!(
1655                    "operational collection '{collection}' is disabled"
1656                )));
1657            }
1658            Some(None) => {}
1659            None => {
1660                return Err(EngineError::InvalidWrite(format!(
1661                    "operational collection '{collection}' is not registered"
1662                )));
1663            }
1664        }
1665    }
1666    Ok(())
1667}
1668
1669fn validate_operational_writes_against_live_contracts(
1670    tx: &rusqlite::Transaction<'_>,
1671    prepared: &PreparedWrite,
1672) -> Result<Vec<String>, EngineError> {
1673    let mut collection_validation_contracts =
1674        HashMap::<String, Option<OperationalValidationContract>>::new();
1675    for collection in prepared.operational_collection_kinds.keys() {
1676        let validation_json: String = tx
1677            .query_row(
1678                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1679                params![collection],
1680                |row| row.get(0),
1681            )
1682            .map_err(EngineError::Sqlite)?;
1683        let validation_contract = parse_operational_validation_contract(&validation_json)
1684            .map_err(EngineError::InvalidWrite)?;
1685        collection_validation_contracts.insert(collection.clone(), validation_contract);
1686    }
1687
1688    let mut warnings = Vec::new();
1689    for write in &prepared.operational_writes {
1690        if let Some(Some(contract)) =
1691            collection_validation_contracts.get(operational_write_collection(write))
1692            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1693        {
1694            warnings.push(warning);
1695        }
1696    }
1697
1698    Ok(warnings)
1699}
1700
1701fn load_live_operational_secondary_indexes(
1702    tx: &rusqlite::Transaction<'_>,
1703    prepared: &PreparedWrite,
1704) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1705    let mut collection_indexes = HashMap::new();
1706    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1707        let secondary_indexes_json: String = tx
1708            .query_row(
1709                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1710                params![collection],
1711                |row| row.get(0),
1712            )
1713            .map_err(EngineError::Sqlite)?;
1714        let indexes =
1715            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1716                .map_err(EngineError::InvalidWrite)?;
1717        collection_indexes.insert(collection.clone(), indexes);
1718    }
1719    Ok(collection_indexes)
1720}
1721
1722fn check_operational_write_against_contract(
1723    write: &OperationalWrite,
1724    contract: &OperationalValidationContract,
1725) -> Result<Option<String>, EngineError> {
1726    if contract.mode == OperationalValidationMode::Disabled {
1727        return Ok(None);
1728    }
1729
1730    let (payload_json, collection, record_key) = match write {
1731        OperationalWrite::Append {
1732            collection,
1733            record_key,
1734            payload_json,
1735            ..
1736        }
1737        | OperationalWrite::Put {
1738            collection,
1739            record_key,
1740            payload_json,
1741            ..
1742        } => (
1743            payload_json.as_str(),
1744            collection.as_str(),
1745            record_key.as_str(),
1746        ),
1747        OperationalWrite::Delete { .. } => return Ok(None),
1748    };
1749
1750    match validate_operational_payload_against_contract(contract, payload_json) {
1751        Ok(()) => Ok(None),
1752        Err(message) => match contract.mode {
1753            OperationalValidationMode::Disabled => Ok(None),
1754            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1755                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1756                kind = operational_write_kind(write)
1757            ))),
1758            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1759                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1760                kind = operational_write_kind(write)
1761            ))),
1762        },
1763    }
1764}
1765
1766#[allow(clippy::too_many_lines)]
1767fn apply_write(
1768    conn: &mut rusqlite::Connection,
1769    prepared: &mut PreparedWrite,
1770) -> Result<WriteReceipt, EngineError> {
1771    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1772
1773    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1774    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1775    {
1776        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1777        let mut del_prop_fts =
1778            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1779        let mut del_prop_positions = tx
1780            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1781        let mut sup_node = tx.prepare_cached(
1782            "UPDATE nodes SET superseded_at = unixepoch() \
1783             WHERE logical_id = ?1 AND superseded_at IS NULL",
1784        )?;
1785        let mut ins_event = tx.prepare_cached(
1786            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1787             VALUES (?1, 'node_retire', ?2, ?3)",
1788        )?;
1789        for retire in &prepared.node_retires {
1790            del_fts.execute(params![retire.logical_id])?;
1791            del_prop_fts.execute(params![retire.logical_id])?;
1792            del_prop_positions.execute(params![retire.logical_id])?;
1793            sup_node.execute(params![retire.logical_id])?;
1794            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1795        }
1796    }
1797
1798    // Edge retires: mark superseded, record audit event.
1799    {
1800        let mut sup_edge = tx.prepare_cached(
1801            "UPDATE edges SET superseded_at = unixepoch() \
1802             WHERE logical_id = ?1 AND superseded_at IS NULL",
1803        )?;
1804        let mut ins_event = tx.prepare_cached(
1805            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1806             VALUES (?1, 'edge_retire', ?2, ?3)",
1807        )?;
1808        for retire in &prepared.edge_retires {
1809            sup_edge.execute(params![retire.logical_id])?;
1810            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1811        }
1812    }
1813
1814    // Node inserts (with optional upsert + chunk-policy handling).
1815    {
1816        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1817        let mut del_prop_fts =
1818            tx.prepare_cached("DELETE FROM fts_node_properties WHERE node_logical_id = ?1")?;
1819        let mut del_prop_positions = tx
1820            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1821        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1822        let mut sup_node = tx.prepare_cached(
1823            "UPDATE nodes SET superseded_at = unixepoch() \
1824             WHERE logical_id = ?1 AND superseded_at IS NULL",
1825        )?;
1826        let mut ins_node = tx.prepare_cached(
1827            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1828             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1829        )?;
1830        #[cfg(feature = "sqlite-vec")]
1831        let vec_del_sql2 = "DELETE FROM vec_nodes_active WHERE chunk_id IN \
1832                            (SELECT id FROM chunks WHERE node_logical_id = ?1)";
1833        #[cfg(feature = "sqlite-vec")]
1834        let mut del_vec = match tx.prepare_cached(vec_del_sql2) {
1835            Ok(stmt) => Some(stmt),
1836            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => None,
1837            Err(e) => return Err(e.into()),
1838        };
1839        for node in &prepared.nodes {
1840            if node.upsert {
1841                // Property FTS rows are always replaced on upsert since properties change.
1842                del_prop_fts.execute(params![node.logical_id])?;
1843                del_prop_positions.execute(params![node.logical_id])?;
1844                if node.chunk_policy == ChunkPolicy::Replace {
1845                    #[cfg(feature = "sqlite-vec")]
1846                    if let Some(ref mut stmt) = del_vec {
1847                        stmt.execute(params![node.logical_id])?;
1848                    }
1849                    del_fts.execute(params![node.logical_id])?;
1850                    del_chunks.execute(params![node.logical_id])?;
1851                }
1852                sup_node.execute(params![node.logical_id])?;
1853            }
1854            ins_node.execute(params![
1855                node.row_id,
1856                node.logical_id,
1857                node.kind,
1858                node.properties,
1859                node.source_ref,
1860                node.content_ref,
1861            ])?;
1862        }
1863    }
1864
1865    // Edge inserts (with optional upsert).
1866    {
1867        let mut sup_edge = tx.prepare_cached(
1868            "UPDATE edges SET superseded_at = unixepoch() \
1869             WHERE logical_id = ?1 AND superseded_at IS NULL",
1870        )?;
1871        let mut ins_edge = tx.prepare_cached(
1872            "INSERT INTO edges \
1873             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1874             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1875        )?;
1876        for edge in &prepared.edges {
1877            if edge.upsert {
1878                sup_edge.execute(params![edge.logical_id])?;
1879            }
1880            ins_edge.execute(params![
1881                edge.row_id,
1882                edge.logical_id,
1883                edge.source_logical_id,
1884                edge.target_logical_id,
1885                edge.kind,
1886                edge.properties,
1887                edge.source_ref,
1888            ])?;
1889        }
1890    }
1891
1892    // Chunk inserts.
1893    {
1894        let mut ins_chunk = tx.prepare_cached(
1895            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
1896             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1897        )?;
1898        for chunk in &prepared.chunks {
1899            ins_chunk.execute(params![
1900                chunk.id,
1901                chunk.node_logical_id,
1902                chunk.text_content,
1903                chunk.byte_start,
1904                chunk.byte_end,
1905                chunk.content_hash,
1906            ])?;
1907        }
1908    }
1909
1910    // Run inserts (with optional upsert).
1911    {
1912        let mut sup_run = tx.prepare_cached(
1913            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1914        )?;
1915        let mut ins_run = tx.prepare_cached(
1916            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
1917             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
1918        )?;
1919        for run in &prepared.runs {
1920            if run.upsert
1921                && let Some(ref prior_id) = run.supersedes_id
1922            {
1923                sup_run.execute(params![prior_id])?;
1924            }
1925            ins_run.execute(params![
1926                run.id,
1927                run.kind,
1928                run.status,
1929                run.properties,
1930                run.source_ref
1931            ])?;
1932        }
1933    }
1934
1935    // Step inserts (with optional upsert).
1936    {
1937        let mut sup_step = tx.prepare_cached(
1938            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1939        )?;
1940        let mut ins_step = tx.prepare_cached(
1941            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
1942             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1943        )?;
1944        for step in &prepared.steps {
1945            if step.upsert
1946                && let Some(ref prior_id) = step.supersedes_id
1947            {
1948                sup_step.execute(params![prior_id])?;
1949            }
1950            ins_step.execute(params![
1951                step.id,
1952                step.run_id,
1953                step.kind,
1954                step.status,
1955                step.properties,
1956                step.source_ref,
1957            ])?;
1958        }
1959    }
1960
1961    // Action inserts (with optional upsert).
1962    {
1963        let mut sup_action = tx.prepare_cached(
1964            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
1965        )?;
1966        let mut ins_action = tx.prepare_cached(
1967            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
1968             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
1969        )?;
1970        for action in &prepared.actions {
1971            if action.upsert
1972                && let Some(ref prior_id) = action.supersedes_id
1973            {
1974                sup_action.execute(params![prior_id])?;
1975            }
1976            ins_action.execute(params![
1977                action.id,
1978                action.step_id,
1979                action.kind,
1980                action.status,
1981                action.properties,
1982                action.source_ref,
1983            ])?;
1984        }
1985    }
1986
1987    // Operational mutation log writes and latest-state current rows.
1988    {
1989        ensure_operational_collections_writable(&tx, prepared)?;
1990        prepared.operational_validation_warnings =
1991            validate_operational_writes_against_live_contracts(&tx, prepared)?;
1992        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
1993
1994        let mut next_mutation_order: i64 = tx.query_row(
1995            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
1996            [],
1997            |row| row.get(0),
1998        )?;
1999        let mut ins_mutation = tx.prepare_cached(
2000            "INSERT INTO operational_mutations \
2001             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2002             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2003        )?;
2004        let mut ins_filter_value = tx.prepare_cached(
2005            "INSERT INTO operational_filter_values \
2006             (mutation_id, collection_name, field_name, string_value, integer_value) \
2007             VALUES (?1, ?2, ?3, ?4, ?5)",
2008        )?;
2009        let mut upsert_current = tx.prepare_cached(
2010            "INSERT INTO operational_current \
2011             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2012             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2013             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2014                 payload_json = excluded.payload_json, \
2015                 updated_at = excluded.updated_at, \
2016                 last_mutation_id = excluded.last_mutation_id",
2017        )?;
2018        let mut del_current = tx.prepare_cached(
2019            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2020        )?;
2021        let mut del_current_secondary_indexes = tx.prepare_cached(
2022            "DELETE FROM operational_secondary_index_entries \
2023             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2024        )?;
2025        let mut ins_secondary_index = tx.prepare_cached(
2026            "INSERT INTO operational_secondary_index_entries \
2027             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2028              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2029             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2030        )?;
2031        let mut current_row_stmt = tx.prepare_cached(
2032            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2033             WHERE collection_name = ?1 AND record_key = ?2",
2034        )?;
2035
2036        for write in &prepared.operational_writes {
2037            let collection = operational_write_collection(write);
2038            let record_key = operational_write_record_key(write);
2039            let mutation_id = new_id();
2040            next_mutation_order += 1;
2041            let payload_json = operational_write_payload(write);
2042            ins_mutation.execute(params![
2043                &mutation_id,
2044                collection,
2045                record_key,
2046                operational_write_kind(write),
2047                payload_json,
2048                operational_write_source_ref(write),
2049                next_mutation_order,
2050            ])?;
2051            if let Some(indexes) = collection_secondary_indexes.get(collection) {
2052                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2053                    ins_secondary_index.execute(params![
2054                        collection,
2055                        entry.index_name,
2056                        "mutation",
2057                        &mutation_id,
2058                        record_key,
2059                        entry.sort_timestamp,
2060                        entry.slot1_text,
2061                        entry.slot1_integer,
2062                        entry.slot2_text,
2063                        entry.slot2_integer,
2064                        entry.slot3_text,
2065                        entry.slot3_integer,
2066                    ])?;
2067                }
2068            }
2069            if let Some(filter_fields) = prepared
2070                .operational_collection_filter_fields
2071                .get(collection)
2072            {
2073                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2074                    ins_filter_value.execute(params![
2075                        &mutation_id,
2076                        collection,
2077                        filter_value.field_name,
2078                        filter_value.string_value,
2079                        filter_value.integer_value,
2080                    ])?;
2081                }
2082            }
2083
2084            if prepared.operational_collection_kinds.get(collection)
2085                == Some(&OperationalCollectionKind::LatestState)
2086            {
2087                del_current_secondary_indexes.execute(params![collection, record_key])?;
2088                match write {
2089                    OperationalWrite::Put { payload_json, .. } => {
2090                        upsert_current.execute(params![
2091                            collection,
2092                            record_key,
2093                            payload_json,
2094                            &mutation_id,
2095                        ])?;
2096                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
2097                            let (current_payload_json, updated_at, last_mutation_id): (
2098                                String,
2099                                i64,
2100                                String,
2101                            ) = current_row_stmt
2102                                .query_row(params![collection, record_key], |row| {
2103                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2104                                })?;
2105                            for entry in extract_secondary_index_entries_for_current(
2106                                indexes,
2107                                &current_payload_json,
2108                                updated_at,
2109                            ) {
2110                                ins_secondary_index.execute(params![
2111                                    collection,
2112                                    entry.index_name,
2113                                    "current",
2114                                    last_mutation_id.as_str(),
2115                                    record_key,
2116                                    entry.sort_timestamp,
2117                                    entry.slot1_text,
2118                                    entry.slot1_integer,
2119                                    entry.slot2_text,
2120                                    entry.slot2_integer,
2121                                    entry.slot3_text,
2122                                    entry.slot3_integer,
2123                                ])?;
2124                            }
2125                        }
2126                    }
2127                    OperationalWrite::Delete { .. } => {
2128                        del_current.execute(params![collection, record_key])?;
2129                    }
2130                    OperationalWrite::Append { .. } => {}
2131                }
2132            }
2133        }
2134    }
2135
2136    // FTS row inserts.
2137    {
2138        let mut ins_fts = tx.prepare_cached(
2139            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2140             VALUES (?1, ?2, ?3, ?4)",
2141        )?;
2142        for fts_row in &prepared.required_fts_rows {
2143            ins_fts.execute(params![
2144                fts_row.chunk_id,
2145                fts_row.node_logical_id,
2146                fts_row.kind,
2147                fts_row.text_content,
2148            ])?;
2149        }
2150    }
2151
2152    // Property FTS row inserts.
2153    if !prepared.required_property_fts_rows.is_empty() {
2154        let mut ins_prop_fts = tx.prepare_cached(
2155            "INSERT INTO fts_node_properties (node_logical_id, kind, text_content) \
2156             VALUES (?1, ?2, ?3)",
2157        )?;
2158        let mut ins_positions = tx.prepare_cached(
2159            "INSERT INTO fts_node_property_positions \
2160             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2161             VALUES (?1, ?2, ?3, ?4, ?5)",
2162        )?;
2163        // Delete any stale position rows for these nodes; writer already
2164        // deleted the `fts_node_properties` rows above.
2165        let mut del_positions = tx
2166            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2167        for row in &prepared.required_property_fts_rows {
2168            del_positions.execute(params![row.node_logical_id])?;
2169            ins_prop_fts.execute(params![row.node_logical_id, row.kind, row.text_content,])?;
2170            for pos in &row.positions {
2171                ins_positions.execute(params![
2172                    row.node_logical_id,
2173                    row.kind,
2174                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2175                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2176                    pos.leaf_path,
2177                ])?;
2178            }
2179        }
2180    }
2181
2182    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
2183    #[cfg(feature = "sqlite-vec")]
2184    {
2185        match tx
2186            .prepare_cached("INSERT INTO vec_nodes_active (chunk_id, embedding) VALUES (?1, ?2)")
2187        {
2188            Ok(mut ins_vec) => {
2189                for vi in &prepared.vec_inserts {
2190                    let bytes: Vec<u8> =
2191                        vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2192                    ins_vec.execute(params![vi.chunk_id, bytes])?;
2193                }
2194            }
2195            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {
2196                // vec profile absent: vec inserts are silently skipped.
2197            }
2198            Err(e) => return Err(e.into()),
2199        }
2200    }
2201
2202    tx.commit()?;
2203
2204    let provenance_warnings: Vec<String> = prepared
2205        .nodes
2206        .iter()
2207        .filter(|node| node.source_ref.is_none())
2208        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2209        .chain(
2210            prepared
2211                .node_retires
2212                .iter()
2213                .filter(|r| r.source_ref.is_none())
2214                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2215        )
2216        .chain(
2217            prepared
2218                .edges
2219                .iter()
2220                .filter(|e| e.source_ref.is_none())
2221                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2222        )
2223        .chain(
2224            prepared
2225                .edge_retires
2226                .iter()
2227                .filter(|r| r.source_ref.is_none())
2228                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2229        )
2230        .chain(
2231            prepared
2232                .runs
2233                .iter()
2234                .filter(|r| r.source_ref.is_none())
2235                .map(|r| format!("run '{}' has no source_ref", r.id)),
2236        )
2237        .chain(
2238            prepared
2239                .steps
2240                .iter()
2241                .filter(|s| s.source_ref.is_none())
2242                .map(|s| format!("step '{}' has no source_ref", s.id)),
2243        )
2244        .chain(
2245            prepared
2246                .actions
2247                .iter()
2248                .filter(|a| a.source_ref.is_none())
2249                .map(|a| format!("action '{}' has no source_ref", a.id)),
2250        )
2251        .chain(
2252            prepared
2253                .operational_writes
2254                .iter()
2255                .filter(|write| operational_write_source_ref(write).is_none())
2256                .map(|write| {
2257                    format!(
2258                        "operational {} '{}:{}' has no source_ref",
2259                        operational_write_kind(write),
2260                        operational_write_collection(write),
2261                        operational_write_record_key(write)
2262                    )
2263                }),
2264        )
2265        .collect();
2266
2267    let mut warnings = provenance_warnings.clone();
2268    warnings.extend(prepared.operational_validation_warnings.clone());
2269
2270    Ok(WriteReceipt {
2271        label: prepared.label.clone(),
2272        optional_backfill_count: prepared.optional_backfills.len(),
2273        warnings,
2274        provenance_warnings,
2275    })
2276}
2277
2278fn operational_write_collection(write: &OperationalWrite) -> &str {
2279    match write {
2280        OperationalWrite::Append { collection, .. }
2281        | OperationalWrite::Put { collection, .. }
2282        | OperationalWrite::Delete { collection, .. } => collection,
2283    }
2284}
2285
2286fn operational_write_record_key(write: &OperationalWrite) -> &str {
2287    match write {
2288        OperationalWrite::Append { record_key, .. }
2289        | OperationalWrite::Put { record_key, .. }
2290        | OperationalWrite::Delete { record_key, .. } => record_key,
2291    }
2292}
2293
2294fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2295    match write {
2296        OperationalWrite::Append { .. } => "append",
2297        OperationalWrite::Put { .. } => "put",
2298        OperationalWrite::Delete { .. } => "delete",
2299    }
2300}
2301
2302fn operational_write_payload(write: &OperationalWrite) -> &str {
2303    match write {
2304        OperationalWrite::Append { payload_json, .. }
2305        | OperationalWrite::Put { payload_json, .. } => payload_json,
2306        OperationalWrite::Delete { .. } => "null",
2307    }
2308}
2309
2310fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2311    match write {
2312        OperationalWrite::Append { source_ref, .. }
2313        | OperationalWrite::Put { source_ref, .. }
2314        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2315    }
2316}
2317
2318#[cfg(test)]
2319#[allow(clippy::expect_used)]
2320mod tests {
2321    use std::sync::Arc;
2322
2323    use fathomdb_schema::SchemaManager;
2324    use tempfile::NamedTempFile;
2325
2326    use super::{apply_write, prepare_write, resolve_operational_writes};
2327    use crate::{
2328        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2329        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2330        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2331        projection::ProjectionTarget,
2332    };
2333
2334    #[test]
2335    fn writer_executes_runtime_table_rows() {
2336        let db = NamedTempFile::new().expect("temporary db");
2337        let writer = WriterActor::start(
2338            db.path(),
2339            Arc::new(SchemaManager::new()),
2340            ProvenanceMode::Warn,
2341            Arc::new(TelemetryCounters::default()),
2342        )
2343        .expect("writer");
2344
2345        let receipt = writer
2346            .submit(WriteRequest {
2347                label: "runtime".to_owned(),
2348                nodes: vec![],
2349                node_retires: vec![],
2350                edges: vec![],
2351                edge_retires: vec![],
2352                chunks: vec![],
2353                runs: vec![RunInsert {
2354                    id: "run-1".to_owned(),
2355                    kind: "session".to_owned(),
2356                    status: "completed".to_owned(),
2357                    properties: "{}".to_owned(),
2358                    source_ref: Some("src-1".to_owned()),
2359                    upsert: false,
2360                    supersedes_id: None,
2361                }],
2362                steps: vec![StepInsert {
2363                    id: "step-1".to_owned(),
2364                    run_id: "run-1".to_owned(),
2365                    kind: "llm".to_owned(),
2366                    status: "completed".to_owned(),
2367                    properties: "{}".to_owned(),
2368                    source_ref: Some("src-1".to_owned()),
2369                    upsert: false,
2370                    supersedes_id: None,
2371                }],
2372                actions: vec![ActionInsert {
2373                    id: "action-1".to_owned(),
2374                    step_id: "step-1".to_owned(),
2375                    kind: "emit".to_owned(),
2376                    status: "completed".to_owned(),
2377                    properties: "{}".to_owned(),
2378                    source_ref: Some("src-1".to_owned()),
2379                    upsert: false,
2380                    supersedes_id: None,
2381                }],
2382                optional_backfills: vec![],
2383                vec_inserts: vec![],
2384                operational_writes: vec![],
2385            })
2386            .expect("write receipt");
2387
2388        assert_eq!(receipt.label, "runtime");
2389    }
2390
2391    #[test]
2392    fn writer_put_operational_write_updates_current_and_mutations() {
2393        let db = NamedTempFile::new().expect("temporary db");
2394        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2395        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2396        conn.execute(
2397            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2398             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2399            [],
2400        )
2401        .expect("seed collection");
2402        drop(conn);
2403        let writer = WriterActor::start(
2404            db.path(),
2405            Arc::new(SchemaManager::new()),
2406            ProvenanceMode::Warn,
2407            Arc::new(TelemetryCounters::default()),
2408        )
2409        .expect("writer");
2410
2411        writer
2412            .submit(WriteRequest {
2413                label: "node-and-operational".to_owned(),
2414                nodes: vec![NodeInsert {
2415                    row_id: "row-1".to_owned(),
2416                    logical_id: "lg-1".to_owned(),
2417                    kind: "Meeting".to_owned(),
2418                    properties: "{}".to_owned(),
2419                    source_ref: Some("src-1".to_owned()),
2420                    upsert: false,
2421                    chunk_policy: ChunkPolicy::Preserve,
2422                    content_ref: None,
2423                }],
2424                node_retires: vec![],
2425                edges: vec![],
2426                edge_retires: vec![],
2427                chunks: vec![],
2428                runs: vec![],
2429                steps: vec![],
2430                actions: vec![],
2431                optional_backfills: vec![],
2432                vec_inserts: vec![],
2433                operational_writes: vec![OperationalWrite::Put {
2434                    collection: "connector_health".to_owned(),
2435                    record_key: "gmail".to_owned(),
2436                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2437                    source_ref: Some("src-1".to_owned()),
2438                }],
2439            })
2440            .expect("write receipt");
2441
2442        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2443        let node_count: i64 = conn
2444            .query_row(
2445                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2446                [],
2447                |row| row.get(0),
2448            )
2449            .expect("node count");
2450        assert_eq!(node_count, 1);
2451        let mutation_count: i64 = conn
2452            .query_row(
2453                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2454                 AND record_key = 'gmail'",
2455                [],
2456                |row| row.get(0),
2457            )
2458            .expect("mutation count");
2459        assert_eq!(mutation_count, 1);
2460        let payload: String = conn
2461            .query_row(
2462                "SELECT payload_json FROM operational_current \
2463                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2464                [],
2465                |row| row.get(0),
2466            )
2467            .expect("current payload");
2468        assert_eq!(payload, r#"{"status":"ok"}"#);
2469    }
2470
2471    #[test]
2472    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2473        let db = NamedTempFile::new().expect("temporary db");
2474        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2475        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2476        conn.execute(
2477            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2478             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2479            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2480        )
2481        .expect("seed collection");
2482        drop(conn);
2483        let writer = WriterActor::start(
2484            db.path(),
2485            Arc::new(SchemaManager::new()),
2486            ProvenanceMode::Warn,
2487            Arc::new(TelemetryCounters::default()),
2488        )
2489        .expect("writer");
2490
2491        writer
2492            .submit(WriteRequest {
2493                label: "disabled-validation".to_owned(),
2494                nodes: vec![],
2495                node_retires: vec![],
2496                edges: vec![],
2497                edge_retires: vec![],
2498                chunks: vec![],
2499                runs: vec![],
2500                steps: vec![],
2501                actions: vec![],
2502                optional_backfills: vec![],
2503                vec_inserts: vec![],
2504                operational_writes: vec![OperationalWrite::Put {
2505                    collection: "connector_health".to_owned(),
2506                    record_key: "gmail".to_owned(),
2507                    payload_json: r#"{"bogus":true}"#.to_owned(),
2508                    source_ref: Some("src-1".to_owned()),
2509                }],
2510            })
2511            .expect("write receipt");
2512
2513        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2514        let payload: String = conn
2515            .query_row(
2516                "SELECT payload_json FROM operational_current \
2517                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2518                [],
2519                |row| row.get(0),
2520            )
2521            .expect("current payload");
2522        assert_eq!(payload, r#"{"bogus":true}"#);
2523    }
2524
2525    #[test]
2526    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2527        let db = NamedTempFile::new().expect("temporary db");
2528        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2529        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2530        conn.execute(
2531            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2532             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2533            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2534        )
2535        .expect("seed collection");
2536        drop(conn);
2537        let writer = WriterActor::start(
2538            db.path(),
2539            Arc::new(SchemaManager::new()),
2540            ProvenanceMode::Warn,
2541            Arc::new(TelemetryCounters::default()),
2542        )
2543        .expect("writer");
2544
2545        let receipt = writer
2546            .submit(WriteRequest {
2547                label: "report-only-validation".to_owned(),
2548                nodes: vec![],
2549                node_retires: vec![],
2550                edges: vec![],
2551                edge_retires: vec![],
2552                chunks: vec![],
2553                runs: vec![],
2554                steps: vec![],
2555                actions: vec![],
2556                optional_backfills: vec![],
2557                vec_inserts: vec![],
2558                operational_writes: vec![OperationalWrite::Put {
2559                    collection: "connector_health".to_owned(),
2560                    record_key: "gmail".to_owned(),
2561                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2562                    source_ref: Some("src-1".to_owned()),
2563                }],
2564            })
2565            .expect("report_only write should succeed");
2566
2567        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2568        assert_eq!(receipt.warnings.len(), 1);
2569        assert!(
2570            receipt.warnings[0].contains("connector_health"),
2571            "warning should identify collection"
2572        );
2573        assert!(
2574            receipt.warnings[0].contains("must be one of"),
2575            "warning should explain validation failure"
2576        );
2577
2578        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2579        let payload: String = conn
2580            .query_row(
2581                "SELECT payload_json FROM operational_current \
2582                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2583                [],
2584                |row| row.get(0),
2585            )
2586            .expect("current payload");
2587        assert_eq!(payload, r#"{"status":"bogus"}"#);
2588    }
2589
2590    #[test]
2591    fn writer_rejects_operational_write_for_missing_collection() {
2592        let db = NamedTempFile::new().expect("temporary db");
2593        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2594        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2595        drop(conn);
2596        let writer = WriterActor::start(
2597            db.path(),
2598            Arc::new(SchemaManager::new()),
2599            ProvenanceMode::Warn,
2600            Arc::new(TelemetryCounters::default()),
2601        )
2602        .expect("writer");
2603
2604        let result = writer.submit(WriteRequest {
2605            label: "missing-operational-collection".to_owned(),
2606            nodes: vec![],
2607            node_retires: vec![],
2608            edges: vec![],
2609            edge_retires: vec![],
2610            chunks: vec![],
2611            runs: vec![],
2612            steps: vec![],
2613            actions: vec![],
2614            optional_backfills: vec![],
2615            vec_inserts: vec![],
2616            operational_writes: vec![OperationalWrite::Put {
2617                collection: "connector_health".to_owned(),
2618                record_key: "gmail".to_owned(),
2619                payload_json: r#"{"status":"ok"}"#.to_owned(),
2620                source_ref: Some("src-1".to_owned()),
2621            }],
2622        });
2623
2624        assert!(
2625            matches!(result, Err(EngineError::InvalidWrite(_))),
2626            "missing operational collection must return InvalidWrite"
2627        );
2628    }
2629
2630    #[test]
2631    fn writer_append_operational_write_records_history_without_current_row() {
2632        let db = NamedTempFile::new().expect("temporary db");
2633        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2634        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2635        conn.execute(
2636            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2637             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2638            [],
2639        )
2640        .expect("seed collection");
2641        drop(conn);
2642        let writer = WriterActor::start(
2643            db.path(),
2644            Arc::new(SchemaManager::new()),
2645            ProvenanceMode::Warn,
2646            Arc::new(TelemetryCounters::default()),
2647        )
2648        .expect("writer");
2649
2650        writer
2651            .submit(WriteRequest {
2652                label: "append-operational".to_owned(),
2653                nodes: vec![],
2654                node_retires: vec![],
2655                edges: vec![],
2656                edge_retires: vec![],
2657                chunks: vec![],
2658                runs: vec![],
2659                steps: vec![],
2660                actions: vec![],
2661                optional_backfills: vec![],
2662                vec_inserts: vec![],
2663                operational_writes: vec![OperationalWrite::Append {
2664                    collection: "audit_log".to_owned(),
2665                    record_key: "evt-1".to_owned(),
2666                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2667                    source_ref: Some("src-1".to_owned()),
2668                }],
2669            })
2670            .expect("write receipt");
2671
2672        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2673        let mutation: (String, String) = conn
2674            .query_row(
2675                "SELECT op_kind, payload_json FROM operational_mutations \
2676                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2677                [],
2678                |row| Ok((row.get(0)?, row.get(1)?)),
2679            )
2680            .expect("mutation row");
2681        assert_eq!(mutation.0, "append");
2682        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2683        let current_count: i64 = conn
2684            .query_row(
2685                "SELECT count(*) FROM operational_current \
2686                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2687                [],
2688                |row| row.get(0),
2689            )
2690            .expect("current count");
2691        assert_eq!(current_count, 0);
2692    }
2693
2694    #[test]
2695    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2696        let db = NamedTempFile::new().expect("temporary db");
2697        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2698        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2699        conn.execute(
2700            "INSERT INTO operational_collections \
2701             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2702             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2703                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2704            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2705        )
2706        .expect("seed collection");
2707        drop(conn);
2708        let writer = WriterActor::start(
2709            db.path(),
2710            Arc::new(SchemaManager::new()),
2711            ProvenanceMode::Warn,
2712            Arc::new(TelemetryCounters::default()),
2713        )
2714        .expect("writer");
2715
2716        let error = writer
2717            .submit(WriteRequest {
2718                label: "invalid-append".to_owned(),
2719                nodes: vec![],
2720                node_retires: vec![],
2721                edges: vec![],
2722                edge_retires: vec![],
2723                chunks: vec![],
2724                runs: vec![],
2725                steps: vec![],
2726                actions: vec![],
2727                optional_backfills: vec![],
2728                vec_inserts: vec![],
2729                operational_writes: vec![OperationalWrite::Append {
2730                    collection: "audit_log".to_owned(),
2731                    record_key: "evt-1".to_owned(),
2732                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2733                    source_ref: Some("src-1".to_owned()),
2734                }],
2735            })
2736            .expect_err("invalid append must reject");
2737        assert!(matches!(error, EngineError::InvalidWrite(_)));
2738        assert!(error.to_string().contains("must be one of"));
2739
2740        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2741        let mutation_count: i64 = conn
2742            .query_row(
2743                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2744                [],
2745                |row| row.get(0),
2746            )
2747            .expect("mutation count");
2748        assert_eq!(mutation_count, 0);
2749        let filter_count: i64 = conn
2750            .query_row(
2751                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2752                [],
2753                |row| row.get(0),
2754            )
2755            .expect("filter count");
2756        assert_eq!(filter_count, 0);
2757    }
2758
2759    #[test]
2760    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2761        let db = NamedTempFile::new().expect("temporary db");
2762        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2763        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2764        conn.execute(
2765            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2766             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2767            [],
2768        )
2769        .expect("seed collection");
2770        drop(conn);
2771        let writer = WriterActor::start(
2772            db.path(),
2773            Arc::new(SchemaManager::new()),
2774            ProvenanceMode::Warn,
2775            Arc::new(TelemetryCounters::default()),
2776        )
2777        .expect("writer");
2778
2779        writer
2780            .submit(WriteRequest {
2781                label: "put-operational".to_owned(),
2782                nodes: vec![],
2783                node_retires: vec![],
2784                edges: vec![],
2785                edge_retires: vec![],
2786                chunks: vec![],
2787                runs: vec![],
2788                steps: vec![],
2789                actions: vec![],
2790                optional_backfills: vec![],
2791                vec_inserts: vec![],
2792                operational_writes: vec![OperationalWrite::Put {
2793                    collection: "connector_health".to_owned(),
2794                    record_key: "gmail".to_owned(),
2795                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2796                    source_ref: Some("src-1".to_owned()),
2797                }],
2798            })
2799            .expect("put receipt");
2800
2801        writer
2802            .submit(WriteRequest {
2803                label: "delete-operational".to_owned(),
2804                nodes: vec![],
2805                node_retires: vec![],
2806                edges: vec![],
2807                edge_retires: vec![],
2808                chunks: vec![],
2809                runs: vec![],
2810                steps: vec![],
2811                actions: vec![],
2812                optional_backfills: vec![],
2813                vec_inserts: vec![],
2814                operational_writes: vec![OperationalWrite::Delete {
2815                    collection: "connector_health".to_owned(),
2816                    record_key: "gmail".to_owned(),
2817                    source_ref: Some("src-2".to_owned()),
2818                }],
2819            })
2820            .expect("delete receipt");
2821
2822        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2823        let mutation_kinds: Vec<String> = {
2824            let mut stmt = conn
2825                .prepare(
2826                    "SELECT op_kind FROM operational_mutations \
2827                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
2828                     ORDER BY mutation_order ASC",
2829                )
2830                .expect("stmt");
2831            stmt.query_map([], |row| row.get(0))
2832                .expect("rows")
2833                .collect::<Result<_, _>>()
2834                .expect("collect")
2835        };
2836        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
2837        let current_count: i64 = conn
2838            .query_row(
2839                "SELECT count(*) FROM operational_current \
2840                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2841                [],
2842                |row| row.get(0),
2843            )
2844            .expect("current count");
2845        assert_eq!(current_count, 0);
2846    }
2847
2848    #[test]
2849    fn writer_delete_bypasses_validation_contract() {
2850        let db = NamedTempFile::new().expect("temporary db");
2851        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2852        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2853        conn.execute(
2854            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2855             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2856            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2857        )
2858        .expect("seed collection");
2859        drop(conn);
2860        let writer = WriterActor::start(
2861            db.path(),
2862            Arc::new(SchemaManager::new()),
2863            ProvenanceMode::Warn,
2864            Arc::new(TelemetryCounters::default()),
2865        )
2866        .expect("writer");
2867
2868        writer
2869            .submit(WriteRequest {
2870                label: "valid-put".to_owned(),
2871                nodes: vec![],
2872                node_retires: vec![],
2873                edges: vec![],
2874                edge_retires: vec![],
2875                chunks: vec![],
2876                runs: vec![],
2877                steps: vec![],
2878                actions: vec![],
2879                optional_backfills: vec![],
2880                vec_inserts: vec![],
2881                operational_writes: vec![OperationalWrite::Put {
2882                    collection: "connector_health".to_owned(),
2883                    record_key: "gmail".to_owned(),
2884                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2885                    source_ref: Some("src-1".to_owned()),
2886                }],
2887            })
2888            .expect("put receipt");
2889        writer
2890            .submit(WriteRequest {
2891                label: "delete-after-put".to_owned(),
2892                nodes: vec![],
2893                node_retires: vec![],
2894                edges: vec![],
2895                edge_retires: vec![],
2896                chunks: vec![],
2897                runs: vec![],
2898                steps: vec![],
2899                actions: vec![],
2900                optional_backfills: vec![],
2901                vec_inserts: vec![],
2902                operational_writes: vec![OperationalWrite::Delete {
2903                    collection: "connector_health".to_owned(),
2904                    record_key: "gmail".to_owned(),
2905                    source_ref: Some("src-2".to_owned()),
2906                }],
2907            })
2908            .expect("delete receipt");
2909
2910        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2911        let current_count: i64 = conn
2912            .query_row(
2913                "SELECT count(*) FROM operational_current \
2914                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2915                [],
2916                |row| row.get(0),
2917            )
2918            .expect("current count");
2919        assert_eq!(current_count, 0);
2920    }
2921
2922    #[test]
2923    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
2924        let db = NamedTempFile::new().expect("temporary db");
2925        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2926        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2927        conn.execute(
2928            "INSERT INTO operational_collections \
2929             (name, kind, schema_json, retention_json, secondary_indexes_json) \
2930             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2931            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
2932        )
2933        .expect("seed collection");
2934        drop(conn);
2935        let writer = WriterActor::start(
2936            db.path(),
2937            Arc::new(SchemaManager::new()),
2938            ProvenanceMode::Warn,
2939            Arc::new(TelemetryCounters::default()),
2940        )
2941        .expect("writer");
2942
2943        writer
2944            .submit(WriteRequest {
2945                label: "secondary-index-put".to_owned(),
2946                nodes: vec![],
2947                node_retires: vec![],
2948                edges: vec![],
2949                edge_retires: vec![],
2950                chunks: vec![],
2951                runs: vec![],
2952                steps: vec![],
2953                actions: vec![],
2954                optional_backfills: vec![],
2955                vec_inserts: vec![],
2956                operational_writes: vec![OperationalWrite::Put {
2957                    collection: "connector_health".to_owned(),
2958                    record_key: "gmail".to_owned(),
2959                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
2960                        .to_owned(),
2961                    source_ref: Some("src-1".to_owned()),
2962                }],
2963            })
2964            .expect("put receipt");
2965
2966        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2967        let current_entry_count: i64 = conn
2968            .query_row(
2969                "SELECT count(*) FROM operational_secondary_index_entries \
2970                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
2971                [],
2972                |row| row.get(0),
2973            )
2974            .expect("current secondary index count");
2975        assert_eq!(current_entry_count, 2);
2976        drop(conn);
2977
2978        writer
2979            .submit(WriteRequest {
2980                label: "secondary-index-delete".to_owned(),
2981                nodes: vec![],
2982                node_retires: vec![],
2983                edges: vec![],
2984                edge_retires: vec![],
2985                chunks: vec![],
2986                runs: vec![],
2987                steps: vec![],
2988                actions: vec![],
2989                optional_backfills: vec![],
2990                vec_inserts: vec![],
2991                operational_writes: vec![OperationalWrite::Delete {
2992                    collection: "connector_health".to_owned(),
2993                    record_key: "gmail".to_owned(),
2994                    source_ref: Some("src-2".to_owned()),
2995                }],
2996            })
2997            .expect("delete receipt");
2998
2999        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3000        let current_entry_count: i64 = conn
3001            .query_row(
3002                "SELECT count(*) FROM operational_secondary_index_entries \
3003                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3004                [],
3005                |row| row.get(0),
3006            )
3007            .expect("current secondary index count");
3008        assert_eq!(current_entry_count, 0);
3009    }
3010
3011    #[test]
3012    fn writer_latest_state_operational_writes_persist_mutation_order() {
3013        let db = NamedTempFile::new().expect("temporary db");
3014        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3015        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3016        conn.execute(
3017            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3018             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3019            [],
3020        )
3021        .expect("seed collection");
3022        drop(conn);
3023
3024        let writer = WriterActor::start(
3025            db.path(),
3026            Arc::new(SchemaManager::new()),
3027            ProvenanceMode::Warn,
3028            Arc::new(TelemetryCounters::default()),
3029        )
3030        .expect("writer");
3031
3032        writer
3033            .submit(WriteRequest {
3034                label: "ordered-operational-batch".to_owned(),
3035                nodes: vec![],
3036                node_retires: vec![],
3037                edges: vec![],
3038                edge_retires: vec![],
3039                chunks: vec![],
3040                runs: vec![],
3041                steps: vec![],
3042                actions: vec![],
3043                optional_backfills: vec![],
3044                vec_inserts: vec![],
3045                operational_writes: vec![
3046                    OperationalWrite::Put {
3047                        collection: "connector_health".to_owned(),
3048                        record_key: "gmail".to_owned(),
3049                        payload_json: r#"{"status":"old"}"#.to_owned(),
3050                        source_ref: Some("src-1".to_owned()),
3051                    },
3052                    OperationalWrite::Delete {
3053                        collection: "connector_health".to_owned(),
3054                        record_key: "gmail".to_owned(),
3055                        source_ref: Some("src-2".to_owned()),
3056                    },
3057                    OperationalWrite::Put {
3058                        collection: "connector_health".to_owned(),
3059                        record_key: "gmail".to_owned(),
3060                        payload_json: r#"{"status":"new"}"#.to_owned(),
3061                        source_ref: Some("src-3".to_owned()),
3062                    },
3063                ],
3064            })
3065            .expect("write receipt");
3066
3067        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3068        let rows: Vec<(String, i64)> = {
3069            let mut stmt = conn
3070                .prepare(
3071                    "SELECT op_kind, mutation_order FROM operational_mutations \
3072                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3073                     ORDER BY mutation_order ASC",
3074                )
3075                .expect("stmt");
3076            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3077                .expect("rows")
3078                .collect::<Result<_, _>>()
3079                .expect("collect")
3080        };
3081        assert_eq!(
3082            rows,
3083            vec![
3084                ("put".to_owned(), 1),
3085                ("delete".to_owned(), 2),
3086                ("put".to_owned(), 3),
3087            ]
3088        );
3089        let payload: String = conn
3090            .query_row(
3091                "SELECT payload_json FROM operational_current \
3092                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3093                [],
3094                |row| row.get(0),
3095            )
3096            .expect("current payload");
3097        assert_eq!(payload, r#"{"status":"new"}"#);
3098    }
3099
3100    #[test]
3101    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3102        let db = NamedTempFile::new().expect("temporary db");
3103        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3104        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3105        conn.execute(
3106            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3107             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3108            [],
3109        )
3110        .expect("seed collection");
3111
3112        let request = WriteRequest {
3113            label: "disabled-race".to_owned(),
3114            nodes: vec![],
3115            node_retires: vec![],
3116            edges: vec![],
3117            edge_retires: vec![],
3118            chunks: vec![],
3119            runs: vec![],
3120            steps: vec![],
3121            actions: vec![],
3122            optional_backfills: vec![],
3123            vec_inserts: vec![],
3124            operational_writes: vec![OperationalWrite::Put {
3125                collection: "connector_health".to_owned(),
3126                record_key: "gmail".to_owned(),
3127                payload_json: r#"{"status":"ok"}"#.to_owned(),
3128                source_ref: Some("src-1".to_owned()),
3129            }],
3130        };
3131        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3132        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3133
3134        conn.execute(
3135            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3136            [],
3137        )
3138        .expect("disable collection after preflight");
3139
3140        let error =
3141            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3142        assert!(matches!(error, EngineError::InvalidWrite(_)));
3143        assert!(error.to_string().contains("is disabled"));
3144
3145        let mutation_count: i64 = conn
3146            .query_row(
3147                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3148                [],
3149                |row| row.get(0),
3150            )
3151            .expect("mutation count");
3152        assert_eq!(mutation_count, 0);
3153
3154        let current_count: i64 = conn
3155            .query_row(
3156                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3157                [],
3158                |row| row.get(0),
3159            )
3160            .expect("current count");
3161        assert_eq!(current_count, 0);
3162    }
3163
3164    #[test]
3165    fn writer_enforce_validation_rejects_invalid_put_atomically() {
3166        let db = NamedTempFile::new().expect("temporary db");
3167        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3168        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3169        conn.execute(
3170            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3171             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3172            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3173        )
3174        .expect("seed collection");
3175        drop(conn);
3176        let writer = WriterActor::start(
3177            db.path(),
3178            Arc::new(SchemaManager::new()),
3179            ProvenanceMode::Warn,
3180            Arc::new(TelemetryCounters::default()),
3181        )
3182        .expect("writer");
3183
3184        let error = writer
3185            .submit(WriteRequest {
3186                label: "invalid-put".to_owned(),
3187                nodes: vec![NodeInsert {
3188                    row_id: "row-1".to_owned(),
3189                    logical_id: "lg-1".to_owned(),
3190                    kind: "Meeting".to_owned(),
3191                    properties: "{}".to_owned(),
3192                    source_ref: Some("src-1".to_owned()),
3193                    upsert: false,
3194                    chunk_policy: ChunkPolicy::Preserve,
3195                    content_ref: None,
3196                }],
3197                node_retires: vec![],
3198                edges: vec![],
3199                edge_retires: vec![],
3200                chunks: vec![],
3201                runs: vec![],
3202                steps: vec![],
3203                actions: vec![],
3204                optional_backfills: vec![],
3205                vec_inserts: vec![],
3206                operational_writes: vec![OperationalWrite::Put {
3207                    collection: "connector_health".to_owned(),
3208                    record_key: "gmail".to_owned(),
3209                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
3210                    source_ref: Some("src-1".to_owned()),
3211                }],
3212            })
3213            .expect_err("invalid put must reject");
3214        assert!(matches!(error, EngineError::InvalidWrite(_)));
3215        assert!(error.to_string().contains("must be one of"));
3216
3217        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3218        let node_count: i64 = conn
3219            .query_row(
3220                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3221                [],
3222                |row| row.get(0),
3223            )
3224            .expect("node count");
3225        assert_eq!(node_count, 0);
3226        let mutation_count: i64 = conn
3227            .query_row(
3228                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3229                [],
3230                |row| row.get(0),
3231            )
3232            .expect("mutation count");
3233        assert_eq!(mutation_count, 0);
3234        let current_count: i64 = conn
3235            .query_row(
3236                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3237                [],
3238                |row| row.get(0),
3239            )
3240            .expect("current count");
3241        assert_eq!(current_count, 0);
3242    }
3243
3244    #[test]
3245    fn writer_rejects_append_against_latest_state_collection() {
3246        let db = NamedTempFile::new().expect("temporary db");
3247        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3248        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3249        conn.execute(
3250            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3251             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3252            [],
3253        )
3254        .expect("seed collection");
3255        drop(conn);
3256        let writer = WriterActor::start(
3257            db.path(),
3258            Arc::new(SchemaManager::new()),
3259            ProvenanceMode::Warn,
3260            Arc::new(TelemetryCounters::default()),
3261        )
3262        .expect("writer");
3263
3264        let result = writer.submit(WriteRequest {
3265            label: "bad-append".to_owned(),
3266            nodes: vec![],
3267            node_retires: vec![],
3268            edges: vec![],
3269            edge_retires: vec![],
3270            chunks: vec![],
3271            runs: vec![],
3272            steps: vec![],
3273            actions: vec![],
3274            optional_backfills: vec![],
3275            vec_inserts: vec![],
3276            operational_writes: vec![OperationalWrite::Append {
3277                collection: "connector_health".to_owned(),
3278                record_key: "gmail".to_owned(),
3279                payload_json: r#"{"status":"ok"}"#.to_owned(),
3280                source_ref: Some("src-1".to_owned()),
3281            }],
3282        });
3283
3284        assert!(
3285            matches!(result, Err(EngineError::InvalidWrite(_))),
3286            "latest_state collection must reject Append"
3287        );
3288    }
3289
3290    #[test]
3291    fn writer_upsert_supersedes_prior_active_node() {
3292        let db = NamedTempFile::new().expect("temporary db");
3293        let writer = WriterActor::start(
3294            db.path(),
3295            Arc::new(SchemaManager::new()),
3296            ProvenanceMode::Warn,
3297            Arc::new(TelemetryCounters::default()),
3298        )
3299        .expect("writer");
3300
3301        writer
3302            .submit(WriteRequest {
3303                label: "v1".to_owned(),
3304                nodes: vec![NodeInsert {
3305                    row_id: "row-1".to_owned(),
3306                    logical_id: "lg-1".to_owned(),
3307                    kind: "Meeting".to_owned(),
3308                    properties: r#"{"version":1}"#.to_owned(),
3309                    source_ref: Some("src-1".to_owned()),
3310                    upsert: false,
3311                    chunk_policy: ChunkPolicy::Preserve,
3312                    content_ref: None,
3313                }],
3314                node_retires: vec![],
3315                edges: vec![],
3316                edge_retires: vec![],
3317                chunks: vec![],
3318                runs: vec![],
3319                steps: vec![],
3320                actions: vec![],
3321                optional_backfills: vec![],
3322                vec_inserts: vec![],
3323                operational_writes: vec![],
3324            })
3325            .expect("v1 write");
3326
3327        writer
3328            .submit(WriteRequest {
3329                label: "v2".to_owned(),
3330                nodes: vec![NodeInsert {
3331                    row_id: "row-2".to_owned(),
3332                    logical_id: "lg-1".to_owned(),
3333                    kind: "Meeting".to_owned(),
3334                    properties: r#"{"version":2}"#.to_owned(),
3335                    source_ref: Some("src-2".to_owned()),
3336                    upsert: true,
3337                    chunk_policy: ChunkPolicy::Preserve,
3338                    content_ref: None,
3339                }],
3340                node_retires: vec![],
3341                edges: vec![],
3342                edge_retires: vec![],
3343                chunks: vec![],
3344                runs: vec![],
3345                steps: vec![],
3346                actions: vec![],
3347                optional_backfills: vec![],
3348                vec_inserts: vec![],
3349                operational_writes: vec![],
3350            })
3351            .expect("v2 upsert write");
3352
3353        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3354        let (active_row_id, props): (String, String) = conn
3355            .query_row(
3356                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3357                [],
3358                |row| Ok((row.get(0)?, row.get(1)?)),
3359            )
3360            .expect("active row");
3361        assert_eq!(active_row_id, "row-2");
3362        assert!(props.contains("\"version\":2"));
3363
3364        let superseded: i64 = conn
3365            .query_row(
3366                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3367                [],
3368                |row| row.get(0),
3369            )
3370            .expect("superseded count");
3371        assert_eq!(superseded, 1);
3372    }
3373
3374    #[test]
3375    fn writer_inserts_edge_between_two_nodes() {
3376        let db = NamedTempFile::new().expect("temporary db");
3377        let writer = WriterActor::start(
3378            db.path(),
3379            Arc::new(SchemaManager::new()),
3380            ProvenanceMode::Warn,
3381            Arc::new(TelemetryCounters::default()),
3382        )
3383        .expect("writer");
3384
3385        writer
3386            .submit(WriteRequest {
3387                label: "nodes-and-edge".to_owned(),
3388                nodes: vec![
3389                    NodeInsert {
3390                        row_id: "row-meeting".to_owned(),
3391                        logical_id: "meeting-1".to_owned(),
3392                        kind: "Meeting".to_owned(),
3393                        properties: "{}".to_owned(),
3394                        source_ref: Some("src-1".to_owned()),
3395                        upsert: false,
3396                        chunk_policy: ChunkPolicy::Preserve,
3397                        content_ref: None,
3398                    },
3399                    NodeInsert {
3400                        row_id: "row-task".to_owned(),
3401                        logical_id: "task-1".to_owned(),
3402                        kind: "Task".to_owned(),
3403                        properties: "{}".to_owned(),
3404                        source_ref: Some("src-1".to_owned()),
3405                        upsert: false,
3406                        chunk_policy: ChunkPolicy::Preserve,
3407                        content_ref: None,
3408                    },
3409                ],
3410                node_retires: vec![],
3411                edges: vec![EdgeInsert {
3412                    row_id: "edge-1".to_owned(),
3413                    logical_id: "edge-lg-1".to_owned(),
3414                    source_logical_id: "meeting-1".to_owned(),
3415                    target_logical_id: "task-1".to_owned(),
3416                    kind: "HAS_TASK".to_owned(),
3417                    properties: "{}".to_owned(),
3418                    source_ref: Some("src-1".to_owned()),
3419                    upsert: false,
3420                }],
3421                edge_retires: vec![],
3422                chunks: vec![],
3423                runs: vec![],
3424                steps: vec![],
3425                actions: vec![],
3426                optional_backfills: vec![],
3427                vec_inserts: vec![],
3428                operational_writes: vec![],
3429            })
3430            .expect("write receipt");
3431
3432        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3433        let (src, tgt, kind): (String, String, String) = conn
3434            .query_row(
3435                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3436                [],
3437                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3438            )
3439            .expect("edge row");
3440        assert_eq!(src, "meeting-1");
3441        assert_eq!(tgt, "task-1");
3442        assert_eq!(kind, "HAS_TASK");
3443    }
3444
3445    #[test]
3446    #[allow(clippy::too_many_lines)]
3447    fn writer_upsert_supersedes_prior_active_edge() {
3448        let db = NamedTempFile::new().expect("temporary db");
3449        let writer = WriterActor::start(
3450            db.path(),
3451            Arc::new(SchemaManager::new()),
3452            ProvenanceMode::Warn,
3453            Arc::new(TelemetryCounters::default()),
3454        )
3455        .expect("writer");
3456
3457        // Write two nodes
3458        writer
3459            .submit(WriteRequest {
3460                label: "nodes".to_owned(),
3461                nodes: vec![
3462                    NodeInsert {
3463                        row_id: "row-a".to_owned(),
3464                        logical_id: "node-a".to_owned(),
3465                        kind: "Meeting".to_owned(),
3466                        properties: "{}".to_owned(),
3467                        source_ref: Some("src-1".to_owned()),
3468                        upsert: false,
3469                        chunk_policy: ChunkPolicy::Preserve,
3470                        content_ref: None,
3471                    },
3472                    NodeInsert {
3473                        row_id: "row-b".to_owned(),
3474                        logical_id: "node-b".to_owned(),
3475                        kind: "Task".to_owned(),
3476                        properties: "{}".to_owned(),
3477                        source_ref: Some("src-1".to_owned()),
3478                        upsert: false,
3479                        chunk_policy: ChunkPolicy::Preserve,
3480                        content_ref: None,
3481                    },
3482                ],
3483                node_retires: vec![],
3484                edges: vec![],
3485                edge_retires: vec![],
3486                chunks: vec![],
3487                runs: vec![],
3488                steps: vec![],
3489                actions: vec![],
3490                optional_backfills: vec![],
3491                vec_inserts: vec![],
3492                operational_writes: vec![],
3493            })
3494            .expect("nodes write");
3495
3496        // Write v1 edge
3497        writer
3498            .submit(WriteRequest {
3499                label: "edge-v1".to_owned(),
3500                nodes: vec![],
3501                node_retires: vec![],
3502                edges: vec![EdgeInsert {
3503                    row_id: "edge-row-1".to_owned(),
3504                    logical_id: "edge-lg-1".to_owned(),
3505                    source_logical_id: "node-a".to_owned(),
3506                    target_logical_id: "node-b".to_owned(),
3507                    kind: "HAS_TASK".to_owned(),
3508                    properties: r#"{"weight":1}"#.to_owned(),
3509                    source_ref: Some("src-1".to_owned()),
3510                    upsert: false,
3511                }],
3512                edge_retires: vec![],
3513                chunks: vec![],
3514                runs: vec![],
3515                steps: vec![],
3516                actions: vec![],
3517                optional_backfills: vec![],
3518                vec_inserts: vec![],
3519                operational_writes: vec![],
3520            })
3521            .expect("edge v1 write");
3522
3523        // Upsert v2 edge
3524        writer
3525            .submit(WriteRequest {
3526                label: "edge-v2".to_owned(),
3527                nodes: vec![],
3528                node_retires: vec![],
3529                edges: vec![EdgeInsert {
3530                    row_id: "edge-row-2".to_owned(),
3531                    logical_id: "edge-lg-1".to_owned(),
3532                    source_logical_id: "node-a".to_owned(),
3533                    target_logical_id: "node-b".to_owned(),
3534                    kind: "HAS_TASK".to_owned(),
3535                    properties: r#"{"weight":2}"#.to_owned(),
3536                    source_ref: Some("src-2".to_owned()),
3537                    upsert: true,
3538                }],
3539                edge_retires: vec![],
3540                chunks: vec![],
3541                runs: vec![],
3542                steps: vec![],
3543                actions: vec![],
3544                optional_backfills: vec![],
3545                vec_inserts: vec![],
3546                operational_writes: vec![],
3547            })
3548            .expect("edge v2 upsert");
3549
3550        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3551        let (active_row_id, props): (String, String) = conn
3552            .query_row(
3553                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3554                [],
3555                |row| Ok((row.get(0)?, row.get(1)?)),
3556            )
3557            .expect("active edge");
3558        assert_eq!(active_row_id, "edge-row-2");
3559        assert!(props.contains("\"weight\":2"));
3560
3561        let superseded: i64 = conn
3562            .query_row(
3563                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3564                [],
3565                |row| row.get(0),
3566            )
3567            .expect("superseded count");
3568        assert_eq!(superseded, 1);
3569    }
3570
3571    #[test]
3572    fn writer_fts_rows_are_written_to_database() {
3573        let db = NamedTempFile::new().expect("temporary db");
3574        let writer = WriterActor::start(
3575            db.path(),
3576            Arc::new(SchemaManager::new()),
3577            ProvenanceMode::Warn,
3578            Arc::new(TelemetryCounters::default()),
3579        )
3580        .expect("writer");
3581
3582        writer
3583            .submit(WriteRequest {
3584                label: "seed".to_owned(),
3585                nodes: vec![NodeInsert {
3586                    row_id: "row-1".to_owned(),
3587                    logical_id: "logical-1".to_owned(),
3588                    kind: "Meeting".to_owned(),
3589                    properties: "{}".to_owned(),
3590                    source_ref: Some("src-1".to_owned()),
3591                    upsert: false,
3592                    chunk_policy: ChunkPolicy::Preserve,
3593                    content_ref: None,
3594                }],
3595                node_retires: vec![],
3596                edges: vec![],
3597                edge_retires: vec![],
3598                chunks: vec![ChunkInsert {
3599                    id: "chunk-1".to_owned(),
3600                    node_logical_id: "logical-1".to_owned(),
3601                    text_content: "budget discussion".to_owned(),
3602                    byte_start: None,
3603                    byte_end: None,
3604                    content_hash: None,
3605                }],
3606                runs: vec![],
3607                steps: vec![],
3608                actions: vec![],
3609                optional_backfills: vec![],
3610                vec_inserts: vec![],
3611                operational_writes: vec![],
3612            })
3613            .expect("write receipt");
3614
3615        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3616        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3617            conn.query_row(
3618                "SELECT chunk_id, node_logical_id, kind, text_content \
3619                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3620                [],
3621                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3622            )
3623            .expect("fts row");
3624        assert_eq!(chunk_id, "chunk-1");
3625        assert_eq!(node_logical_id, "logical-1");
3626        assert_eq!(kind, "Meeting");
3627        assert_eq!(text_content, "budget discussion");
3628    }
3629
3630    #[test]
3631    fn writer_receipt_warns_on_nodes_without_source_ref() {
3632        let db = NamedTempFile::new().expect("temporary db");
3633        let writer = WriterActor::start(
3634            db.path(),
3635            Arc::new(SchemaManager::new()),
3636            ProvenanceMode::Warn,
3637            Arc::new(TelemetryCounters::default()),
3638        )
3639        .expect("writer");
3640
3641        let receipt = writer
3642            .submit(WriteRequest {
3643                label: "no-source".to_owned(),
3644                nodes: vec![NodeInsert {
3645                    row_id: "row-1".to_owned(),
3646                    logical_id: "logical-1".to_owned(),
3647                    kind: "Meeting".to_owned(),
3648                    properties: "{}".to_owned(),
3649                    source_ref: None,
3650                    upsert: false,
3651                    chunk_policy: ChunkPolicy::Preserve,
3652                    content_ref: None,
3653                }],
3654                node_retires: vec![],
3655                edges: vec![],
3656                edge_retires: vec![],
3657                chunks: vec![],
3658                runs: vec![],
3659                steps: vec![],
3660                actions: vec![],
3661                optional_backfills: vec![],
3662                vec_inserts: vec![],
3663                operational_writes: vec![],
3664            })
3665            .expect("write receipt");
3666
3667        assert_eq!(receipt.provenance_warnings.len(), 1);
3668        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3669    }
3670
3671    #[test]
3672    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3673        let db = NamedTempFile::new().expect("temporary db");
3674        let writer = WriterActor::start(
3675            db.path(),
3676            Arc::new(SchemaManager::new()),
3677            ProvenanceMode::Warn,
3678            Arc::new(TelemetryCounters::default()),
3679        )
3680        .expect("writer");
3681
3682        let receipt = writer
3683            .submit(WriteRequest {
3684                label: "with-source".to_owned(),
3685                nodes: vec![NodeInsert {
3686                    row_id: "row-1".to_owned(),
3687                    logical_id: "logical-1".to_owned(),
3688                    kind: "Meeting".to_owned(),
3689                    properties: "{}".to_owned(),
3690                    source_ref: Some("src-1".to_owned()),
3691                    upsert: false,
3692                    chunk_policy: ChunkPolicy::Preserve,
3693                    content_ref: None,
3694                }],
3695                node_retires: vec![],
3696                edges: vec![],
3697                edge_retires: vec![],
3698                chunks: vec![],
3699                runs: vec![],
3700                steps: vec![],
3701                actions: vec![],
3702                optional_backfills: vec![],
3703                vec_inserts: vec![],
3704                operational_writes: vec![],
3705            })
3706            .expect("write receipt");
3707
3708        assert!(receipt.provenance_warnings.is_empty());
3709    }
3710
3711    #[test]
3712    fn writer_accepts_chunk_for_pre_existing_node() {
3713        let db = NamedTempFile::new().expect("temporary db");
3714        let writer = WriterActor::start(
3715            db.path(),
3716            Arc::new(SchemaManager::new()),
3717            ProvenanceMode::Warn,
3718            Arc::new(TelemetryCounters::default()),
3719        )
3720        .expect("writer");
3721
3722        // Request 1: submit node only
3723        writer
3724            .submit(WriteRequest {
3725                label: "r1".to_owned(),
3726                nodes: vec![NodeInsert {
3727                    row_id: "row-1".to_owned(),
3728                    logical_id: "logical-1".to_owned(),
3729                    kind: "Meeting".to_owned(),
3730                    properties: "{}".to_owned(),
3731                    source_ref: Some("src-1".to_owned()),
3732                    upsert: false,
3733                    chunk_policy: ChunkPolicy::Preserve,
3734                    content_ref: None,
3735                }],
3736                node_retires: vec![],
3737                edges: vec![],
3738                edge_retires: vec![],
3739                chunks: vec![],
3740                runs: vec![],
3741                steps: vec![],
3742                actions: vec![],
3743                optional_backfills: vec![],
3744                vec_inserts: vec![],
3745                operational_writes: vec![],
3746            })
3747            .expect("r1 write");
3748
3749        // Request 2: submit chunk for pre-existing node
3750        writer
3751            .submit(WriteRequest {
3752                label: "r2".to_owned(),
3753                nodes: vec![],
3754                node_retires: vec![],
3755                edges: vec![],
3756                edge_retires: vec![],
3757                chunks: vec![ChunkInsert {
3758                    id: "chunk-1".to_owned(),
3759                    node_logical_id: "logical-1".to_owned(),
3760                    text_content: "budget discussion".to_owned(),
3761                    byte_start: None,
3762                    byte_end: None,
3763                    content_hash: None,
3764                }],
3765                runs: vec![],
3766                steps: vec![],
3767                actions: vec![],
3768                optional_backfills: vec![],
3769                vec_inserts: vec![],
3770                operational_writes: vec![],
3771            })
3772            .expect("r2 write — chunk for pre-existing node");
3773
3774        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3775        let count: i64 = conn
3776            .query_row(
3777                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3778                [],
3779                |row| row.get(0),
3780            )
3781            .expect("fts count");
3782        assert_eq!(
3783            count, 1,
3784            "FTS row must exist for chunk attached to pre-existing node"
3785        );
3786    }
3787
3788    #[test]
3789    fn writer_rejects_chunk_for_completely_unknown_node() {
3790        let db = NamedTempFile::new().expect("temporary db");
3791        let writer = WriterActor::start(
3792            db.path(),
3793            Arc::new(SchemaManager::new()),
3794            ProvenanceMode::Warn,
3795            Arc::new(TelemetryCounters::default()),
3796        )
3797        .expect("writer");
3798
3799        let result = writer.submit(WriteRequest {
3800            label: "bad".to_owned(),
3801            nodes: vec![],
3802            node_retires: vec![],
3803            edges: vec![],
3804            edge_retires: vec![],
3805            chunks: vec![ChunkInsert {
3806                id: "chunk-1".to_owned(),
3807                node_logical_id: "nonexistent".to_owned(),
3808                text_content: "some text".to_owned(),
3809                byte_start: None,
3810                byte_end: None,
3811                content_hash: None,
3812            }],
3813            runs: vec![],
3814            steps: vec![],
3815            actions: vec![],
3816            optional_backfills: vec![],
3817            vec_inserts: vec![],
3818            operational_writes: vec![],
3819        });
3820
3821        assert!(
3822            matches!(result, Err(EngineError::InvalidWrite(_))),
3823            "completely unknown node must return InvalidWrite"
3824        );
3825    }
3826
3827    #[test]
3828    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
3829        let db = NamedTempFile::new().expect("temporary db");
3830        let writer = WriterActor::start(
3831            db.path(),
3832            Arc::new(SchemaManager::new()),
3833            ProvenanceMode::Warn,
3834            Arc::new(TelemetryCounters::default()),
3835        )
3836        .expect("writer");
3837
3838        let receipt = writer
3839            .submit(WriteRequest {
3840                label: "seed".to_owned(),
3841                nodes: vec![NodeInsert {
3842                    row_id: "row-1".to_owned(),
3843                    logical_id: "logical-1".to_owned(),
3844                    kind: "Meeting".to_owned(),
3845                    properties: "{}".to_owned(),
3846                    source_ref: None,
3847                    upsert: false,
3848                    chunk_policy: ChunkPolicy::Preserve,
3849                    content_ref: None,
3850                }],
3851                node_retires: vec![],
3852                edges: vec![],
3853                edge_retires: vec![],
3854                chunks: vec![ChunkInsert {
3855                    id: "chunk-1".to_owned(),
3856                    node_logical_id: "logical-1".to_owned(),
3857                    text_content: "budget discussion".to_owned(),
3858                    byte_start: None,
3859                    byte_end: None,
3860                    content_hash: None,
3861                }],
3862                runs: vec![],
3863                steps: vec![],
3864                actions: vec![],
3865                optional_backfills: vec![],
3866                vec_inserts: vec![],
3867                operational_writes: vec![],
3868            })
3869            .expect("write receipt");
3870
3871        assert_eq!(receipt.label, "seed");
3872    }
3873
3874    #[test]
3875    fn writer_node_retire_supersedes_active_node() {
3876        let db = NamedTempFile::new().expect("temporary db");
3877        let writer = WriterActor::start(
3878            db.path(),
3879            Arc::new(SchemaManager::new()),
3880            ProvenanceMode::Warn,
3881            Arc::new(TelemetryCounters::default()),
3882        )
3883        .expect("writer");
3884
3885        writer
3886            .submit(WriteRequest {
3887                label: "seed".to_owned(),
3888                nodes: vec![NodeInsert {
3889                    row_id: "row-1".to_owned(),
3890                    logical_id: "meeting-1".to_owned(),
3891                    kind: "Meeting".to_owned(),
3892                    properties: "{}".to_owned(),
3893                    source_ref: Some("src-1".to_owned()),
3894                    upsert: false,
3895                    chunk_policy: ChunkPolicy::Preserve,
3896                    content_ref: None,
3897                }],
3898                node_retires: vec![],
3899                edges: vec![],
3900                edge_retires: vec![],
3901                chunks: vec![],
3902                runs: vec![],
3903                steps: vec![],
3904                actions: vec![],
3905                optional_backfills: vec![],
3906                vec_inserts: vec![],
3907                operational_writes: vec![],
3908            })
3909            .expect("seed write");
3910
3911        writer
3912            .submit(WriteRequest {
3913                label: "retire".to_owned(),
3914                nodes: vec![],
3915                node_retires: vec![NodeRetire {
3916                    logical_id: "meeting-1".to_owned(),
3917                    source_ref: Some("src-2".to_owned()),
3918                }],
3919                edges: vec![],
3920                edge_retires: vec![],
3921                chunks: vec![],
3922                runs: vec![],
3923                steps: vec![],
3924                actions: vec![],
3925                optional_backfills: vec![],
3926                vec_inserts: vec![],
3927                operational_writes: vec![],
3928            })
3929            .expect("retire write");
3930
3931        let conn = rusqlite::Connection::open(db.path()).expect("open");
3932        let active: i64 = conn
3933            .query_row(
3934                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
3935                [],
3936                |r| r.get(0),
3937            )
3938            .expect("count active");
3939        let historical: i64 = conn
3940            .query_row(
3941                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
3942                [],
3943                |r| r.get(0),
3944            )
3945            .expect("count historical");
3946
3947        assert_eq!(active, 0, "active count must be 0 after retire");
3948        assert_eq!(historical, 1, "historical count must be 1 after retire");
3949    }
3950
3951    #[test]
3952    fn writer_node_retire_preserves_chunks_and_clears_fts() {
3953        let db = NamedTempFile::new().expect("temporary db");
3954        let writer = WriterActor::start(
3955            db.path(),
3956            Arc::new(SchemaManager::new()),
3957            ProvenanceMode::Warn,
3958            Arc::new(TelemetryCounters::default()),
3959        )
3960        .expect("writer");
3961
3962        writer
3963            .submit(WriteRequest {
3964                label: "seed".to_owned(),
3965                nodes: vec![NodeInsert {
3966                    row_id: "row-1".to_owned(),
3967                    logical_id: "meeting-1".to_owned(),
3968                    kind: "Meeting".to_owned(),
3969                    properties: "{}".to_owned(),
3970                    source_ref: Some("src-1".to_owned()),
3971                    upsert: false,
3972                    chunk_policy: ChunkPolicy::Preserve,
3973                    content_ref: None,
3974                }],
3975                node_retires: vec![],
3976                edges: vec![],
3977                edge_retires: vec![],
3978                chunks: vec![ChunkInsert {
3979                    id: "chunk-1".to_owned(),
3980                    node_logical_id: "meeting-1".to_owned(),
3981                    text_content: "budget discussion".to_owned(),
3982                    byte_start: None,
3983                    byte_end: None,
3984                    content_hash: None,
3985                }],
3986                runs: vec![],
3987                steps: vec![],
3988                actions: vec![],
3989                optional_backfills: vec![],
3990                vec_inserts: vec![],
3991                operational_writes: vec![],
3992            })
3993            .expect("seed write");
3994
3995        writer
3996            .submit(WriteRequest {
3997                label: "retire".to_owned(),
3998                nodes: vec![],
3999                node_retires: vec![NodeRetire {
4000                    logical_id: "meeting-1".to_owned(),
4001                    source_ref: Some("src-2".to_owned()),
4002                }],
4003                edges: vec![],
4004                edge_retires: vec![],
4005                chunks: vec![],
4006                runs: vec![],
4007                steps: vec![],
4008                actions: vec![],
4009                optional_backfills: vec![],
4010                vec_inserts: vec![],
4011                operational_writes: vec![],
4012            })
4013            .expect("retire write");
4014
4015        let conn = rusqlite::Connection::open(db.path()).expect("open");
4016        let chunk_count: i64 = conn
4017            .query_row(
4018                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4019                [],
4020                |r| r.get(0),
4021            )
4022            .expect("chunk count");
4023        let fts_count: i64 = conn
4024            .query_row(
4025                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4026                [],
4027                |r| r.get(0),
4028            )
4029            .expect("fts count");
4030
4031        assert_eq!(
4032            chunk_count, 1,
4033            "chunks must remain after node retire so restore can re-establish content"
4034        );
4035        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4036    }
4037
4038    #[test]
4039    fn writer_edge_retire_supersedes_active_edge() {
4040        let db = NamedTempFile::new().expect("temporary db");
4041        let writer = WriterActor::start(
4042            db.path(),
4043            Arc::new(SchemaManager::new()),
4044            ProvenanceMode::Warn,
4045            Arc::new(TelemetryCounters::default()),
4046        )
4047        .expect("writer");
4048
4049        writer
4050            .submit(WriteRequest {
4051                label: "seed".to_owned(),
4052                nodes: vec![
4053                    NodeInsert {
4054                        row_id: "row-a".to_owned(),
4055                        logical_id: "node-a".to_owned(),
4056                        kind: "Meeting".to_owned(),
4057                        properties: "{}".to_owned(),
4058                        source_ref: Some("src-1".to_owned()),
4059                        upsert: false,
4060                        chunk_policy: ChunkPolicy::Preserve,
4061                        content_ref: None,
4062                    },
4063                    NodeInsert {
4064                        row_id: "row-b".to_owned(),
4065                        logical_id: "node-b".to_owned(),
4066                        kind: "Task".to_owned(),
4067                        properties: "{}".to_owned(),
4068                        source_ref: Some("src-1".to_owned()),
4069                        upsert: false,
4070                        chunk_policy: ChunkPolicy::Preserve,
4071                        content_ref: None,
4072                    },
4073                ],
4074                node_retires: vec![],
4075                edges: vec![EdgeInsert {
4076                    row_id: "edge-1".to_owned(),
4077                    logical_id: "edge-lg-1".to_owned(),
4078                    source_logical_id: "node-a".to_owned(),
4079                    target_logical_id: "node-b".to_owned(),
4080                    kind: "HAS_TASK".to_owned(),
4081                    properties: "{}".to_owned(),
4082                    source_ref: Some("src-1".to_owned()),
4083                    upsert: false,
4084                }],
4085                edge_retires: vec![],
4086                chunks: vec![],
4087                runs: vec![],
4088                steps: vec![],
4089                actions: vec![],
4090                optional_backfills: vec![],
4091                vec_inserts: vec![],
4092                operational_writes: vec![],
4093            })
4094            .expect("seed write");
4095
4096        writer
4097            .submit(WriteRequest {
4098                label: "retire-edge".to_owned(),
4099                nodes: vec![],
4100                node_retires: vec![],
4101                edges: vec![],
4102                edge_retires: vec![EdgeRetire {
4103                    logical_id: "edge-lg-1".to_owned(),
4104                    source_ref: Some("src-2".to_owned()),
4105                }],
4106                chunks: vec![],
4107                runs: vec![],
4108                steps: vec![],
4109                actions: vec![],
4110                optional_backfills: vec![],
4111                vec_inserts: vec![],
4112                operational_writes: vec![],
4113            })
4114            .expect("retire edge write");
4115
4116        let conn = rusqlite::Connection::open(db.path()).expect("open");
4117        let active: i64 = conn
4118            .query_row(
4119                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4120                [],
4121                |r| r.get(0),
4122            )
4123            .expect("active edge count");
4124
4125        assert_eq!(active, 0, "active edge count must be 0 after retire");
4126    }
4127
4128    #[test]
4129    fn writer_retire_without_source_ref_emits_provenance_warning() {
4130        let db = NamedTempFile::new().expect("temporary db");
4131        let writer = WriterActor::start(
4132            db.path(),
4133            Arc::new(SchemaManager::new()),
4134            ProvenanceMode::Warn,
4135            Arc::new(TelemetryCounters::default()),
4136        )
4137        .expect("writer");
4138
4139        writer
4140            .submit(WriteRequest {
4141                label: "seed".to_owned(),
4142                nodes: vec![NodeInsert {
4143                    row_id: "row-1".to_owned(),
4144                    logical_id: "meeting-1".to_owned(),
4145                    kind: "Meeting".to_owned(),
4146                    properties: "{}".to_owned(),
4147                    source_ref: Some("src-1".to_owned()),
4148                    upsert: false,
4149                    chunk_policy: ChunkPolicy::Preserve,
4150                    content_ref: None,
4151                }],
4152                node_retires: vec![],
4153                edges: vec![],
4154                edge_retires: vec![],
4155                chunks: vec![],
4156                runs: vec![],
4157                steps: vec![],
4158                actions: vec![],
4159                optional_backfills: vec![],
4160                vec_inserts: vec![],
4161                operational_writes: vec![],
4162            })
4163            .expect("seed write");
4164
4165        let receipt = writer
4166            .submit(WriteRequest {
4167                label: "retire-no-src".to_owned(),
4168                nodes: vec![],
4169                node_retires: vec![NodeRetire {
4170                    logical_id: "meeting-1".to_owned(),
4171                    source_ref: None,
4172                }],
4173                edges: vec![],
4174                edge_retires: vec![],
4175                chunks: vec![],
4176                runs: vec![],
4177                steps: vec![],
4178                actions: vec![],
4179                optional_backfills: vec![],
4180                vec_inserts: vec![],
4181                operational_writes: vec![],
4182            })
4183            .expect("retire write");
4184
4185        assert!(
4186            !receipt.provenance_warnings.is_empty(),
4187            "retire without source_ref must emit a provenance warning"
4188        );
4189    }
4190
4191    #[test]
4192    #[allow(clippy::too_many_lines)]
4193    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4194        let db = NamedTempFile::new().expect("temporary db");
4195        let writer = WriterActor::start(
4196            db.path(),
4197            Arc::new(SchemaManager::new()),
4198            ProvenanceMode::Warn,
4199            Arc::new(TelemetryCounters::default()),
4200        )
4201        .expect("writer");
4202
4203        writer
4204            .submit(WriteRequest {
4205                label: "v1".to_owned(),
4206                nodes: vec![NodeInsert {
4207                    row_id: "row-1".to_owned(),
4208                    logical_id: "meeting-1".to_owned(),
4209                    kind: "Meeting".to_owned(),
4210                    properties: "{}".to_owned(),
4211                    source_ref: Some("src-1".to_owned()),
4212                    upsert: false,
4213                    chunk_policy: ChunkPolicy::Preserve,
4214                    content_ref: None,
4215                }],
4216                node_retires: vec![],
4217                edges: vec![],
4218                edge_retires: vec![],
4219                chunks: vec![ChunkInsert {
4220                    id: "chunk-old".to_owned(),
4221                    node_logical_id: "meeting-1".to_owned(),
4222                    text_content: "old text".to_owned(),
4223                    byte_start: None,
4224                    byte_end: None,
4225                    content_hash: None,
4226                }],
4227                runs: vec![],
4228                steps: vec![],
4229                actions: vec![],
4230                optional_backfills: vec![],
4231                vec_inserts: vec![],
4232                operational_writes: vec![],
4233            })
4234            .expect("v1 write");
4235
4236        writer
4237            .submit(WriteRequest {
4238                label: "v2".to_owned(),
4239                nodes: vec![NodeInsert {
4240                    row_id: "row-2".to_owned(),
4241                    logical_id: "meeting-1".to_owned(),
4242                    kind: "Meeting".to_owned(),
4243                    properties: "{}".to_owned(),
4244                    source_ref: Some("src-2".to_owned()),
4245                    upsert: true,
4246                    chunk_policy: ChunkPolicy::Replace,
4247                    content_ref: None,
4248                }],
4249                node_retires: vec![],
4250                edges: vec![],
4251                edge_retires: vec![],
4252                chunks: vec![ChunkInsert {
4253                    id: "chunk-new".to_owned(),
4254                    node_logical_id: "meeting-1".to_owned(),
4255                    text_content: "new text".to_owned(),
4256                    byte_start: None,
4257                    byte_end: None,
4258                    content_hash: None,
4259                }],
4260                runs: vec![],
4261                steps: vec![],
4262                actions: vec![],
4263                optional_backfills: vec![],
4264                vec_inserts: vec![],
4265                operational_writes: vec![],
4266            })
4267            .expect("v2 write");
4268
4269        let conn = rusqlite::Connection::open(db.path()).expect("open");
4270        let old_chunk: i64 = conn
4271            .query_row(
4272                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4273                [],
4274                |r| r.get(0),
4275            )
4276            .expect("old chunk count");
4277        let new_chunk: i64 = conn
4278            .query_row(
4279                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4280                [],
4281                |r| r.get(0),
4282            )
4283            .expect("new chunk count");
4284        let fts_old: i64 = conn
4285            .query_row(
4286                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4287                [],
4288                |r| r.get(0),
4289            )
4290            .expect("old fts count");
4291
4292        assert_eq!(
4293            old_chunk, 0,
4294            "old chunk must be deleted by ChunkPolicy::Replace"
4295        );
4296        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4297        assert_eq!(
4298            fts_old, 0,
4299            "old FTS row must be deleted by ChunkPolicy::Replace"
4300        );
4301    }
4302
4303    #[test]
4304    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4305        let db = NamedTempFile::new().expect("temporary db");
4306        let writer = WriterActor::start(
4307            db.path(),
4308            Arc::new(SchemaManager::new()),
4309            ProvenanceMode::Warn,
4310            Arc::new(TelemetryCounters::default()),
4311        )
4312        .expect("writer");
4313
4314        writer
4315            .submit(WriteRequest {
4316                label: "v1".to_owned(),
4317                nodes: vec![NodeInsert {
4318                    row_id: "row-1".to_owned(),
4319                    logical_id: "meeting-1".to_owned(),
4320                    kind: "Meeting".to_owned(),
4321                    properties: "{}".to_owned(),
4322                    source_ref: Some("src-1".to_owned()),
4323                    upsert: false,
4324                    chunk_policy: ChunkPolicy::Preserve,
4325                    content_ref: None,
4326                }],
4327                node_retires: vec![],
4328                edges: vec![],
4329                edge_retires: vec![],
4330                chunks: vec![ChunkInsert {
4331                    id: "chunk-old".to_owned(),
4332                    node_logical_id: "meeting-1".to_owned(),
4333                    text_content: "old text".to_owned(),
4334                    byte_start: None,
4335                    byte_end: None,
4336                    content_hash: None,
4337                }],
4338                runs: vec![],
4339                steps: vec![],
4340                actions: vec![],
4341                optional_backfills: vec![],
4342                vec_inserts: vec![],
4343                operational_writes: vec![],
4344            })
4345            .expect("v1 write");
4346
4347        writer
4348            .submit(WriteRequest {
4349                label: "v2-props-only".to_owned(),
4350                nodes: vec![NodeInsert {
4351                    row_id: "row-2".to_owned(),
4352                    logical_id: "meeting-1".to_owned(),
4353                    kind: "Meeting".to_owned(),
4354                    properties: r#"{"status":"updated"}"#.to_owned(),
4355                    source_ref: Some("src-2".to_owned()),
4356                    upsert: true,
4357                    chunk_policy: ChunkPolicy::Preserve,
4358                    content_ref: None,
4359                }],
4360                node_retires: vec![],
4361                edges: vec![],
4362                edge_retires: vec![],
4363                chunks: vec![],
4364                runs: vec![],
4365                steps: vec![],
4366                actions: vec![],
4367                optional_backfills: vec![],
4368                vec_inserts: vec![],
4369                operational_writes: vec![],
4370            })
4371            .expect("v2 preserve write");
4372
4373        let conn = rusqlite::Connection::open(db.path()).expect("open");
4374        let old_chunk: i64 = conn
4375            .query_row(
4376                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4377                [],
4378                |r| r.get(0),
4379            )
4380            .expect("old chunk count");
4381
4382        assert_eq!(
4383            old_chunk, 1,
4384            "old chunk must be preserved by ChunkPolicy::Preserve"
4385        );
4386    }
4387
4388    #[test]
4389    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4390        let db = NamedTempFile::new().expect("temporary db");
4391        let writer = WriterActor::start(
4392            db.path(),
4393            Arc::new(SchemaManager::new()),
4394            ProvenanceMode::Warn,
4395            Arc::new(TelemetryCounters::default()),
4396        )
4397        .expect("writer");
4398
4399        writer
4400            .submit(WriteRequest {
4401                label: "v1".to_owned(),
4402                nodes: vec![NodeInsert {
4403                    row_id: "row-1".to_owned(),
4404                    logical_id: "meeting-1".to_owned(),
4405                    kind: "Meeting".to_owned(),
4406                    properties: "{}".to_owned(),
4407                    source_ref: Some("src-1".to_owned()),
4408                    upsert: false,
4409                    chunk_policy: ChunkPolicy::Preserve,
4410                    content_ref: None,
4411                }],
4412                node_retires: vec![],
4413                edges: vec![],
4414                edge_retires: vec![],
4415                chunks: vec![ChunkInsert {
4416                    id: "chunk-existing".to_owned(),
4417                    node_logical_id: "meeting-1".to_owned(),
4418                    text_content: "existing text".to_owned(),
4419                    byte_start: None,
4420                    byte_end: None,
4421                    content_hash: None,
4422                }],
4423                runs: vec![],
4424                steps: vec![],
4425                actions: vec![],
4426                optional_backfills: vec![],
4427                vec_inserts: vec![],
4428                operational_writes: vec![],
4429            })
4430            .expect("v1 write");
4431
4432        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4433        writer
4434            .submit(WriteRequest {
4435                label: "insert-no-upsert".to_owned(),
4436                nodes: vec![NodeInsert {
4437                    row_id: "row-2".to_owned(),
4438                    logical_id: "meeting-2".to_owned(),
4439                    kind: "Meeting".to_owned(),
4440                    properties: "{}".to_owned(),
4441                    source_ref: Some("src-2".to_owned()),
4442                    upsert: false,
4443                    chunk_policy: ChunkPolicy::Replace,
4444                    content_ref: None,
4445                }],
4446                node_retires: vec![],
4447                edges: vec![],
4448                edge_retires: vec![],
4449                chunks: vec![],
4450                runs: vec![],
4451                steps: vec![],
4452                actions: vec![],
4453                optional_backfills: vec![],
4454                vec_inserts: vec![],
4455                operational_writes: vec![],
4456            })
4457            .expect("insert no-upsert write");
4458
4459        let conn = rusqlite::Connection::open(db.path()).expect("open");
4460        let existing_chunk: i64 = conn
4461            .query_row(
4462                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4463                [],
4464                |r| r.get(0),
4465            )
4466            .expect("chunk count");
4467
4468        assert_eq!(
4469            existing_chunk, 1,
4470            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4471        );
4472    }
4473
4474    #[test]
4475    fn writer_run_upsert_supersedes_prior_active_run() {
4476        let db = NamedTempFile::new().expect("temporary db");
4477        let writer = WriterActor::start(
4478            db.path(),
4479            Arc::new(SchemaManager::new()),
4480            ProvenanceMode::Warn,
4481            Arc::new(TelemetryCounters::default()),
4482        )
4483        .expect("writer");
4484
4485        writer
4486            .submit(WriteRequest {
4487                label: "v1".to_owned(),
4488                nodes: vec![],
4489                node_retires: vec![],
4490                edges: vec![],
4491                edge_retires: vec![],
4492                chunks: vec![],
4493                runs: vec![RunInsert {
4494                    id: "run-v1".to_owned(),
4495                    kind: "session".to_owned(),
4496                    status: "completed".to_owned(),
4497                    properties: "{}".to_owned(),
4498                    source_ref: Some("src-1".to_owned()),
4499                    upsert: false,
4500                    supersedes_id: None,
4501                }],
4502                steps: vec![],
4503                actions: vec![],
4504                optional_backfills: vec![],
4505                vec_inserts: vec![],
4506                operational_writes: vec![],
4507            })
4508            .expect("v1 run write");
4509
4510        writer
4511            .submit(WriteRequest {
4512                label: "v2".to_owned(),
4513                nodes: vec![],
4514                node_retires: vec![],
4515                edges: vec![],
4516                edge_retires: vec![],
4517                chunks: vec![],
4518                runs: vec![RunInsert {
4519                    id: "run-v2".to_owned(),
4520                    kind: "session".to_owned(),
4521                    status: "completed".to_owned(),
4522                    properties: "{}".to_owned(),
4523                    source_ref: Some("src-2".to_owned()),
4524                    upsert: true,
4525                    supersedes_id: Some("run-v1".to_owned()),
4526                }],
4527                steps: vec![],
4528                actions: vec![],
4529                optional_backfills: vec![],
4530                vec_inserts: vec![],
4531                operational_writes: vec![],
4532            })
4533            .expect("v2 run write");
4534
4535        let conn = rusqlite::Connection::open(db.path()).expect("open");
4536        let v1_historical: i64 = conn
4537            .query_row(
4538                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4539                [],
4540                |r| r.get(0),
4541            )
4542            .expect("v1 historical count");
4543        let v2_active: i64 = conn
4544            .query_row(
4545                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4546                [],
4547                |r| r.get(0),
4548            )
4549            .expect("v2 active count");
4550
4551        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4552        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4553    }
4554
4555    #[test]
4556    fn writer_step_upsert_supersedes_prior_active_step() {
4557        let db = NamedTempFile::new().expect("temporary db");
4558        let writer = WriterActor::start(
4559            db.path(),
4560            Arc::new(SchemaManager::new()),
4561            ProvenanceMode::Warn,
4562            Arc::new(TelemetryCounters::default()),
4563        )
4564        .expect("writer");
4565
4566        writer
4567            .submit(WriteRequest {
4568                label: "v1".to_owned(),
4569                nodes: vec![],
4570                node_retires: vec![],
4571                edges: vec![],
4572                edge_retires: vec![],
4573                chunks: vec![],
4574                runs: vec![RunInsert {
4575                    id: "run-1".to_owned(),
4576                    kind: "session".to_owned(),
4577                    status: "completed".to_owned(),
4578                    properties: "{}".to_owned(),
4579                    source_ref: Some("src-1".to_owned()),
4580                    upsert: false,
4581                    supersedes_id: None,
4582                }],
4583                steps: vec![StepInsert {
4584                    id: "step-v1".to_owned(),
4585                    run_id: "run-1".to_owned(),
4586                    kind: "llm".to_owned(),
4587                    status: "completed".to_owned(),
4588                    properties: "{}".to_owned(),
4589                    source_ref: Some("src-1".to_owned()),
4590                    upsert: false,
4591                    supersedes_id: None,
4592                }],
4593                actions: vec![],
4594                optional_backfills: vec![],
4595                vec_inserts: vec![],
4596                operational_writes: vec![],
4597            })
4598            .expect("v1 step write");
4599
4600        writer
4601            .submit(WriteRequest {
4602                label: "v2".to_owned(),
4603                nodes: vec![],
4604                node_retires: vec![],
4605                edges: vec![],
4606                edge_retires: vec![],
4607                chunks: vec![],
4608                runs: vec![],
4609                steps: vec![StepInsert {
4610                    id: "step-v2".to_owned(),
4611                    run_id: "run-1".to_owned(),
4612                    kind: "llm".to_owned(),
4613                    status: "completed".to_owned(),
4614                    properties: "{}".to_owned(),
4615                    source_ref: Some("src-2".to_owned()),
4616                    upsert: true,
4617                    supersedes_id: Some("step-v1".to_owned()),
4618                }],
4619                actions: vec![],
4620                optional_backfills: vec![],
4621                vec_inserts: vec![],
4622                operational_writes: vec![],
4623            })
4624            .expect("v2 step write");
4625
4626        let conn = rusqlite::Connection::open(db.path()).expect("open");
4627        let v1_historical: i64 = conn
4628            .query_row(
4629                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4630                [],
4631                |r| r.get(0),
4632            )
4633            .expect("v1 historical count");
4634        let v2_active: i64 = conn
4635            .query_row(
4636                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4637                [],
4638                |r| r.get(0),
4639            )
4640            .expect("v2 active count");
4641
4642        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4643        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4644    }
4645
4646    #[test]
4647    fn writer_action_upsert_supersedes_prior_active_action() {
4648        let db = NamedTempFile::new().expect("temporary db");
4649        let writer = WriterActor::start(
4650            db.path(),
4651            Arc::new(SchemaManager::new()),
4652            ProvenanceMode::Warn,
4653            Arc::new(TelemetryCounters::default()),
4654        )
4655        .expect("writer");
4656
4657        writer
4658            .submit(WriteRequest {
4659                label: "v1".to_owned(),
4660                nodes: vec![],
4661                node_retires: vec![],
4662                edges: vec![],
4663                edge_retires: vec![],
4664                chunks: vec![],
4665                runs: vec![RunInsert {
4666                    id: "run-1".to_owned(),
4667                    kind: "session".to_owned(),
4668                    status: "completed".to_owned(),
4669                    properties: "{}".to_owned(),
4670                    source_ref: Some("src-1".to_owned()),
4671                    upsert: false,
4672                    supersedes_id: None,
4673                }],
4674                steps: vec![StepInsert {
4675                    id: "step-1".to_owned(),
4676                    run_id: "run-1".to_owned(),
4677                    kind: "llm".to_owned(),
4678                    status: "completed".to_owned(),
4679                    properties: "{}".to_owned(),
4680                    source_ref: Some("src-1".to_owned()),
4681                    upsert: false,
4682                    supersedes_id: None,
4683                }],
4684                actions: vec![ActionInsert {
4685                    id: "action-v1".to_owned(),
4686                    step_id: "step-1".to_owned(),
4687                    kind: "emit".to_owned(),
4688                    status: "completed".to_owned(),
4689                    properties: "{}".to_owned(),
4690                    source_ref: Some("src-1".to_owned()),
4691                    upsert: false,
4692                    supersedes_id: None,
4693                }],
4694                optional_backfills: vec![],
4695                vec_inserts: vec![],
4696                operational_writes: vec![],
4697            })
4698            .expect("v1 action write");
4699
4700        writer
4701            .submit(WriteRequest {
4702                label: "v2".to_owned(),
4703                nodes: vec![],
4704                node_retires: vec![],
4705                edges: vec![],
4706                edge_retires: vec![],
4707                chunks: vec![],
4708                runs: vec![],
4709                steps: vec![],
4710                actions: vec![ActionInsert {
4711                    id: "action-v2".to_owned(),
4712                    step_id: "step-1".to_owned(),
4713                    kind: "emit".to_owned(),
4714                    status: "completed".to_owned(),
4715                    properties: "{}".to_owned(),
4716                    source_ref: Some("src-2".to_owned()),
4717                    upsert: true,
4718                    supersedes_id: Some("action-v1".to_owned()),
4719                }],
4720                optional_backfills: vec![],
4721                vec_inserts: vec![],
4722                operational_writes: vec![],
4723            })
4724            .expect("v2 action write");
4725
4726        let conn = rusqlite::Connection::open(db.path()).expect("open");
4727        let v1_historical: i64 = conn
4728            .query_row(
4729                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4730                [],
4731                |r| r.get(0),
4732            )
4733            .expect("v1 historical count");
4734        let v2_active: i64 = conn
4735            .query_row(
4736                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4737                [],
4738                |r| r.get(0),
4739            )
4740            .expect("v2 active count");
4741
4742        assert_eq!(
4743            v1_historical, 1,
4744            "action-v1 must be historical after upsert"
4745        );
4746        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4747    }
4748
4749    // P0: runtime upsert without supersedes_id must be rejected
4750
4751    #[test]
4752    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4753        let db = NamedTempFile::new().expect("temporary db");
4754        let writer = WriterActor::start(
4755            db.path(),
4756            Arc::new(SchemaManager::new()),
4757            ProvenanceMode::Warn,
4758            Arc::new(TelemetryCounters::default()),
4759        )
4760        .expect("writer");
4761
4762        let result = writer.submit(WriteRequest {
4763            label: "bad".to_owned(),
4764            nodes: vec![],
4765            node_retires: vec![],
4766            edges: vec![],
4767            edge_retires: vec![],
4768            chunks: vec![],
4769            runs: vec![RunInsert {
4770                id: "run-1".to_owned(),
4771                kind: "session".to_owned(),
4772                status: "completed".to_owned(),
4773                properties: "{}".to_owned(),
4774                source_ref: None,
4775                upsert: true,
4776                supersedes_id: None,
4777            }],
4778            steps: vec![],
4779            actions: vec![],
4780            optional_backfills: vec![],
4781            vec_inserts: vec![],
4782            operational_writes: vec![],
4783        });
4784
4785        assert!(
4786            matches!(result, Err(EngineError::InvalidWrite(_))),
4787            "run upsert=true without supersedes_id must return InvalidWrite"
4788        );
4789    }
4790
4791    #[test]
4792    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
4793        let db = NamedTempFile::new().expect("temporary db");
4794        let writer = WriterActor::start(
4795            db.path(),
4796            Arc::new(SchemaManager::new()),
4797            ProvenanceMode::Warn,
4798            Arc::new(TelemetryCounters::default()),
4799        )
4800        .expect("writer");
4801
4802        let result = writer.submit(WriteRequest {
4803            label: "bad".to_owned(),
4804            nodes: vec![],
4805            node_retires: vec![],
4806            edges: vec![],
4807            edge_retires: vec![],
4808            chunks: vec![],
4809            runs: vec![],
4810            steps: vec![StepInsert {
4811                id: "step-1".to_owned(),
4812                run_id: "run-1".to_owned(),
4813                kind: "llm".to_owned(),
4814                status: "completed".to_owned(),
4815                properties: "{}".to_owned(),
4816                source_ref: None,
4817                upsert: true,
4818                supersedes_id: None,
4819            }],
4820            actions: vec![],
4821            optional_backfills: vec![],
4822            vec_inserts: vec![],
4823            operational_writes: vec![],
4824        });
4825
4826        assert!(
4827            matches!(result, Err(EngineError::InvalidWrite(_))),
4828            "step upsert=true without supersedes_id must return InvalidWrite"
4829        );
4830    }
4831
4832    #[test]
4833    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
4834        let db = NamedTempFile::new().expect("temporary db");
4835        let writer = WriterActor::start(
4836            db.path(),
4837            Arc::new(SchemaManager::new()),
4838            ProvenanceMode::Warn,
4839            Arc::new(TelemetryCounters::default()),
4840        )
4841        .expect("writer");
4842
4843        let result = writer.submit(WriteRequest {
4844            label: "bad".to_owned(),
4845            nodes: vec![],
4846            node_retires: vec![],
4847            edges: vec![],
4848            edge_retires: vec![],
4849            chunks: vec![],
4850            runs: vec![],
4851            steps: vec![],
4852            actions: vec![ActionInsert {
4853                id: "action-1".to_owned(),
4854                step_id: "step-1".to_owned(),
4855                kind: "emit".to_owned(),
4856                status: "completed".to_owned(),
4857                properties: "{}".to_owned(),
4858                source_ref: None,
4859                upsert: true,
4860                supersedes_id: None,
4861            }],
4862            optional_backfills: vec![],
4863            vec_inserts: vec![],
4864            operational_writes: vec![],
4865        });
4866
4867        assert!(
4868            matches!(result, Err(EngineError::InvalidWrite(_))),
4869            "action upsert=true without supersedes_id must return InvalidWrite"
4870        );
4871    }
4872
4873    // P1a/b: provenance warnings for edge inserts and runtime table inserts
4874
4875    #[test]
4876    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
4877        let db = NamedTempFile::new().expect("temporary db");
4878        let writer = WriterActor::start(
4879            db.path(),
4880            Arc::new(SchemaManager::new()),
4881            ProvenanceMode::Warn,
4882            Arc::new(TelemetryCounters::default()),
4883        )
4884        .expect("writer");
4885
4886        let receipt = writer
4887            .submit(WriteRequest {
4888                label: "test".to_owned(),
4889                nodes: vec![
4890                    NodeInsert {
4891                        row_id: "row-a".to_owned(),
4892                        logical_id: "node-a".to_owned(),
4893                        kind: "Meeting".to_owned(),
4894                        properties: "{}".to_owned(),
4895                        source_ref: Some("src-1".to_owned()),
4896                        upsert: false,
4897                        chunk_policy: ChunkPolicy::Preserve,
4898                        content_ref: None,
4899                    },
4900                    NodeInsert {
4901                        row_id: "row-b".to_owned(),
4902                        logical_id: "node-b".to_owned(),
4903                        kind: "Task".to_owned(),
4904                        properties: "{}".to_owned(),
4905                        source_ref: Some("src-1".to_owned()),
4906                        upsert: false,
4907                        chunk_policy: ChunkPolicy::Preserve,
4908                        content_ref: None,
4909                    },
4910                ],
4911                node_retires: vec![],
4912                edges: vec![EdgeInsert {
4913                    row_id: "edge-1".to_owned(),
4914                    logical_id: "edge-lg-1".to_owned(),
4915                    source_logical_id: "node-a".to_owned(),
4916                    target_logical_id: "node-b".to_owned(),
4917                    kind: "HAS_TASK".to_owned(),
4918                    properties: "{}".to_owned(),
4919                    source_ref: None,
4920                    upsert: false,
4921                }],
4922                edge_retires: vec![],
4923                chunks: vec![],
4924                runs: vec![],
4925                steps: vec![],
4926                actions: vec![],
4927                optional_backfills: vec![],
4928                vec_inserts: vec![],
4929                operational_writes: vec![],
4930            })
4931            .expect("write");
4932
4933        assert!(
4934            !receipt.provenance_warnings.is_empty(),
4935            "edge insert without source_ref must emit a provenance warning"
4936        );
4937    }
4938
4939    #[test]
4940    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
4941        let db = NamedTempFile::new().expect("temporary db");
4942        let writer = WriterActor::start(
4943            db.path(),
4944            Arc::new(SchemaManager::new()),
4945            ProvenanceMode::Warn,
4946            Arc::new(TelemetryCounters::default()),
4947        )
4948        .expect("writer");
4949
4950        let receipt = writer
4951            .submit(WriteRequest {
4952                label: "test".to_owned(),
4953                nodes: vec![],
4954                node_retires: vec![],
4955                edges: vec![],
4956                edge_retires: vec![],
4957                chunks: vec![],
4958                runs: vec![RunInsert {
4959                    id: "run-1".to_owned(),
4960                    kind: "session".to_owned(),
4961                    status: "completed".to_owned(),
4962                    properties: "{}".to_owned(),
4963                    source_ref: None,
4964                    upsert: false,
4965                    supersedes_id: None,
4966                }],
4967                steps: vec![],
4968                actions: vec![],
4969                optional_backfills: vec![],
4970                vec_inserts: vec![],
4971                operational_writes: vec![],
4972            })
4973            .expect("write");
4974
4975        assert!(
4976            !receipt.provenance_warnings.is_empty(),
4977            "run insert without source_ref must emit a provenance warning"
4978        );
4979    }
4980
4981    // P1c: retire a node AND submit chunks for the same logical_id in one request
4982
4983    #[test]
4984    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
4985        let db = NamedTempFile::new().expect("temporary db");
4986        let writer = WriterActor::start(
4987            db.path(),
4988            Arc::new(SchemaManager::new()),
4989            ProvenanceMode::Warn,
4990            Arc::new(TelemetryCounters::default()),
4991        )
4992        .expect("writer");
4993
4994        // First seed the node so it exists
4995        writer
4996            .submit(WriteRequest {
4997                label: "seed".to_owned(),
4998                nodes: vec![NodeInsert {
4999                    row_id: "row-1".to_owned(),
5000                    logical_id: "meeting-1".to_owned(),
5001                    kind: "Meeting".to_owned(),
5002                    properties: "{}".to_owned(),
5003                    source_ref: Some("src-1".to_owned()),
5004                    upsert: false,
5005                    chunk_policy: ChunkPolicy::Preserve,
5006                    content_ref: None,
5007                }],
5008                node_retires: vec![],
5009                edges: vec![],
5010                edge_retires: vec![],
5011                chunks: vec![],
5012                runs: vec![],
5013                steps: vec![],
5014                actions: vec![],
5015                optional_backfills: vec![],
5016                vec_inserts: vec![],
5017                operational_writes: vec![],
5018            })
5019            .expect("seed write");
5020
5021        // Now try to retire it AND add a chunk for it in the same request
5022        let result = writer.submit(WriteRequest {
5023            label: "bad".to_owned(),
5024            nodes: vec![],
5025            node_retires: vec![NodeRetire {
5026                logical_id: "meeting-1".to_owned(),
5027                source_ref: Some("src-2".to_owned()),
5028            }],
5029            edges: vec![],
5030            edge_retires: vec![],
5031            chunks: vec![ChunkInsert {
5032                id: "chunk-bad".to_owned(),
5033                node_logical_id: "meeting-1".to_owned(),
5034                text_content: "some text".to_owned(),
5035                byte_start: None,
5036                byte_end: None,
5037                content_hash: None,
5038            }],
5039            runs: vec![],
5040            steps: vec![],
5041            actions: vec![],
5042            optional_backfills: vec![],
5043            vec_inserts: vec![],
5044            operational_writes: vec![],
5045        });
5046
5047        assert!(
5048            matches!(result, Err(EngineError::InvalidWrite(_))),
5049            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5050        );
5051    }
5052
5053    // --- Item 1: prepare_cached batch insert ---
5054
5055    #[test]
5056    fn writer_batch_insert_multiple_nodes() {
5057        let db = NamedTempFile::new().expect("temporary db");
5058        let writer = WriterActor::start(
5059            db.path(),
5060            Arc::new(SchemaManager::new()),
5061            ProvenanceMode::Warn,
5062            Arc::new(TelemetryCounters::default()),
5063        )
5064        .expect("writer");
5065
5066        let nodes: Vec<NodeInsert> = (0..100)
5067            .map(|i| NodeInsert {
5068                row_id: format!("row-{i}"),
5069                logical_id: format!("lg-{i}"),
5070                kind: "Note".to_owned(),
5071                properties: "{}".to_owned(),
5072                source_ref: Some("batch-src".to_owned()),
5073                upsert: false,
5074                chunk_policy: ChunkPolicy::Preserve,
5075                content_ref: None,
5076            })
5077            .collect();
5078
5079        writer
5080            .submit(WriteRequest {
5081                label: "batch".to_owned(),
5082                nodes,
5083                node_retires: vec![],
5084                edges: vec![],
5085                edge_retires: vec![],
5086                chunks: vec![],
5087                runs: vec![],
5088                steps: vec![],
5089                actions: vec![],
5090                optional_backfills: vec![],
5091                vec_inserts: vec![],
5092                operational_writes: vec![],
5093            })
5094            .expect("batch write");
5095
5096        let conn = rusqlite::Connection::open(db.path()).expect("open");
5097        let count: i64 = conn
5098            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5099            .expect("count nodes");
5100        assert_eq!(
5101            count, 100,
5102            "all 100 nodes must be present after batch insert"
5103        );
5104    }
5105
5106    // --- Item 2: ID validation ---
5107
5108    #[test]
5109    fn prepare_write_rejects_empty_node_row_id() {
5110        let db = NamedTempFile::new().expect("temporary db");
5111        let writer = WriterActor::start(
5112            db.path(),
5113            Arc::new(SchemaManager::new()),
5114            ProvenanceMode::Warn,
5115            Arc::new(TelemetryCounters::default()),
5116        )
5117        .expect("writer");
5118
5119        let result = writer.submit(WriteRequest {
5120            label: "test".to_owned(),
5121            nodes: vec![NodeInsert {
5122                row_id: String::new(),
5123                logical_id: "lg-1".to_owned(),
5124                kind: "Note".to_owned(),
5125                properties: "{}".to_owned(),
5126                source_ref: None,
5127                upsert: false,
5128                chunk_policy: ChunkPolicy::Preserve,
5129                content_ref: None,
5130            }],
5131            node_retires: vec![],
5132            edges: vec![],
5133            edge_retires: vec![],
5134            chunks: vec![],
5135            runs: vec![],
5136            steps: vec![],
5137            actions: vec![],
5138            optional_backfills: vec![],
5139            vec_inserts: vec![],
5140            operational_writes: vec![],
5141        });
5142
5143        assert!(
5144            matches!(result, Err(EngineError::InvalidWrite(_))),
5145            "empty row_id must be rejected"
5146        );
5147    }
5148
5149    #[test]
5150    fn prepare_write_rejects_empty_node_logical_id() {
5151        let db = NamedTempFile::new().expect("temporary db");
5152        let writer = WriterActor::start(
5153            db.path(),
5154            Arc::new(SchemaManager::new()),
5155            ProvenanceMode::Warn,
5156            Arc::new(TelemetryCounters::default()),
5157        )
5158        .expect("writer");
5159
5160        let result = writer.submit(WriteRequest {
5161            label: "test".to_owned(),
5162            nodes: vec![NodeInsert {
5163                row_id: "row-1".to_owned(),
5164                logical_id: String::new(),
5165                kind: "Note".to_owned(),
5166                properties: "{}".to_owned(),
5167                source_ref: None,
5168                upsert: false,
5169                chunk_policy: ChunkPolicy::Preserve,
5170                content_ref: None,
5171            }],
5172            node_retires: vec![],
5173            edges: vec![],
5174            edge_retires: vec![],
5175            chunks: vec![],
5176            runs: vec![],
5177            steps: vec![],
5178            actions: vec![],
5179            optional_backfills: vec![],
5180            vec_inserts: vec![],
5181            operational_writes: vec![],
5182        });
5183
5184        assert!(
5185            matches!(result, Err(EngineError::InvalidWrite(_))),
5186            "empty logical_id must be rejected"
5187        );
5188    }
5189
5190    #[test]
5191    fn prepare_write_rejects_duplicate_row_ids_in_request() {
5192        let db = NamedTempFile::new().expect("temporary db");
5193        let writer = WriterActor::start(
5194            db.path(),
5195            Arc::new(SchemaManager::new()),
5196            ProvenanceMode::Warn,
5197            Arc::new(TelemetryCounters::default()),
5198        )
5199        .expect("writer");
5200
5201        let result = writer.submit(WriteRequest {
5202            label: "test".to_owned(),
5203            nodes: vec![
5204                NodeInsert {
5205                    row_id: "row-1".to_owned(),
5206                    logical_id: "lg-1".to_owned(),
5207                    kind: "Note".to_owned(),
5208                    properties: "{}".to_owned(),
5209                    source_ref: None,
5210                    upsert: false,
5211                    chunk_policy: ChunkPolicy::Preserve,
5212                    content_ref: None,
5213                },
5214                NodeInsert {
5215                    row_id: "row-1".to_owned(), // duplicate
5216                    logical_id: "lg-2".to_owned(),
5217                    kind: "Note".to_owned(),
5218                    properties: "{}".to_owned(),
5219                    source_ref: None,
5220                    upsert: false,
5221                    chunk_policy: ChunkPolicy::Preserve,
5222                    content_ref: None,
5223                },
5224            ],
5225            node_retires: vec![],
5226            edges: vec![],
5227            edge_retires: vec![],
5228            chunks: vec![],
5229            runs: vec![],
5230            steps: vec![],
5231            actions: vec![],
5232            optional_backfills: vec![],
5233            vec_inserts: vec![],
5234            operational_writes: vec![],
5235        });
5236
5237        assert!(
5238            matches!(result, Err(EngineError::InvalidWrite(_))),
5239            "duplicate row_id within request must be rejected"
5240        );
5241    }
5242
5243    #[test]
5244    fn prepare_write_rejects_empty_chunk_id() {
5245        let db = NamedTempFile::new().expect("temporary db");
5246        let writer = WriterActor::start(
5247            db.path(),
5248            Arc::new(SchemaManager::new()),
5249            ProvenanceMode::Warn,
5250            Arc::new(TelemetryCounters::default()),
5251        )
5252        .expect("writer");
5253
5254        let result = writer.submit(WriteRequest {
5255            label: "test".to_owned(),
5256            nodes: vec![NodeInsert {
5257                row_id: "row-1".to_owned(),
5258                logical_id: "lg-1".to_owned(),
5259                kind: "Note".to_owned(),
5260                properties: "{}".to_owned(),
5261                source_ref: None,
5262                upsert: false,
5263                chunk_policy: ChunkPolicy::Preserve,
5264                content_ref: None,
5265            }],
5266            node_retires: vec![],
5267            edges: vec![],
5268            edge_retires: vec![],
5269            chunks: vec![ChunkInsert {
5270                id: String::new(),
5271                node_logical_id: "lg-1".to_owned(),
5272                text_content: "some text".to_owned(),
5273                byte_start: None,
5274                byte_end: None,
5275                content_hash: None,
5276            }],
5277            runs: vec![],
5278            steps: vec![],
5279            actions: vec![],
5280            optional_backfills: vec![],
5281            vec_inserts: vec![],
5282            operational_writes: vec![],
5283        });
5284
5285        assert!(
5286            matches!(result, Err(EngineError::InvalidWrite(_))),
5287            "empty chunk id must be rejected"
5288        );
5289    }
5290
5291    // --- Item 4: provenance warning coverage tests ---
5292
5293    #[test]
5294    fn writer_receipt_warns_on_step_without_source_ref() {
5295        let db = NamedTempFile::new().expect("temporary db");
5296        let writer = WriterActor::start(
5297            db.path(),
5298            Arc::new(SchemaManager::new()),
5299            ProvenanceMode::Warn,
5300            Arc::new(TelemetryCounters::default()),
5301        )
5302        .expect("writer");
5303
5304        // seed a run first so step FK is satisfied
5305        writer
5306            .submit(WriteRequest {
5307                label: "seed-run".to_owned(),
5308                nodes: vec![],
5309                node_retires: vec![],
5310                edges: vec![],
5311                edge_retires: vec![],
5312                chunks: vec![],
5313                runs: vec![RunInsert {
5314                    id: "run-1".to_owned(),
5315                    kind: "session".to_owned(),
5316                    status: "active".to_owned(),
5317                    properties: "{}".to_owned(),
5318                    source_ref: Some("src-1".to_owned()),
5319                    upsert: false,
5320                    supersedes_id: None,
5321                }],
5322                steps: vec![],
5323                actions: vec![],
5324                optional_backfills: vec![],
5325                vec_inserts: vec![],
5326                operational_writes: vec![],
5327            })
5328            .expect("seed run");
5329
5330        let receipt = writer
5331            .submit(WriteRequest {
5332                label: "test".to_owned(),
5333                nodes: vec![],
5334                node_retires: vec![],
5335                edges: vec![],
5336                edge_retires: vec![],
5337                chunks: vec![],
5338                runs: vec![],
5339                steps: vec![StepInsert {
5340                    id: "step-1".to_owned(),
5341                    run_id: "run-1".to_owned(),
5342                    kind: "llm_call".to_owned(),
5343                    status: "completed".to_owned(),
5344                    properties: "{}".to_owned(),
5345                    source_ref: None,
5346                    upsert: false,
5347                    supersedes_id: None,
5348                }],
5349                actions: vec![],
5350                optional_backfills: vec![],
5351                vec_inserts: vec![],
5352                operational_writes: vec![],
5353            })
5354            .expect("write");
5355
5356        assert!(
5357            !receipt.provenance_warnings.is_empty(),
5358            "step insert without source_ref must emit a provenance warning"
5359        );
5360    }
5361
5362    #[test]
5363    fn writer_receipt_warns_on_action_without_source_ref() {
5364        let db = NamedTempFile::new().expect("temporary db");
5365        let writer = WriterActor::start(
5366            db.path(),
5367            Arc::new(SchemaManager::new()),
5368            ProvenanceMode::Warn,
5369            Arc::new(TelemetryCounters::default()),
5370        )
5371        .expect("writer");
5372
5373        // seed run and step so action FK is satisfied
5374        writer
5375            .submit(WriteRequest {
5376                label: "seed".to_owned(),
5377                nodes: vec![],
5378                node_retires: vec![],
5379                edges: vec![],
5380                edge_retires: vec![],
5381                chunks: vec![],
5382                runs: vec![RunInsert {
5383                    id: "run-1".to_owned(),
5384                    kind: "session".to_owned(),
5385                    status: "active".to_owned(),
5386                    properties: "{}".to_owned(),
5387                    source_ref: Some("src-1".to_owned()),
5388                    upsert: false,
5389                    supersedes_id: None,
5390                }],
5391                steps: vec![StepInsert {
5392                    id: "step-1".to_owned(),
5393                    run_id: "run-1".to_owned(),
5394                    kind: "llm_call".to_owned(),
5395                    status: "completed".to_owned(),
5396                    properties: "{}".to_owned(),
5397                    source_ref: Some("src-1".to_owned()),
5398                    upsert: false,
5399                    supersedes_id: None,
5400                }],
5401                actions: vec![],
5402                optional_backfills: vec![],
5403                vec_inserts: vec![],
5404                operational_writes: vec![],
5405            })
5406            .expect("seed");
5407
5408        let receipt = writer
5409            .submit(WriteRequest {
5410                label: "test".to_owned(),
5411                nodes: vec![],
5412                node_retires: vec![],
5413                edges: vec![],
5414                edge_retires: vec![],
5415                chunks: vec![],
5416                runs: vec![],
5417                steps: vec![],
5418                actions: vec![ActionInsert {
5419                    id: "action-1".to_owned(),
5420                    step_id: "step-1".to_owned(),
5421                    kind: "tool_call".to_owned(),
5422                    status: "completed".to_owned(),
5423                    properties: "{}".to_owned(),
5424                    source_ref: None,
5425                    upsert: false,
5426                    supersedes_id: None,
5427                }],
5428                optional_backfills: vec![],
5429                vec_inserts: vec![],
5430                operational_writes: vec![],
5431            })
5432            .expect("write");
5433
5434        assert!(
5435            !receipt.provenance_warnings.is_empty(),
5436            "action insert without source_ref must emit a provenance warning"
5437        );
5438    }
5439
5440    #[test]
5441    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5442        let db = NamedTempFile::new().expect("temporary db");
5443        let writer = WriterActor::start(
5444            db.path(),
5445            Arc::new(SchemaManager::new()),
5446            ProvenanceMode::Warn,
5447            Arc::new(TelemetryCounters::default()),
5448        )
5449        .expect("writer");
5450
5451        let receipt = writer
5452            .submit(WriteRequest {
5453                label: "test".to_owned(),
5454                nodes: vec![NodeInsert {
5455                    row_id: "row-1".to_owned(),
5456                    logical_id: "node-1".to_owned(),
5457                    kind: "Note".to_owned(),
5458                    properties: "{}".to_owned(),
5459                    source_ref: Some("src-1".to_owned()),
5460                    upsert: false,
5461                    chunk_policy: ChunkPolicy::Preserve,
5462                    content_ref: None,
5463                }],
5464                node_retires: vec![],
5465                edges: vec![],
5466                edge_retires: vec![],
5467                chunks: vec![],
5468                runs: vec![RunInsert {
5469                    id: "run-1".to_owned(),
5470                    kind: "session".to_owned(),
5471                    status: "active".to_owned(),
5472                    properties: "{}".to_owned(),
5473                    source_ref: Some("src-1".to_owned()),
5474                    upsert: false,
5475                    supersedes_id: None,
5476                }],
5477                steps: vec![StepInsert {
5478                    id: "step-1".to_owned(),
5479                    run_id: "run-1".to_owned(),
5480                    kind: "llm_call".to_owned(),
5481                    status: "completed".to_owned(),
5482                    properties: "{}".to_owned(),
5483                    source_ref: Some("src-1".to_owned()),
5484                    upsert: false,
5485                    supersedes_id: None,
5486                }],
5487                actions: vec![ActionInsert {
5488                    id: "action-1".to_owned(),
5489                    step_id: "step-1".to_owned(),
5490                    kind: "tool_call".to_owned(),
5491                    status: "completed".to_owned(),
5492                    properties: "{}".to_owned(),
5493                    source_ref: Some("src-1".to_owned()),
5494                    upsert: false,
5495                    supersedes_id: None,
5496                }],
5497                optional_backfills: vec![],
5498                vec_inserts: vec![],
5499                operational_writes: vec![],
5500            })
5501            .expect("write");
5502
5503        assert!(
5504            receipt.provenance_warnings.is_empty(),
5505            "no warnings expected when all types have source_ref; got: {:?}",
5506            receipt.provenance_warnings
5507        );
5508    }
5509
5510    // --- Item 4 Task 2: ProvenanceMode tests ---
5511
5512    #[test]
5513    fn default_provenance_mode_is_warn() {
5514        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5515        let db = NamedTempFile::new().expect("temporary db");
5516        let writer = WriterActor::start(
5517            db.path(),
5518            Arc::new(SchemaManager::new()),
5519            ProvenanceMode::default(),
5520            Arc::new(TelemetryCounters::default()),
5521        )
5522        .expect("writer");
5523
5524        let receipt = writer
5525            .submit(WriteRequest {
5526                label: "test".to_owned(),
5527                nodes: vec![NodeInsert {
5528                    row_id: "row-1".to_owned(),
5529                    logical_id: "node-1".to_owned(),
5530                    kind: "Note".to_owned(),
5531                    properties: "{}".to_owned(),
5532                    source_ref: None,
5533                    upsert: false,
5534                    chunk_policy: ChunkPolicy::Preserve,
5535                    content_ref: None,
5536                }],
5537                node_retires: vec![],
5538                edges: vec![],
5539                edge_retires: vec![],
5540                chunks: vec![],
5541                runs: vec![],
5542                steps: vec![],
5543                actions: vec![],
5544                optional_backfills: vec![],
5545                vec_inserts: vec![],
5546                operational_writes: vec![],
5547            })
5548            .expect("Warn mode must not reject missing source_ref");
5549
5550        assert!(
5551            !receipt.provenance_warnings.is_empty(),
5552            "Warn mode must emit a warning instead of rejecting"
5553        );
5554    }
5555
5556    #[test]
5557    fn require_mode_rejects_node_without_source_ref() {
5558        let db = NamedTempFile::new().expect("temporary db");
5559        let writer = WriterActor::start(
5560            db.path(),
5561            Arc::new(SchemaManager::new()),
5562            ProvenanceMode::Require,
5563            Arc::new(TelemetryCounters::default()),
5564        )
5565        .expect("writer");
5566
5567        let result = writer.submit(WriteRequest {
5568            label: "test".to_owned(),
5569            nodes: vec![NodeInsert {
5570                row_id: "row-1".to_owned(),
5571                logical_id: "node-1".to_owned(),
5572                kind: "Note".to_owned(),
5573                properties: "{}".to_owned(),
5574                source_ref: None,
5575                upsert: false,
5576                chunk_policy: ChunkPolicy::Preserve,
5577                content_ref: None,
5578            }],
5579            node_retires: vec![],
5580            edges: vec![],
5581            edge_retires: vec![],
5582            chunks: vec![],
5583            runs: vec![],
5584            steps: vec![],
5585            actions: vec![],
5586            optional_backfills: vec![],
5587            vec_inserts: vec![],
5588            operational_writes: vec![],
5589        });
5590
5591        assert!(
5592            matches!(result, Err(EngineError::InvalidWrite(_))),
5593            "Require mode must reject node without source_ref"
5594        );
5595    }
5596
5597    #[test]
5598    fn require_mode_accepts_node_with_source_ref() {
5599        let db = NamedTempFile::new().expect("temporary db");
5600        let writer = WriterActor::start(
5601            db.path(),
5602            Arc::new(SchemaManager::new()),
5603            ProvenanceMode::Require,
5604            Arc::new(TelemetryCounters::default()),
5605        )
5606        .expect("writer");
5607
5608        let result = writer.submit(WriteRequest {
5609            label: "test".to_owned(),
5610            nodes: vec![NodeInsert {
5611                row_id: "row-1".to_owned(),
5612                logical_id: "node-1".to_owned(),
5613                kind: "Note".to_owned(),
5614                properties: "{}".to_owned(),
5615                source_ref: Some("src-1".to_owned()),
5616                upsert: false,
5617                chunk_policy: ChunkPolicy::Preserve,
5618                content_ref: None,
5619            }],
5620            node_retires: vec![],
5621            edges: vec![],
5622            edge_retires: vec![],
5623            chunks: vec![],
5624            runs: vec![],
5625            steps: vec![],
5626            actions: vec![],
5627            optional_backfills: vec![],
5628            vec_inserts: vec![],
5629            operational_writes: vec![],
5630        });
5631
5632        assert!(
5633            result.is_ok(),
5634            "Require mode must accept node with source_ref"
5635        );
5636    }
5637
5638    #[test]
5639    fn require_mode_rejects_edge_without_source_ref() {
5640        let db = NamedTempFile::new().expect("temporary db");
5641        let writer = WriterActor::start(
5642            db.path(),
5643            Arc::new(SchemaManager::new()),
5644            ProvenanceMode::Require,
5645            Arc::new(TelemetryCounters::default()),
5646        )
5647        .expect("writer");
5648
5649        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5650        let result = writer.submit(WriteRequest {
5651            label: "test".to_owned(),
5652            nodes: vec![
5653                NodeInsert {
5654                    row_id: "row-a".to_owned(),
5655                    logical_id: "node-a".to_owned(),
5656                    kind: "Note".to_owned(),
5657                    properties: "{}".to_owned(),
5658                    source_ref: Some("src-1".to_owned()),
5659                    upsert: false,
5660                    chunk_policy: ChunkPolicy::Preserve,
5661                    content_ref: None,
5662                },
5663                NodeInsert {
5664                    row_id: "row-b".to_owned(),
5665                    logical_id: "node-b".to_owned(),
5666                    kind: "Note".to_owned(),
5667                    properties: "{}".to_owned(),
5668                    source_ref: Some("src-1".to_owned()),
5669                    upsert: false,
5670                    chunk_policy: ChunkPolicy::Preserve,
5671                    content_ref: None,
5672                },
5673            ],
5674            node_retires: vec![],
5675            edges: vec![EdgeInsert {
5676                row_id: "edge-row-1".to_owned(),
5677                logical_id: "edge-1".to_owned(),
5678                source_logical_id: "node-a".to_owned(),
5679                target_logical_id: "node-b".to_owned(),
5680                kind: "LINKS_TO".to_owned(),
5681                properties: "{}".to_owned(),
5682                source_ref: None,
5683                upsert: false,
5684            }],
5685            edge_retires: vec![],
5686            chunks: vec![],
5687            runs: vec![],
5688            steps: vec![],
5689            actions: vec![],
5690            optional_backfills: vec![],
5691            vec_inserts: vec![],
5692            operational_writes: vec![],
5693        });
5694
5695        assert!(
5696            matches!(result, Err(EngineError::InvalidWrite(_))),
5697            "Require mode must reject edge without source_ref"
5698        );
5699    }
5700
5701    // --- Item 5: FTS projection coverage tests ---
5702
5703    #[test]
5704    fn fts_row_has_correct_kind_from_co_submitted_node() {
5705        let db = NamedTempFile::new().expect("temporary db");
5706        let writer = WriterActor::start(
5707            db.path(),
5708            Arc::new(SchemaManager::new()),
5709            ProvenanceMode::Warn,
5710            Arc::new(TelemetryCounters::default()),
5711        )
5712        .expect("writer");
5713
5714        writer
5715            .submit(WriteRequest {
5716                label: "test".to_owned(),
5717                nodes: vec![NodeInsert {
5718                    row_id: "row-1".to_owned(),
5719                    logical_id: "node-1".to_owned(),
5720                    kind: "Meeting".to_owned(),
5721                    properties: "{}".to_owned(),
5722                    source_ref: Some("src-1".to_owned()),
5723                    upsert: false,
5724                    chunk_policy: ChunkPolicy::Preserve,
5725                    content_ref: None,
5726                }],
5727                node_retires: vec![],
5728                edges: vec![],
5729                edge_retires: vec![],
5730                chunks: vec![ChunkInsert {
5731                    id: "chunk-1".to_owned(),
5732                    node_logical_id: "node-1".to_owned(),
5733                    text_content: "some text".to_owned(),
5734                    byte_start: None,
5735                    byte_end: None,
5736                    content_hash: None,
5737                }],
5738                runs: vec![],
5739                steps: vec![],
5740                actions: vec![],
5741                optional_backfills: vec![],
5742                vec_inserts: vec![],
5743                operational_writes: vec![],
5744            })
5745            .expect("write");
5746
5747        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5748        let kind: String = conn
5749            .query_row(
5750                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5751                [],
5752                |row| row.get(0),
5753            )
5754            .expect("fts row");
5755
5756        assert_eq!(kind, "Meeting");
5757    }
5758
5759    #[test]
5760    fn fts_row_has_correct_text_content() {
5761        let db = NamedTempFile::new().expect("temporary db");
5762        let writer = WriterActor::start(
5763            db.path(),
5764            Arc::new(SchemaManager::new()),
5765            ProvenanceMode::Warn,
5766            Arc::new(TelemetryCounters::default()),
5767        )
5768        .expect("writer");
5769
5770        writer
5771            .submit(WriteRequest {
5772                label: "test".to_owned(),
5773                nodes: vec![NodeInsert {
5774                    row_id: "row-1".to_owned(),
5775                    logical_id: "node-1".to_owned(),
5776                    kind: "Note".to_owned(),
5777                    properties: "{}".to_owned(),
5778                    source_ref: Some("src-1".to_owned()),
5779                    upsert: false,
5780                    chunk_policy: ChunkPolicy::Preserve,
5781                    content_ref: None,
5782                }],
5783                node_retires: vec![],
5784                edges: vec![],
5785                edge_retires: vec![],
5786                chunks: vec![ChunkInsert {
5787                    id: "chunk-1".to_owned(),
5788                    node_logical_id: "node-1".to_owned(),
5789                    text_content: "exactly this text".to_owned(),
5790                    byte_start: None,
5791                    byte_end: None,
5792                    content_hash: None,
5793                }],
5794                runs: vec![],
5795                steps: vec![],
5796                actions: vec![],
5797                optional_backfills: vec![],
5798                vec_inserts: vec![],
5799                operational_writes: vec![],
5800            })
5801            .expect("write");
5802
5803        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5804        let text: String = conn
5805            .query_row(
5806                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5807                [],
5808                |row| row.get(0),
5809            )
5810            .expect("fts row");
5811
5812        assert_eq!(text, "exactly this text");
5813    }
5814
5815    #[test]
5816    fn fts_row_has_correct_kind_from_pre_existing_node() {
5817        let db = NamedTempFile::new().expect("temporary db");
5818        let writer = WriterActor::start(
5819            db.path(),
5820            Arc::new(SchemaManager::new()),
5821            ProvenanceMode::Warn,
5822            Arc::new(TelemetryCounters::default()),
5823        )
5824        .expect("writer");
5825
5826        // Request 1: node only
5827        writer
5828            .submit(WriteRequest {
5829                label: "r1".to_owned(),
5830                nodes: vec![NodeInsert {
5831                    row_id: "row-1".to_owned(),
5832                    logical_id: "node-1".to_owned(),
5833                    kind: "Document".to_owned(),
5834                    properties: "{}".to_owned(),
5835                    source_ref: Some("src-1".to_owned()),
5836                    upsert: false,
5837                    chunk_policy: ChunkPolicy::Preserve,
5838                    content_ref: None,
5839                }],
5840                node_retires: vec![],
5841                edges: vec![],
5842                edge_retires: vec![],
5843                chunks: vec![],
5844                runs: vec![],
5845                steps: vec![],
5846                actions: vec![],
5847                optional_backfills: vec![],
5848                vec_inserts: vec![],
5849                operational_writes: vec![],
5850            })
5851            .expect("r1 write");
5852
5853        // Request 2: chunk for pre-existing node
5854        writer
5855            .submit(WriteRequest {
5856                label: "r2".to_owned(),
5857                nodes: vec![],
5858                node_retires: vec![],
5859                edges: vec![],
5860                edge_retires: vec![],
5861                chunks: vec![ChunkInsert {
5862                    id: "chunk-1".to_owned(),
5863                    node_logical_id: "node-1".to_owned(),
5864                    text_content: "some text".to_owned(),
5865                    byte_start: None,
5866                    byte_end: None,
5867                    content_hash: None,
5868                }],
5869                runs: vec![],
5870                steps: vec![],
5871                actions: vec![],
5872                optional_backfills: vec![],
5873                vec_inserts: vec![],
5874                operational_writes: vec![],
5875            })
5876            .expect("r2 write");
5877
5878        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5879        let kind: String = conn
5880            .query_row(
5881                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5882                [],
5883                |row| row.get(0),
5884            )
5885            .expect("fts row");
5886
5887        assert_eq!(kind, "Document");
5888    }
5889
5890    #[test]
5891    fn fts_derives_rows_for_multiple_chunks_per_node() {
5892        let db = NamedTempFile::new().expect("temporary db");
5893        let writer = WriterActor::start(
5894            db.path(),
5895            Arc::new(SchemaManager::new()),
5896            ProvenanceMode::Warn,
5897            Arc::new(TelemetryCounters::default()),
5898        )
5899        .expect("writer");
5900
5901        writer
5902            .submit(WriteRequest {
5903                label: "test".to_owned(),
5904                nodes: vec![NodeInsert {
5905                    row_id: "row-1".to_owned(),
5906                    logical_id: "node-1".to_owned(),
5907                    kind: "Meeting".to_owned(),
5908                    properties: "{}".to_owned(),
5909                    source_ref: Some("src-1".to_owned()),
5910                    upsert: false,
5911                    chunk_policy: ChunkPolicy::Preserve,
5912                    content_ref: None,
5913                }],
5914                node_retires: vec![],
5915                edges: vec![],
5916                edge_retires: vec![],
5917                chunks: vec![
5918                    ChunkInsert {
5919                        id: "chunk-a".to_owned(),
5920                        node_logical_id: "node-1".to_owned(),
5921                        text_content: "intro".to_owned(),
5922                        byte_start: None,
5923                        byte_end: None,
5924                        content_hash: None,
5925                    },
5926                    ChunkInsert {
5927                        id: "chunk-b".to_owned(),
5928                        node_logical_id: "node-1".to_owned(),
5929                        text_content: "body".to_owned(),
5930                        byte_start: None,
5931                        byte_end: None,
5932                        content_hash: None,
5933                    },
5934                    ChunkInsert {
5935                        id: "chunk-c".to_owned(),
5936                        node_logical_id: "node-1".to_owned(),
5937                        text_content: "conclusion".to_owned(),
5938                        byte_start: None,
5939                        byte_end: None,
5940                        content_hash: None,
5941                    },
5942                ],
5943                runs: vec![],
5944                steps: vec![],
5945                actions: vec![],
5946                optional_backfills: vec![],
5947                vec_inserts: vec![],
5948                operational_writes: vec![],
5949            })
5950            .expect("write");
5951
5952        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5953        let count: i64 = conn
5954            .query_row(
5955                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
5956                [],
5957                |row| row.get(0),
5958            )
5959            .expect("fts count");
5960
5961        assert_eq!(count, 3, "three chunks must produce three FTS rows");
5962    }
5963
5964    #[test]
5965    fn fts_resolves_mixed_fast_and_db_paths() {
5966        let db = NamedTempFile::new().expect("temporary db");
5967        let writer = WriterActor::start(
5968            db.path(),
5969            Arc::new(SchemaManager::new()),
5970            ProvenanceMode::Warn,
5971            Arc::new(TelemetryCounters::default()),
5972        )
5973        .expect("writer");
5974
5975        // Seed pre-existing node
5976        writer
5977            .submit(WriteRequest {
5978                label: "seed".to_owned(),
5979                nodes: vec![NodeInsert {
5980                    row_id: "row-existing".to_owned(),
5981                    logical_id: "existing-node".to_owned(),
5982                    kind: "Archive".to_owned(),
5983                    properties: "{}".to_owned(),
5984                    source_ref: Some("src-1".to_owned()),
5985                    upsert: false,
5986                    chunk_policy: ChunkPolicy::Preserve,
5987                    content_ref: None,
5988                }],
5989                node_retires: vec![],
5990                edges: vec![],
5991                edge_retires: vec![],
5992                chunks: vec![],
5993                runs: vec![],
5994                steps: vec![],
5995                actions: vec![],
5996                optional_backfills: vec![],
5997                vec_inserts: vec![],
5998                operational_writes: vec![],
5999            })
6000            .expect("seed");
6001
6002        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
6003        writer
6004            .submit(WriteRequest {
6005                label: "mixed".to_owned(),
6006                nodes: vec![NodeInsert {
6007                    row_id: "row-new".to_owned(),
6008                    logical_id: "new-node".to_owned(),
6009                    kind: "Inbox".to_owned(),
6010                    properties: "{}".to_owned(),
6011                    source_ref: Some("src-2".to_owned()),
6012                    upsert: false,
6013                    chunk_policy: ChunkPolicy::Preserve,
6014                    content_ref: None,
6015                }],
6016                node_retires: vec![],
6017                edges: vec![],
6018                edge_retires: vec![],
6019                chunks: vec![
6020                    ChunkInsert {
6021                        id: "chunk-fast".to_owned(),
6022                        node_logical_id: "new-node".to_owned(),
6023                        text_content: "new content".to_owned(),
6024                        byte_start: None,
6025                        byte_end: None,
6026                        content_hash: None,
6027                    },
6028                    ChunkInsert {
6029                        id: "chunk-db".to_owned(),
6030                        node_logical_id: "existing-node".to_owned(),
6031                        text_content: "archive content".to_owned(),
6032                        byte_start: None,
6033                        byte_end: None,
6034                        content_hash: None,
6035                    },
6036                ],
6037                runs: vec![],
6038                steps: vec![],
6039                actions: vec![],
6040                optional_backfills: vec![],
6041                vec_inserts: vec![],
6042                operational_writes: vec![],
6043            })
6044            .expect("mixed write");
6045
6046        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6047        let fast_kind: String = conn
6048            .query_row(
6049                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6050                [],
6051                |row| row.get(0),
6052            )
6053            .expect("fast path fts row");
6054        let db_kind: String = conn
6055            .query_row(
6056                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6057                [],
6058                |row| row.get(0),
6059            )
6060            .expect("db path fts row");
6061
6062        assert_eq!(fast_kind, "Inbox");
6063        assert_eq!(db_kind, "Archive");
6064    }
6065
6066    #[test]
6067    fn prepare_write_rejects_empty_chunk_text() {
6068        let db = NamedTempFile::new().expect("temporary db");
6069        let writer = WriterActor::start(
6070            db.path(),
6071            Arc::new(SchemaManager::new()),
6072            ProvenanceMode::Warn,
6073            Arc::new(TelemetryCounters::default()),
6074        )
6075        .expect("writer");
6076
6077        let result = writer.submit(WriteRequest {
6078            label: "test".to_owned(),
6079            nodes: vec![NodeInsert {
6080                row_id: "row-1".to_owned(),
6081                logical_id: "node-1".to_owned(),
6082                kind: "Note".to_owned(),
6083                properties: "{}".to_owned(),
6084                source_ref: None,
6085                upsert: false,
6086                chunk_policy: ChunkPolicy::Preserve,
6087                content_ref: None,
6088            }],
6089            node_retires: vec![],
6090            edges: vec![],
6091            edge_retires: vec![],
6092            chunks: vec![ChunkInsert {
6093                id: "chunk-1".to_owned(),
6094                node_logical_id: "node-1".to_owned(),
6095                text_content: String::new(),
6096                byte_start: None,
6097                byte_end: None,
6098                content_hash: None,
6099            }],
6100            runs: vec![],
6101            steps: vec![],
6102            actions: vec![],
6103            optional_backfills: vec![],
6104            vec_inserts: vec![],
6105            operational_writes: vec![],
6106        });
6107
6108        assert!(
6109            matches!(result, Err(EngineError::InvalidWrite(_))),
6110            "empty text_content must be rejected"
6111        );
6112    }
6113
6114    #[test]
6115    fn receipt_reports_zero_backfills_when_none_submitted() {
6116        let db = NamedTempFile::new().expect("temporary db");
6117        let writer = WriterActor::start(
6118            db.path(),
6119            Arc::new(SchemaManager::new()),
6120            ProvenanceMode::Warn,
6121            Arc::new(TelemetryCounters::default()),
6122        )
6123        .expect("writer");
6124
6125        let receipt = writer
6126            .submit(WriteRequest {
6127                label: "test".to_owned(),
6128                nodes: vec![NodeInsert {
6129                    row_id: "row-1".to_owned(),
6130                    logical_id: "node-1".to_owned(),
6131                    kind: "Note".to_owned(),
6132                    properties: "{}".to_owned(),
6133                    source_ref: Some("src-1".to_owned()),
6134                    upsert: false,
6135                    chunk_policy: ChunkPolicy::Preserve,
6136                    content_ref: None,
6137                }],
6138                node_retires: vec![],
6139                edges: vec![],
6140                edge_retires: vec![],
6141                chunks: vec![],
6142                runs: vec![],
6143                steps: vec![],
6144                actions: vec![],
6145                optional_backfills: vec![],
6146                vec_inserts: vec![],
6147                operational_writes: vec![],
6148            })
6149            .expect("write");
6150
6151        assert_eq!(receipt.optional_backfill_count, 0);
6152    }
6153
6154    #[test]
6155    fn receipt_reports_correct_backfill_count() {
6156        let db = NamedTempFile::new().expect("temporary db");
6157        let writer = WriterActor::start(
6158            db.path(),
6159            Arc::new(SchemaManager::new()),
6160            ProvenanceMode::Warn,
6161            Arc::new(TelemetryCounters::default()),
6162        )
6163        .expect("writer");
6164
6165        let receipt = writer
6166            .submit(WriteRequest {
6167                label: "test".to_owned(),
6168                nodes: vec![NodeInsert {
6169                    row_id: "row-1".to_owned(),
6170                    logical_id: "node-1".to_owned(),
6171                    kind: "Note".to_owned(),
6172                    properties: "{}".to_owned(),
6173                    source_ref: Some("src-1".to_owned()),
6174                    upsert: false,
6175                    chunk_policy: ChunkPolicy::Preserve,
6176                    content_ref: None,
6177                }],
6178                node_retires: vec![],
6179                edges: vec![],
6180                edge_retires: vec![],
6181                chunks: vec![],
6182                runs: vec![],
6183                steps: vec![],
6184                actions: vec![],
6185                optional_backfills: vec![
6186                    OptionalProjectionTask {
6187                        target: ProjectionTarget::Fts,
6188                        payload: "p1".to_owned(),
6189                    },
6190                    OptionalProjectionTask {
6191                        target: ProjectionTarget::Vec,
6192                        payload: "p2".to_owned(),
6193                    },
6194                    OptionalProjectionTask {
6195                        target: ProjectionTarget::All,
6196                        payload: "p3".to_owned(),
6197                    },
6198                ],
6199                vec_inserts: vec![],
6200                operational_writes: vec![],
6201            })
6202            .expect("write");
6203
6204        assert_eq!(receipt.optional_backfill_count, 3);
6205    }
6206
6207    #[test]
6208    fn backfill_tasks_are_not_executed_during_write() {
6209        let db = NamedTempFile::new().expect("temporary db");
6210        let writer = WriterActor::start(
6211            db.path(),
6212            Arc::new(SchemaManager::new()),
6213            ProvenanceMode::Warn,
6214            Arc::new(TelemetryCounters::default()),
6215        )
6216        .expect("writer");
6217
6218        // Write a node + chunk. Submit a backfill task targeting FTS.
6219        // The write path must not create any extra FTS rows beyond the required one.
6220        writer
6221            .submit(WriteRequest {
6222                label: "test".to_owned(),
6223                nodes: vec![NodeInsert {
6224                    row_id: "row-1".to_owned(),
6225                    logical_id: "node-1".to_owned(),
6226                    kind: "Note".to_owned(),
6227                    properties: "{}".to_owned(),
6228                    source_ref: Some("src-1".to_owned()),
6229                    upsert: false,
6230                    chunk_policy: ChunkPolicy::Preserve,
6231                    content_ref: None,
6232                }],
6233                node_retires: vec![],
6234                edges: vec![],
6235                edge_retires: vec![],
6236                chunks: vec![ChunkInsert {
6237                    id: "chunk-1".to_owned(),
6238                    node_logical_id: "node-1".to_owned(),
6239                    text_content: "required text".to_owned(),
6240                    byte_start: None,
6241                    byte_end: None,
6242                    content_hash: None,
6243                }],
6244                runs: vec![],
6245                steps: vec![],
6246                actions: vec![],
6247                optional_backfills: vec![OptionalProjectionTask {
6248                    target: ProjectionTarget::Fts,
6249                    payload: "backfill-payload".to_owned(),
6250                }],
6251                vec_inserts: vec![],
6252                operational_writes: vec![],
6253            })
6254            .expect("write");
6255
6256        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6257        let count: i64 = conn
6258            .query_row(
6259                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6260                [],
6261                |row| row.get(0),
6262            )
6263            .expect("fts count");
6264
6265        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6266    }
6267
6268    #[test]
6269    fn fts_row_uses_new_kind_after_node_replace() {
6270        let db = NamedTempFile::new().expect("temporary db");
6271        let writer = WriterActor::start(
6272            db.path(),
6273            Arc::new(SchemaManager::new()),
6274            ProvenanceMode::Warn,
6275            Arc::new(TelemetryCounters::default()),
6276        )
6277        .expect("writer");
6278
6279        // Write original node as "Note"
6280        writer
6281            .submit(WriteRequest {
6282                label: "v1".to_owned(),
6283                nodes: vec![NodeInsert {
6284                    row_id: "row-1".to_owned(),
6285                    logical_id: "node-1".to_owned(),
6286                    kind: "Note".to_owned(),
6287                    properties: "{}".to_owned(),
6288                    source_ref: Some("src-1".to_owned()),
6289                    upsert: false,
6290                    chunk_policy: ChunkPolicy::Preserve,
6291                    content_ref: None,
6292                }],
6293                node_retires: vec![],
6294                edges: vec![],
6295                edge_retires: vec![],
6296                chunks: vec![ChunkInsert {
6297                    id: "chunk-v1".to_owned(),
6298                    node_logical_id: "node-1".to_owned(),
6299                    text_content: "original".to_owned(),
6300                    byte_start: None,
6301                    byte_end: None,
6302                    content_hash: None,
6303                }],
6304                runs: vec![],
6305                steps: vec![],
6306                actions: vec![],
6307                optional_backfills: vec![],
6308                vec_inserts: vec![],
6309                operational_writes: vec![],
6310            })
6311            .expect("v1 write");
6312
6313        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
6314        writer
6315            .submit(WriteRequest {
6316                label: "v2".to_owned(),
6317                nodes: vec![NodeInsert {
6318                    row_id: "row-2".to_owned(),
6319                    logical_id: "node-1".to_owned(),
6320                    kind: "Meeting".to_owned(),
6321                    properties: "{}".to_owned(),
6322                    source_ref: Some("src-2".to_owned()),
6323                    upsert: true,
6324                    chunk_policy: ChunkPolicy::Replace,
6325                    content_ref: None,
6326                }],
6327                node_retires: vec![],
6328                edges: vec![],
6329                edge_retires: vec![],
6330                chunks: vec![ChunkInsert {
6331                    id: "chunk-v2".to_owned(),
6332                    node_logical_id: "node-1".to_owned(),
6333                    text_content: "updated".to_owned(),
6334                    byte_start: None,
6335                    byte_end: None,
6336                    content_hash: None,
6337                }],
6338                runs: vec![],
6339                steps: vec![],
6340                actions: vec![],
6341                optional_backfills: vec![],
6342                vec_inserts: vec![],
6343                operational_writes: vec![],
6344            })
6345            .expect("v2 write");
6346
6347        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6348
6349        // Old FTS row must be gone
6350        let old_count: i64 = conn
6351            .query_row(
6352                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6353                [],
6354                |row| row.get(0),
6355            )
6356            .expect("old fts count");
6357        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6358
6359        // New FTS row must use the new kind
6360        let new_kind: String = conn
6361            .query_row(
6362                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6363                [],
6364                |row| row.get(0),
6365            )
6366            .expect("new fts row");
6367        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6368    }
6369
6370    // --- Item 3: VecInsert tests ---
6371
6372    #[test]
6373    fn vec_insert_empty_chunk_id_is_rejected() {
6374        let db = NamedTempFile::new().expect("temporary db");
6375        let writer = WriterActor::start(
6376            db.path(),
6377            Arc::new(SchemaManager::new()),
6378            ProvenanceMode::Warn,
6379            Arc::new(TelemetryCounters::default()),
6380        )
6381        .expect("writer");
6382        let result = writer.submit(WriteRequest {
6383            label: "vec-test".to_owned(),
6384            nodes: vec![],
6385            node_retires: vec![],
6386            edges: vec![],
6387            edge_retires: vec![],
6388            chunks: vec![],
6389            runs: vec![],
6390            steps: vec![],
6391            actions: vec![],
6392            optional_backfills: vec![],
6393            vec_inserts: vec![VecInsert {
6394                chunk_id: String::new(),
6395                embedding: vec![0.1, 0.2, 0.3],
6396            }],
6397            operational_writes: vec![],
6398        });
6399        assert!(
6400            matches!(result, Err(EngineError::InvalidWrite(_))),
6401            "empty chunk_id in VecInsert must be rejected"
6402        );
6403    }
6404
6405    #[test]
6406    fn vec_insert_empty_embedding_is_rejected() {
6407        let db = NamedTempFile::new().expect("temporary db");
6408        let writer = WriterActor::start(
6409            db.path(),
6410            Arc::new(SchemaManager::new()),
6411            ProvenanceMode::Warn,
6412            Arc::new(TelemetryCounters::default()),
6413        )
6414        .expect("writer");
6415        let result = writer.submit(WriteRequest {
6416            label: "vec-test".to_owned(),
6417            nodes: vec![],
6418            node_retires: vec![],
6419            edges: vec![],
6420            edge_retires: vec![],
6421            chunks: vec![],
6422            runs: vec![],
6423            steps: vec![],
6424            actions: vec![],
6425            optional_backfills: vec![],
6426            vec_inserts: vec![VecInsert {
6427                chunk_id: "chunk-1".to_owned(),
6428                embedding: vec![],
6429            }],
6430            operational_writes: vec![],
6431        });
6432        assert!(
6433            matches!(result, Err(EngineError::InvalidWrite(_))),
6434            "empty embedding in VecInsert must be rejected"
6435        );
6436    }
6437
6438    #[test]
6439    fn vec_insert_noop_without_feature() {
6440        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6441        // (no error) but not write to any vec table.
6442        let db = NamedTempFile::new().expect("temporary db");
6443        let writer = WriterActor::start(
6444            db.path(),
6445            Arc::new(SchemaManager::new()),
6446            ProvenanceMode::Warn,
6447            Arc::new(TelemetryCounters::default()),
6448        )
6449        .expect("writer");
6450        let result = writer.submit(WriteRequest {
6451            label: "vec-noop".to_owned(),
6452            nodes: vec![],
6453            node_retires: vec![],
6454            edges: vec![],
6455            edge_retires: vec![],
6456            chunks: vec![],
6457            runs: vec![],
6458            steps: vec![],
6459            actions: vec![],
6460            optional_backfills: vec![],
6461            vec_inserts: vec![VecInsert {
6462                chunk_id: "chunk-noop".to_owned(),
6463                embedding: vec![1.0, 2.0, 3.0],
6464            }],
6465            operational_writes: vec![],
6466        });
6467        #[cfg(not(feature = "sqlite-vec"))]
6468        result.expect("noop VecInsert without feature must succeed");
6469        // The result variable is used above; silence unused warning for cfg-on path.
6470        #[cfg(feature = "sqlite-vec")]
6471        let _ = result;
6472    }
6473
6474    #[cfg(feature = "sqlite-vec")]
6475    #[test]
6476    fn node_retire_preserves_vec_rows_for_later_restore() {
6477        use crate::sqlite::open_connection_with_vec;
6478
6479        let db = NamedTempFile::new().expect("temporary db");
6480        let schema_manager = Arc::new(SchemaManager::new());
6481
6482        {
6483            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6484            schema_manager.bootstrap(&conn).expect("bootstrap");
6485            schema_manager
6486                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6487                .expect("ensure profile");
6488        }
6489
6490        let writer = WriterActor::start(
6491            db.path(),
6492            Arc::clone(&schema_manager),
6493            ProvenanceMode::Warn,
6494            Arc::new(TelemetryCounters::default()),
6495        )
6496        .expect("writer");
6497
6498        // Insert node + chunk + vec row
6499        writer
6500            .submit(WriteRequest {
6501                label: "setup".to_owned(),
6502                nodes: vec![NodeInsert {
6503                    row_id: "row-retire-vec".to_owned(),
6504                    logical_id: "node-retire-vec".to_owned(),
6505                    kind: "Doc".to_owned(),
6506                    properties: "{}".to_owned(),
6507                    source_ref: Some("src".to_owned()),
6508                    upsert: false,
6509                    chunk_policy: ChunkPolicy::Preserve,
6510                    content_ref: None,
6511                }],
6512                node_retires: vec![],
6513                edges: vec![],
6514                edge_retires: vec![],
6515                chunks: vec![ChunkInsert {
6516                    id: "chunk-retire-vec".to_owned(),
6517                    node_logical_id: "node-retire-vec".to_owned(),
6518                    text_content: "text".to_owned(),
6519                    byte_start: None,
6520                    byte_end: None,
6521                    content_hash: None,
6522                }],
6523                runs: vec![],
6524                steps: vec![],
6525                actions: vec![],
6526                optional_backfills: vec![],
6527                vec_inserts: vec![VecInsert {
6528                    chunk_id: "chunk-retire-vec".to_owned(),
6529                    embedding: vec![0.1, 0.2, 0.3],
6530                }],
6531                operational_writes: vec![],
6532            })
6533            .expect("setup write");
6534
6535        // Retire the node
6536        writer
6537            .submit(WriteRequest {
6538                label: "retire".to_owned(),
6539                nodes: vec![],
6540                node_retires: vec![NodeRetire {
6541                    logical_id: "node-retire-vec".to_owned(),
6542                    source_ref: Some("src".to_owned()),
6543                }],
6544                edges: vec![],
6545                edge_retires: vec![],
6546                chunks: vec![],
6547                runs: vec![],
6548                steps: vec![],
6549                actions: vec![],
6550                optional_backfills: vec![],
6551                vec_inserts: vec![],
6552                operational_writes: vec![],
6553            })
6554            .expect("retire write");
6555
6556        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6557        let count: i64 = conn
6558            .query_row(
6559                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-retire-vec'",
6560                [],
6561                |row| row.get(0),
6562            )
6563            .expect("count");
6564        assert_eq!(
6565            count, 1,
6566            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6567        );
6568    }
6569
6570    #[cfg(feature = "sqlite-vec")]
6571    #[test]
6572    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6573        use crate::sqlite::open_connection_with_vec;
6574
6575        let db = NamedTempFile::new().expect("temporary db");
6576        let schema_manager = Arc::new(SchemaManager::new());
6577
6578        {
6579            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6580            schema_manager.bootstrap(&conn).expect("bootstrap");
6581            schema_manager
6582                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6583                .expect("ensure profile");
6584        }
6585
6586        let writer = WriterActor::start(
6587            db.path(),
6588            Arc::clone(&schema_manager),
6589            ProvenanceMode::Warn,
6590            Arc::new(TelemetryCounters::default()),
6591        )
6592        .expect("writer");
6593
6594        // Insert node + chunk-A + vec-A
6595        writer
6596            .submit(WriteRequest {
6597                label: "v1".to_owned(),
6598                nodes: vec![NodeInsert {
6599                    row_id: "row-replace-v1".to_owned(),
6600                    logical_id: "node-replace-vec".to_owned(),
6601                    kind: "Doc".to_owned(),
6602                    properties: "{}".to_owned(),
6603                    source_ref: Some("src".to_owned()),
6604                    upsert: false,
6605                    chunk_policy: ChunkPolicy::Preserve,
6606                    content_ref: None,
6607                }],
6608                node_retires: vec![],
6609                edges: vec![],
6610                edge_retires: vec![],
6611                chunks: vec![ChunkInsert {
6612                    id: "chunk-replace-A".to_owned(),
6613                    node_logical_id: "node-replace-vec".to_owned(),
6614                    text_content: "version one".to_owned(),
6615                    byte_start: None,
6616                    byte_end: None,
6617                    content_hash: None,
6618                }],
6619                runs: vec![],
6620                steps: vec![],
6621                actions: vec![],
6622                optional_backfills: vec![],
6623                vec_inserts: vec![VecInsert {
6624                    chunk_id: "chunk-replace-A".to_owned(),
6625                    embedding: vec![0.1, 0.2, 0.3],
6626                }],
6627                operational_writes: vec![],
6628            })
6629            .expect("v1 write");
6630
6631        // Upsert with Replace + chunk-B + vec-B
6632        writer
6633            .submit(WriteRequest {
6634                label: "v2".to_owned(),
6635                nodes: vec![NodeInsert {
6636                    row_id: "row-replace-v2".to_owned(),
6637                    logical_id: "node-replace-vec".to_owned(),
6638                    kind: "Doc".to_owned(),
6639                    properties: "{}".to_owned(),
6640                    source_ref: Some("src".to_owned()),
6641                    upsert: true,
6642                    chunk_policy: ChunkPolicy::Replace,
6643                    content_ref: None,
6644                }],
6645                node_retires: vec![],
6646                edges: vec![],
6647                edge_retires: vec![],
6648                chunks: vec![ChunkInsert {
6649                    id: "chunk-replace-B".to_owned(),
6650                    node_logical_id: "node-replace-vec".to_owned(),
6651                    text_content: "version two".to_owned(),
6652                    byte_start: None,
6653                    byte_end: None,
6654                    content_hash: None,
6655                }],
6656                runs: vec![],
6657                steps: vec![],
6658                actions: vec![],
6659                optional_backfills: vec![],
6660                vec_inserts: vec![VecInsert {
6661                    chunk_id: "chunk-replace-B".to_owned(),
6662                    embedding: vec![0.4, 0.5, 0.6],
6663                }],
6664                operational_writes: vec![],
6665            })
6666            .expect("v2 write");
6667
6668        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6669        let count_a: i64 = conn
6670            .query_row(
6671                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-A'",
6672                [],
6673                |row| row.get(0),
6674            )
6675            .expect("count A");
6676        let count_b: i64 = conn
6677            .query_row(
6678                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-replace-B'",
6679                [],
6680                |row| row.get(0),
6681            )
6682            .expect("count B");
6683        assert_eq!(
6684            count_a, 0,
6685            "old vec row (chunk-A) must be deleted on Replace"
6686        );
6687        assert_eq!(
6688            count_b, 1,
6689            "new vec row (chunk-B) must be present after Replace"
6690        );
6691    }
6692
6693    #[cfg(feature = "sqlite-vec")]
6694    #[test]
6695    fn vec_insert_is_persisted_when_feature_enabled() {
6696        use crate::sqlite::open_connection_with_vec;
6697
6698        let db = NamedTempFile::new().expect("temporary db");
6699        let schema_manager = Arc::new(SchemaManager::new());
6700
6701        // Open a vec-capable connection and bootstrap + ensure profile
6702        {
6703            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6704            schema_manager.bootstrap(&conn).expect("bootstrap");
6705            schema_manager
6706                .ensure_vector_profile(&conn, "default", "vec_nodes_active", 3)
6707                .expect("ensure profile");
6708        }
6709
6710        let writer = WriterActor::start(
6711            db.path(),
6712            Arc::clone(&schema_manager),
6713            ProvenanceMode::Warn,
6714            Arc::new(TelemetryCounters::default()),
6715        )
6716        .expect("writer");
6717
6718        writer
6719            .submit(WriteRequest {
6720                label: "vec-insert".to_owned(),
6721                nodes: vec![],
6722                node_retires: vec![],
6723                edges: vec![],
6724                edge_retires: vec![],
6725                chunks: vec![],
6726                runs: vec![],
6727                steps: vec![],
6728                actions: vec![],
6729                optional_backfills: vec![],
6730                vec_inserts: vec![VecInsert {
6731                    chunk_id: "chunk-vec".to_owned(),
6732                    embedding: vec![0.1, 0.2, 0.3],
6733                }],
6734                operational_writes: vec![],
6735            })
6736            .expect("vec insert write");
6737
6738        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6739        let count: i64 = conn
6740            .query_row(
6741                "SELECT count(*) FROM vec_nodes_active WHERE chunk_id = 'chunk-vec'",
6742                [],
6743                |row| row.get(0),
6744            )
6745            .expect("count");
6746        assert_eq!(count, 1, "VecInsert must persist a row in vec_nodes_active");
6747    }
6748
6749    // --- WriteRequest size validation tests ---
6750
6751    #[test]
6752    fn write_request_exceeding_node_limit_is_rejected() {
6753        let nodes: Vec<NodeInsert> = (0..10_001)
6754            .map(|i| NodeInsert {
6755                row_id: format!("row-{i}"),
6756                logical_id: format!("lg-{i}"),
6757                kind: "Note".to_owned(),
6758                properties: "{}".to_owned(),
6759                source_ref: None,
6760                upsert: false,
6761                chunk_policy: ChunkPolicy::Preserve,
6762                content_ref: None,
6763            })
6764            .collect();
6765
6766        let request = WriteRequest {
6767            label: "too-many-nodes".to_owned(),
6768            nodes,
6769            node_retires: vec![],
6770            edges: vec![],
6771            edge_retires: vec![],
6772            chunks: vec![],
6773            runs: vec![],
6774            steps: vec![],
6775            actions: vec![],
6776            optional_backfills: vec![],
6777            vec_inserts: vec![],
6778            operational_writes: vec![],
6779        };
6780
6781        let result = prepare_write(request, ProvenanceMode::Warn)
6782            .map(|_| ())
6783            .map_err(|e| format!("{e}"));
6784        assert!(
6785            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
6786            "exceeding node limit must return InvalidWrite: got {result:?}"
6787        );
6788    }
6789
6790    #[test]
6791    fn write_request_exceeding_total_limit_is_rejected() {
6792        // Spread items across fields to exceed 100_000 total
6793        // without exceeding any single per-field limit.
6794        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
6795        let request = WriteRequest {
6796            label: "too-many-total".to_owned(),
6797            nodes: (0..10_000)
6798                .map(|i| NodeInsert {
6799                    row_id: format!("row-{i}"),
6800                    logical_id: format!("lg-{i}"),
6801                    kind: "Note".to_owned(),
6802                    properties: "{}".to_owned(),
6803                    source_ref: None,
6804                    upsert: false,
6805                    chunk_policy: ChunkPolicy::Preserve,
6806                    content_ref: None,
6807                })
6808                .collect(),
6809            node_retires: vec![],
6810            edges: (0..10_000)
6811                .map(|i| EdgeInsert {
6812                    row_id: format!("edge-row-{i}"),
6813                    logical_id: format!("edge-lg-{i}"),
6814                    kind: "link".to_owned(),
6815                    source_logical_id: format!("lg-{i}"),
6816                    target_logical_id: format!("lg-{}", i + 1),
6817                    properties: "{}".to_owned(),
6818                    source_ref: None,
6819                    upsert: false,
6820                })
6821                .collect(),
6822            edge_retires: vec![],
6823            chunks: (0..50_000)
6824                .map(|i| ChunkInsert {
6825                    id: format!("chunk-{i}"),
6826                    node_logical_id: "lg-0".to_owned(),
6827                    text_content: "text".to_owned(),
6828                    byte_start: None,
6829                    byte_end: None,
6830                    content_hash: None,
6831                })
6832                .collect(),
6833            runs: vec![],
6834            steps: vec![],
6835            actions: vec![],
6836            optional_backfills: vec![],
6837            vec_inserts: (0..20_001)
6838                .map(|i| VecInsert {
6839                    chunk_id: format!("vec-chunk-{i}"),
6840                    embedding: vec![0.1],
6841                })
6842                .collect(),
6843            operational_writes: (0..10_000)
6844                .map(|i| OperationalWrite::Append {
6845                    collection: format!("col-{i}"),
6846                    record_key: format!("key-{i}"),
6847                    payload_json: "{}".to_owned(),
6848                    source_ref: None,
6849                })
6850                .collect(),
6851        };
6852
6853        let result = prepare_write(request, ProvenanceMode::Warn)
6854            .map(|_| ())
6855            .map_err(|e| format!("{e}"));
6856        assert!(
6857            matches!(result, Err(ref msg) if msg.contains("too many total items")),
6858            "exceeding total item limit must return InvalidWrite: got {result:?}"
6859        );
6860    }
6861
6862    #[test]
6863    fn write_request_within_limits_succeeds() {
6864        let db = NamedTempFile::new().expect("temporary db");
6865        let writer = WriterActor::start(
6866            db.path(),
6867            Arc::new(SchemaManager::new()),
6868            ProvenanceMode::Warn,
6869            Arc::new(TelemetryCounters::default()),
6870        )
6871        .expect("writer");
6872
6873        let result = writer.submit(WriteRequest {
6874            label: "within-limits".to_owned(),
6875            nodes: vec![NodeInsert {
6876                row_id: "row-1".to_owned(),
6877                logical_id: "lg-1".to_owned(),
6878                kind: "Note".to_owned(),
6879                properties: "{}".to_owned(),
6880                source_ref: None,
6881                upsert: false,
6882                chunk_policy: ChunkPolicy::Preserve,
6883                content_ref: None,
6884            }],
6885            node_retires: vec![],
6886            edges: vec![],
6887            edge_retires: vec![],
6888            chunks: vec![],
6889            runs: vec![],
6890            steps: vec![],
6891            actions: vec![],
6892            optional_backfills: vec![],
6893            vec_inserts: vec![],
6894            operational_writes: vec![],
6895        });
6896
6897        assert!(
6898            result.is_ok(),
6899            "write request within limits must succeed: got {result:?}"
6900        );
6901    }
6902
6903    #[test]
6904    fn property_fts_rows_created_on_node_insert() {
6905        let db = NamedTempFile::new().expect("temporary db");
6906        // Bootstrap and register a property schema before starting the writer.
6907        let schema = Arc::new(SchemaManager::new());
6908        {
6909            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6910            schema.bootstrap(&conn).expect("bootstrap");
6911            conn.execute(
6912                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6913                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
6914                [],
6915            )
6916            .expect("register schema");
6917        }
6918        let writer = WriterActor::start(
6919            db.path(),
6920            Arc::clone(&schema),
6921            ProvenanceMode::Warn,
6922            Arc::new(TelemetryCounters::default()),
6923        )
6924        .expect("writer");
6925
6926        writer
6927            .submit(WriteRequest {
6928                label: "goal-insert".to_owned(),
6929                nodes: vec![NodeInsert {
6930                    row_id: "row-1".to_owned(),
6931                    logical_id: "goal-1".to_owned(),
6932                    kind: "Goal".to_owned(),
6933                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
6934                        .to_owned(),
6935                    source_ref: Some("src-1".to_owned()),
6936                    upsert: false,
6937                    chunk_policy: ChunkPolicy::Preserve,
6938                    content_ref: None,
6939                }],
6940                node_retires: vec![],
6941                edges: vec![],
6942                edge_retires: vec![],
6943                chunks: vec![],
6944                runs: vec![],
6945                steps: vec![],
6946                actions: vec![],
6947                optional_backfills: vec![],
6948                vec_inserts: vec![],
6949                operational_writes: vec![],
6950            })
6951            .expect("write");
6952
6953        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6954        let text: String = conn
6955            .query_row(
6956                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
6957                [],
6958                |row| row.get(0),
6959            )
6960            .expect("property FTS row must exist");
6961        assert_eq!(text, "Ship v2 Launch the redesign");
6962    }
6963
6964    #[test]
6965    fn property_fts_rows_replaced_on_upsert() {
6966        let db = NamedTempFile::new().expect("temporary db");
6967        let schema = Arc::new(SchemaManager::new());
6968        {
6969            let conn = rusqlite::Connection::open(db.path()).expect("conn");
6970            schema.bootstrap(&conn).expect("bootstrap");
6971            conn.execute(
6972                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
6973                 VALUES ('Goal', '[\"$.name\"]', ' ')",
6974                [],
6975            )
6976            .expect("register schema");
6977        }
6978        let writer = WriterActor::start(
6979            db.path(),
6980            Arc::clone(&schema),
6981            ProvenanceMode::Warn,
6982            Arc::new(TelemetryCounters::default()),
6983        )
6984        .expect("writer");
6985
6986        // Initial insert.
6987        writer
6988            .submit(WriteRequest {
6989                label: "insert".to_owned(),
6990                nodes: vec![NodeInsert {
6991                    row_id: "row-1".to_owned(),
6992                    logical_id: "goal-1".to_owned(),
6993                    kind: "Goal".to_owned(),
6994                    properties: r#"{"name":"Alpha"}"#.to_owned(),
6995                    source_ref: Some("src-1".to_owned()),
6996                    upsert: false,
6997                    chunk_policy: ChunkPolicy::Preserve,
6998                    content_ref: None,
6999                }],
7000                node_retires: vec![],
7001                edges: vec![],
7002                edge_retires: vec![],
7003                chunks: vec![],
7004                runs: vec![],
7005                steps: vec![],
7006                actions: vec![],
7007                optional_backfills: vec![],
7008                vec_inserts: vec![],
7009                operational_writes: vec![],
7010            })
7011            .expect("insert");
7012
7013        // Upsert with changed property.
7014        writer
7015            .submit(WriteRequest {
7016                label: "upsert".to_owned(),
7017                nodes: vec![NodeInsert {
7018                    row_id: "row-2".to_owned(),
7019                    logical_id: "goal-1".to_owned(),
7020                    kind: "Goal".to_owned(),
7021                    properties: r#"{"name":"Beta"}"#.to_owned(),
7022                    source_ref: Some("src-2".to_owned()),
7023                    upsert: true,
7024                    chunk_policy: ChunkPolicy::Preserve,
7025                    content_ref: None,
7026                }],
7027                node_retires: vec![],
7028                edges: vec![],
7029                edge_retires: vec![],
7030                chunks: vec![],
7031                runs: vec![],
7032                steps: vec![],
7033                actions: vec![],
7034                optional_backfills: vec![],
7035                vec_inserts: vec![],
7036                operational_writes: vec![],
7037            })
7038            .expect("upsert");
7039
7040        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7041        let count: i64 = conn
7042            .query_row(
7043                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7044                [],
7045                |row| row.get(0),
7046            )
7047            .expect("count");
7048        assert_eq!(
7049            count, 1,
7050            "must have exactly one property FTS row after upsert"
7051        );
7052
7053        let text: String = conn
7054            .query_row(
7055                "SELECT text_content FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7056                [],
7057                |row| row.get(0),
7058            )
7059            .expect("text");
7060        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7061    }
7062
7063    #[test]
7064    fn property_fts_rows_deleted_on_retire() {
7065        let db = NamedTempFile::new().expect("temporary db");
7066        let schema = Arc::new(SchemaManager::new());
7067        {
7068            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7069            schema.bootstrap(&conn).expect("bootstrap");
7070            conn.execute(
7071                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7072                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7073                [],
7074            )
7075            .expect("register schema");
7076        }
7077        let writer = WriterActor::start(
7078            db.path(),
7079            Arc::clone(&schema),
7080            ProvenanceMode::Warn,
7081            Arc::new(TelemetryCounters::default()),
7082        )
7083        .expect("writer");
7084
7085        // Insert a node.
7086        writer
7087            .submit(WriteRequest {
7088                label: "insert".to_owned(),
7089                nodes: vec![NodeInsert {
7090                    row_id: "row-1".to_owned(),
7091                    logical_id: "goal-1".to_owned(),
7092                    kind: "Goal".to_owned(),
7093                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7094                    source_ref: Some("src-1".to_owned()),
7095                    upsert: false,
7096                    chunk_policy: ChunkPolicy::Preserve,
7097                    content_ref: None,
7098                }],
7099                node_retires: vec![],
7100                edges: vec![],
7101                edge_retires: vec![],
7102                chunks: vec![],
7103                runs: vec![],
7104                steps: vec![],
7105                actions: vec![],
7106                optional_backfills: vec![],
7107                vec_inserts: vec![],
7108                operational_writes: vec![],
7109            })
7110            .expect("insert");
7111
7112        // Retire the node.
7113        writer
7114            .submit(WriteRequest {
7115                label: "retire".to_owned(),
7116                nodes: vec![],
7117                node_retires: vec![NodeRetire {
7118                    logical_id: "goal-1".to_owned(),
7119                    source_ref: Some("forget-1".to_owned()),
7120                }],
7121                edges: vec![],
7122                edge_retires: vec![],
7123                chunks: vec![],
7124                runs: vec![],
7125                steps: vec![],
7126                actions: vec![],
7127                optional_backfills: vec![],
7128                vec_inserts: vec![],
7129                operational_writes: vec![],
7130            })
7131            .expect("retire");
7132
7133        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7134        let count: i64 = conn
7135            .query_row(
7136                "SELECT count(*) FROM fts_node_properties WHERE node_logical_id = 'goal-1'",
7137                [],
7138                |row| row.get(0),
7139            )
7140            .expect("count");
7141        assert_eq!(count, 0, "property FTS row must be deleted on retire");
7142    }
7143
7144    #[test]
7145    fn no_property_fts_row_for_unregistered_kind() {
7146        let db = NamedTempFile::new().expect("temporary db");
7147        let schema = Arc::new(SchemaManager::new());
7148        {
7149            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7150            schema.bootstrap(&conn).expect("bootstrap");
7151            // No schema registered for "Note" kind.
7152        }
7153        let writer = WriterActor::start(
7154            db.path(),
7155            Arc::clone(&schema),
7156            ProvenanceMode::Warn,
7157            Arc::new(TelemetryCounters::default()),
7158        )
7159        .expect("writer");
7160
7161        writer
7162            .submit(WriteRequest {
7163                label: "insert".to_owned(),
7164                nodes: vec![NodeInsert {
7165                    row_id: "row-1".to_owned(),
7166                    logical_id: "note-1".to_owned(),
7167                    kind: "Note".to_owned(),
7168                    properties: r#"{"title":"hello"}"#.to_owned(),
7169                    source_ref: Some("src-1".to_owned()),
7170                    upsert: false,
7171                    chunk_policy: ChunkPolicy::Preserve,
7172                    content_ref: None,
7173                }],
7174                node_retires: vec![],
7175                edges: vec![],
7176                edge_retires: vec![],
7177                chunks: vec![],
7178                runs: vec![],
7179                steps: vec![],
7180                actions: vec![],
7181                optional_backfills: vec![],
7182                vec_inserts: vec![],
7183                operational_writes: vec![],
7184            })
7185            .expect("insert");
7186
7187        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7188        let count: i64 = conn
7189            .query_row("SELECT count(*) FROM fts_node_properties", [], |row| {
7190                row.get(0)
7191            })
7192            .expect("count");
7193        assert_eq!(count, 0, "no property FTS rows for unregistered kind");
7194    }
7195
7196    mod extract_json_path_tests {
7197        use super::super::extract_json_path;
7198        use serde_json::json;
7199
7200        #[test]
7201        fn string_value() {
7202            let v = json!({"name": "alice"});
7203            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7204        }
7205
7206        #[test]
7207        fn number_value() {
7208            let v = json!({"age": 42});
7209            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7210        }
7211
7212        #[test]
7213        fn bool_value() {
7214            let v = json!({"active": true});
7215            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7216        }
7217
7218        #[test]
7219        fn null_value() {
7220            let v = json!({"x": null});
7221            assert!(extract_json_path(&v, "$.x").is_empty());
7222        }
7223
7224        #[test]
7225        fn missing_path() {
7226            let v = json!({"name": "a"});
7227            assert!(extract_json_path(&v, "$.missing").is_empty());
7228        }
7229
7230        #[test]
7231        fn nested_path() {
7232            let v = json!({"address": {"city": "NYC"}});
7233            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7234        }
7235
7236        #[test]
7237        fn array_of_strings() {
7238            let v = json!({"tags": ["a", "b", "c"]});
7239            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7240        }
7241
7242        #[test]
7243        fn array_mixed_scalars() {
7244            let v = json!({"vals": ["x", 1, true]});
7245            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7246        }
7247
7248        #[test]
7249        fn array_only_objects_returns_empty() {
7250            let v = json!({"data": [{"k": "v"}]});
7251            assert!(extract_json_path(&v, "$.data").is_empty());
7252        }
7253
7254        #[test]
7255        fn array_mixed_objects_and_scalars() {
7256            let v = json!({"data": ["keep", {"skip": true}, "also"]});
7257            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7258        }
7259
7260        #[test]
7261        fn object_returns_empty() {
7262            let v = json!({"meta": {"k": "v"}});
7263            assert!(extract_json_path(&v, "$.meta").is_empty());
7264        }
7265
7266        #[test]
7267        fn no_prefix_returns_empty() {
7268            let v = json!({"name": "a"});
7269            assert!(extract_json_path(&v, "name").is_empty());
7270        }
7271    }
7272
7273    mod recursive_extraction_tests {
7274        use super::super::{
7275            LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7276            PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7277        };
7278        use serde_json::json;
7279
7280        fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7281            PropertyFtsSchema {
7282                paths,
7283                separator: " ".to_owned(),
7284                exclude_paths: Vec::new(),
7285            }
7286        }
7287
7288        fn schema_with_excludes(
7289            paths: Vec<PropertyPathEntry>,
7290            excludes: Vec<String>,
7291        ) -> PropertyFtsSchema {
7292            PropertyFtsSchema {
7293                paths,
7294                separator: " ".to_owned(),
7295                exclude_paths: excludes,
7296            }
7297        }
7298
7299        #[test]
7300        fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7301            let props = json!({"payload": {"b": "two", "a": "one"}});
7302            let (blob, positions, _stats) = extract_property_fts(
7303                &props,
7304                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7305            );
7306            let blob = blob.expect("blob emitted");
7307            let idx_one = blob.find("one").expect("contains 'one'");
7308            let idx_two = blob.find("two").expect("contains 'two'");
7309            assert!(
7310                idx_one < idx_two,
7311                "lex order: 'one' (key a) before 'two' (key b)"
7312            );
7313            assert_eq!(positions.len(), 2);
7314            assert_eq!(positions[0].leaf_path, "$.payload.a");
7315            assert_eq!(positions[1].leaf_path, "$.payload.b");
7316        }
7317
7318        #[test]
7319        fn recursive_extraction_walks_arrays_of_scalars() {
7320            let props = json!({"tags": ["red", "blue"]});
7321            let (_blob, positions, _stats) = extract_property_fts(
7322                &props,
7323                &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7324            );
7325            assert_eq!(positions.len(), 2);
7326            assert_eq!(positions[0].leaf_path, "$.tags[0]");
7327            assert_eq!(positions[1].leaf_path, "$.tags[1]");
7328        }
7329
7330        #[test]
7331        fn recursive_extraction_walks_arrays_of_objects() {
7332            let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7333            let (_blob, positions, _stats) = extract_property_fts(
7334                &props,
7335                &schema(vec![PropertyPathEntry::recursive("$.items")]),
7336            );
7337            assert_eq!(positions.len(), 2);
7338            assert_eq!(positions[0].leaf_path, "$.items[0].name");
7339            assert_eq!(positions[1].leaf_path, "$.items[1].name");
7340        }
7341
7342        #[test]
7343        fn recursive_extraction_stringifies_numbers_and_bools() {
7344            let props = json!({"root": {"n": 42, "ok": true}});
7345            let (blob, _positions, _stats) = extract_property_fts(
7346                &props,
7347                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7348            );
7349            let blob = blob.expect("blob emitted");
7350            assert!(blob.contains("42"));
7351            assert!(blob.contains("true"));
7352        }
7353
7354        #[test]
7355        fn recursive_extraction_skips_nulls_and_missing() {
7356            let props = json!({"root": {"x": null, "y": "present"}});
7357            let (blob, positions, _stats) = extract_property_fts(
7358                &props,
7359                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7360            );
7361            let blob = blob.expect("blob emitted");
7362            assert!(!blob.contains("null"));
7363            assert_eq!(positions.len(), 1);
7364            assert_eq!(positions[0].leaf_path, "$.root.y");
7365        }
7366
7367        #[test]
7368        fn recursive_extraction_respects_max_depth_guardrail() {
7369            // Build nested object 10 levels deep. MAX_RECURSIVE_DEPTH = 8.
7370            let mut inner = json!("leaf-value");
7371            for _ in 0..10 {
7372                inner = json!({ "k": inner });
7373            }
7374            let props = json!({ "root": inner });
7375            let (blob, positions, stats) = extract_property_fts(
7376                &props,
7377                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7378            );
7379            assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7380            // The walk should stop before reaching the leaf (depth 8).
7381            assert!(
7382                blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7383                "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7384            );
7385            // Row is still indexed with whatever was emitted — even if
7386            // that's nothing. The contract is only that the walk clamped.
7387            let _ = positions;
7388            let _ = MAX_RECURSIVE_DEPTH;
7389        }
7390
7391        #[test]
7392        fn recursive_extraction_respects_max_bytes_guardrail() {
7393            // Build an array of 4KB strings; total blob > 64KB.
7394            let leaves: Vec<String> = (0..40)
7395                .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7396                .collect();
7397            let props = json!({ "root": leaves });
7398            let (blob, positions, stats) = extract_property_fts(
7399                &props,
7400                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7401            );
7402            assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7403            let blob = blob.expect("blob must still be emitted");
7404            assert!(
7405                blob.len() <= MAX_EXTRACTED_BYTES,
7406                "blob must not exceed MAX_EXTRACTED_BYTES"
7407            );
7408            // No partial leaves: every position end must fall inside blob length.
7409            for pos in &positions {
7410                assert!(pos.end_offset <= blob.len());
7411                let slice = &blob[pos.start_offset..pos.end_offset];
7412                // Slice must equal some original leaf value; we only
7413                // verify it's not empty and doesn't straddle a separator.
7414                assert!(!slice.is_empty());
7415                assert!(!slice.contains(LEAF_SEPARATOR));
7416            }
7417            // Blob cannot end in a dangling separator.
7418            assert!(!blob.ends_with(LEAF_SEPARATOR));
7419        }
7420
7421        #[test]
7422        fn recursive_extraction_respects_exclude_paths() {
7423            let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7424            let (blob, positions, stats) = extract_property_fts(
7425                &props,
7426                &schema_with_excludes(
7427                    vec![PropertyPathEntry::recursive("$.payload")],
7428                    vec!["$.payload.priv".to_owned()],
7429                ),
7430            );
7431            let blob = blob.expect("blob emitted");
7432            assert!(blob.contains("yes"));
7433            assert!(!blob.contains("no"));
7434            assert_eq!(positions.len(), 1);
7435            assert_eq!(positions[0].leaf_path, "$.payload.pub");
7436            assert!(stats.excluded_subtree > 0);
7437        }
7438
7439        #[test]
7440        fn position_map_entries_match_emitted_leaves_in_order() {
7441            let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7442            let (blob, positions, _stats) = extract_property_fts(
7443                &props,
7444                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7445            );
7446            let blob = blob.expect("blob emitted");
7447            assert_eq!(positions.len(), 3);
7448            // Leaf paths in lexicographic order.
7449            assert_eq!(positions[0].leaf_path, "$.root.a");
7450            assert_eq!(positions[1].leaf_path, "$.root.b");
7451            assert_eq!(positions[2].leaf_path, "$.root.c");
7452            // Monotonic, non-overlapping offsets.
7453            let mut prev_end: usize = 0;
7454            for (i, pos) in positions.iter().enumerate() {
7455                assert!(pos.start_offset >= prev_end);
7456                assert!(pos.end_offset > pos.start_offset);
7457                assert!(pos.end_offset <= blob.len());
7458                let slice = &blob[pos.start_offset..pos.end_offset];
7459                match i {
7460                    0 => assert_eq!(slice, "alpha"),
7461                    1 => assert_eq!(slice, "bravo"),
7462                    2 => assert_eq!(slice, "charlie"),
7463                    _ => unreachable!(),
7464                }
7465                prev_end = pos.end_offset;
7466            }
7467        }
7468
7469        #[test]
7470        fn scalar_only_schema_produces_empty_position_map() {
7471            let props = json!({"name": "alpha", "title": "beta"});
7472            let (blob, positions, _stats) = extract_property_fts(
7473                &props,
7474                &schema(vec![
7475                    PropertyPathEntry::scalar("$.name"),
7476                    PropertyPathEntry::scalar("$.title"),
7477                ]),
7478            );
7479            assert_eq!(blob.as_deref(), Some("alpha beta"));
7480            assert!(
7481                positions.is_empty(),
7482                "scalar-only schema must emit no position entries"
7483            );
7484            // Sanity-check the type so the test fails loudly if the
7485            // signature changes.
7486            let _: Vec<PositionEntry> = positions;
7487        }
7488    }
7489}