Skip to main content

fathomdb_engine/
writer.rs

1use std::collections::HashMap;
2use std::mem::ManuallyDrop;
3use std::panic::AssertUnwindSafe;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::mpsc::{self, Sender, SyncSender};
7use std::thread;
8use std::time::Duration;
9
10use fathomdb_schema::{DEFAULT_FTS_TOKENIZER, SchemaManager};
11use rusqlite::{OptionalExtension, TransactionBehavior, params};
12
13use crate::operational::{
14    OperationalCollectionKind, OperationalFilterField, OperationalFilterFieldType,
15    OperationalFilterMode, OperationalSecondaryIndexDefinition, OperationalValidationContract,
16    OperationalValidationMode, extract_secondary_index_entries_for_current,
17    extract_secondary_index_entries_for_mutation, parse_operational_secondary_indexes_json,
18    parse_operational_validation_contract, validate_operational_payload_against_contract,
19};
20use crate::telemetry::TelemetryCounters;
21use crate::{EngineError, ids::new_id, projection::ProjectionTarget, sqlite};
22
23/// A deferred projection backfill task submitted alongside a write.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct OptionalProjectionTask {
26    /// Which projection to backfill (FTS, vec, etc.).
27    pub target: ProjectionTarget,
28    /// JSON payload describing the backfill work.
29    pub payload: String,
30}
31
32/// Policy for handling existing chunks when upserting a node.
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
34pub enum ChunkPolicy {
35    /// Keep existing chunks and FTS rows.
36    #[default]
37    Preserve,
38    /// Delete existing chunks and FTS rows before inserting new ones.
39    Replace,
40}
41
42/// Controls how missing `source_ref` values are handled at write time.
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
44pub enum ProvenanceMode {
45    /// Emit a warning in the [`WriteReceipt`] but allow the write to proceed.
46    #[default]
47    Warn,
48    /// Reject the write with [`EngineError::InvalidWrite`] if any canonical
49    /// insert or retire is missing `source_ref`.
50    Require,
51}
52
53/// A node to be inserted in a [`WriteRequest`].
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct NodeInsert {
56    pub row_id: String,
57    pub logical_id: String,
58    pub kind: String,
59    pub properties: String,
60    pub source_ref: Option<String>,
61    /// When true the writer supersedes the current active row for this `logical_id`
62    /// before inserting this new version. The supersession and insert are atomic.
63    pub upsert: bool,
64    /// Controls whether existing chunks and FTS rows are deleted when upsert=true.
65    pub chunk_policy: ChunkPolicy,
66    /// Optional URI referencing external content (e.g. a PDF, web page, or dataset).
67    pub content_ref: Option<String>,
68}
69
70/// An edge to be inserted in a [`WriteRequest`].
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EdgeInsert {
73    pub row_id: String,
74    pub logical_id: String,
75    pub source_logical_id: String,
76    pub target_logical_id: String,
77    pub kind: String,
78    pub properties: String,
79    pub source_ref: Option<String>,
80    /// When true the writer supersedes the current active edge for this `logical_id`
81    /// before inserting this new version. The supersession and insert are atomic.
82    pub upsert: bool,
83}
84
85/// A node to be retired (soft-deleted) in a [`WriteRequest`].
86#[derive(Clone, Debug, PartialEq, Eq)]
87pub struct NodeRetire {
88    pub logical_id: String,
89    pub source_ref: Option<String>,
90}
91
92/// An edge to be retired (soft-deleted) in a [`WriteRequest`].
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct EdgeRetire {
95    pub logical_id: String,
96    pub source_ref: Option<String>,
97}
98
99/// A text chunk to be inserted in a [`WriteRequest`].
100#[derive(Clone, Debug, PartialEq, Eq)]
101pub struct ChunkInsert {
102    pub id: String,
103    pub node_logical_id: String,
104    pub text_content: String,
105    pub byte_start: Option<i64>,
106    pub byte_end: Option<i64>,
107    /// Optional hash of the external content this chunk was derived from.
108    pub content_hash: Option<String>,
109}
110
111/// A vector embedding to attach to an existing chunk.
112///
113/// The `chunk_id` must reference a chunk already present in the database or
114/// co-submitted in the same [`WriteRequest`].  The embedding is stored in the
115/// `vec_nodes_active` virtual table when the `sqlite-vec` feature is enabled;
116/// without the feature the insert is silently skipped.
117#[derive(Clone, Debug, PartialEq)]
118pub struct VecInsert {
119    pub chunk_id: String,
120    pub embedding: Vec<f32>,
121}
122
123/// A mutation to an operational collection submitted as part of a write request.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub enum OperationalWrite {
126    Append {
127        collection: String,
128        record_key: String,
129        payload_json: String,
130        source_ref: Option<String>,
131    },
132    Put {
133        collection: String,
134        record_key: String,
135        payload_json: String,
136        source_ref: Option<String>,
137    },
138    Delete {
139        collection: String,
140        record_key: String,
141        source_ref: Option<String>,
142    },
143}
144
145/// A run to be inserted in a [`WriteRequest`].
146#[derive(Clone, Debug, PartialEq, Eq)]
147pub struct RunInsert {
148    pub id: String,
149    pub kind: String,
150    pub status: String,
151    pub properties: String,
152    pub source_ref: Option<String>,
153    pub upsert: bool,
154    pub supersedes_id: Option<String>,
155}
156
157/// A step to be inserted in a [`WriteRequest`].
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub struct StepInsert {
160    pub id: String,
161    pub run_id: String,
162    pub kind: String,
163    pub status: String,
164    pub properties: String,
165    pub source_ref: Option<String>,
166    pub upsert: bool,
167    pub supersedes_id: Option<String>,
168}
169
170/// An action to be inserted in a [`WriteRequest`].
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct ActionInsert {
173    pub id: String,
174    pub step_id: String,
175    pub kind: String,
176    pub status: String,
177    pub properties: String,
178    pub source_ref: Option<String>,
179    pub upsert: bool,
180    pub supersedes_id: Option<String>,
181}
182
183// --- Write-request size limits ---
184const MAX_NODES: usize = 10_000;
185const MAX_EDGES: usize = 10_000;
186const MAX_CHUNKS: usize = 50_000;
187const MAX_RETIRES: usize = 10_000;
188const MAX_RUNTIME_ITEMS: usize = 10_000;
189const MAX_OPERATIONAL: usize = 10_000;
190const MAX_TOTAL_ITEMS: usize = 100_000;
191
192/// How long `submit` / `touch_last_accessed` wait for the writer thread to reply.
193///
194/// After this timeout the caller receives [`EngineError::WriterTimedOut`] — the
195/// writer thread is **not** cancelled, so the write may still commit.  This is
196/// intentionally distinct from [`EngineError::WriterRejected`], which signals
197/// that the writer thread has disconnected and the write will **not** commit.
198const WRITER_REPLY_TIMEOUT: Duration = Duration::from_secs(30);
199
200/// A batch of graph mutations to be applied atomically in a single `SQLite` transaction.
201#[derive(Clone, Debug, PartialEq)]
202pub struct WriteRequest {
203    pub label: String,
204    pub nodes: Vec<NodeInsert>,
205    pub node_retires: Vec<NodeRetire>,
206    pub edges: Vec<EdgeInsert>,
207    pub edge_retires: Vec<EdgeRetire>,
208    pub chunks: Vec<ChunkInsert>,
209    pub runs: Vec<RunInsert>,
210    pub steps: Vec<StepInsert>,
211    pub actions: Vec<ActionInsert>,
212    pub optional_backfills: Vec<OptionalProjectionTask>,
213    /// Vector embeddings to persist alongside chunks.  Silently skipped when the
214    /// `sqlite-vec` feature is absent.
215    pub vec_inserts: Vec<VecInsert>,
216    pub operational_writes: Vec<OperationalWrite>,
217}
218
219/// Receipt returned after a successful write transaction.
220#[derive(Clone, Debug, PartialEq, Eq)]
221pub struct WriteReceipt {
222    pub label: String,
223    pub optional_backfill_count: usize,
224    pub warnings: Vec<String>,
225    pub provenance_warnings: Vec<String>,
226}
227
228/// Request to update `last_accessed_at` timestamps for a batch of nodes.
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct LastAccessTouchRequest {
231    pub logical_ids: Vec<String>,
232    pub touched_at: i64,
233    pub source_ref: Option<String>,
234}
235
236/// Report from a last-access touch operation.
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub struct LastAccessTouchReport {
239    pub touched_logical_ids: usize,
240    pub touched_at: i64,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct FtsProjectionRow {
245    chunk_id: String,
246    node_logical_id: String,
247    kind: String,
248    text_content: String,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
252struct FtsPropertyProjectionRow {
253    node_logical_id: String,
254    kind: String,
255    text_content: String,
256    positions: Vec<PositionEntry>,
257    /// Per-column text extracted for weighted schemas. Empty for non-weighted schemas.
258    /// When non-empty, the writer uses per-column INSERT instead of single `text_content`.
259    columns: Vec<(String, String)>,
260}
261
262/// Maximum depth the recursive property-FTS extraction walk descends before
263/// clamping. A walk that reaches this depth will not emit leaves below it.
264pub(crate) const MAX_RECURSIVE_DEPTH: usize = 8;
265
266/// Maximum blob size the recursive property-FTS extraction walk will emit.
267/// When the next complete leaf would push the blob past this cap, the walk
268/// stops on the preceding leaf boundary — the existing blob is indexed, the
269/// truncated leaf is not partially emitted.
270///
271/// This budget applies only to the recursive-walk portion of the blob.
272/// Scalar-prefix emissions and the scalar↔recursive [`LEAF_SEPARATOR`] token
273/// are not counted against it; the cap governs how much nested-subtree
274/// content is serialized, not the final blob length.
275pub(crate) const MAX_EXTRACTED_BYTES: usize = 65_536;
276
277/// Hard phrase-break separator inserted between two adjacent recursive
278/// leaves in the concatenated blob.
279///
280/// FTS5 phrase queries match on consecutive token positions, so simply
281/// inserting non-token whitespace or control characters does NOT prevent a
282/// phrase like `"foo bar"` from matching when `foo` ends one leaf and
283/// `bar` starts the next — the tokenizer would discard the control
284/// character and the two content tokens would still be adjacent.
285///
286/// To create a real position gap we embed a sentinel *token*
287/// `fathomdbphrasebreaksentinel` between leaves. Under `porter
288/// unicode61 remove_diacritics 2` this survives tokenization as its own
289/// position, so the content tokens on either side are separated by at
290/// least one intervening position and phrase queries spanning the gap
291/// cannot match. The sentinel is long, lowercase, and obviously
292/// synthetic to minimize the chance of accidental collisions with real
293/// content. It remains searchable as a word — callers should avoid
294/// passing it to `text_search` directly.
295///
296/// Validated by the integration test
297/// `leaf_separator_is_hard_phrase_break_under_unicode61_porter`.
298///
299/// Note: because the sentinel is a real token it may appear in FTS5
300/// `snippet()` output when the snippet window spans a leaf boundary.
301/// Phase 5 presentation-layer snippet post-processing is responsible for
302/// stripping it before the snippet is surfaced to callers.
303pub(crate) const LEAF_SEPARATOR: &str = " fathomdbphrasebreaksentinel ";
304
305/// Whether a registered property-FTS path extracts a single scalar value
306/// or recursively walks the subtree rooted at the path.
307#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
308pub(crate) enum PropertyPathMode {
309    /// Legacy scalar extraction — resolve the path, append the scalar
310    /// (flattening arrays of scalars as before Phase 4).
311    #[default]
312    Scalar,
313    /// Recursively walk scalar leaves rooted at the path, emitting one
314    /// leaf per scalar with position-map tracking.
315    Recursive,
316}
317
318/// A single registered property-FTS path with its extraction mode.
319#[derive(Clone, Debug, PartialEq)]
320pub(crate) struct PropertyPathEntry {
321    pub path: String,
322    pub mode: PropertyPathMode,
323    /// Optional BM25 weight multiplier parsed from the rich JSON format.
324    pub weight: Option<f32>,
325}
326
327// f32 does not implement Eq (due to NaN), but weights are always finite in practice.
328impl Eq for PropertyPathEntry {}
329
330impl PropertyPathEntry {
331    pub(crate) fn scalar(path: impl Into<String>) -> Self {
332        Self {
333            path: path.into(),
334            mode: PropertyPathMode::Scalar,
335            weight: None,
336        }
337    }
338
339    #[cfg(test)]
340    pub(crate) fn recursive(path: impl Into<String>) -> Self {
341        Self {
342            path: path.into(),
343            mode: PropertyPathMode::Recursive,
344            weight: None,
345        }
346    }
347}
348
349/// Parsed property-FTS schema definition for a single kind.
350#[derive(Clone, Debug, PartialEq, Eq)]
351pub(crate) struct PropertyFtsSchema {
352    /// Property paths to extract. Each entry is matched exactly against
353    /// the JSON tree from the node's root; see [`PropertyPathMode`].
354    pub paths: Vec<PropertyPathEntry>,
355    /// Separator inserted between adjacent leaf values in the extracted
356    /// blob. Acts as a hard phrase break under FTS5.
357    pub separator: String,
358    /// JSON paths to skip during recursive extraction. Matched as an
359    /// *exact* path equality — the walk visits each object/array before
360    /// descending into it, so listing `$.x.y` suppresses the entire
361    /// subtree rooted at `$.x.y`. Prefix matching is NOT supported: an
362    /// exclude of `$.x` does not implicitly exclude `$.x.y`, and
363    /// `$.payload.priv` does not implicitly exclude `$.payload.priv.inner`
364    /// via prefix — the walker will still descend into any subtree that
365    /// is not itself listed exactly in `exclude_paths`.
366    pub exclude_paths: Vec<String>,
367}
368
369/// One position-map entry: a half-open `[start, end)` byte range within the
370/// extracted blob plus the `JSONPath` of the scalar leaf whose value occupies
371/// that range.
372#[derive(Clone, Debug, PartialEq, Eq)]
373pub(crate) struct PositionEntry {
374    pub start_offset: usize,
375    pub end_offset: usize,
376    pub leaf_path: String,
377}
378
379/// Per-kind extraction stats accumulated by a recursive walk. Surface-level
380/// Phase 4 logging only; Phase 5 may expose these to callers.
381#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
382pub(crate) struct ExtractStats {
383    pub depth_cap_hit: usize,
384    /// `true` once the extracted blob has reached `MAX_EXTRACTED_BYTES` and
385    /// the walker has stopped accepting additional leaves. Unlike the
386    /// depth counter this is a boolean: the walker's `stopped` guard
387    /// blocks further `emit_leaf` calls after the first truncation.
388    pub byte_cap_reached: bool,
389    pub excluded_subtree: usize,
390}
391
392impl ExtractStats {
393    fn merge(&mut self, other: ExtractStats) {
394        self.depth_cap_hit += other.depth_cap_hit;
395        self.byte_cap_reached |= other.byte_cap_reached;
396        self.excluded_subtree += other.excluded_subtree;
397    }
398}
399
400struct PreparedWrite {
401    label: String,
402    nodes: Vec<NodeInsert>,
403    node_retires: Vec<NodeRetire>,
404    edges: Vec<EdgeInsert>,
405    edge_retires: Vec<EdgeRetire>,
406    chunks: Vec<ChunkInsert>,
407    runs: Vec<RunInsert>,
408    steps: Vec<StepInsert>,
409    actions: Vec<ActionInsert>,
410    /// Suppressed when sqlite-vec feature is absent: field is set by `prepare_write` but only
411    /// consumed by the cfg-gated apply block.
412    #[cfg_attr(not(feature = "sqlite-vec"), allow(dead_code))]
413    vec_inserts: Vec<VecInsert>,
414    operational_writes: Vec<OperationalWrite>,
415    operational_collection_kinds: HashMap<String, OperationalCollectionKind>,
416    operational_collection_filter_fields: HashMap<String, Vec<OperationalFilterField>>,
417    operational_validation_warnings: Vec<String>,
418    /// `node_logical_id` → kind for nodes co-submitted in this request.
419    /// Used by `resolve_fts_rows` to avoid a DB round-trip for the common case.
420    node_kinds: HashMap<String, String>,
421    /// Filled in by `resolve_fts_rows` in the writer thread before BEGIN IMMEDIATE.
422    required_fts_rows: Vec<FtsProjectionRow>,
423    /// Filled in by `resolve_property_fts_rows` in the writer thread before BEGIN IMMEDIATE.
424    required_property_fts_rows: Vec<FtsPropertyProjectionRow>,
425    optional_backfills: Vec<OptionalProjectionTask>,
426}
427
428enum WriteMessage {
429    Submit {
430        prepared: Box<PreparedWrite>,
431        reply: Sender<Result<WriteReceipt, EngineError>>,
432    },
433    TouchLastAccessed {
434        request: LastAccessTouchRequest,
435        reply: Sender<Result<LastAccessTouchReport, EngineError>>,
436    },
437}
438
439/// Single-threaded writer that serializes all mutations through one `SQLite` connection.
440///
441/// On drop, the channel is closed and the writer thread is joined, ensuring all
442/// in-flight writes complete and the `SQLite` connection closes cleanly.
443#[derive(Debug)]
444pub struct WriterActor {
445    sender: ManuallyDrop<SyncSender<WriteMessage>>,
446    thread_handle: Option<thread::JoinHandle<()>>,
447    provenance_mode: ProvenanceMode,
448    // Retained for Phase 2 (statement-level telemetry from WriterActor methods).
449    _telemetry: Arc<TelemetryCounters>,
450}
451
452impl WriterActor {
453    /// # Errors
454    /// Returns [`EngineError`] if the writer thread cannot be spawned.
455    pub fn start(
456        path: impl AsRef<Path>,
457        schema_manager: Arc<SchemaManager>,
458        provenance_mode: ProvenanceMode,
459        telemetry: Arc<TelemetryCounters>,
460    ) -> Result<Self, EngineError> {
461        let database_path = path.as_ref().to_path_buf();
462        let (sender, receiver) = mpsc::sync_channel::<WriteMessage>(256);
463
464        let writer_telemetry = Arc::clone(&telemetry);
465        let handle = thread::Builder::new()
466            .name("fathomdb-writer".to_owned())
467            .spawn(move || {
468                writer_loop(&database_path, &schema_manager, receiver, &writer_telemetry);
469            })
470            .map_err(EngineError::Io)?;
471
472        Ok(Self {
473            sender: ManuallyDrop::new(sender),
474            thread_handle: Some(handle),
475            provenance_mode,
476            _telemetry: telemetry,
477        })
478    }
479
480    /// Returns `true` if the writer thread is still running.
481    fn is_thread_alive(&self) -> bool {
482        self.thread_handle
483            .as_ref()
484            .is_some_and(|h| !h.is_finished())
485    }
486
487    /// Returns `WriterRejected` if the writer thread has exited.
488    fn check_thread_alive(&self) -> Result<(), EngineError> {
489        if self.is_thread_alive() {
490            Ok(())
491        } else {
492            Err(EngineError::WriterRejected(
493                "writer thread has exited".to_owned(),
494            ))
495        }
496    }
497
498    /// # Errors
499    /// Returns [`EngineError`] if the write request validation fails, the writer actor has shut
500    /// down, or the underlying `SQLite` transaction fails.
501    pub fn submit(&self, request: WriteRequest) -> Result<WriteReceipt, EngineError> {
502        self.check_thread_alive()?;
503        let prepared = prepare_write(request, self.provenance_mode)?;
504        let (reply_tx, reply_rx) = mpsc::channel();
505        self.sender
506            .send(WriteMessage::Submit {
507                prepared: Box::new(prepared),
508                reply: reply_tx,
509            })
510            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
511
512        recv_with_timeout(&reply_rx)
513    }
514
515    /// # Errors
516    /// Returns [`EngineError`] if validation fails, the writer actor has shut down,
517    /// or the underlying `SQLite` transaction fails.
518    pub fn touch_last_accessed(
519        &self,
520        request: LastAccessTouchRequest,
521    ) -> Result<LastAccessTouchReport, EngineError> {
522        self.check_thread_alive()?;
523        prepare_touch_last_accessed(&request, self.provenance_mode)?;
524        let (reply_tx, reply_rx) = mpsc::channel();
525        self.sender
526            .send(WriteMessage::TouchLastAccessed {
527                request,
528                reply: reply_tx,
529            })
530            .map_err(|error| EngineError::WriterRejected(error.to_string()))?;
531
532        recv_with_timeout(&reply_rx)
533    }
534}
535
536#[cfg(not(feature = "tracing"))]
537#[allow(clippy::print_stderr)]
538fn stderr_panic_notice() {
539    eprintln!("fathomdb-writer panicked during shutdown (suppressed: already panicking)");
540}
541
542impl Drop for WriterActor {
543    fn drop(&mut self) {
544        // Phase 1: close the channel so the writer thread's `for msg in receiver`
545        // loop exits after finishing any in-progress message.
546        // Must happen BEFORE join to avoid deadlock.
547        //
548        // SAFETY: drop is called exactly once, and no method accesses the sender
549        // after drop begins.
550        unsafe { ManuallyDrop::drop(&mut self.sender) };
551
552        // Phase 2: join the writer thread to ensure the SQLite connection closes.
553        if let Some(handle) = self.thread_handle.take() {
554            match handle.join() {
555                Ok(()) => {}
556                Err(payload) => {
557                    if std::thread::panicking() {
558                        trace_warn!(
559                            "writer thread panicked during shutdown (suppressed: already panicking)"
560                        );
561                        #[cfg(not(feature = "tracing"))]
562                        stderr_panic_notice();
563                    } else {
564                        std::panic::resume_unwind(payload);
565                    }
566                }
567            }
568        }
569    }
570}
571
572/// Wait for a reply from the writer thread, with a timeout.
573fn recv_with_timeout<T>(rx: &mpsc::Receiver<Result<T, EngineError>>) -> Result<T, EngineError> {
574    rx.recv_timeout(WRITER_REPLY_TIMEOUT)
575        .map_err(|error| match error {
576            mpsc::RecvTimeoutError::Timeout => EngineError::WriterTimedOut(
577                "write timed out waiting for writer thread reply — the write may still commit"
578                    .to_owned(),
579            ),
580            mpsc::RecvTimeoutError::Disconnected => EngineError::WriterRejected(error.to_string()),
581        })
582        .and_then(|result| result)
583}
584
585fn prepare_touch_last_accessed(
586    request: &LastAccessTouchRequest,
587    mode: ProvenanceMode,
588) -> Result<(), EngineError> {
589    if request.logical_ids.is_empty() {
590        return Err(EngineError::InvalidWrite(
591            "touch_last_accessed requires at least one logical_id".to_owned(),
592        ));
593    }
594    for logical_id in &request.logical_ids {
595        if logical_id.trim().is_empty() {
596            return Err(EngineError::InvalidWrite(
597                "touch_last_accessed requires non-empty logical_ids".to_owned(),
598            ));
599        }
600    }
601    if mode == ProvenanceMode::Require && request.source_ref.is_none() {
602        return Err(EngineError::InvalidWrite(
603            "touch_last_accessed requires source_ref when ProvenanceMode::Require is active"
604                .to_owned(),
605        ));
606    }
607    Ok(())
608}
609
610fn check_require_provenance(request: &WriteRequest) -> Result<(), EngineError> {
611    let missing: Vec<String> = request
612        .nodes
613        .iter()
614        .filter(|n| n.source_ref.is_none())
615        .map(|n| format!("node '{}'", n.logical_id))
616        .chain(
617            request
618                .node_retires
619                .iter()
620                .filter(|r| r.source_ref.is_none())
621                .map(|r| format!("node retire '{}'", r.logical_id)),
622        )
623        .chain(
624            request
625                .edges
626                .iter()
627                .filter(|e| e.source_ref.is_none())
628                .map(|e| format!("edge '{}'", e.logical_id)),
629        )
630        .chain(
631            request
632                .edge_retires
633                .iter()
634                .filter(|r| r.source_ref.is_none())
635                .map(|r| format!("edge retire '{}'", r.logical_id)),
636        )
637        .chain(
638            request
639                .runs
640                .iter()
641                .filter(|r| r.source_ref.is_none())
642                .map(|r| format!("run '{}'", r.id)),
643        )
644        .chain(
645            request
646                .steps
647                .iter()
648                .filter(|s| s.source_ref.is_none())
649                .map(|s| format!("step '{}'", s.id)),
650        )
651        .chain(
652            request
653                .actions
654                .iter()
655                .filter(|a| a.source_ref.is_none())
656                .map(|a| format!("action '{}'", a.id)),
657        )
658        .chain(
659            request
660                .operational_writes
661                .iter()
662                .filter(|write| operational_write_source_ref(write).is_none())
663                .map(|write| {
664                    format!(
665                        "operational {} '{}:{}'",
666                        operational_write_kind(write),
667                        operational_write_collection(write),
668                        operational_write_record_key(write)
669                    )
670                }),
671        )
672        .collect();
673
674    if missing.is_empty() {
675        Ok(())
676    } else {
677        Err(EngineError::InvalidWrite(format!(
678            "ProvenanceMode::Require: missing source_ref on: {}",
679            missing.join(", ")
680        )))
681    }
682}
683
684fn validate_request_size(request: &WriteRequest) -> Result<(), EngineError> {
685    if request.nodes.len() > MAX_NODES {
686        return Err(EngineError::InvalidWrite(format!(
687            "too many nodes: {} exceeds limit of {MAX_NODES}",
688            request.nodes.len()
689        )));
690    }
691    if request.edges.len() > MAX_EDGES {
692        return Err(EngineError::InvalidWrite(format!(
693            "too many edges: {} exceeds limit of {MAX_EDGES}",
694            request.edges.len()
695        )));
696    }
697    if request.chunks.len() > MAX_CHUNKS {
698        return Err(EngineError::InvalidWrite(format!(
699            "too many chunks: {} exceeds limit of {MAX_CHUNKS}",
700            request.chunks.len()
701        )));
702    }
703    let retires = request.node_retires.len() + request.edge_retires.len();
704    if retires > MAX_RETIRES {
705        return Err(EngineError::InvalidWrite(format!(
706            "too many retires: {retires} exceeds limit of {MAX_RETIRES}"
707        )));
708    }
709    let runtime_items = request.runs.len() + request.steps.len() + request.actions.len();
710    if runtime_items > MAX_RUNTIME_ITEMS {
711        return Err(EngineError::InvalidWrite(format!(
712            "too many runtime items: {runtime_items} exceeds limit of {MAX_RUNTIME_ITEMS}"
713        )));
714    }
715    if request.operational_writes.len() > MAX_OPERATIONAL {
716        return Err(EngineError::InvalidWrite(format!(
717            "too many operational writes: {} exceeds limit of {MAX_OPERATIONAL}",
718            request.operational_writes.len()
719        )));
720    }
721    let total = request.nodes.len()
722        + request.node_retires.len()
723        + request.edges.len()
724        + request.edge_retires.len()
725        + request.chunks.len()
726        + request.runs.len()
727        + request.steps.len()
728        + request.actions.len()
729        + request.vec_inserts.len()
730        + request.operational_writes.len();
731    if total > MAX_TOTAL_ITEMS {
732        return Err(EngineError::InvalidWrite(format!(
733            "too many total items: {total} exceeds limit of {MAX_TOTAL_ITEMS}"
734        )));
735    }
736    Ok(())
737}
738
739#[allow(clippy::too_many_lines)]
740fn prepare_write(
741    request: WriteRequest,
742    mode: ProvenanceMode,
743) -> Result<PreparedWrite, EngineError> {
744    validate_request_size(&request)?;
745
746    // --- ID validation: reject empty IDs ---
747    for node in &request.nodes {
748        if node.row_id.is_empty() {
749            return Err(EngineError::InvalidWrite(
750                "NodeInsert has empty row_id".to_owned(),
751            ));
752        }
753        if node.logical_id.is_empty() {
754            return Err(EngineError::InvalidWrite(
755                "NodeInsert has empty logical_id".to_owned(),
756            ));
757        }
758    }
759    for edge in &request.edges {
760        if edge.row_id.is_empty() {
761            return Err(EngineError::InvalidWrite(
762                "EdgeInsert has empty row_id".to_owned(),
763            ));
764        }
765        if edge.logical_id.is_empty() {
766            return Err(EngineError::InvalidWrite(
767                "EdgeInsert has empty logical_id".to_owned(),
768            ));
769        }
770    }
771    for chunk in &request.chunks {
772        if chunk.id.is_empty() {
773            return Err(EngineError::InvalidWrite(
774                "ChunkInsert has empty id".to_owned(),
775            ));
776        }
777        if chunk.text_content.is_empty() {
778            return Err(EngineError::InvalidWrite(format!(
779                "chunk '{}' has empty text_content; empty chunks are not allowed",
780                chunk.id
781            )));
782        }
783    }
784    for run in &request.runs {
785        if run.id.is_empty() {
786            return Err(EngineError::InvalidWrite(
787                "RunInsert has empty id".to_owned(),
788            ));
789        }
790    }
791    for step in &request.steps {
792        if step.id.is_empty() {
793            return Err(EngineError::InvalidWrite(
794                "StepInsert has empty id".to_owned(),
795            ));
796        }
797    }
798    for action in &request.actions {
799        if action.id.is_empty() {
800            return Err(EngineError::InvalidWrite(
801                "ActionInsert has empty id".to_owned(),
802            ));
803        }
804    }
805    for vi in &request.vec_inserts {
806        if vi.chunk_id.is_empty() {
807            return Err(EngineError::InvalidWrite(
808                "VecInsert has empty chunk_id".to_owned(),
809            ));
810        }
811        if vi.embedding.is_empty() {
812            return Err(EngineError::InvalidWrite(format!(
813                "VecInsert for chunk '{}' has empty embedding",
814                vi.chunk_id
815            )));
816        }
817    }
818    for operational in &request.operational_writes {
819        if operational_write_collection(operational).is_empty() {
820            return Err(EngineError::InvalidWrite(
821                "OperationalWrite has empty collection".to_owned(),
822            ));
823        }
824        if operational_write_record_key(operational).is_empty() {
825            return Err(EngineError::InvalidWrite(format!(
826                "OperationalWrite for collection '{}' has empty record_key",
827                operational_write_collection(operational)
828            )));
829        }
830        match operational {
831            OperationalWrite::Append { payload_json, .. }
832            | OperationalWrite::Put { payload_json, .. } => {
833                if payload_json.is_empty() {
834                    return Err(EngineError::InvalidWrite(format!(
835                        "OperationalWrite {} '{}:{}' has empty payload_json",
836                        operational_write_kind(operational),
837                        operational_write_collection(operational),
838                        operational_write_record_key(operational)
839                    )));
840                }
841            }
842            OperationalWrite::Delete { .. } => {}
843        }
844    }
845
846    // --- ID validation: reject duplicate row_ids within the request ---
847    {
848        let mut seen = std::collections::HashSet::new();
849        for node in &request.nodes {
850            if !seen.insert(node.row_id.as_str()) {
851                return Err(EngineError::InvalidWrite(format!(
852                    "duplicate row_id '{}' within the same WriteRequest",
853                    node.row_id
854                )));
855            }
856        }
857        for edge in &request.edges {
858            if !seen.insert(edge.row_id.as_str()) {
859                return Err(EngineError::InvalidWrite(format!(
860                    "duplicate row_id '{}' within the same WriteRequest",
861                    edge.row_id
862                )));
863            }
864        }
865    }
866
867    // --- ProvenanceMode::Require enforcement ---
868    if mode == ProvenanceMode::Require {
869        check_require_provenance(&request)?;
870    }
871
872    // --- Runtime table upsert validation ---
873    for run in &request.runs {
874        if run.upsert && run.supersedes_id.is_none() {
875            return Err(EngineError::InvalidWrite(format!(
876                "run '{}': upsert=true requires supersedes_id to be set",
877                run.id
878            )));
879        }
880    }
881    for step in &request.steps {
882        if step.upsert && step.supersedes_id.is_none() {
883            return Err(EngineError::InvalidWrite(format!(
884                "step '{}': upsert=true requires supersedes_id to be set",
885                step.id
886            )));
887        }
888    }
889    for action in &request.actions {
890        if action.upsert && action.supersedes_id.is_none() {
891            return Err(EngineError::InvalidWrite(format!(
892                "action '{}': upsert=true requires supersedes_id to be set",
893                action.id
894            )));
895        }
896    }
897
898    let node_kinds = request
899        .nodes
900        .iter()
901        .map(|node| (node.logical_id.clone(), node.kind.clone()))
902        .collect::<HashMap<_, _>>();
903
904    Ok(PreparedWrite {
905        label: request.label,
906        nodes: request.nodes,
907        node_retires: request.node_retires,
908        edges: request.edges,
909        edge_retires: request.edge_retires,
910        chunks: request.chunks,
911        runs: request.runs,
912        steps: request.steps,
913        actions: request.actions,
914        vec_inserts: request.vec_inserts,
915        operational_writes: request.operational_writes,
916        operational_collection_kinds: HashMap::new(),
917        operational_collection_filter_fields: HashMap::new(),
918        operational_validation_warnings: Vec::new(),
919        node_kinds,
920        required_fts_rows: Vec::new(),
921        required_property_fts_rows: Vec::new(),
922        optional_backfills: request.optional_backfills,
923    })
924}
925
926fn writer_loop(
927    database_path: &Path,
928    schema_manager: &Arc<SchemaManager>,
929    receiver: mpsc::Receiver<WriteMessage>,
930    telemetry: &TelemetryCounters,
931) {
932    trace_info!("writer thread started");
933
934    let mut conn = match sqlite::open_connection(database_path) {
935        Ok(conn) => conn,
936        Err(error) => {
937            trace_error!(error = %error, "writer thread: database connection failed");
938            reject_all(receiver, &error.to_string());
939            return;
940        }
941    };
942
943    if let Err(error) = schema_manager.bootstrap(&conn) {
944        trace_error!(error = %error, "writer thread: schema bootstrap failed");
945        reject_all(receiver, &error.to_string());
946        return;
947    }
948
949    for message in receiver {
950        match message {
951            WriteMessage::Submit {
952                mut prepared,
953                reply,
954            } => {
955                #[cfg(feature = "tracing")]
956                let start = std::time::Instant::now();
957                let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
958                    resolve_and_apply(&mut conn, &mut prepared)
959                }));
960                if let Ok(inner) = result {
961                    #[allow(unused_variables)]
962                    if let Err(error) = &inner {
963                        trace_error!(
964                            label = %prepared.label,
965                            error = %error,
966                            "write failed"
967                        );
968                        telemetry.increment_errors();
969                    } else {
970                        let row_count = (prepared.nodes.len()
971                            + prepared.edges.len()
972                            + prepared.chunks.len()) as u64;
973                        telemetry.increment_writes(row_count);
974                        trace_info!(
975                            label = %prepared.label,
976                            nodes = prepared.nodes.len(),
977                            edges = prepared.edges.len(),
978                            chunks = prepared.chunks.len(),
979                            duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
980                            "write committed"
981                        );
982                    }
983                    let _ = reply.send(inner);
984                } else {
985                    trace_error!(label = %prepared.label, "writer thread: panic during resolve_and_apply");
986                    telemetry.increment_errors();
987                    // Attempt to clean up any open transaction after a panic.
988                    let _ = conn.execute_batch("ROLLBACK");
989                    let _ = reply.send(Err(EngineError::WriterRejected(
990                        "writer thread panic during resolve_and_apply".to_owned(),
991                    )));
992                }
993            }
994            WriteMessage::TouchLastAccessed { request, reply } => {
995                let result = apply_touch_last_accessed(&mut conn, &request);
996                if result.is_ok() {
997                    telemetry.increment_writes(0);
998                } else {
999                    telemetry.increment_errors();
1000                }
1001                let _ = reply.send(result);
1002            }
1003        }
1004    }
1005
1006    trace_info!("writer thread shutting down");
1007}
1008
1009fn reject_all(receiver: mpsc::Receiver<WriteMessage>, error: &str) {
1010    for message in receiver {
1011        match message {
1012            WriteMessage::Submit { reply, .. } => {
1013                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1014            }
1015            WriteMessage::TouchLastAccessed { reply, .. } => {
1016                let _ = reply.send(Err(EngineError::WriterRejected(error.to_string())));
1017            }
1018        }
1019    }
1020}
1021
1022/// Resolve FTS projection rows before the write transaction begins.
1023///
1024/// For each chunk in the prepared write, determines the node `kind` needed
1025/// to populate the FTS index. If the node was co-submitted in the same
1026/// request its kind is taken from `prepared.node_kinds` without a DB query.
1027/// If the node is pre-existing it is looked up in the database. If the node
1028/// cannot be found in either place, an `InvalidWrite` error is returned.
1029fn resolve_fts_rows(
1030    conn: &rusqlite::Connection,
1031    prepared: &mut PreparedWrite,
1032) -> Result<(), EngineError> {
1033    let retiring_ids: std::collections::HashSet<&str> = prepared
1034        .node_retires
1035        .iter()
1036        .map(|r| r.logical_id.as_str())
1037        .collect();
1038    for chunk in &prepared.chunks {
1039        if retiring_ids.contains(chunk.node_logical_id.as_str()) {
1040            return Err(EngineError::InvalidWrite(format!(
1041                "chunk '{}' references node_logical_id '{}' which is being retired in the same \
1042                 WriteRequest; retire and chunk insertion for the same node must not be combined",
1043                chunk.id, chunk.node_logical_id
1044            )));
1045        }
1046    }
1047    for chunk in &prepared.chunks {
1048        let kind = if let Some(k) = prepared.node_kinds.get(&chunk.node_logical_id) {
1049            k.clone()
1050        } else {
1051            match conn.query_row(
1052                "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1053                params![chunk.node_logical_id],
1054                |row| row.get::<_, String>(0),
1055            ) {
1056                Ok(kind) => kind,
1057                Err(rusqlite::Error::QueryReturnedNoRows) => {
1058                    return Err(EngineError::InvalidWrite(format!(
1059                        "chunk '{}' references node_logical_id '{}' that is not present in this \
1060                         write request or the database \
1061                         (v1 limitation: chunks and their nodes must be submitted together or the \
1062                         node must already exist)",
1063                        chunk.id, chunk.node_logical_id
1064                    )));
1065                }
1066                Err(e) => return Err(EngineError::Sqlite(e)),
1067            }
1068        };
1069        prepared.required_fts_rows.push(FtsProjectionRow {
1070            chunk_id: chunk.id.clone(),
1071            node_logical_id: chunk.node_logical_id.clone(),
1072            kind,
1073            text_content: chunk.text_content.clone(),
1074        });
1075    }
1076    trace_debug!(
1077        fts_rows = prepared.required_fts_rows.len(),
1078        chunks_processed = prepared.chunks.len(),
1079        "fts row resolution completed"
1080    );
1081    Ok(())
1082}
1083
1084/// Load FTS property schemas from the database and extract property values from
1085/// co-submitted nodes, generating `FtsPropertyProjectionRow` entries.
1086fn resolve_property_fts_rows(
1087    conn: &rusqlite::Connection,
1088    prepared: &mut PreparedWrite,
1089) -> Result<(), EngineError> {
1090    if prepared.nodes.is_empty() {
1091        return Ok(());
1092    }
1093
1094    let schemas: HashMap<String, PropertyFtsSchema> =
1095        load_fts_property_schemas(conn)?.into_iter().collect();
1096
1097    if schemas.is_empty() {
1098        return Ok(());
1099    }
1100
1101    let mut combined_stats = ExtractStats::default();
1102    for node in &prepared.nodes {
1103        let Some(schema) = schemas.get(&node.kind) else {
1104            continue;
1105        };
1106        let props: serde_json::Value = serde_json::from_str(&node.properties).unwrap_or_default();
1107        let has_weights = schema.paths.iter().any(|p| p.weight.is_some());
1108        let (text_content, positions, stats) = extract_property_fts(&props, schema);
1109        combined_stats.merge(stats);
1110        if let Some(text_content) = text_content {
1111            let columns = if has_weights {
1112                extract_property_fts_columns(&props, schema)
1113            } else {
1114                Vec::new()
1115            };
1116            prepared
1117                .required_property_fts_rows
1118                .push(FtsPropertyProjectionRow {
1119                    node_logical_id: node.logical_id.clone(),
1120                    kind: node.kind.clone(),
1121                    text_content,
1122                    positions,
1123                    columns,
1124                });
1125        }
1126    }
1127    if combined_stats != ExtractStats::default() {
1128        trace_debug!(
1129            depth_cap_hit = combined_stats.depth_cap_hit,
1130            byte_cap_reached = combined_stats.byte_cap_reached,
1131            excluded_subtree = combined_stats.excluded_subtree,
1132            "property fts recursive extraction guardrails engaged"
1133        );
1134    }
1135    trace_debug!(
1136        property_fts_rows = prepared.required_property_fts_rows.len(),
1137        nodes_processed = prepared.nodes.len(),
1138        "property fts row resolution completed"
1139    );
1140    Ok(())
1141}
1142
1143/// Extract scalar values from a JSON object at a `$.field` path.
1144/// Supports simple dot-notation paths like `$.name`, `$.address.city`.
1145/// Returns individual scalars so callers can join them with the configured separator.
1146pub(crate) fn extract_json_path(value: &serde_json::Value, path: &str) -> Vec<String> {
1147    let Some(path) = path.strip_prefix("$.") else {
1148        return Vec::new();
1149    };
1150    let mut current = value;
1151    for segment in path.split('.') {
1152        match current.get(segment) {
1153            Some(v) => current = v,
1154            None => return Vec::new(),
1155        }
1156    }
1157    match current {
1158        serde_json::Value::String(s) => vec![s.clone()],
1159        serde_json::Value::Number(n) => vec![n.to_string()],
1160        serde_json::Value::Bool(b) => vec![b.to_string()],
1161        serde_json::Value::Null | serde_json::Value::Object(_) => Vec::new(),
1162        serde_json::Value::Array(arr) => arr
1163            .iter()
1164            .filter_map(|v| match v {
1165                serde_json::Value::String(s) => Some(s.clone()),
1166                serde_json::Value::Number(n) => Some(n.to_string()),
1167                serde_json::Value::Bool(b) => Some(b.to_string()),
1168                _ => None,
1169            })
1170            .collect(),
1171    }
1172}
1173
1174/// Core property-FTS extraction. Produces the concatenated blob, the
1175/// position map identifying which byte range in the blob came from which
1176/// JSON leaf path, and the guardrail stats for the walk.
1177///
1178/// Emission rules:
1179/// - Path entries are processed in their registered order.
1180/// - Scalar-mode entries append the scalar value(s) directly (arrays of
1181///   scalars flatten, matching pre-Phase-4 behavior). Scalar emissions do
1182///   not produce position-map entries — the position map is only populated
1183///   when a recursive path contributes a leaf.
1184/// - Recursive-mode entries walk every descendant scalar leaf in stable
1185///   lexicographic key order. Each leaf emits a position-map entry whose
1186///   `[start, end)` spans the leaf value's bytes in the blob (NOT the
1187///   trailing separator).
1188/// - Between any two emitted values (whether from the same or different
1189///   path entries) the walk inserts [`LEAF_SEPARATOR`], a hard phrase
1190///   break under FTS5's `porter unicode61` tokenizer. Scalar-mode emissions
1191///   between each other use the schema's configured `separator` for
1192///   backwards compatibility with Phase 0.
1193pub(crate) fn extract_property_fts(
1194    props: &serde_json::Value,
1195    schema: &PropertyFtsSchema,
1196) -> (Option<String>, Vec<PositionEntry>, ExtractStats) {
1197    let mut walker = RecursiveWalker {
1198        blob: String::new(),
1199        positions: Vec::new(),
1200        stats: ExtractStats::default(),
1201        exclude_paths: schema.exclude_paths.clone(),
1202        stopped: false,
1203    };
1204
1205    let mut scalar_parts: Vec<String> = Vec::new();
1206
1207    for entry in &schema.paths {
1208        match entry.mode {
1209            PropertyPathMode::Scalar => {
1210                scalar_parts.extend(extract_json_path(props, &entry.path));
1211            }
1212            PropertyPathMode::Recursive => {
1213                let root = resolve_path_root(props, &entry.path);
1214                if let Some(root) = root {
1215                    walker.walk(&entry.path, root, 0);
1216                }
1217            }
1218        }
1219    }
1220
1221    // Flush scalar parts into the blob. Scalar emissions go first so that
1222    // their byte offsets are predictable relative to any recursive leaves
1223    // that follow. Scalars use the configured `separator`; the boundary
1224    // between scalars and recursive leaves uses `LEAF_SEPARATOR` (so phrase
1225    // queries cannot cross it).
1226    let scalar_text = if scalar_parts.is_empty() {
1227        None
1228    } else {
1229        Some(scalar_parts.join(&schema.separator))
1230    };
1231
1232    let combined = match (scalar_text, walker.blob.is_empty()) {
1233        (None, true) => None,
1234        (None, false) => Some(walker.blob.clone()),
1235        (Some(s), true) => Some(s),
1236        (Some(mut s), false) => {
1237            // Shift recursive positions by the prefix length so they stay
1238            // correct relative to the combined blob.
1239            let offset = s.len() + LEAF_SEPARATOR.len();
1240            for pos in &mut walker.positions {
1241                pos.start_offset += offset;
1242                pos.end_offset += offset;
1243            }
1244            s.push_str(LEAF_SEPARATOR);
1245            s.push_str(&walker.blob);
1246            Some(s)
1247        }
1248    };
1249
1250    (combined, walker.positions, walker.stats)
1251}
1252
1253/// Extract per-column FTS text for a node's properties, one entry per spec.
1254///
1255/// Returns `Vec<(column_name, text)>` in spec order.
1256pub(crate) fn extract_property_fts_columns(
1257    props: &serde_json::Value,
1258    schema: &PropertyFtsSchema,
1259) -> Vec<(String, String)> {
1260    let mut result = Vec::new();
1261    for entry in &schema.paths {
1262        let is_recursive = matches!(entry.mode, PropertyPathMode::Recursive);
1263        let column_name = fathomdb_schema::fts_column_name(&entry.path, is_recursive);
1264        let text = match entry.mode {
1265            PropertyPathMode::Scalar => {
1266                let parts = extract_json_path(props, &entry.path);
1267                parts.join(&schema.separator)
1268            }
1269            PropertyPathMode::Recursive => {
1270                let mut walker = RecursiveWalker {
1271                    blob: String::new(),
1272                    positions: Vec::new(),
1273                    stats: ExtractStats::default(),
1274                    exclude_paths: schema.exclude_paths.clone(),
1275                    stopped: false,
1276                };
1277                if let Some(root) = resolve_path_root(props, &entry.path) {
1278                    walker.walk(&entry.path, root, 0);
1279                }
1280                walker.blob
1281            }
1282        };
1283        result.push((column_name, text));
1284    }
1285    result
1286}
1287
1288fn resolve_path_root<'a>(
1289    value: &'a serde_json::Value,
1290    path: &str,
1291) -> Option<&'a serde_json::Value> {
1292    let stripped = path.strip_prefix("$.")?;
1293    let mut current = value;
1294    for segment in stripped.split('.') {
1295        current = current.get(segment)?;
1296    }
1297    Some(current)
1298}
1299
1300struct RecursiveWalker {
1301    blob: String,
1302    positions: Vec<PositionEntry>,
1303    stats: ExtractStats,
1304    exclude_paths: Vec<String>,
1305    /// Once the byte cap is hit, the walker refuses to emit any further
1306    /// leaves. Subsequent walks still account for depth/exclude stats but
1307    /// do not append.
1308    stopped: bool,
1309}
1310
1311impl RecursiveWalker {
1312    fn walk(&mut self, current_path: &str, value: &serde_json::Value, depth: usize) {
1313        if self.stopped {
1314            return;
1315        }
1316        if self.exclude_paths.iter().any(|p| p == current_path) {
1317            self.stats.excluded_subtree += 1;
1318            return;
1319        }
1320        match value {
1321            serde_json::Value::String(s) => self.emit_leaf(current_path, s),
1322            serde_json::Value::Number(n) => self.emit_leaf(current_path, &n.to_string()),
1323            serde_json::Value::Bool(b) => self.emit_leaf(current_path, &b.to_string()),
1324            serde_json::Value::Null => {}
1325            serde_json::Value::Object(map) => {
1326                if depth >= MAX_RECURSIVE_DEPTH {
1327                    self.stats.depth_cap_hit += 1;
1328                    return;
1329                }
1330                let mut keys: Vec<&String> = map.keys().collect();
1331                keys.sort();
1332                for key in keys {
1333                    if self.stopped {
1334                        return;
1335                    }
1336                    let child_path = format!("{current_path}.{key}");
1337                    if let Some(child) = map.get(key) {
1338                        self.walk(&child_path, child, depth + 1);
1339                    }
1340                }
1341            }
1342            serde_json::Value::Array(items) => {
1343                if depth >= MAX_RECURSIVE_DEPTH {
1344                    self.stats.depth_cap_hit += 1;
1345                    return;
1346                }
1347                for (idx, item) in items.iter().enumerate() {
1348                    if self.stopped {
1349                        return;
1350                    }
1351                    let child_path = format!("{current_path}[{idx}]");
1352                    self.walk(&child_path, item, depth + 1);
1353                }
1354            }
1355        }
1356    }
1357
1358    fn emit_leaf(&mut self, leaf_path: &str, value: &str) {
1359        if self.stopped {
1360            return;
1361        }
1362        if value.is_empty() {
1363            return;
1364        }
1365        // Compute the projected blob size if we accept this leaf.
1366        // If we already emitted at least one leaf, we must account for
1367        // the separator that precedes this one.
1368        let sep_len = if self.blob.is_empty() {
1369            0
1370        } else {
1371            LEAF_SEPARATOR.len()
1372        };
1373        let projected_len = self.blob.len() + sep_len + value.len();
1374        if projected_len > MAX_EXTRACTED_BYTES {
1375            self.stats.byte_cap_reached = true;
1376            self.stopped = true;
1377            return;
1378        }
1379        if !self.blob.is_empty() {
1380            self.blob.push_str(LEAF_SEPARATOR);
1381        }
1382        let start_offset = self.blob.len();
1383        self.blob.push_str(value);
1384        let end_offset = self.blob.len();
1385        self.positions.push(PositionEntry {
1386            start_offset,
1387            end_offset,
1388            leaf_path: leaf_path.to_owned(),
1389        });
1390    }
1391}
1392
1393/// Load all registered FTS property schemas from the database, tolerating
1394/// both the legacy JSON shape (array of bare path strings = scalar mode)
1395/// and the Phase 4 shape (objects carrying `path`, `mode`, optional
1396/// `exclude_paths`, or a top-level object carrying `paths` + global
1397/// `exclude_paths`).
1398pub(crate) fn load_fts_property_schemas(
1399    conn: &rusqlite::Connection,
1400) -> Result<Vec<(String, PropertyFtsSchema)>, rusqlite::Error> {
1401    let mut stmt =
1402        conn.prepare("SELECT kind, property_paths_json, separator FROM fts_property_schemas")?;
1403    stmt.query_map([], |row| {
1404        let kind: String = row.get(0)?;
1405        let paths_json: String = row.get(1)?;
1406        let separator: String = row.get(2)?;
1407        let schema = parse_property_schema_json(&paths_json, &separator);
1408        Ok((kind, schema))
1409    })?
1410    .collect::<Result<Vec<_>, _>>()
1411}
1412
1413pub(crate) fn parse_property_schema_json(paths_json: &str, separator: &str) -> PropertyFtsSchema {
1414    let value: serde_json::Value = serde_json::from_str(paths_json).unwrap_or_default();
1415    let mut paths = Vec::new();
1416    let mut exclude_paths: Vec<String> = Vec::new();
1417
1418    let path_values: Vec<serde_json::Value> = match value {
1419        serde_json::Value::Array(arr) => arr,
1420        serde_json::Value::Object(map) => {
1421            if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1422                exclude_paths = excl
1423                    .iter()
1424                    .filter_map(|v| v.as_str().map(str::to_owned))
1425                    .collect();
1426            }
1427            match map.get("paths") {
1428                Some(serde_json::Value::Array(arr)) => arr.clone(),
1429                _ => Vec::new(),
1430            }
1431        }
1432        _ => Vec::new(),
1433    };
1434
1435    for entry in path_values {
1436        match entry {
1437            serde_json::Value::String(path) => {
1438                paths.push(PropertyPathEntry::scalar(path));
1439            }
1440            serde_json::Value::Object(map) => {
1441                let Some(path) = map.get("path").and_then(|v| v.as_str()) else {
1442                    continue;
1443                };
1444                let mode = map.get("mode").and_then(|v| v.as_str()).map_or(
1445                    PropertyPathMode::Scalar,
1446                    |m| match m {
1447                        "recursive" => PropertyPathMode::Recursive,
1448                        _ => PropertyPathMode::Scalar,
1449                    },
1450                );
1451                #[allow(clippy::cast_possible_truncation)]
1452                let weight = map
1453                    .get("weight")
1454                    .and_then(serde_json::Value::as_f64)
1455                    .map(|w| w as f32);
1456                paths.push(PropertyPathEntry {
1457                    path: path.to_owned(),
1458                    mode,
1459                    weight,
1460                });
1461                if let Some(serde_json::Value::Array(excl)) = map.get("exclude_paths") {
1462                    for p in excl {
1463                        if let Some(s) = p.as_str() {
1464                            exclude_paths.push(s.to_owned());
1465                        }
1466                    }
1467                }
1468            }
1469            _ => {}
1470        }
1471    }
1472
1473    PropertyFtsSchema {
1474        paths,
1475        separator: separator.to_owned(),
1476        exclude_paths,
1477    }
1478}
1479
1480fn resolve_operational_writes(
1481    conn: &rusqlite::Connection,
1482    prepared: &mut PreparedWrite,
1483) -> Result<(), EngineError> {
1484    let mut collection_kinds = HashMap::new();
1485    let mut collection_filter_fields = HashMap::new();
1486    let mut collection_validation_contracts = HashMap::new();
1487    for write in &prepared.operational_writes {
1488        let collection = operational_write_collection(write);
1489        if !collection_kinds.contains_key(collection) {
1490            let maybe_row: Option<(String, Option<i64>, String, String)> = conn
1491                .query_row(
1492                    "SELECT kind, disabled_at, filter_fields_json, validation_json FROM operational_collections WHERE name = ?1",
1493                    params![collection],
1494                    |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
1495                )
1496                .optional()
1497                .map_err(EngineError::Sqlite)?;
1498            let (kind_text, disabled_at, filter_fields_json, validation_json) = maybe_row
1499                .ok_or_else(|| {
1500                    EngineError::InvalidWrite(format!(
1501                        "operational collection '{collection}' is not registered"
1502                    ))
1503                })?;
1504            if disabled_at.is_some() {
1505                return Err(EngineError::InvalidWrite(format!(
1506                    "operational collection '{collection}' is disabled"
1507                )));
1508            }
1509            let kind = OperationalCollectionKind::try_from(kind_text.as_str())
1510                .map_err(EngineError::InvalidWrite)?;
1511            let filter_fields = parse_operational_filter_fields(&filter_fields_json)?;
1512            let validation_contract = parse_operational_validation_contract(&validation_json)
1513                .map_err(EngineError::InvalidWrite)?;
1514            collection_kinds.insert(collection.to_owned(), kind);
1515            collection_filter_fields.insert(collection.to_owned(), filter_fields);
1516            collection_validation_contracts.insert(collection.to_owned(), validation_contract);
1517        }
1518
1519        let kind = collection_kinds.get(collection).copied().ok_or_else(|| {
1520            EngineError::InvalidWrite("missing operational collection kind".to_owned())
1521        })?;
1522        match (kind, write) {
1523            (OperationalCollectionKind::AppendOnlyLog, OperationalWrite::Append { .. })
1524            | (
1525                OperationalCollectionKind::LatestState,
1526                OperationalWrite::Put { .. } | OperationalWrite::Delete { .. },
1527            ) => {}
1528            (OperationalCollectionKind::AppendOnlyLog, _) => {
1529                return Err(EngineError::InvalidWrite(format!(
1530                    "operational collection '{collection}' is append_only_log and only accepts Append"
1531                )));
1532            }
1533            (OperationalCollectionKind::LatestState, _) => {
1534                return Err(EngineError::InvalidWrite(format!(
1535                    "operational collection '{collection}' is latest_state and only accepts Put/Delete"
1536                )));
1537            }
1538        }
1539        if let Some(Some(contract)) = collection_validation_contracts.get(collection) {
1540            let _ = check_operational_write_against_contract(write, contract)?;
1541        }
1542    }
1543    prepared.operational_collection_kinds = collection_kinds;
1544    prepared.operational_collection_filter_fields = collection_filter_fields;
1545    Ok(())
1546}
1547
1548fn parse_operational_filter_fields(
1549    filter_fields_json: &str,
1550) -> Result<Vec<OperationalFilterField>, EngineError> {
1551    let fields: Vec<OperationalFilterField> =
1552        serde_json::from_str(filter_fields_json).map_err(|error| {
1553            EngineError::InvalidWrite(format!("invalid filter_fields_json: {error}"))
1554        })?;
1555    let mut seen = std::collections::HashSet::new();
1556    for field in &fields {
1557        if field.name.trim().is_empty() {
1558            return Err(EngineError::InvalidWrite(
1559                "filter_fields_json field names must not be empty".to_owned(),
1560            ));
1561        }
1562        if !seen.insert(field.name.as_str()) {
1563            return Err(EngineError::InvalidWrite(format!(
1564                "filter_fields_json contains duplicate field '{}'",
1565                field.name
1566            )));
1567        }
1568        if field.modes.is_empty() {
1569            return Err(EngineError::InvalidWrite(format!(
1570                "filter_fields_json field '{}' must declare at least one mode",
1571                field.name
1572            )));
1573        }
1574        if field.modes.contains(&OperationalFilterMode::Prefix)
1575            && field.field_type != OperationalFilterFieldType::String
1576        {
1577            return Err(EngineError::InvalidWrite(format!(
1578                "filter field '{}' only supports prefix for string types",
1579                field.name
1580            )));
1581        }
1582    }
1583    Ok(fields)
1584}
1585
1586#[derive(Clone, Debug, PartialEq, Eq)]
1587struct OperationalFilterValueRow {
1588    field_name: String,
1589    string_value: Option<String>,
1590    integer_value: Option<i64>,
1591}
1592
1593fn extract_operational_filter_values(
1594    filter_fields: &[OperationalFilterField],
1595    payload_json: &str,
1596) -> Vec<OperationalFilterValueRow> {
1597    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1598        return Vec::new();
1599    };
1600    let Some(object) = parsed.as_object() else {
1601        return Vec::new();
1602    };
1603
1604    filter_fields
1605        .iter()
1606        .filter_map(|field| {
1607            let value = object.get(&field.name)?;
1608            match field.field_type {
1609                OperationalFilterFieldType::String => {
1610                    value
1611                        .as_str()
1612                        .map(|string_value| OperationalFilterValueRow {
1613                            field_name: field.name.clone(),
1614                            string_value: Some(string_value.to_owned()),
1615                            integer_value: None,
1616                        })
1617                }
1618                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1619                    value
1620                        .as_i64()
1621                        .map(|integer_value| OperationalFilterValueRow {
1622                            field_name: field.name.clone(),
1623                            string_value: None,
1624                            integer_value: Some(integer_value),
1625                        })
1626                }
1627            }
1628        })
1629        .collect()
1630}
1631
1632fn resolve_and_apply(
1633    conn: &mut rusqlite::Connection,
1634    prepared: &mut PreparedWrite,
1635) -> Result<WriteReceipt, EngineError> {
1636    resolve_fts_rows(conn, prepared)?;
1637    resolve_property_fts_rows(conn, prepared)?;
1638    resolve_operational_writes(conn, prepared)?;
1639    apply_write(conn, prepared)
1640}
1641
1642fn apply_touch_last_accessed(
1643    conn: &mut rusqlite::Connection,
1644    request: &LastAccessTouchRequest,
1645) -> Result<LastAccessTouchReport, EngineError> {
1646    let mut seen = std::collections::HashSet::new();
1647    let logical_ids = request
1648        .logical_ids
1649        .iter()
1650        .filter(|logical_id| seen.insert(logical_id.as_str()))
1651        .cloned()
1652        .collect::<Vec<_>>();
1653    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1654
1655    for logical_id in &logical_ids {
1656        let exists = tx
1657            .query_row(
1658                "SELECT 1 FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL LIMIT 1",
1659                params![logical_id],
1660                |row| row.get::<_, i64>(0),
1661            )
1662            .optional()?
1663            .is_some();
1664        if !exists {
1665            return Err(EngineError::InvalidWrite(format!(
1666                "touch_last_accessed requires an active node for logical_id '{logical_id}'"
1667            )));
1668        }
1669    }
1670
1671    {
1672        let mut upsert_metadata = tx.prepare_cached(
1673            "INSERT INTO node_access_metadata (logical_id, last_accessed_at, updated_at) \
1674             VALUES (?1, ?2, ?2) \
1675             ON CONFLICT(logical_id) DO UPDATE SET \
1676                 last_accessed_at = excluded.last_accessed_at, \
1677                 updated_at = excluded.updated_at",
1678        )?;
1679        let mut insert_provenance = tx.prepare_cached(
1680            "INSERT INTO provenance_events (id, event_type, subject, source_ref, metadata_json) \
1681             VALUES (?1, 'node_last_accessed_touched', ?2, ?3, ?4)",
1682        )?;
1683        for logical_id in &logical_ids {
1684            upsert_metadata.execute(params![logical_id, request.touched_at])?;
1685            insert_provenance.execute(params![
1686                new_id(),
1687                logical_id,
1688                request.source_ref.as_deref(),
1689                format!("{{\"touched_at\":{}}}", request.touched_at),
1690            ])?;
1691        }
1692    }
1693
1694    tx.commit()?;
1695    Ok(LastAccessTouchReport {
1696        touched_logical_ids: logical_ids.len(),
1697        touched_at: request.touched_at,
1698    })
1699}
1700
1701fn ensure_operational_collections_writable(
1702    tx: &rusqlite::Transaction<'_>,
1703    prepared: &PreparedWrite,
1704) -> Result<(), EngineError> {
1705    for collection in prepared.operational_collection_kinds.keys() {
1706        let disabled_at: Option<Option<i64>> = tx
1707            .query_row(
1708                "SELECT disabled_at FROM operational_collections WHERE name = ?1",
1709                params![collection],
1710                |row| row.get::<_, Option<i64>>(0),
1711            )
1712            .optional()?;
1713        match disabled_at {
1714            Some(Some(_)) => {
1715                return Err(EngineError::InvalidWrite(format!(
1716                    "operational collection '{collection}' is disabled"
1717                )));
1718            }
1719            Some(None) => {}
1720            None => {
1721                return Err(EngineError::InvalidWrite(format!(
1722                    "operational collection '{collection}' is not registered"
1723                )));
1724            }
1725        }
1726    }
1727    Ok(())
1728}
1729
1730fn validate_operational_writes_against_live_contracts(
1731    tx: &rusqlite::Transaction<'_>,
1732    prepared: &PreparedWrite,
1733) -> Result<Vec<String>, EngineError> {
1734    let mut collection_validation_contracts =
1735        HashMap::<String, Option<OperationalValidationContract>>::new();
1736    for collection in prepared.operational_collection_kinds.keys() {
1737        let validation_json: String = tx
1738            .query_row(
1739                "SELECT validation_json FROM operational_collections WHERE name = ?1",
1740                params![collection],
1741                |row| row.get(0),
1742            )
1743            .map_err(EngineError::Sqlite)?;
1744        let validation_contract = parse_operational_validation_contract(&validation_json)
1745            .map_err(EngineError::InvalidWrite)?;
1746        collection_validation_contracts.insert(collection.clone(), validation_contract);
1747    }
1748
1749    let mut warnings = Vec::new();
1750    for write in &prepared.operational_writes {
1751        if let Some(Some(contract)) =
1752            collection_validation_contracts.get(operational_write_collection(write))
1753            && let Some(warning) = check_operational_write_against_contract(write, contract)?
1754        {
1755            warnings.push(warning);
1756        }
1757    }
1758
1759    Ok(warnings)
1760}
1761
1762fn load_live_operational_secondary_indexes(
1763    tx: &rusqlite::Transaction<'_>,
1764    prepared: &PreparedWrite,
1765) -> Result<HashMap<String, Vec<OperationalSecondaryIndexDefinition>>, EngineError> {
1766    let mut collection_indexes = HashMap::new();
1767    for (collection, collection_kind) in &prepared.operational_collection_kinds {
1768        let secondary_indexes_json: String = tx
1769            .query_row(
1770                "SELECT secondary_indexes_json FROM operational_collections WHERE name = ?1",
1771                params![collection],
1772                |row| row.get(0),
1773            )
1774            .map_err(EngineError::Sqlite)?;
1775        let indexes =
1776            parse_operational_secondary_indexes_json(&secondary_indexes_json, *collection_kind)
1777                .map_err(EngineError::InvalidWrite)?;
1778        collection_indexes.insert(collection.clone(), indexes);
1779    }
1780    Ok(collection_indexes)
1781}
1782
1783fn check_operational_write_against_contract(
1784    write: &OperationalWrite,
1785    contract: &OperationalValidationContract,
1786) -> Result<Option<String>, EngineError> {
1787    if contract.mode == OperationalValidationMode::Disabled {
1788        return Ok(None);
1789    }
1790
1791    let (payload_json, collection, record_key) = match write {
1792        OperationalWrite::Append {
1793            collection,
1794            record_key,
1795            payload_json,
1796            ..
1797        }
1798        | OperationalWrite::Put {
1799            collection,
1800            record_key,
1801            payload_json,
1802            ..
1803        } => (
1804            payload_json.as_str(),
1805            collection.as_str(),
1806            record_key.as_str(),
1807        ),
1808        OperationalWrite::Delete { .. } => return Ok(None),
1809    };
1810
1811    match validate_operational_payload_against_contract(contract, payload_json) {
1812        Ok(()) => Ok(None),
1813        Err(message) => match contract.mode {
1814            OperationalValidationMode::Disabled => Ok(None),
1815            OperationalValidationMode::ReportOnly => Ok(Some(format!(
1816                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1817                kind = operational_write_kind(write)
1818            ))),
1819            OperationalValidationMode::Enforce => Err(EngineError::InvalidWrite(format!(
1820                "invalid operational payload for collection '{collection}' {kind} '{record_key}': {message}",
1821                kind = operational_write_kind(write)
1822            ))),
1823        },
1824    }
1825}
1826
1827#[allow(clippy::too_many_lines)]
1828fn apply_write(
1829    conn: &mut rusqlite::Connection,
1830    prepared: &mut PreparedWrite,
1831) -> Result<WriteReceipt, EngineError> {
1832    let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1833
1834    // Node retires: clear rebuildable FTS rows (chunk-based and property-based),
1835    // preserve chunks/vec for possible restore, mark superseded, record audit event.
1836    {
1837        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1838        let mut del_prop_positions = tx
1839            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1840        // Double-write cleanup: also remove staging rows for this node so that a
1841        // rebuild-in-progress does not re-insert a retired node at swap time.
1842        let mut del_staging = tx.prepare_cached(
1843            "DELETE FROM fts_property_rebuild_staging WHERE node_logical_id = ?1",
1844        )?;
1845        let mut sup_node = tx.prepare_cached(
1846            "UPDATE nodes SET superseded_at = unixepoch() \
1847             WHERE logical_id = ?1 AND superseded_at IS NULL",
1848        )?;
1849        let mut ins_event = tx.prepare_cached(
1850            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1851             VALUES (?1, 'node_retire', ?2, ?3)",
1852        )?;
1853        for retire in &prepared.node_retires {
1854            del_fts.execute(params![retire.logical_id])?;
1855            // Delete from the per-kind FTS table: look up the node's kind first.
1856            let kind_opt: Option<String> = tx
1857                .query_row(
1858                    "SELECT kind FROM nodes WHERE logical_id = ?1 AND superseded_at IS NULL",
1859                    params![retire.logical_id],
1860                    |r| r.get(0),
1861                )
1862                .optional()?;
1863            if let Some(kind) = kind_opt {
1864                let table = fathomdb_schema::fts_kind_table_name(&kind);
1865                // Only delete if the per-kind table exists (may not exist if no schema registered).
1866                let table_exists: bool = tx
1867                    .query_row(
1868                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1869                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1870                        rusqlite::params![table],
1871                        |r| r.get::<_, i64>(0),
1872                    )
1873                    .unwrap_or(0)
1874                    > 0;
1875                if table_exists {
1876                    tx.execute(
1877                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1878                        params![retire.logical_id],
1879                    )?;
1880                }
1881            }
1882            del_prop_positions.execute(params![retire.logical_id])?;
1883            del_staging.execute(params![retire.logical_id])?;
1884            sup_node.execute(params![retire.logical_id])?;
1885            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1886        }
1887    }
1888
1889    // Edge retires: mark superseded, record audit event.
1890    {
1891        let mut sup_edge = tx.prepare_cached(
1892            "UPDATE edges SET superseded_at = unixepoch() \
1893             WHERE logical_id = ?1 AND superseded_at IS NULL",
1894        )?;
1895        let mut ins_event = tx.prepare_cached(
1896            "INSERT INTO provenance_events (id, event_type, subject, source_ref) \
1897             VALUES (?1, 'edge_retire', ?2, ?3)",
1898        )?;
1899        for retire in &prepared.edge_retires {
1900            sup_edge.execute(params![retire.logical_id])?;
1901            ins_event.execute(params![new_id(), retire.logical_id, retire.source_ref])?;
1902        }
1903    }
1904
1905    // Node inserts (with optional upsert + chunk-policy handling).
1906    {
1907        let mut del_fts = tx.prepare_cached("DELETE FROM fts_nodes WHERE node_logical_id = ?1")?;
1908        let mut del_prop_positions = tx
1909            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
1910        let mut del_chunks = tx.prepare_cached("DELETE FROM chunks WHERE node_logical_id = ?1")?;
1911        let mut sup_node = tx.prepare_cached(
1912            "UPDATE nodes SET superseded_at = unixepoch() \
1913             WHERE logical_id = ?1 AND superseded_at IS NULL",
1914        )?;
1915        let mut ins_node = tx.prepare_cached(
1916            "INSERT INTO nodes (row_id, logical_id, kind, properties, created_at, source_ref, content_ref) \
1917             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5, ?6)",
1918        )?;
1919        for node in &prepared.nodes {
1920            if node.upsert {
1921                // Property FTS rows are always replaced on upsert since properties change.
1922                // Delete from the per-kind table (table name is dynamic).
1923                let table = fathomdb_schema::fts_kind_table_name(&node.kind);
1924                // Only delete if the per-kind table exists (may not exist if no schema registered).
1925                let table_exists: bool = tx
1926                    .query_row(
1927                        "SELECT count(*) FROM sqlite_master WHERE type='table' AND name = ?1 \
1928                         AND sql LIKE 'CREATE VIRTUAL TABLE%'",
1929                        rusqlite::params![table],
1930                        |r| r.get::<_, i64>(0),
1931                    )
1932                    .unwrap_or(0)
1933                    > 0;
1934                if table_exists {
1935                    tx.execute(
1936                        &format!("DELETE FROM {table} WHERE node_logical_id = ?1"),
1937                        params![node.logical_id],
1938                    )?;
1939                }
1940                del_prop_positions.execute(params![node.logical_id])?;
1941                if node.chunk_policy == ChunkPolicy::Replace {
1942                    // 0.5.0: delete from the per-kind vec table instead of the global sentinel.
1943                    #[cfg(feature = "sqlite-vec")]
1944                    {
1945                        let vec_table = fathomdb_schema::vec_kind_table_name(&node.kind);
1946                        let del_sql = format!(
1947                            "DELETE FROM {vec_table} WHERE chunk_id IN \
1948                             (SELECT id FROM chunks WHERE node_logical_id = ?1)"
1949                        );
1950                        match tx.execute(&del_sql, params![node.logical_id]) {
1951                            Ok(_) => {}
1952                            Err(ref e) if crate::coordinator::is_vec_table_absent(e) => {}
1953                            Err(e) => return Err(e.into()),
1954                        }
1955                    }
1956                    del_fts.execute(params![node.logical_id])?;
1957                    del_chunks.execute(params![node.logical_id])?;
1958                }
1959                sup_node.execute(params![node.logical_id])?;
1960            }
1961            ins_node.execute(params![
1962                node.row_id,
1963                node.logical_id,
1964                node.kind,
1965                node.properties,
1966                node.source_ref,
1967                node.content_ref,
1968            ])?;
1969        }
1970    }
1971
1972    // Edge inserts (with optional upsert).
1973    {
1974        let mut sup_edge = tx.prepare_cached(
1975            "UPDATE edges SET superseded_at = unixepoch() \
1976             WHERE logical_id = ?1 AND superseded_at IS NULL",
1977        )?;
1978        let mut ins_edge = tx.prepare_cached(
1979            "INSERT INTO edges \
1980             (row_id, logical_id, source_logical_id, target_logical_id, kind, properties, created_at, source_ref) \
1981             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
1982        )?;
1983        for edge in &prepared.edges {
1984            if edge.upsert {
1985                sup_edge.execute(params![edge.logical_id])?;
1986            }
1987            ins_edge.execute(params![
1988                edge.row_id,
1989                edge.logical_id,
1990                edge.source_logical_id,
1991                edge.target_logical_id,
1992                edge.kind,
1993                edge.properties,
1994                edge.source_ref,
1995            ])?;
1996        }
1997    }
1998
1999    // Chunk inserts.
2000    {
2001        let mut ins_chunk = tx.prepare_cached(
2002            "INSERT INTO chunks (id, node_logical_id, text_content, byte_start, byte_end, created_at, content_hash) \
2003             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2004        )?;
2005        for chunk in &prepared.chunks {
2006            ins_chunk.execute(params![
2007                chunk.id,
2008                chunk.node_logical_id,
2009                chunk.text_content,
2010                chunk.byte_start,
2011                chunk.byte_end,
2012                chunk.content_hash,
2013            ])?;
2014        }
2015    }
2016
2017    // Run inserts (with optional upsert).
2018    {
2019        let mut sup_run = tx.prepare_cached(
2020            "UPDATE runs SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2021        )?;
2022        let mut ins_run = tx.prepare_cached(
2023            "INSERT INTO runs (id, kind, status, properties, created_at, source_ref) \
2024             VALUES (?1, ?2, ?3, ?4, unixepoch(), ?5)",
2025        )?;
2026        for run in &prepared.runs {
2027            if run.upsert
2028                && let Some(ref prior_id) = run.supersedes_id
2029            {
2030                sup_run.execute(params![prior_id])?;
2031            }
2032            ins_run.execute(params![
2033                run.id,
2034                run.kind,
2035                run.status,
2036                run.properties,
2037                run.source_ref
2038            ])?;
2039        }
2040    }
2041
2042    // Step inserts (with optional upsert).
2043    {
2044        let mut sup_step = tx.prepare_cached(
2045            "UPDATE steps SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2046        )?;
2047        let mut ins_step = tx.prepare_cached(
2048            "INSERT INTO steps (id, run_id, kind, status, properties, created_at, source_ref) \
2049             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2050        )?;
2051        for step in &prepared.steps {
2052            if step.upsert
2053                && let Some(ref prior_id) = step.supersedes_id
2054            {
2055                sup_step.execute(params![prior_id])?;
2056            }
2057            ins_step.execute(params![
2058                step.id,
2059                step.run_id,
2060                step.kind,
2061                step.status,
2062                step.properties,
2063                step.source_ref,
2064            ])?;
2065        }
2066    }
2067
2068    // Action inserts (with optional upsert).
2069    {
2070        let mut sup_action = tx.prepare_cached(
2071            "UPDATE actions SET superseded_at = unixepoch() WHERE id = ?1 AND superseded_at IS NULL",
2072        )?;
2073        let mut ins_action = tx.prepare_cached(
2074            "INSERT INTO actions (id, step_id, kind, status, properties, created_at, source_ref) \
2075             VALUES (?1, ?2, ?3, ?4, ?5, unixepoch(), ?6)",
2076        )?;
2077        for action in &prepared.actions {
2078            if action.upsert
2079                && let Some(ref prior_id) = action.supersedes_id
2080            {
2081                sup_action.execute(params![prior_id])?;
2082            }
2083            ins_action.execute(params![
2084                action.id,
2085                action.step_id,
2086                action.kind,
2087                action.status,
2088                action.properties,
2089                action.source_ref,
2090            ])?;
2091        }
2092    }
2093
2094    // Operational mutation log writes and latest-state current rows.
2095    {
2096        ensure_operational_collections_writable(&tx, prepared)?;
2097        prepared.operational_validation_warnings =
2098            validate_operational_writes_against_live_contracts(&tx, prepared)?;
2099        let collection_secondary_indexes = load_live_operational_secondary_indexes(&tx, prepared)?;
2100
2101        let mut next_mutation_order: i64 = tx.query_row(
2102            "SELECT COALESCE(MAX(mutation_order), 0) FROM operational_mutations",
2103            [],
2104            |row| row.get(0),
2105        )?;
2106        let mut ins_mutation = tx.prepare_cached(
2107            "INSERT INTO operational_mutations \
2108             (id, collection_name, record_key, op_kind, payload_json, source_ref, created_at, mutation_order) \
2109             VALUES (?1, ?2, ?3, ?4, ?5, ?6, unixepoch(), ?7)",
2110        )?;
2111        let mut ins_filter_value = tx.prepare_cached(
2112            "INSERT INTO operational_filter_values \
2113             (mutation_id, collection_name, field_name, string_value, integer_value) \
2114             VALUES (?1, ?2, ?3, ?4, ?5)",
2115        )?;
2116        let mut upsert_current = tx.prepare_cached(
2117            "INSERT INTO operational_current \
2118             (collection_name, record_key, payload_json, updated_at, last_mutation_id) \
2119             VALUES (?1, ?2, ?3, unixepoch(), ?4) \
2120             ON CONFLICT(collection_name, record_key) DO UPDATE SET \
2121                 payload_json = excluded.payload_json, \
2122                 updated_at = excluded.updated_at, \
2123                 last_mutation_id = excluded.last_mutation_id",
2124        )?;
2125        let mut del_current = tx.prepare_cached(
2126            "DELETE FROM operational_current WHERE collection_name = ?1 AND record_key = ?2",
2127        )?;
2128        let mut del_current_secondary_indexes = tx.prepare_cached(
2129            "DELETE FROM operational_secondary_index_entries \
2130             WHERE collection_name = ?1 AND subject_kind = 'current' AND record_key = ?2",
2131        )?;
2132        let mut ins_secondary_index = tx.prepare_cached(
2133            "INSERT INTO operational_secondary_index_entries \
2134             (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
2135              slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
2136             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
2137        )?;
2138        let mut current_row_stmt = tx.prepare_cached(
2139            "SELECT payload_json, updated_at, last_mutation_id FROM operational_current \
2140             WHERE collection_name = ?1 AND record_key = ?2",
2141        )?;
2142
2143        for write in &prepared.operational_writes {
2144            let collection = operational_write_collection(write);
2145            let record_key = operational_write_record_key(write);
2146            let mutation_id = new_id();
2147            next_mutation_order += 1;
2148            let payload_json = operational_write_payload(write);
2149            ins_mutation.execute(params![
2150                &mutation_id,
2151                collection,
2152                record_key,
2153                operational_write_kind(write),
2154                payload_json,
2155                operational_write_source_ref(write),
2156                next_mutation_order,
2157            ])?;
2158            if let Some(indexes) = collection_secondary_indexes.get(collection) {
2159                for entry in extract_secondary_index_entries_for_mutation(indexes, payload_json) {
2160                    ins_secondary_index.execute(params![
2161                        collection,
2162                        entry.index_name,
2163                        "mutation",
2164                        &mutation_id,
2165                        record_key,
2166                        entry.sort_timestamp,
2167                        entry.slot1_text,
2168                        entry.slot1_integer,
2169                        entry.slot2_text,
2170                        entry.slot2_integer,
2171                        entry.slot3_text,
2172                        entry.slot3_integer,
2173                    ])?;
2174                }
2175            }
2176            if let Some(filter_fields) = prepared
2177                .operational_collection_filter_fields
2178                .get(collection)
2179            {
2180                for filter_value in extract_operational_filter_values(filter_fields, payload_json) {
2181                    ins_filter_value.execute(params![
2182                        &mutation_id,
2183                        collection,
2184                        filter_value.field_name,
2185                        filter_value.string_value,
2186                        filter_value.integer_value,
2187                    ])?;
2188                }
2189            }
2190
2191            if prepared.operational_collection_kinds.get(collection)
2192                == Some(&OperationalCollectionKind::LatestState)
2193            {
2194                del_current_secondary_indexes.execute(params![collection, record_key])?;
2195                match write {
2196                    OperationalWrite::Put { payload_json, .. } => {
2197                        upsert_current.execute(params![
2198                            collection,
2199                            record_key,
2200                            payload_json,
2201                            &mutation_id,
2202                        ])?;
2203                        if let Some(indexes) = collection_secondary_indexes.get(collection) {
2204                            let (current_payload_json, updated_at, last_mutation_id): (
2205                                String,
2206                                i64,
2207                                String,
2208                            ) = current_row_stmt
2209                                .query_row(params![collection, record_key], |row| {
2210                                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
2211                                })?;
2212                            for entry in extract_secondary_index_entries_for_current(
2213                                indexes,
2214                                &current_payload_json,
2215                                updated_at,
2216                            ) {
2217                                ins_secondary_index.execute(params![
2218                                    collection,
2219                                    entry.index_name,
2220                                    "current",
2221                                    last_mutation_id.as_str(),
2222                                    record_key,
2223                                    entry.sort_timestamp,
2224                                    entry.slot1_text,
2225                                    entry.slot1_integer,
2226                                    entry.slot2_text,
2227                                    entry.slot2_integer,
2228                                    entry.slot3_text,
2229                                    entry.slot3_integer,
2230                                ])?;
2231                            }
2232                        }
2233                    }
2234                    OperationalWrite::Delete { .. } => {
2235                        del_current.execute(params![collection, record_key])?;
2236                    }
2237                    OperationalWrite::Append { .. } => {}
2238                }
2239            }
2240        }
2241    }
2242
2243    // FTS row inserts.
2244    {
2245        let mut ins_fts = tx.prepare_cached(
2246            "INSERT INTO fts_nodes (chunk_id, node_logical_id, kind, text_content) \
2247             VALUES (?1, ?2, ?3, ?4)",
2248        )?;
2249        for fts_row in &prepared.required_fts_rows {
2250            ins_fts.execute(params![
2251                fts_row.chunk_id,
2252                fts_row.node_logical_id,
2253                fts_row.kind,
2254                fts_row.text_content,
2255            ])?;
2256        }
2257    }
2258
2259    // Property FTS row inserts.
2260    if !prepared.required_property_fts_rows.is_empty() {
2261        let mut ins_positions = tx.prepare_cached(
2262            "INSERT INTO fts_node_property_positions \
2263             (node_logical_id, kind, start_offset, end_offset, leaf_path) \
2264             VALUES (?1, ?2, ?3, ?4, ?5)",
2265        )?;
2266        // Delete any stale position rows for these nodes; writer already
2267        // deleted the per-kind FTS rows in the upsert block above.
2268        let mut del_positions = tx
2269            .prepare_cached("DELETE FROM fts_node_property_positions WHERE node_logical_id = ?1")?;
2270        for row in &prepared.required_property_fts_rows {
2271            del_positions.execute(params![row.node_logical_id])?;
2272            // Insert into per-kind FTS table (table name is dynamic).
2273            let table = fathomdb_schema::fts_kind_table_name(&row.kind);
2274            if row.columns.is_empty() {
2275                // Non-weighted schema: single text_content column.
2276                // Ensure the per-kind table exists (handles new-kind first write).
2277                tx.execute_batch(&format!(
2278                    "CREATE VIRTUAL TABLE IF NOT EXISTS {table} USING fts5(\
2279                        node_logical_id UNINDEXED, text_content, \
2280                        tokenize = '{DEFAULT_FTS_TOKENIZER}'\
2281                    )"
2282                ))?;
2283                tx.prepare(&format!(
2284                    "INSERT INTO {table} (node_logical_id, text_content) VALUES (?1, ?2)"
2285                ))?
2286                .execute(params![row.node_logical_id, row.text_content])?;
2287            } else {
2288                // Weighted schema: per-column INSERT.
2289                let col_names: Vec<&str> = row.columns.iter().map(|(n, _)| n.as_str()).collect();
2290                let placeholders: Vec<String> = (2..=row.columns.len() + 1)
2291                    .map(|i| format!("?{i}"))
2292                    .collect();
2293                let sql = format!(
2294                    "INSERT INTO {table}(node_logical_id, {cols}) VALUES (?1, {placeholders})",
2295                    cols = col_names.join(", "),
2296                    placeholders = placeholders.join(", "),
2297                );
2298                let mut stmt = tx.prepare(&sql)?;
2299                stmt.execute(rusqlite::params_from_iter(
2300                    std::iter::once(row.node_logical_id.as_str())
2301                        .chain(row.columns.iter().map(|(_, v)| v.as_str())),
2302                ))?;
2303            }
2304            for pos in &row.positions {
2305                ins_positions.execute(params![
2306                    row.node_logical_id,
2307                    row.kind,
2308                    i64::try_from(pos.start_offset).unwrap_or(i64::MAX),
2309                    i64::try_from(pos.end_offset).unwrap_or(i64::MAX),
2310                    pos.leaf_path,
2311                ])?;
2312            }
2313        }
2314    }
2315
2316    // Double-write to staging: for nodes whose kind has an active rebuild
2317    // (state IN ('PENDING','BUILDING','SWAPPING')), upsert the precomputed
2318    // text_content into fts_property_rebuild_staging.  Both this upsert and
2319    // the per-kind FTS insert above happen inside the same transaction.
2320    if !prepared.required_property_fts_rows.is_empty() {
2321        let mut ins_staging_simple = tx.prepare_cached(
2322            "INSERT INTO fts_property_rebuild_staging \
2323             (kind, node_logical_id, text_content) \
2324             VALUES (?1, ?2, ?3) \
2325             ON CONFLICT(kind, node_logical_id) DO UPDATE \
2326             SET text_content = excluded.text_content",
2327        )?;
2328        let mut ins_staging_columns = tx.prepare_cached(
2329            "INSERT INTO fts_property_rebuild_staging \
2330             (kind, node_logical_id, text_content, columns_json) \
2331             VALUES (?1, ?2, ?3, ?4) \
2332             ON CONFLICT(kind, node_logical_id) DO UPDATE \
2333             SET text_content = excluded.text_content, \
2334                 columns_json = excluded.columns_json",
2335        )?;
2336        let mut check_rebuild = tx.prepare_cached(
2337            "SELECT 1 FROM fts_property_rebuild_state \
2338             WHERE kind = ?1 AND state IN ('PENDING','BUILDING','SWAPPING') LIMIT 1",
2339        )?;
2340        for row in &prepared.required_property_fts_rows {
2341            let in_rebuild: bool = check_rebuild
2342                .query_row(params![row.kind], |_| Ok(true))
2343                .optional()?
2344                .unwrap_or(false);
2345            if in_rebuild {
2346                if row.columns.is_empty() {
2347                    ins_staging_simple.execute(params![
2348                        row.kind,
2349                        row.node_logical_id,
2350                        row.text_content
2351                    ])?;
2352                } else {
2353                    let json_map: serde_json::Map<String, serde_json::Value> = row
2354                        .columns
2355                        .iter()
2356                        .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
2357                        .collect();
2358                    let columns_json =
2359                        serde_json::to_string(&serde_json::Value::Object(json_map)).ok();
2360                    ins_staging_columns.execute(params![
2361                        row.kind,
2362                        row.node_logical_id,
2363                        row.text_content,
2364                        columns_json
2365                    ])?;
2366                }
2367            }
2368        }
2369    }
2370
2371    // Vec inserts (feature-gated; silently skipped when sqlite-vec is absent or table missing).
2372    // 0.5.0: inserts target per-kind tables (vec_<sanitized_kind>) instead of vec_nodes_active.
2373    #[cfg(feature = "sqlite-vec")]
2374    {
2375        // Build chunk_id → node_logical_id from the current request's chunks.
2376        let chunk_to_node: std::collections::HashMap<&str, &str> = prepared
2377            .chunks
2378            .iter()
2379            .map(|c| (c.id.as_str(), c.node_logical_id.as_str()))
2380            .collect();
2381
2382        for vi in &prepared.vec_inserts {
2383            // Resolve kind: prefer in-memory lookup; fall back to DB for pre-existing chunks.
2384            let kind: Option<String> = chunk_to_node
2385                .get(vi.chunk_id.as_str())
2386                .and_then(|lid| prepared.node_kinds.get(*lid))
2387                .cloned()
2388                .or_else(|| {
2389                    tx.query_row(
2390                        "SELECT n.kind FROM chunks c \
2391                         JOIN nodes n ON n.logical_id = c.node_logical_id \
2392                         WHERE c.id = ?1 LIMIT 1",
2393                        rusqlite::params![vi.chunk_id],
2394                        |row| row.get::<_, String>(0),
2395                    )
2396                    .optional()
2397                    .unwrap_or(None)
2398                });
2399
2400            let Some(kind) = kind else {
2401                // No kind found — chunk not yet inserted; skip silently.
2402                continue;
2403            };
2404
2405            let table_name = fathomdb_schema::vec_kind_table_name(&kind);
2406            let dimension = vi.embedding.len();
2407            let bytes: Vec<u8> = vi.embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
2408            // Auto-create the per-kind vec table if it does not exist yet.
2409            tx.execute_batch(&format!(
2410                "CREATE VIRTUAL TABLE IF NOT EXISTS {table_name} USING vec0(\
2411                    chunk_id TEXT PRIMARY KEY,\
2412                    embedding float[{dimension}]\
2413                )"
2414            ))?;
2415            tx.execute(
2416                &format!(
2417                    "INSERT OR REPLACE INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
2418                ),
2419                rusqlite::params![vi.chunk_id, bytes],
2420            )?;
2421        }
2422    }
2423
2424    tx.commit()?;
2425
2426    let provenance_warnings: Vec<String> = prepared
2427        .nodes
2428        .iter()
2429        .filter(|node| node.source_ref.is_none())
2430        .map(|node| format!("node '{}' has no source_ref", node.logical_id))
2431        .chain(
2432            prepared
2433                .node_retires
2434                .iter()
2435                .filter(|r| r.source_ref.is_none())
2436                .map(|r| format!("node retire '{}' has no source_ref", r.logical_id)),
2437        )
2438        .chain(
2439            prepared
2440                .edges
2441                .iter()
2442                .filter(|e| e.source_ref.is_none())
2443                .map(|e| format!("edge '{}' has no source_ref", e.logical_id)),
2444        )
2445        .chain(
2446            prepared
2447                .edge_retires
2448                .iter()
2449                .filter(|r| r.source_ref.is_none())
2450                .map(|r| format!("edge retire '{}' has no source_ref", r.logical_id)),
2451        )
2452        .chain(
2453            prepared
2454                .runs
2455                .iter()
2456                .filter(|r| r.source_ref.is_none())
2457                .map(|r| format!("run '{}' has no source_ref", r.id)),
2458        )
2459        .chain(
2460            prepared
2461                .steps
2462                .iter()
2463                .filter(|s| s.source_ref.is_none())
2464                .map(|s| format!("step '{}' has no source_ref", s.id)),
2465        )
2466        .chain(
2467            prepared
2468                .actions
2469                .iter()
2470                .filter(|a| a.source_ref.is_none())
2471                .map(|a| format!("action '{}' has no source_ref", a.id)),
2472        )
2473        .chain(
2474            prepared
2475                .operational_writes
2476                .iter()
2477                .filter(|write| operational_write_source_ref(write).is_none())
2478                .map(|write| {
2479                    format!(
2480                        "operational {} '{}:{}' has no source_ref",
2481                        operational_write_kind(write),
2482                        operational_write_collection(write),
2483                        operational_write_record_key(write)
2484                    )
2485                }),
2486        )
2487        .collect();
2488
2489    let mut warnings = provenance_warnings.clone();
2490    warnings.extend(prepared.operational_validation_warnings.clone());
2491
2492    Ok(WriteReceipt {
2493        label: prepared.label.clone(),
2494        optional_backfill_count: prepared.optional_backfills.len(),
2495        warnings,
2496        provenance_warnings,
2497    })
2498}
2499
2500fn operational_write_collection(write: &OperationalWrite) -> &str {
2501    match write {
2502        OperationalWrite::Append { collection, .. }
2503        | OperationalWrite::Put { collection, .. }
2504        | OperationalWrite::Delete { collection, .. } => collection,
2505    }
2506}
2507
2508fn operational_write_record_key(write: &OperationalWrite) -> &str {
2509    match write {
2510        OperationalWrite::Append { record_key, .. }
2511        | OperationalWrite::Put { record_key, .. }
2512        | OperationalWrite::Delete { record_key, .. } => record_key,
2513    }
2514}
2515
2516fn operational_write_kind(write: &OperationalWrite) -> &'static str {
2517    match write {
2518        OperationalWrite::Append { .. } => "append",
2519        OperationalWrite::Put { .. } => "put",
2520        OperationalWrite::Delete { .. } => "delete",
2521    }
2522}
2523
2524fn operational_write_payload(write: &OperationalWrite) -> &str {
2525    match write {
2526        OperationalWrite::Append { payload_json, .. }
2527        | OperationalWrite::Put { payload_json, .. } => payload_json,
2528        OperationalWrite::Delete { .. } => "null",
2529    }
2530}
2531
2532fn operational_write_source_ref(write: &OperationalWrite) -> Option<&str> {
2533    match write {
2534        OperationalWrite::Append { source_ref, .. }
2535        | OperationalWrite::Put { source_ref, .. }
2536        | OperationalWrite::Delete { source_ref, .. } => source_ref.as_deref(),
2537    }
2538}
2539
2540#[cfg(test)]
2541#[allow(clippy::expect_used)]
2542mod tests {
2543    use std::sync::Arc;
2544
2545    use fathomdb_schema::SchemaManager;
2546    use tempfile::NamedTempFile;
2547
2548    use super::{apply_write, prepare_write, resolve_operational_writes};
2549    use crate::{
2550        ActionInsert, ChunkInsert, ChunkPolicy, EdgeInsert, EdgeRetire, EngineError, NodeInsert,
2551        NodeRetire, OperationalWrite, OptionalProjectionTask, ProvenanceMode, RunInsert,
2552        StepInsert, TelemetryCounters, VecInsert, WriteRequest, WriterActor,
2553        projection::ProjectionTarget,
2554    };
2555
2556    #[test]
2557    fn writer_executes_runtime_table_rows() {
2558        let db = NamedTempFile::new().expect("temporary db");
2559        let writer = WriterActor::start(
2560            db.path(),
2561            Arc::new(SchemaManager::new()),
2562            ProvenanceMode::Warn,
2563            Arc::new(TelemetryCounters::default()),
2564        )
2565        .expect("writer");
2566
2567        let receipt = writer
2568            .submit(WriteRequest {
2569                label: "runtime".to_owned(),
2570                nodes: vec![],
2571                node_retires: vec![],
2572                edges: vec![],
2573                edge_retires: vec![],
2574                chunks: vec![],
2575                runs: vec![RunInsert {
2576                    id: "run-1".to_owned(),
2577                    kind: "session".to_owned(),
2578                    status: "completed".to_owned(),
2579                    properties: "{}".to_owned(),
2580                    source_ref: Some("src-1".to_owned()),
2581                    upsert: false,
2582                    supersedes_id: None,
2583                }],
2584                steps: vec![StepInsert {
2585                    id: "step-1".to_owned(),
2586                    run_id: "run-1".to_owned(),
2587                    kind: "llm".to_owned(),
2588                    status: "completed".to_owned(),
2589                    properties: "{}".to_owned(),
2590                    source_ref: Some("src-1".to_owned()),
2591                    upsert: false,
2592                    supersedes_id: None,
2593                }],
2594                actions: vec![ActionInsert {
2595                    id: "action-1".to_owned(),
2596                    step_id: "step-1".to_owned(),
2597                    kind: "emit".to_owned(),
2598                    status: "completed".to_owned(),
2599                    properties: "{}".to_owned(),
2600                    source_ref: Some("src-1".to_owned()),
2601                    upsert: false,
2602                    supersedes_id: None,
2603                }],
2604                optional_backfills: vec![],
2605                vec_inserts: vec![],
2606                operational_writes: vec![],
2607            })
2608            .expect("write receipt");
2609
2610        assert_eq!(receipt.label, "runtime");
2611    }
2612
2613    #[test]
2614    fn writer_put_operational_write_updates_current_and_mutations() {
2615        let db = NamedTempFile::new().expect("temporary db");
2616        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2617        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2618        conn.execute(
2619            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2620             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2621            [],
2622        )
2623        .expect("seed collection");
2624        drop(conn);
2625        let writer = WriterActor::start(
2626            db.path(),
2627            Arc::new(SchemaManager::new()),
2628            ProvenanceMode::Warn,
2629            Arc::new(TelemetryCounters::default()),
2630        )
2631        .expect("writer");
2632
2633        writer
2634            .submit(WriteRequest {
2635                label: "node-and-operational".to_owned(),
2636                nodes: vec![NodeInsert {
2637                    row_id: "row-1".to_owned(),
2638                    logical_id: "lg-1".to_owned(),
2639                    kind: "Meeting".to_owned(),
2640                    properties: "{}".to_owned(),
2641                    source_ref: Some("src-1".to_owned()),
2642                    upsert: false,
2643                    chunk_policy: ChunkPolicy::Preserve,
2644                    content_ref: None,
2645                }],
2646                node_retires: vec![],
2647                edges: vec![],
2648                edge_retires: vec![],
2649                chunks: vec![],
2650                runs: vec![],
2651                steps: vec![],
2652                actions: vec![],
2653                optional_backfills: vec![],
2654                vec_inserts: vec![],
2655                operational_writes: vec![OperationalWrite::Put {
2656                    collection: "connector_health".to_owned(),
2657                    record_key: "gmail".to_owned(),
2658                    payload_json: r#"{"status":"ok"}"#.to_owned(),
2659                    source_ref: Some("src-1".to_owned()),
2660                }],
2661            })
2662            .expect("write receipt");
2663
2664        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2665        let node_count: i64 = conn
2666            .query_row(
2667                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
2668                [],
2669                |row| row.get(0),
2670            )
2671            .expect("node count");
2672        assert_eq!(node_count, 1);
2673        let mutation_count: i64 = conn
2674            .query_row(
2675                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health' \
2676                 AND record_key = 'gmail'",
2677                [],
2678                |row| row.get(0),
2679            )
2680            .expect("mutation count");
2681        assert_eq!(mutation_count, 1);
2682        let payload: String = conn
2683            .query_row(
2684                "SELECT payload_json FROM operational_current \
2685                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2686                [],
2687                |row| row.get(0),
2688            )
2689            .expect("current payload");
2690        assert_eq!(payload, r#"{"status":"ok"}"#);
2691    }
2692
2693    #[test]
2694    fn writer_disabled_validation_mode_allows_invalid_operational_payloads() {
2695        let db = NamedTempFile::new().expect("temporary db");
2696        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2697        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2698        conn.execute(
2699            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2700             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2701            [r#"{"format_version":1,"mode":"disabled","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2702        )
2703        .expect("seed collection");
2704        drop(conn);
2705        let writer = WriterActor::start(
2706            db.path(),
2707            Arc::new(SchemaManager::new()),
2708            ProvenanceMode::Warn,
2709            Arc::new(TelemetryCounters::default()),
2710        )
2711        .expect("writer");
2712
2713        writer
2714            .submit(WriteRequest {
2715                label: "disabled-validation".to_owned(),
2716                nodes: vec![],
2717                node_retires: vec![],
2718                edges: vec![],
2719                edge_retires: vec![],
2720                chunks: vec![],
2721                runs: vec![],
2722                steps: vec![],
2723                actions: vec![],
2724                optional_backfills: vec![],
2725                vec_inserts: vec![],
2726                operational_writes: vec![OperationalWrite::Put {
2727                    collection: "connector_health".to_owned(),
2728                    record_key: "gmail".to_owned(),
2729                    payload_json: r#"{"bogus":true}"#.to_owned(),
2730                    source_ref: Some("src-1".to_owned()),
2731                }],
2732            })
2733            .expect("write receipt");
2734
2735        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2736        let payload: String = conn
2737            .query_row(
2738                "SELECT payload_json FROM operational_current \
2739                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2740                [],
2741                |row| row.get(0),
2742            )
2743            .expect("current payload");
2744        assert_eq!(payload, r#"{"bogus":true}"#);
2745    }
2746
2747    #[test]
2748    fn writer_report_only_validation_allows_invalid_payload_and_emits_warning() {
2749        let db = NamedTempFile::new().expect("temporary db");
2750        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2751        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2752        conn.execute(
2753            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
2754             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
2755            [r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2756        )
2757        .expect("seed collection");
2758        drop(conn);
2759        let writer = WriterActor::start(
2760            db.path(),
2761            Arc::new(SchemaManager::new()),
2762            ProvenanceMode::Warn,
2763            Arc::new(TelemetryCounters::default()),
2764        )
2765        .expect("writer");
2766
2767        let receipt = writer
2768            .submit(WriteRequest {
2769                label: "report-only-validation".to_owned(),
2770                nodes: vec![],
2771                node_retires: vec![],
2772                edges: vec![],
2773                edge_retires: vec![],
2774                chunks: vec![],
2775                runs: vec![],
2776                steps: vec![],
2777                actions: vec![],
2778                optional_backfills: vec![],
2779                vec_inserts: vec![],
2780                operational_writes: vec![OperationalWrite::Put {
2781                    collection: "connector_health".to_owned(),
2782                    record_key: "gmail".to_owned(),
2783                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2784                    source_ref: Some("src-1".to_owned()),
2785                }],
2786            })
2787            .expect("report_only write should succeed");
2788
2789        assert_eq!(receipt.provenance_warnings, Vec::<String>::new());
2790        assert_eq!(receipt.warnings.len(), 1);
2791        assert!(
2792            receipt.warnings[0].contains("connector_health"),
2793            "warning should identify collection"
2794        );
2795        assert!(
2796            receipt.warnings[0].contains("must be one of"),
2797            "warning should explain validation failure"
2798        );
2799
2800        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2801        let payload: String = conn
2802            .query_row(
2803                "SELECT payload_json FROM operational_current \
2804                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
2805                [],
2806                |row| row.get(0),
2807            )
2808            .expect("current payload");
2809        assert_eq!(payload, r#"{"status":"bogus"}"#);
2810    }
2811
2812    #[test]
2813    fn writer_rejects_operational_write_for_missing_collection() {
2814        let db = NamedTempFile::new().expect("temporary db");
2815        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2816        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2817        drop(conn);
2818        let writer = WriterActor::start(
2819            db.path(),
2820            Arc::new(SchemaManager::new()),
2821            ProvenanceMode::Warn,
2822            Arc::new(TelemetryCounters::default()),
2823        )
2824        .expect("writer");
2825
2826        let result = writer.submit(WriteRequest {
2827            label: "missing-operational-collection".to_owned(),
2828            nodes: vec![],
2829            node_retires: vec![],
2830            edges: vec![],
2831            edge_retires: vec![],
2832            chunks: vec![],
2833            runs: vec![],
2834            steps: vec![],
2835            actions: vec![],
2836            optional_backfills: vec![],
2837            vec_inserts: vec![],
2838            operational_writes: vec![OperationalWrite::Put {
2839                collection: "connector_health".to_owned(),
2840                record_key: "gmail".to_owned(),
2841                payload_json: r#"{"status":"ok"}"#.to_owned(),
2842                source_ref: Some("src-1".to_owned()),
2843            }],
2844        });
2845
2846        assert!(
2847            matches!(result, Err(EngineError::InvalidWrite(_))),
2848            "missing operational collection must return InvalidWrite"
2849        );
2850    }
2851
2852    #[test]
2853    fn writer_append_operational_write_records_history_without_current_row() {
2854        let db = NamedTempFile::new().expect("temporary db");
2855        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2856        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2857        conn.execute(
2858            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2859             VALUES ('audit_log', 'append_only_log', '{}', '{}')",
2860            [],
2861        )
2862        .expect("seed collection");
2863        drop(conn);
2864        let writer = WriterActor::start(
2865            db.path(),
2866            Arc::new(SchemaManager::new()),
2867            ProvenanceMode::Warn,
2868            Arc::new(TelemetryCounters::default()),
2869        )
2870        .expect("writer");
2871
2872        writer
2873            .submit(WriteRequest {
2874                label: "append-operational".to_owned(),
2875                nodes: vec![],
2876                node_retires: vec![],
2877                edges: vec![],
2878                edge_retires: vec![],
2879                chunks: vec![],
2880                runs: vec![],
2881                steps: vec![],
2882                actions: vec![],
2883                optional_backfills: vec![],
2884                vec_inserts: vec![],
2885                operational_writes: vec![OperationalWrite::Append {
2886                    collection: "audit_log".to_owned(),
2887                    record_key: "evt-1".to_owned(),
2888                    payload_json: r#"{"type":"sync"}"#.to_owned(),
2889                    source_ref: Some("src-1".to_owned()),
2890                }],
2891            })
2892            .expect("write receipt");
2893
2894        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2895        let mutation: (String, String) = conn
2896            .query_row(
2897                "SELECT op_kind, payload_json FROM operational_mutations \
2898                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2899                [],
2900                |row| Ok((row.get(0)?, row.get(1)?)),
2901            )
2902            .expect("mutation row");
2903        assert_eq!(mutation.0, "append");
2904        assert_eq!(mutation.1, r#"{"type":"sync"}"#);
2905        let current_count: i64 = conn
2906            .query_row(
2907                "SELECT count(*) FROM operational_current \
2908                 WHERE collection_name = 'audit_log' AND record_key = 'evt-1'",
2909                [],
2910                |row| row.get(0),
2911            )
2912            .expect("current count");
2913        assert_eq!(current_count, 0);
2914    }
2915
2916    #[test]
2917    fn writer_enforce_validation_rejects_invalid_append_without_side_effects() {
2918        let db = NamedTempFile::new().expect("temporary db");
2919        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2920        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2921        conn.execute(
2922            "INSERT INTO operational_collections \
2923             (name, kind, schema_json, retention_json, filter_fields_json, validation_json) \
2924             VALUES ('audit_log', 'append_only_log', '{}', '{}', \
2925                     '[{\"name\":\"status\",\"type\":\"string\",\"modes\":[\"exact\"]}]', ?1)",
2926            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
2927        )
2928        .expect("seed collection");
2929        drop(conn);
2930        let writer = WriterActor::start(
2931            db.path(),
2932            Arc::new(SchemaManager::new()),
2933            ProvenanceMode::Warn,
2934            Arc::new(TelemetryCounters::default()),
2935        )
2936        .expect("writer");
2937
2938        let error = writer
2939            .submit(WriteRequest {
2940                label: "invalid-append".to_owned(),
2941                nodes: vec![],
2942                node_retires: vec![],
2943                edges: vec![],
2944                edge_retires: vec![],
2945                chunks: vec![],
2946                runs: vec![],
2947                steps: vec![],
2948                actions: vec![],
2949                optional_backfills: vec![],
2950                vec_inserts: vec![],
2951                operational_writes: vec![OperationalWrite::Append {
2952                    collection: "audit_log".to_owned(),
2953                    record_key: "evt-1".to_owned(),
2954                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
2955                    source_ref: Some("src-1".to_owned()),
2956                }],
2957            })
2958            .expect_err("invalid append must reject");
2959        assert!(matches!(error, EngineError::InvalidWrite(_)));
2960        assert!(error.to_string().contains("must be one of"));
2961
2962        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2963        let mutation_count: i64 = conn
2964            .query_row(
2965                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'audit_log'",
2966                [],
2967                |row| row.get(0),
2968            )
2969            .expect("mutation count");
2970        assert_eq!(mutation_count, 0);
2971        let filter_count: i64 = conn
2972            .query_row(
2973                "SELECT count(*) FROM operational_filter_values WHERE collection_name = 'audit_log'",
2974                [],
2975                |row| row.get(0),
2976            )
2977            .expect("filter count");
2978        assert_eq!(filter_count, 0);
2979    }
2980
2981    #[test]
2982    fn writer_delete_operational_write_removes_current_row_and_keeps_history() {
2983        let db = NamedTempFile::new().expect("temporary db");
2984        let conn = rusqlite::Connection::open(db.path()).expect("conn");
2985        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
2986        conn.execute(
2987            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
2988             VALUES ('connector_health', 'latest_state', '{}', '{}')",
2989            [],
2990        )
2991        .expect("seed collection");
2992        drop(conn);
2993        let writer = WriterActor::start(
2994            db.path(),
2995            Arc::new(SchemaManager::new()),
2996            ProvenanceMode::Warn,
2997            Arc::new(TelemetryCounters::default()),
2998        )
2999        .expect("writer");
3000
3001        writer
3002            .submit(WriteRequest {
3003                label: "put-operational".to_owned(),
3004                nodes: vec![],
3005                node_retires: vec![],
3006                edges: vec![],
3007                edge_retires: vec![],
3008                chunks: vec![],
3009                runs: vec![],
3010                steps: vec![],
3011                actions: vec![],
3012                optional_backfills: vec![],
3013                vec_inserts: vec![],
3014                operational_writes: vec![OperationalWrite::Put {
3015                    collection: "connector_health".to_owned(),
3016                    record_key: "gmail".to_owned(),
3017                    payload_json: r#"{"status":"ok"}"#.to_owned(),
3018                    source_ref: Some("src-1".to_owned()),
3019                }],
3020            })
3021            .expect("put receipt");
3022
3023        writer
3024            .submit(WriteRequest {
3025                label: "delete-operational".to_owned(),
3026                nodes: vec![],
3027                node_retires: vec![],
3028                edges: vec![],
3029                edge_retires: vec![],
3030                chunks: vec![],
3031                runs: vec![],
3032                steps: vec![],
3033                actions: vec![],
3034                optional_backfills: vec![],
3035                vec_inserts: vec![],
3036                operational_writes: vec![OperationalWrite::Delete {
3037                    collection: "connector_health".to_owned(),
3038                    record_key: "gmail".to_owned(),
3039                    source_ref: Some("src-2".to_owned()),
3040                }],
3041            })
3042            .expect("delete receipt");
3043
3044        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3045        let mutation_kinds: Vec<String> = {
3046            let mut stmt = conn
3047                .prepare(
3048                    "SELECT op_kind FROM operational_mutations \
3049                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3050                     ORDER BY mutation_order ASC",
3051                )
3052                .expect("stmt");
3053            stmt.query_map([], |row| row.get(0))
3054                .expect("rows")
3055                .collect::<Result<_, _>>()
3056                .expect("collect")
3057        };
3058        assert_eq!(mutation_kinds, vec!["put".to_owned(), "delete".to_owned()]);
3059        let current_count: i64 = conn
3060            .query_row(
3061                "SELECT count(*) FROM operational_current \
3062                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3063                [],
3064                |row| row.get(0),
3065            )
3066            .expect("current count");
3067        assert_eq!(current_count, 0);
3068    }
3069
3070    #[test]
3071    fn writer_delete_bypasses_validation_contract() {
3072        let db = NamedTempFile::new().expect("temporary db");
3073        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3074        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3075        conn.execute(
3076            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3077             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3078            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3079        )
3080        .expect("seed collection");
3081        drop(conn);
3082        let writer = WriterActor::start(
3083            db.path(),
3084            Arc::new(SchemaManager::new()),
3085            ProvenanceMode::Warn,
3086            Arc::new(TelemetryCounters::default()),
3087        )
3088        .expect("writer");
3089
3090        writer
3091            .submit(WriteRequest {
3092                label: "valid-put".to_owned(),
3093                nodes: vec![],
3094                node_retires: vec![],
3095                edges: vec![],
3096                edge_retires: vec![],
3097                chunks: vec![],
3098                runs: vec![],
3099                steps: vec![],
3100                actions: vec![],
3101                optional_backfills: vec![],
3102                vec_inserts: vec![],
3103                operational_writes: vec![OperationalWrite::Put {
3104                    collection: "connector_health".to_owned(),
3105                    record_key: "gmail".to_owned(),
3106                    payload_json: r#"{"status":"ok"}"#.to_owned(),
3107                    source_ref: Some("src-1".to_owned()),
3108                }],
3109            })
3110            .expect("put receipt");
3111        writer
3112            .submit(WriteRequest {
3113                label: "delete-after-put".to_owned(),
3114                nodes: vec![],
3115                node_retires: vec![],
3116                edges: vec![],
3117                edge_retires: vec![],
3118                chunks: vec![],
3119                runs: vec![],
3120                steps: vec![],
3121                actions: vec![],
3122                optional_backfills: vec![],
3123                vec_inserts: vec![],
3124                operational_writes: vec![OperationalWrite::Delete {
3125                    collection: "connector_health".to_owned(),
3126                    record_key: "gmail".to_owned(),
3127                    source_ref: Some("src-2".to_owned()),
3128                }],
3129            })
3130            .expect("delete receipt");
3131
3132        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3133        let current_count: i64 = conn
3134            .query_row(
3135                "SELECT count(*) FROM operational_current \
3136                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3137                [],
3138                |row| row.get(0),
3139            )
3140            .expect("current count");
3141        assert_eq!(current_count, 0);
3142    }
3143
3144    #[test]
3145    fn writer_latest_state_secondary_indexes_track_put_and_delete() {
3146        let db = NamedTempFile::new().expect("temporary db");
3147        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3148        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3149        conn.execute(
3150            "INSERT INTO operational_collections \
3151             (name, kind, schema_json, retention_json, secondary_indexes_json) \
3152             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3153            [r#"[{"name":"status_current","kind":"latest_state_field","field":"status","value_type":"string"},{"name":"tenant_category","kind":"latest_state_composite","fields":[{"name":"tenant","value_type":"string"},{"name":"category","value_type":"string"}]}]"#],
3154        )
3155        .expect("seed collection");
3156        drop(conn);
3157        let writer = WriterActor::start(
3158            db.path(),
3159            Arc::new(SchemaManager::new()),
3160            ProvenanceMode::Warn,
3161            Arc::new(TelemetryCounters::default()),
3162        )
3163        .expect("writer");
3164
3165        writer
3166            .submit(WriteRequest {
3167                label: "secondary-index-put".to_owned(),
3168                nodes: vec![],
3169                node_retires: vec![],
3170                edges: vec![],
3171                edge_retires: vec![],
3172                chunks: vec![],
3173                runs: vec![],
3174                steps: vec![],
3175                actions: vec![],
3176                optional_backfills: vec![],
3177                vec_inserts: vec![],
3178                operational_writes: vec![OperationalWrite::Put {
3179                    collection: "connector_health".to_owned(),
3180                    record_key: "gmail".to_owned(),
3181                    payload_json: r#"{"status":"degraded","tenant":"acme","category":"mail"}"#
3182                        .to_owned(),
3183                    source_ref: Some("src-1".to_owned()),
3184                }],
3185            })
3186            .expect("put receipt");
3187
3188        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3189        let current_entry_count: i64 = conn
3190            .query_row(
3191                "SELECT count(*) FROM operational_secondary_index_entries \
3192                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3193                [],
3194                |row| row.get(0),
3195            )
3196            .expect("current secondary index count");
3197        assert_eq!(current_entry_count, 2);
3198        drop(conn);
3199
3200        writer
3201            .submit(WriteRequest {
3202                label: "secondary-index-delete".to_owned(),
3203                nodes: vec![],
3204                node_retires: vec![],
3205                edges: vec![],
3206                edge_retires: vec![],
3207                chunks: vec![],
3208                runs: vec![],
3209                steps: vec![],
3210                actions: vec![],
3211                optional_backfills: vec![],
3212                vec_inserts: vec![],
3213                operational_writes: vec![OperationalWrite::Delete {
3214                    collection: "connector_health".to_owned(),
3215                    record_key: "gmail".to_owned(),
3216                    source_ref: Some("src-2".to_owned()),
3217                }],
3218            })
3219            .expect("delete receipt");
3220
3221        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3222        let current_entry_count: i64 = conn
3223            .query_row(
3224                "SELECT count(*) FROM operational_secondary_index_entries \
3225                 WHERE collection_name = 'connector_health' AND subject_kind = 'current'",
3226                [],
3227                |row| row.get(0),
3228            )
3229            .expect("current secondary index count");
3230        assert_eq!(current_entry_count, 0);
3231    }
3232
3233    #[test]
3234    fn writer_latest_state_operational_writes_persist_mutation_order() {
3235        let db = NamedTempFile::new().expect("temporary db");
3236        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3237        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3238        conn.execute(
3239            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3240             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3241            [],
3242        )
3243        .expect("seed collection");
3244        drop(conn);
3245
3246        let writer = WriterActor::start(
3247            db.path(),
3248            Arc::new(SchemaManager::new()),
3249            ProvenanceMode::Warn,
3250            Arc::new(TelemetryCounters::default()),
3251        )
3252        .expect("writer");
3253
3254        writer
3255            .submit(WriteRequest {
3256                label: "ordered-operational-batch".to_owned(),
3257                nodes: vec![],
3258                node_retires: vec![],
3259                edges: vec![],
3260                edge_retires: vec![],
3261                chunks: vec![],
3262                runs: vec![],
3263                steps: vec![],
3264                actions: vec![],
3265                optional_backfills: vec![],
3266                vec_inserts: vec![],
3267                operational_writes: vec![
3268                    OperationalWrite::Put {
3269                        collection: "connector_health".to_owned(),
3270                        record_key: "gmail".to_owned(),
3271                        payload_json: r#"{"status":"old"}"#.to_owned(),
3272                        source_ref: Some("src-1".to_owned()),
3273                    },
3274                    OperationalWrite::Delete {
3275                        collection: "connector_health".to_owned(),
3276                        record_key: "gmail".to_owned(),
3277                        source_ref: Some("src-2".to_owned()),
3278                    },
3279                    OperationalWrite::Put {
3280                        collection: "connector_health".to_owned(),
3281                        record_key: "gmail".to_owned(),
3282                        payload_json: r#"{"status":"new"}"#.to_owned(),
3283                        source_ref: Some("src-3".to_owned()),
3284                    },
3285                ],
3286            })
3287            .expect("write receipt");
3288
3289        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3290        let rows: Vec<(String, i64)> = {
3291            let mut stmt = conn
3292                .prepare(
3293                    "SELECT op_kind, mutation_order FROM operational_mutations \
3294                     WHERE collection_name = 'connector_health' AND record_key = 'gmail' \
3295                     ORDER BY mutation_order ASC",
3296                )
3297                .expect("stmt");
3298            stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3299                .expect("rows")
3300                .collect::<Result<_, _>>()
3301                .expect("collect")
3302        };
3303        assert_eq!(
3304            rows,
3305            vec![
3306                ("put".to_owned(), 1),
3307                ("delete".to_owned(), 2),
3308                ("put".to_owned(), 3),
3309            ]
3310        );
3311        let payload: String = conn
3312            .query_row(
3313                "SELECT payload_json FROM operational_current \
3314                 WHERE collection_name = 'connector_health' AND record_key = 'gmail'",
3315                [],
3316                |row| row.get(0),
3317            )
3318            .expect("current payload");
3319        assert_eq!(payload, r#"{"status":"new"}"#);
3320    }
3321
3322    #[test]
3323    fn apply_write_rechecks_collection_disabled_state_inside_transaction() {
3324        let db = NamedTempFile::new().expect("temporary db");
3325        let mut conn = rusqlite::Connection::open(db.path()).expect("conn");
3326        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3327        conn.execute(
3328            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3329             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3330            [],
3331        )
3332        .expect("seed collection");
3333
3334        let request = WriteRequest {
3335            label: "disabled-race".to_owned(),
3336            nodes: vec![],
3337            node_retires: vec![],
3338            edges: vec![],
3339            edge_retires: vec![],
3340            chunks: vec![],
3341            runs: vec![],
3342            steps: vec![],
3343            actions: vec![],
3344            optional_backfills: vec![],
3345            vec_inserts: vec![],
3346            operational_writes: vec![OperationalWrite::Put {
3347                collection: "connector_health".to_owned(),
3348                record_key: "gmail".to_owned(),
3349                payload_json: r#"{"status":"ok"}"#.to_owned(),
3350                source_ref: Some("src-1".to_owned()),
3351            }],
3352        };
3353        let mut prepared = prepare_write(request, ProvenanceMode::Warn).expect("prepare");
3354        resolve_operational_writes(&conn, &mut prepared).expect("preflight resolve");
3355
3356        conn.execute(
3357            "UPDATE operational_collections SET disabled_at = 123 WHERE name = 'connector_health'",
3358            [],
3359        )
3360        .expect("disable collection after preflight");
3361
3362        let error =
3363            apply_write(&mut conn, &mut prepared).expect_err("disabled collection must reject");
3364        assert!(matches!(error, EngineError::InvalidWrite(_)));
3365        assert!(error.to_string().contains("is disabled"));
3366
3367        let mutation_count: i64 = conn
3368            .query_row(
3369                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3370                [],
3371                |row| row.get(0),
3372            )
3373            .expect("mutation count");
3374        assert_eq!(mutation_count, 0);
3375
3376        let current_count: i64 = conn
3377            .query_row(
3378                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3379                [],
3380                |row| row.get(0),
3381            )
3382            .expect("current count");
3383        assert_eq!(current_count, 0);
3384    }
3385
3386    #[test]
3387    fn writer_enforce_validation_rejects_invalid_put_atomically() {
3388        let db = NamedTempFile::new().expect("temporary db");
3389        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3390        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3391        conn.execute(
3392            "INSERT INTO operational_collections (name, kind, schema_json, retention_json, validation_json) \
3393             VALUES ('connector_health', 'latest_state', '{}', '{}', ?1)",
3394            [r#"{"format_version":1,"mode":"enforce","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#],
3395        )
3396        .expect("seed collection");
3397        drop(conn);
3398        let writer = WriterActor::start(
3399            db.path(),
3400            Arc::new(SchemaManager::new()),
3401            ProvenanceMode::Warn,
3402            Arc::new(TelemetryCounters::default()),
3403        )
3404        .expect("writer");
3405
3406        let error = writer
3407            .submit(WriteRequest {
3408                label: "invalid-put".to_owned(),
3409                nodes: vec![NodeInsert {
3410                    row_id: "row-1".to_owned(),
3411                    logical_id: "lg-1".to_owned(),
3412                    kind: "Meeting".to_owned(),
3413                    properties: "{}".to_owned(),
3414                    source_ref: Some("src-1".to_owned()),
3415                    upsert: false,
3416                    chunk_policy: ChunkPolicy::Preserve,
3417                    content_ref: None,
3418                }],
3419                node_retires: vec![],
3420                edges: vec![],
3421                edge_retires: vec![],
3422                chunks: vec![],
3423                runs: vec![],
3424                steps: vec![],
3425                actions: vec![],
3426                optional_backfills: vec![],
3427                vec_inserts: vec![],
3428                operational_writes: vec![OperationalWrite::Put {
3429                    collection: "connector_health".to_owned(),
3430                    record_key: "gmail".to_owned(),
3431                    payload_json: r#"{"status":"bogus"}"#.to_owned(),
3432                    source_ref: Some("src-1".to_owned()),
3433                }],
3434            })
3435            .expect_err("invalid put must reject");
3436        assert!(matches!(error, EngineError::InvalidWrite(_)));
3437        assert!(error.to_string().contains("must be one of"));
3438
3439        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3440        let node_count: i64 = conn
3441            .query_row(
3442                "SELECT count(*) FROM nodes WHERE logical_id = 'lg-1'",
3443                [],
3444                |row| row.get(0),
3445            )
3446            .expect("node count");
3447        assert_eq!(node_count, 0);
3448        let mutation_count: i64 = conn
3449            .query_row(
3450                "SELECT count(*) FROM operational_mutations WHERE collection_name = 'connector_health'",
3451                [],
3452                |row| row.get(0),
3453            )
3454            .expect("mutation count");
3455        assert_eq!(mutation_count, 0);
3456        let current_count: i64 = conn
3457            .query_row(
3458                "SELECT count(*) FROM operational_current WHERE collection_name = 'connector_health'",
3459                [],
3460                |row| row.get(0),
3461            )
3462            .expect("current count");
3463        assert_eq!(current_count, 0);
3464    }
3465
3466    #[test]
3467    fn writer_rejects_append_against_latest_state_collection() {
3468        let db = NamedTempFile::new().expect("temporary db");
3469        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3470        SchemaManager::new().bootstrap(&conn).expect("bootstrap");
3471        conn.execute(
3472            "INSERT INTO operational_collections (name, kind, schema_json, retention_json) \
3473             VALUES ('connector_health', 'latest_state', '{}', '{}')",
3474            [],
3475        )
3476        .expect("seed collection");
3477        drop(conn);
3478        let writer = WriterActor::start(
3479            db.path(),
3480            Arc::new(SchemaManager::new()),
3481            ProvenanceMode::Warn,
3482            Arc::new(TelemetryCounters::default()),
3483        )
3484        .expect("writer");
3485
3486        let result = writer.submit(WriteRequest {
3487            label: "bad-append".to_owned(),
3488            nodes: vec![],
3489            node_retires: vec![],
3490            edges: vec![],
3491            edge_retires: vec![],
3492            chunks: vec![],
3493            runs: vec![],
3494            steps: vec![],
3495            actions: vec![],
3496            optional_backfills: vec![],
3497            vec_inserts: vec![],
3498            operational_writes: vec![OperationalWrite::Append {
3499                collection: "connector_health".to_owned(),
3500                record_key: "gmail".to_owned(),
3501                payload_json: r#"{"status":"ok"}"#.to_owned(),
3502                source_ref: Some("src-1".to_owned()),
3503            }],
3504        });
3505
3506        assert!(
3507            matches!(result, Err(EngineError::InvalidWrite(_))),
3508            "latest_state collection must reject Append"
3509        );
3510    }
3511
3512    #[test]
3513    fn writer_upsert_supersedes_prior_active_node() {
3514        let db = NamedTempFile::new().expect("temporary db");
3515        let writer = WriterActor::start(
3516            db.path(),
3517            Arc::new(SchemaManager::new()),
3518            ProvenanceMode::Warn,
3519            Arc::new(TelemetryCounters::default()),
3520        )
3521        .expect("writer");
3522
3523        writer
3524            .submit(WriteRequest {
3525                label: "v1".to_owned(),
3526                nodes: vec![NodeInsert {
3527                    row_id: "row-1".to_owned(),
3528                    logical_id: "lg-1".to_owned(),
3529                    kind: "Meeting".to_owned(),
3530                    properties: r#"{"version":1}"#.to_owned(),
3531                    source_ref: Some("src-1".to_owned()),
3532                    upsert: false,
3533                    chunk_policy: ChunkPolicy::Preserve,
3534                    content_ref: None,
3535                }],
3536                node_retires: vec![],
3537                edges: vec![],
3538                edge_retires: vec![],
3539                chunks: vec![],
3540                runs: vec![],
3541                steps: vec![],
3542                actions: vec![],
3543                optional_backfills: vec![],
3544                vec_inserts: vec![],
3545                operational_writes: vec![],
3546            })
3547            .expect("v1 write");
3548
3549        writer
3550            .submit(WriteRequest {
3551                label: "v2".to_owned(),
3552                nodes: vec![NodeInsert {
3553                    row_id: "row-2".to_owned(),
3554                    logical_id: "lg-1".to_owned(),
3555                    kind: "Meeting".to_owned(),
3556                    properties: r#"{"version":2}"#.to_owned(),
3557                    source_ref: Some("src-2".to_owned()),
3558                    upsert: true,
3559                    chunk_policy: ChunkPolicy::Preserve,
3560                    content_ref: None,
3561                }],
3562                node_retires: vec![],
3563                edges: vec![],
3564                edge_retires: vec![],
3565                chunks: vec![],
3566                runs: vec![],
3567                steps: vec![],
3568                actions: vec![],
3569                optional_backfills: vec![],
3570                vec_inserts: vec![],
3571                operational_writes: vec![],
3572            })
3573            .expect("v2 upsert write");
3574
3575        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3576        let (active_row_id, props): (String, String) = conn
3577            .query_row(
3578                "SELECT row_id, properties FROM nodes WHERE logical_id = 'lg-1' AND superseded_at IS NULL",
3579                [],
3580                |row| Ok((row.get(0)?, row.get(1)?)),
3581            )
3582            .expect("active row");
3583        assert_eq!(active_row_id, "row-2");
3584        assert!(props.contains("\"version\":2"));
3585
3586        let superseded: i64 = conn
3587            .query_row(
3588                "SELECT count(*) FROM nodes WHERE row_id = 'row-1' AND superseded_at IS NOT NULL",
3589                [],
3590                |row| row.get(0),
3591            )
3592            .expect("superseded count");
3593        assert_eq!(superseded, 1);
3594    }
3595
3596    #[test]
3597    fn writer_inserts_edge_between_two_nodes() {
3598        let db = NamedTempFile::new().expect("temporary db");
3599        let writer = WriterActor::start(
3600            db.path(),
3601            Arc::new(SchemaManager::new()),
3602            ProvenanceMode::Warn,
3603            Arc::new(TelemetryCounters::default()),
3604        )
3605        .expect("writer");
3606
3607        writer
3608            .submit(WriteRequest {
3609                label: "nodes-and-edge".to_owned(),
3610                nodes: vec![
3611                    NodeInsert {
3612                        row_id: "row-meeting".to_owned(),
3613                        logical_id: "meeting-1".to_owned(),
3614                        kind: "Meeting".to_owned(),
3615                        properties: "{}".to_owned(),
3616                        source_ref: Some("src-1".to_owned()),
3617                        upsert: false,
3618                        chunk_policy: ChunkPolicy::Preserve,
3619                        content_ref: None,
3620                    },
3621                    NodeInsert {
3622                        row_id: "row-task".to_owned(),
3623                        logical_id: "task-1".to_owned(),
3624                        kind: "Task".to_owned(),
3625                        properties: "{}".to_owned(),
3626                        source_ref: Some("src-1".to_owned()),
3627                        upsert: false,
3628                        chunk_policy: ChunkPolicy::Preserve,
3629                        content_ref: None,
3630                    },
3631                ],
3632                node_retires: vec![],
3633                edges: vec![EdgeInsert {
3634                    row_id: "edge-1".to_owned(),
3635                    logical_id: "edge-lg-1".to_owned(),
3636                    source_logical_id: "meeting-1".to_owned(),
3637                    target_logical_id: "task-1".to_owned(),
3638                    kind: "HAS_TASK".to_owned(),
3639                    properties: "{}".to_owned(),
3640                    source_ref: Some("src-1".to_owned()),
3641                    upsert: false,
3642                }],
3643                edge_retires: vec![],
3644                chunks: vec![],
3645                runs: vec![],
3646                steps: vec![],
3647                actions: vec![],
3648                optional_backfills: vec![],
3649                vec_inserts: vec![],
3650                operational_writes: vec![],
3651            })
3652            .expect("write receipt");
3653
3654        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3655        let (src, tgt, kind): (String, String, String) = conn
3656            .query_row(
3657                "SELECT source_logical_id, target_logical_id, kind FROM edges WHERE row_id = 'edge-1'",
3658                [],
3659                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
3660            )
3661            .expect("edge row");
3662        assert_eq!(src, "meeting-1");
3663        assert_eq!(tgt, "task-1");
3664        assert_eq!(kind, "HAS_TASK");
3665    }
3666
3667    #[test]
3668    #[allow(clippy::too_many_lines)]
3669    fn writer_upsert_supersedes_prior_active_edge() {
3670        let db = NamedTempFile::new().expect("temporary db");
3671        let writer = WriterActor::start(
3672            db.path(),
3673            Arc::new(SchemaManager::new()),
3674            ProvenanceMode::Warn,
3675            Arc::new(TelemetryCounters::default()),
3676        )
3677        .expect("writer");
3678
3679        // Write two nodes
3680        writer
3681            .submit(WriteRequest {
3682                label: "nodes".to_owned(),
3683                nodes: vec![
3684                    NodeInsert {
3685                        row_id: "row-a".to_owned(),
3686                        logical_id: "node-a".to_owned(),
3687                        kind: "Meeting".to_owned(),
3688                        properties: "{}".to_owned(),
3689                        source_ref: Some("src-1".to_owned()),
3690                        upsert: false,
3691                        chunk_policy: ChunkPolicy::Preserve,
3692                        content_ref: None,
3693                    },
3694                    NodeInsert {
3695                        row_id: "row-b".to_owned(),
3696                        logical_id: "node-b".to_owned(),
3697                        kind: "Task".to_owned(),
3698                        properties: "{}".to_owned(),
3699                        source_ref: Some("src-1".to_owned()),
3700                        upsert: false,
3701                        chunk_policy: ChunkPolicy::Preserve,
3702                        content_ref: None,
3703                    },
3704                ],
3705                node_retires: vec![],
3706                edges: vec![],
3707                edge_retires: vec![],
3708                chunks: vec![],
3709                runs: vec![],
3710                steps: vec![],
3711                actions: vec![],
3712                optional_backfills: vec![],
3713                vec_inserts: vec![],
3714                operational_writes: vec![],
3715            })
3716            .expect("nodes write");
3717
3718        // Write v1 edge
3719        writer
3720            .submit(WriteRequest {
3721                label: "edge-v1".to_owned(),
3722                nodes: vec![],
3723                node_retires: vec![],
3724                edges: vec![EdgeInsert {
3725                    row_id: "edge-row-1".to_owned(),
3726                    logical_id: "edge-lg-1".to_owned(),
3727                    source_logical_id: "node-a".to_owned(),
3728                    target_logical_id: "node-b".to_owned(),
3729                    kind: "HAS_TASK".to_owned(),
3730                    properties: r#"{"weight":1}"#.to_owned(),
3731                    source_ref: Some("src-1".to_owned()),
3732                    upsert: false,
3733                }],
3734                edge_retires: vec![],
3735                chunks: vec![],
3736                runs: vec![],
3737                steps: vec![],
3738                actions: vec![],
3739                optional_backfills: vec![],
3740                vec_inserts: vec![],
3741                operational_writes: vec![],
3742            })
3743            .expect("edge v1 write");
3744
3745        // Upsert v2 edge
3746        writer
3747            .submit(WriteRequest {
3748                label: "edge-v2".to_owned(),
3749                nodes: vec![],
3750                node_retires: vec![],
3751                edges: vec![EdgeInsert {
3752                    row_id: "edge-row-2".to_owned(),
3753                    logical_id: "edge-lg-1".to_owned(),
3754                    source_logical_id: "node-a".to_owned(),
3755                    target_logical_id: "node-b".to_owned(),
3756                    kind: "HAS_TASK".to_owned(),
3757                    properties: r#"{"weight":2}"#.to_owned(),
3758                    source_ref: Some("src-2".to_owned()),
3759                    upsert: true,
3760                }],
3761                edge_retires: vec![],
3762                chunks: vec![],
3763                runs: vec![],
3764                steps: vec![],
3765                actions: vec![],
3766                optional_backfills: vec![],
3767                vec_inserts: vec![],
3768                operational_writes: vec![],
3769            })
3770            .expect("edge v2 upsert");
3771
3772        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3773        let (active_row_id, props): (String, String) = conn
3774            .query_row(
3775                "SELECT row_id, properties FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
3776                [],
3777                |row| Ok((row.get(0)?, row.get(1)?)),
3778            )
3779            .expect("active edge");
3780        assert_eq!(active_row_id, "edge-row-2");
3781        assert!(props.contains("\"weight\":2"));
3782
3783        let superseded: i64 = conn
3784            .query_row(
3785                "SELECT count(*) FROM edges WHERE row_id = 'edge-row-1' AND superseded_at IS NOT NULL",
3786                [],
3787                |row| row.get(0),
3788            )
3789            .expect("superseded count");
3790        assert_eq!(superseded, 1);
3791    }
3792
3793    #[test]
3794    fn writer_fts_rows_are_written_to_database() {
3795        let db = NamedTempFile::new().expect("temporary db");
3796        let writer = WriterActor::start(
3797            db.path(),
3798            Arc::new(SchemaManager::new()),
3799            ProvenanceMode::Warn,
3800            Arc::new(TelemetryCounters::default()),
3801        )
3802        .expect("writer");
3803
3804        writer
3805            .submit(WriteRequest {
3806                label: "seed".to_owned(),
3807                nodes: vec![NodeInsert {
3808                    row_id: "row-1".to_owned(),
3809                    logical_id: "logical-1".to_owned(),
3810                    kind: "Meeting".to_owned(),
3811                    properties: "{}".to_owned(),
3812                    source_ref: Some("src-1".to_owned()),
3813                    upsert: false,
3814                    chunk_policy: ChunkPolicy::Preserve,
3815                    content_ref: None,
3816                }],
3817                node_retires: vec![],
3818                edges: vec![],
3819                edge_retires: vec![],
3820                chunks: vec![ChunkInsert {
3821                    id: "chunk-1".to_owned(),
3822                    node_logical_id: "logical-1".to_owned(),
3823                    text_content: "budget discussion".to_owned(),
3824                    byte_start: None,
3825                    byte_end: None,
3826                    content_hash: None,
3827                }],
3828                runs: vec![],
3829                steps: vec![],
3830                actions: vec![],
3831                optional_backfills: vec![],
3832                vec_inserts: vec![],
3833                operational_writes: vec![],
3834            })
3835            .expect("write receipt");
3836
3837        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3838        let (chunk_id, node_logical_id, kind, text_content): (String, String, String, String) =
3839            conn.query_row(
3840                "SELECT chunk_id, node_logical_id, kind, text_content \
3841                 FROM fts_nodes WHERE chunk_id = 'chunk-1'",
3842                [],
3843                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
3844            )
3845            .expect("fts row");
3846        assert_eq!(chunk_id, "chunk-1");
3847        assert_eq!(node_logical_id, "logical-1");
3848        assert_eq!(kind, "Meeting");
3849        assert_eq!(text_content, "budget discussion");
3850    }
3851
3852    #[test]
3853    fn writer_receipt_warns_on_nodes_without_source_ref() {
3854        let db = NamedTempFile::new().expect("temporary db");
3855        let writer = WriterActor::start(
3856            db.path(),
3857            Arc::new(SchemaManager::new()),
3858            ProvenanceMode::Warn,
3859            Arc::new(TelemetryCounters::default()),
3860        )
3861        .expect("writer");
3862
3863        let receipt = writer
3864            .submit(WriteRequest {
3865                label: "no-source".to_owned(),
3866                nodes: vec![NodeInsert {
3867                    row_id: "row-1".to_owned(),
3868                    logical_id: "logical-1".to_owned(),
3869                    kind: "Meeting".to_owned(),
3870                    properties: "{}".to_owned(),
3871                    source_ref: None,
3872                    upsert: false,
3873                    chunk_policy: ChunkPolicy::Preserve,
3874                    content_ref: None,
3875                }],
3876                node_retires: vec![],
3877                edges: vec![],
3878                edge_retires: vec![],
3879                chunks: vec![],
3880                runs: vec![],
3881                steps: vec![],
3882                actions: vec![],
3883                optional_backfills: vec![],
3884                vec_inserts: vec![],
3885                operational_writes: vec![],
3886            })
3887            .expect("write receipt");
3888
3889        assert_eq!(receipt.provenance_warnings.len(), 1);
3890        assert!(receipt.provenance_warnings[0].contains("logical-1"));
3891    }
3892
3893    #[test]
3894    fn writer_receipt_no_warnings_when_all_nodes_have_source_ref() {
3895        let db = NamedTempFile::new().expect("temporary db");
3896        let writer = WriterActor::start(
3897            db.path(),
3898            Arc::new(SchemaManager::new()),
3899            ProvenanceMode::Warn,
3900            Arc::new(TelemetryCounters::default()),
3901        )
3902        .expect("writer");
3903
3904        let receipt = writer
3905            .submit(WriteRequest {
3906                label: "with-source".to_owned(),
3907                nodes: vec![NodeInsert {
3908                    row_id: "row-1".to_owned(),
3909                    logical_id: "logical-1".to_owned(),
3910                    kind: "Meeting".to_owned(),
3911                    properties: "{}".to_owned(),
3912                    source_ref: Some("src-1".to_owned()),
3913                    upsert: false,
3914                    chunk_policy: ChunkPolicy::Preserve,
3915                    content_ref: None,
3916                }],
3917                node_retires: vec![],
3918                edges: vec![],
3919                edge_retires: vec![],
3920                chunks: vec![],
3921                runs: vec![],
3922                steps: vec![],
3923                actions: vec![],
3924                optional_backfills: vec![],
3925                vec_inserts: vec![],
3926                operational_writes: vec![],
3927            })
3928            .expect("write receipt");
3929
3930        assert!(receipt.provenance_warnings.is_empty());
3931    }
3932
3933    #[test]
3934    fn writer_accepts_chunk_for_pre_existing_node() {
3935        let db = NamedTempFile::new().expect("temporary db");
3936        let writer = WriterActor::start(
3937            db.path(),
3938            Arc::new(SchemaManager::new()),
3939            ProvenanceMode::Warn,
3940            Arc::new(TelemetryCounters::default()),
3941        )
3942        .expect("writer");
3943
3944        // Request 1: submit node only
3945        writer
3946            .submit(WriteRequest {
3947                label: "r1".to_owned(),
3948                nodes: vec![NodeInsert {
3949                    row_id: "row-1".to_owned(),
3950                    logical_id: "logical-1".to_owned(),
3951                    kind: "Meeting".to_owned(),
3952                    properties: "{}".to_owned(),
3953                    source_ref: Some("src-1".to_owned()),
3954                    upsert: false,
3955                    chunk_policy: ChunkPolicy::Preserve,
3956                    content_ref: None,
3957                }],
3958                node_retires: vec![],
3959                edges: vec![],
3960                edge_retires: vec![],
3961                chunks: vec![],
3962                runs: vec![],
3963                steps: vec![],
3964                actions: vec![],
3965                optional_backfills: vec![],
3966                vec_inserts: vec![],
3967                operational_writes: vec![],
3968            })
3969            .expect("r1 write");
3970
3971        // Request 2: submit chunk for pre-existing node
3972        writer
3973            .submit(WriteRequest {
3974                label: "r2".to_owned(),
3975                nodes: vec![],
3976                node_retires: vec![],
3977                edges: vec![],
3978                edge_retires: vec![],
3979                chunks: vec![ChunkInsert {
3980                    id: "chunk-1".to_owned(),
3981                    node_logical_id: "logical-1".to_owned(),
3982                    text_content: "budget discussion".to_owned(),
3983                    byte_start: None,
3984                    byte_end: None,
3985                    content_hash: None,
3986                }],
3987                runs: vec![],
3988                steps: vec![],
3989                actions: vec![],
3990                optional_backfills: vec![],
3991                vec_inserts: vec![],
3992                operational_writes: vec![],
3993            })
3994            .expect("r2 write — chunk for pre-existing node");
3995
3996        let conn = rusqlite::Connection::open(db.path()).expect("conn");
3997        let count: i64 = conn
3998            .query_row(
3999                "SELECT count(*) FROM fts_nodes WHERE chunk_id = 'chunk-1'",
4000                [],
4001                |row| row.get(0),
4002            )
4003            .expect("fts count");
4004        assert_eq!(
4005            count, 1,
4006            "FTS row must exist for chunk attached to pre-existing node"
4007        );
4008    }
4009
4010    #[test]
4011    fn writer_rejects_chunk_for_completely_unknown_node() {
4012        let db = NamedTempFile::new().expect("temporary db");
4013        let writer = WriterActor::start(
4014            db.path(),
4015            Arc::new(SchemaManager::new()),
4016            ProvenanceMode::Warn,
4017            Arc::new(TelemetryCounters::default()),
4018        )
4019        .expect("writer");
4020
4021        let result = writer.submit(WriteRequest {
4022            label: "bad".to_owned(),
4023            nodes: vec![],
4024            node_retires: vec![],
4025            edges: vec![],
4026            edge_retires: vec![],
4027            chunks: vec![ChunkInsert {
4028                id: "chunk-1".to_owned(),
4029                node_logical_id: "nonexistent".to_owned(),
4030                text_content: "some text".to_owned(),
4031                byte_start: None,
4032                byte_end: None,
4033                content_hash: None,
4034            }],
4035            runs: vec![],
4036            steps: vec![],
4037            actions: vec![],
4038            optional_backfills: vec![],
4039            vec_inserts: vec![],
4040            operational_writes: vec![],
4041        });
4042
4043        assert!(
4044            matches!(result, Err(EngineError::InvalidWrite(_))),
4045            "completely unknown node must return InvalidWrite"
4046        );
4047    }
4048
4049    #[test]
4050    fn writer_executes_typed_nodes_chunks_and_derived_projections() {
4051        let db = NamedTempFile::new().expect("temporary db");
4052        let writer = WriterActor::start(
4053            db.path(),
4054            Arc::new(SchemaManager::new()),
4055            ProvenanceMode::Warn,
4056            Arc::new(TelemetryCounters::default()),
4057        )
4058        .expect("writer");
4059
4060        let receipt = writer
4061            .submit(WriteRequest {
4062                label: "seed".to_owned(),
4063                nodes: vec![NodeInsert {
4064                    row_id: "row-1".to_owned(),
4065                    logical_id: "logical-1".to_owned(),
4066                    kind: "Meeting".to_owned(),
4067                    properties: "{}".to_owned(),
4068                    source_ref: None,
4069                    upsert: false,
4070                    chunk_policy: ChunkPolicy::Preserve,
4071                    content_ref: None,
4072                }],
4073                node_retires: vec![],
4074                edges: vec![],
4075                edge_retires: vec![],
4076                chunks: vec![ChunkInsert {
4077                    id: "chunk-1".to_owned(),
4078                    node_logical_id: "logical-1".to_owned(),
4079                    text_content: "budget discussion".to_owned(),
4080                    byte_start: None,
4081                    byte_end: None,
4082                    content_hash: None,
4083                }],
4084                runs: vec![],
4085                steps: vec![],
4086                actions: vec![],
4087                optional_backfills: vec![],
4088                vec_inserts: vec![],
4089                operational_writes: vec![],
4090            })
4091            .expect("write receipt");
4092
4093        assert_eq!(receipt.label, "seed");
4094    }
4095
4096    #[test]
4097    fn writer_node_retire_supersedes_active_node() {
4098        let db = NamedTempFile::new().expect("temporary db");
4099        let writer = WriterActor::start(
4100            db.path(),
4101            Arc::new(SchemaManager::new()),
4102            ProvenanceMode::Warn,
4103            Arc::new(TelemetryCounters::default()),
4104        )
4105        .expect("writer");
4106
4107        writer
4108            .submit(WriteRequest {
4109                label: "seed".to_owned(),
4110                nodes: vec![NodeInsert {
4111                    row_id: "row-1".to_owned(),
4112                    logical_id: "meeting-1".to_owned(),
4113                    kind: "Meeting".to_owned(),
4114                    properties: "{}".to_owned(),
4115                    source_ref: Some("src-1".to_owned()),
4116                    upsert: false,
4117                    chunk_policy: ChunkPolicy::Preserve,
4118                    content_ref: None,
4119                }],
4120                node_retires: vec![],
4121                edges: vec![],
4122                edge_retires: vec![],
4123                chunks: vec![],
4124                runs: vec![],
4125                steps: vec![],
4126                actions: vec![],
4127                optional_backfills: vec![],
4128                vec_inserts: vec![],
4129                operational_writes: vec![],
4130            })
4131            .expect("seed write");
4132
4133        writer
4134            .submit(WriteRequest {
4135                label: "retire".to_owned(),
4136                nodes: vec![],
4137                node_retires: vec![NodeRetire {
4138                    logical_id: "meeting-1".to_owned(),
4139                    source_ref: Some("src-2".to_owned()),
4140                }],
4141                edges: vec![],
4142                edge_retires: vec![],
4143                chunks: vec![],
4144                runs: vec![],
4145                steps: vec![],
4146                actions: vec![],
4147                optional_backfills: vec![],
4148                vec_inserts: vec![],
4149                operational_writes: vec![],
4150            })
4151            .expect("retire write");
4152
4153        let conn = rusqlite::Connection::open(db.path()).expect("open");
4154        let active: i64 = conn
4155            .query_row(
4156                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NULL",
4157                [],
4158                |r| r.get(0),
4159            )
4160            .expect("count active");
4161        let historical: i64 = conn
4162            .query_row(
4163                "SELECT COUNT(*) FROM nodes WHERE logical_id = 'meeting-1' AND superseded_at IS NOT NULL",
4164                [],
4165                |r| r.get(0),
4166            )
4167            .expect("count historical");
4168
4169        assert_eq!(active, 0, "active count must be 0 after retire");
4170        assert_eq!(historical, 1, "historical count must be 1 after retire");
4171    }
4172
4173    #[test]
4174    fn writer_node_retire_preserves_chunks_and_clears_fts() {
4175        let db = NamedTempFile::new().expect("temporary db");
4176        let writer = WriterActor::start(
4177            db.path(),
4178            Arc::new(SchemaManager::new()),
4179            ProvenanceMode::Warn,
4180            Arc::new(TelemetryCounters::default()),
4181        )
4182        .expect("writer");
4183
4184        writer
4185            .submit(WriteRequest {
4186                label: "seed".to_owned(),
4187                nodes: vec![NodeInsert {
4188                    row_id: "row-1".to_owned(),
4189                    logical_id: "meeting-1".to_owned(),
4190                    kind: "Meeting".to_owned(),
4191                    properties: "{}".to_owned(),
4192                    source_ref: Some("src-1".to_owned()),
4193                    upsert: false,
4194                    chunk_policy: ChunkPolicy::Preserve,
4195                    content_ref: None,
4196                }],
4197                node_retires: vec![],
4198                edges: vec![],
4199                edge_retires: vec![],
4200                chunks: vec![ChunkInsert {
4201                    id: "chunk-1".to_owned(),
4202                    node_logical_id: "meeting-1".to_owned(),
4203                    text_content: "budget discussion".to_owned(),
4204                    byte_start: None,
4205                    byte_end: None,
4206                    content_hash: None,
4207                }],
4208                runs: vec![],
4209                steps: vec![],
4210                actions: vec![],
4211                optional_backfills: vec![],
4212                vec_inserts: vec![],
4213                operational_writes: vec![],
4214            })
4215            .expect("seed write");
4216
4217        writer
4218            .submit(WriteRequest {
4219                label: "retire".to_owned(),
4220                nodes: vec![],
4221                node_retires: vec![NodeRetire {
4222                    logical_id: "meeting-1".to_owned(),
4223                    source_ref: Some("src-2".to_owned()),
4224                }],
4225                edges: vec![],
4226                edge_retires: vec![],
4227                chunks: vec![],
4228                runs: vec![],
4229                steps: vec![],
4230                actions: vec![],
4231                optional_backfills: vec![],
4232                vec_inserts: vec![],
4233                operational_writes: vec![],
4234            })
4235            .expect("retire write");
4236
4237        let conn = rusqlite::Connection::open(db.path()).expect("open");
4238        let chunk_count: i64 = conn
4239            .query_row(
4240                "SELECT COUNT(*) FROM chunks WHERE node_logical_id = 'meeting-1'",
4241                [],
4242                |r| r.get(0),
4243            )
4244            .expect("chunk count");
4245        let fts_count: i64 = conn
4246            .query_row(
4247                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1'",
4248                [],
4249                |r| r.get(0),
4250            )
4251            .expect("fts count");
4252
4253        assert_eq!(
4254            chunk_count, 1,
4255            "chunks must remain after node retire so restore can re-establish content"
4256        );
4257        assert_eq!(fts_count, 0, "fts_nodes must be deleted after node retire");
4258    }
4259
4260    #[test]
4261    fn writer_edge_retire_supersedes_active_edge() {
4262        let db = NamedTempFile::new().expect("temporary db");
4263        let writer = WriterActor::start(
4264            db.path(),
4265            Arc::new(SchemaManager::new()),
4266            ProvenanceMode::Warn,
4267            Arc::new(TelemetryCounters::default()),
4268        )
4269        .expect("writer");
4270
4271        writer
4272            .submit(WriteRequest {
4273                label: "seed".to_owned(),
4274                nodes: vec![
4275                    NodeInsert {
4276                        row_id: "row-a".to_owned(),
4277                        logical_id: "node-a".to_owned(),
4278                        kind: "Meeting".to_owned(),
4279                        properties: "{}".to_owned(),
4280                        source_ref: Some("src-1".to_owned()),
4281                        upsert: false,
4282                        chunk_policy: ChunkPolicy::Preserve,
4283                        content_ref: None,
4284                    },
4285                    NodeInsert {
4286                        row_id: "row-b".to_owned(),
4287                        logical_id: "node-b".to_owned(),
4288                        kind: "Task".to_owned(),
4289                        properties: "{}".to_owned(),
4290                        source_ref: Some("src-1".to_owned()),
4291                        upsert: false,
4292                        chunk_policy: ChunkPolicy::Preserve,
4293                        content_ref: None,
4294                    },
4295                ],
4296                node_retires: vec![],
4297                edges: vec![EdgeInsert {
4298                    row_id: "edge-1".to_owned(),
4299                    logical_id: "edge-lg-1".to_owned(),
4300                    source_logical_id: "node-a".to_owned(),
4301                    target_logical_id: "node-b".to_owned(),
4302                    kind: "HAS_TASK".to_owned(),
4303                    properties: "{}".to_owned(),
4304                    source_ref: Some("src-1".to_owned()),
4305                    upsert: false,
4306                }],
4307                edge_retires: vec![],
4308                chunks: vec![],
4309                runs: vec![],
4310                steps: vec![],
4311                actions: vec![],
4312                optional_backfills: vec![],
4313                vec_inserts: vec![],
4314                operational_writes: vec![],
4315            })
4316            .expect("seed write");
4317
4318        writer
4319            .submit(WriteRequest {
4320                label: "retire-edge".to_owned(),
4321                nodes: vec![],
4322                node_retires: vec![],
4323                edges: vec![],
4324                edge_retires: vec![EdgeRetire {
4325                    logical_id: "edge-lg-1".to_owned(),
4326                    source_ref: Some("src-2".to_owned()),
4327                }],
4328                chunks: vec![],
4329                runs: vec![],
4330                steps: vec![],
4331                actions: vec![],
4332                optional_backfills: vec![],
4333                vec_inserts: vec![],
4334                operational_writes: vec![],
4335            })
4336            .expect("retire edge write");
4337
4338        let conn = rusqlite::Connection::open(db.path()).expect("open");
4339        let active: i64 = conn
4340            .query_row(
4341                "SELECT COUNT(*) FROM edges WHERE logical_id = 'edge-lg-1' AND superseded_at IS NULL",
4342                [],
4343                |r| r.get(0),
4344            )
4345            .expect("active edge count");
4346
4347        assert_eq!(active, 0, "active edge count must be 0 after retire");
4348    }
4349
4350    #[test]
4351    fn writer_retire_without_source_ref_emits_provenance_warning() {
4352        let db = NamedTempFile::new().expect("temporary db");
4353        let writer = WriterActor::start(
4354            db.path(),
4355            Arc::new(SchemaManager::new()),
4356            ProvenanceMode::Warn,
4357            Arc::new(TelemetryCounters::default()),
4358        )
4359        .expect("writer");
4360
4361        writer
4362            .submit(WriteRequest {
4363                label: "seed".to_owned(),
4364                nodes: vec![NodeInsert {
4365                    row_id: "row-1".to_owned(),
4366                    logical_id: "meeting-1".to_owned(),
4367                    kind: "Meeting".to_owned(),
4368                    properties: "{}".to_owned(),
4369                    source_ref: Some("src-1".to_owned()),
4370                    upsert: false,
4371                    chunk_policy: ChunkPolicy::Preserve,
4372                    content_ref: None,
4373                }],
4374                node_retires: vec![],
4375                edges: vec![],
4376                edge_retires: vec![],
4377                chunks: vec![],
4378                runs: vec![],
4379                steps: vec![],
4380                actions: vec![],
4381                optional_backfills: vec![],
4382                vec_inserts: vec![],
4383                operational_writes: vec![],
4384            })
4385            .expect("seed write");
4386
4387        let receipt = writer
4388            .submit(WriteRequest {
4389                label: "retire-no-src".to_owned(),
4390                nodes: vec![],
4391                node_retires: vec![NodeRetire {
4392                    logical_id: "meeting-1".to_owned(),
4393                    source_ref: None,
4394                }],
4395                edges: vec![],
4396                edge_retires: vec![],
4397                chunks: vec![],
4398                runs: vec![],
4399                steps: vec![],
4400                actions: vec![],
4401                optional_backfills: vec![],
4402                vec_inserts: vec![],
4403                operational_writes: vec![],
4404            })
4405            .expect("retire write");
4406
4407        assert!(
4408            !receipt.provenance_warnings.is_empty(),
4409            "retire without source_ref must emit a provenance warning"
4410        );
4411    }
4412
4413    #[test]
4414    #[allow(clippy::too_many_lines)]
4415    fn writer_upsert_with_chunk_policy_replace_clears_old_chunks() {
4416        let db = NamedTempFile::new().expect("temporary db");
4417        let writer = WriterActor::start(
4418            db.path(),
4419            Arc::new(SchemaManager::new()),
4420            ProvenanceMode::Warn,
4421            Arc::new(TelemetryCounters::default()),
4422        )
4423        .expect("writer");
4424
4425        writer
4426            .submit(WriteRequest {
4427                label: "v1".to_owned(),
4428                nodes: vec![NodeInsert {
4429                    row_id: "row-1".to_owned(),
4430                    logical_id: "meeting-1".to_owned(),
4431                    kind: "Meeting".to_owned(),
4432                    properties: "{}".to_owned(),
4433                    source_ref: Some("src-1".to_owned()),
4434                    upsert: false,
4435                    chunk_policy: ChunkPolicy::Preserve,
4436                    content_ref: None,
4437                }],
4438                node_retires: vec![],
4439                edges: vec![],
4440                edge_retires: vec![],
4441                chunks: vec![ChunkInsert {
4442                    id: "chunk-old".to_owned(),
4443                    node_logical_id: "meeting-1".to_owned(),
4444                    text_content: "old text".to_owned(),
4445                    byte_start: None,
4446                    byte_end: None,
4447                    content_hash: None,
4448                }],
4449                runs: vec![],
4450                steps: vec![],
4451                actions: vec![],
4452                optional_backfills: vec![],
4453                vec_inserts: vec![],
4454                operational_writes: vec![],
4455            })
4456            .expect("v1 write");
4457
4458        writer
4459            .submit(WriteRequest {
4460                label: "v2".to_owned(),
4461                nodes: vec![NodeInsert {
4462                    row_id: "row-2".to_owned(),
4463                    logical_id: "meeting-1".to_owned(),
4464                    kind: "Meeting".to_owned(),
4465                    properties: "{}".to_owned(),
4466                    source_ref: Some("src-2".to_owned()),
4467                    upsert: true,
4468                    chunk_policy: ChunkPolicy::Replace,
4469                    content_ref: None,
4470                }],
4471                node_retires: vec![],
4472                edges: vec![],
4473                edge_retires: vec![],
4474                chunks: vec![ChunkInsert {
4475                    id: "chunk-new".to_owned(),
4476                    node_logical_id: "meeting-1".to_owned(),
4477                    text_content: "new text".to_owned(),
4478                    byte_start: None,
4479                    byte_end: None,
4480                    content_hash: None,
4481                }],
4482                runs: vec![],
4483                steps: vec![],
4484                actions: vec![],
4485                optional_backfills: vec![],
4486                vec_inserts: vec![],
4487                operational_writes: vec![],
4488            })
4489            .expect("v2 write");
4490
4491        let conn = rusqlite::Connection::open(db.path()).expect("open");
4492        let old_chunk: i64 = conn
4493            .query_row(
4494                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4495                [],
4496                |r| r.get(0),
4497            )
4498            .expect("old chunk count");
4499        let new_chunk: i64 = conn
4500            .query_row(
4501                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-new'",
4502                [],
4503                |r| r.get(0),
4504            )
4505            .expect("new chunk count");
4506        let fts_old: i64 = conn
4507            .query_row(
4508                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'meeting-1' AND text_content = 'old text'",
4509                [],
4510                |r| r.get(0),
4511            )
4512            .expect("old fts count");
4513
4514        assert_eq!(
4515            old_chunk, 0,
4516            "old chunk must be deleted by ChunkPolicy::Replace"
4517        );
4518        assert_eq!(new_chunk, 1, "new chunk must exist after replace");
4519        assert_eq!(
4520            fts_old, 0,
4521            "old FTS row must be deleted by ChunkPolicy::Replace"
4522        );
4523    }
4524
4525    #[test]
4526    fn writer_upsert_with_chunk_policy_preserve_keeps_old_chunks() {
4527        let db = NamedTempFile::new().expect("temporary db");
4528        let writer = WriterActor::start(
4529            db.path(),
4530            Arc::new(SchemaManager::new()),
4531            ProvenanceMode::Warn,
4532            Arc::new(TelemetryCounters::default()),
4533        )
4534        .expect("writer");
4535
4536        writer
4537            .submit(WriteRequest {
4538                label: "v1".to_owned(),
4539                nodes: vec![NodeInsert {
4540                    row_id: "row-1".to_owned(),
4541                    logical_id: "meeting-1".to_owned(),
4542                    kind: "Meeting".to_owned(),
4543                    properties: "{}".to_owned(),
4544                    source_ref: Some("src-1".to_owned()),
4545                    upsert: false,
4546                    chunk_policy: ChunkPolicy::Preserve,
4547                    content_ref: None,
4548                }],
4549                node_retires: vec![],
4550                edges: vec![],
4551                edge_retires: vec![],
4552                chunks: vec![ChunkInsert {
4553                    id: "chunk-old".to_owned(),
4554                    node_logical_id: "meeting-1".to_owned(),
4555                    text_content: "old text".to_owned(),
4556                    byte_start: None,
4557                    byte_end: None,
4558                    content_hash: None,
4559                }],
4560                runs: vec![],
4561                steps: vec![],
4562                actions: vec![],
4563                optional_backfills: vec![],
4564                vec_inserts: vec![],
4565                operational_writes: vec![],
4566            })
4567            .expect("v1 write");
4568
4569        writer
4570            .submit(WriteRequest {
4571                label: "v2-props-only".to_owned(),
4572                nodes: vec![NodeInsert {
4573                    row_id: "row-2".to_owned(),
4574                    logical_id: "meeting-1".to_owned(),
4575                    kind: "Meeting".to_owned(),
4576                    properties: r#"{"status":"updated"}"#.to_owned(),
4577                    source_ref: Some("src-2".to_owned()),
4578                    upsert: true,
4579                    chunk_policy: ChunkPolicy::Preserve,
4580                    content_ref: None,
4581                }],
4582                node_retires: vec![],
4583                edges: vec![],
4584                edge_retires: vec![],
4585                chunks: vec![],
4586                runs: vec![],
4587                steps: vec![],
4588                actions: vec![],
4589                optional_backfills: vec![],
4590                vec_inserts: vec![],
4591                operational_writes: vec![],
4592            })
4593            .expect("v2 preserve write");
4594
4595        let conn = rusqlite::Connection::open(db.path()).expect("open");
4596        let old_chunk: i64 = conn
4597            .query_row(
4598                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-old'",
4599                [],
4600                |r| r.get(0),
4601            )
4602            .expect("old chunk count");
4603
4604        assert_eq!(
4605            old_chunk, 1,
4606            "old chunk must be preserved by ChunkPolicy::Preserve"
4607        );
4608    }
4609
4610    #[test]
4611    fn writer_chunk_policy_replace_without_upsert_is_a_no_op() {
4612        let db = NamedTempFile::new().expect("temporary db");
4613        let writer = WriterActor::start(
4614            db.path(),
4615            Arc::new(SchemaManager::new()),
4616            ProvenanceMode::Warn,
4617            Arc::new(TelemetryCounters::default()),
4618        )
4619        .expect("writer");
4620
4621        writer
4622            .submit(WriteRequest {
4623                label: "v1".to_owned(),
4624                nodes: vec![NodeInsert {
4625                    row_id: "row-1".to_owned(),
4626                    logical_id: "meeting-1".to_owned(),
4627                    kind: "Meeting".to_owned(),
4628                    properties: "{}".to_owned(),
4629                    source_ref: Some("src-1".to_owned()),
4630                    upsert: false,
4631                    chunk_policy: ChunkPolicy::Preserve,
4632                    content_ref: None,
4633                }],
4634                node_retires: vec![],
4635                edges: vec![],
4636                edge_retires: vec![],
4637                chunks: vec![ChunkInsert {
4638                    id: "chunk-existing".to_owned(),
4639                    node_logical_id: "meeting-1".to_owned(),
4640                    text_content: "existing text".to_owned(),
4641                    byte_start: None,
4642                    byte_end: None,
4643                    content_hash: None,
4644                }],
4645                runs: vec![],
4646                steps: vec![],
4647                actions: vec![],
4648                optional_backfills: vec![],
4649                vec_inserts: vec![],
4650                operational_writes: vec![],
4651            })
4652            .expect("v1 write");
4653
4654        // Insert a second node (not upsert) with ChunkPolicy::Replace — should NOT delete prior chunks
4655        writer
4656            .submit(WriteRequest {
4657                label: "insert-no-upsert".to_owned(),
4658                nodes: vec![NodeInsert {
4659                    row_id: "row-2".to_owned(),
4660                    logical_id: "meeting-2".to_owned(),
4661                    kind: "Meeting".to_owned(),
4662                    properties: "{}".to_owned(),
4663                    source_ref: Some("src-2".to_owned()),
4664                    upsert: false,
4665                    chunk_policy: ChunkPolicy::Replace,
4666                    content_ref: None,
4667                }],
4668                node_retires: vec![],
4669                edges: vec![],
4670                edge_retires: vec![],
4671                chunks: vec![],
4672                runs: vec![],
4673                steps: vec![],
4674                actions: vec![],
4675                optional_backfills: vec![],
4676                vec_inserts: vec![],
4677                operational_writes: vec![],
4678            })
4679            .expect("insert no-upsert write");
4680
4681        let conn = rusqlite::Connection::open(db.path()).expect("open");
4682        let existing_chunk: i64 = conn
4683            .query_row(
4684                "SELECT COUNT(*) FROM chunks WHERE id = 'chunk-existing'",
4685                [],
4686                |r| r.get(0),
4687            )
4688            .expect("chunk count");
4689
4690        assert_eq!(
4691            existing_chunk, 1,
4692            "ChunkPolicy::Replace without upsert must not delete existing chunks"
4693        );
4694    }
4695
4696    #[test]
4697    fn writer_run_upsert_supersedes_prior_active_run() {
4698        let db = NamedTempFile::new().expect("temporary db");
4699        let writer = WriterActor::start(
4700            db.path(),
4701            Arc::new(SchemaManager::new()),
4702            ProvenanceMode::Warn,
4703            Arc::new(TelemetryCounters::default()),
4704        )
4705        .expect("writer");
4706
4707        writer
4708            .submit(WriteRequest {
4709                label: "v1".to_owned(),
4710                nodes: vec![],
4711                node_retires: vec![],
4712                edges: vec![],
4713                edge_retires: vec![],
4714                chunks: vec![],
4715                runs: vec![RunInsert {
4716                    id: "run-v1".to_owned(),
4717                    kind: "session".to_owned(),
4718                    status: "completed".to_owned(),
4719                    properties: "{}".to_owned(),
4720                    source_ref: Some("src-1".to_owned()),
4721                    upsert: false,
4722                    supersedes_id: None,
4723                }],
4724                steps: vec![],
4725                actions: vec![],
4726                optional_backfills: vec![],
4727                vec_inserts: vec![],
4728                operational_writes: vec![],
4729            })
4730            .expect("v1 run write");
4731
4732        writer
4733            .submit(WriteRequest {
4734                label: "v2".to_owned(),
4735                nodes: vec![],
4736                node_retires: vec![],
4737                edges: vec![],
4738                edge_retires: vec![],
4739                chunks: vec![],
4740                runs: vec![RunInsert {
4741                    id: "run-v2".to_owned(),
4742                    kind: "session".to_owned(),
4743                    status: "completed".to_owned(),
4744                    properties: "{}".to_owned(),
4745                    source_ref: Some("src-2".to_owned()),
4746                    upsert: true,
4747                    supersedes_id: Some("run-v1".to_owned()),
4748                }],
4749                steps: vec![],
4750                actions: vec![],
4751                optional_backfills: vec![],
4752                vec_inserts: vec![],
4753                operational_writes: vec![],
4754            })
4755            .expect("v2 run write");
4756
4757        let conn = rusqlite::Connection::open(db.path()).expect("open");
4758        let v1_historical: i64 = conn
4759            .query_row(
4760                "SELECT COUNT(*) FROM runs WHERE id = 'run-v1' AND superseded_at IS NOT NULL",
4761                [],
4762                |r| r.get(0),
4763            )
4764            .expect("v1 historical count");
4765        let v2_active: i64 = conn
4766            .query_row(
4767                "SELECT COUNT(*) FROM runs WHERE id = 'run-v2' AND superseded_at IS NULL",
4768                [],
4769                |r| r.get(0),
4770            )
4771            .expect("v2 active count");
4772
4773        assert_eq!(v1_historical, 1, "run-v1 must be historical after upsert");
4774        assert_eq!(v2_active, 1, "run-v2 must be active after upsert");
4775    }
4776
4777    #[test]
4778    fn writer_step_upsert_supersedes_prior_active_step() {
4779        let db = NamedTempFile::new().expect("temporary db");
4780        let writer = WriterActor::start(
4781            db.path(),
4782            Arc::new(SchemaManager::new()),
4783            ProvenanceMode::Warn,
4784            Arc::new(TelemetryCounters::default()),
4785        )
4786        .expect("writer");
4787
4788        writer
4789            .submit(WriteRequest {
4790                label: "v1".to_owned(),
4791                nodes: vec![],
4792                node_retires: vec![],
4793                edges: vec![],
4794                edge_retires: vec![],
4795                chunks: vec![],
4796                runs: vec![RunInsert {
4797                    id: "run-1".to_owned(),
4798                    kind: "session".to_owned(),
4799                    status: "completed".to_owned(),
4800                    properties: "{}".to_owned(),
4801                    source_ref: Some("src-1".to_owned()),
4802                    upsert: false,
4803                    supersedes_id: None,
4804                }],
4805                steps: vec![StepInsert {
4806                    id: "step-v1".to_owned(),
4807                    run_id: "run-1".to_owned(),
4808                    kind: "llm".to_owned(),
4809                    status: "completed".to_owned(),
4810                    properties: "{}".to_owned(),
4811                    source_ref: Some("src-1".to_owned()),
4812                    upsert: false,
4813                    supersedes_id: None,
4814                }],
4815                actions: vec![],
4816                optional_backfills: vec![],
4817                vec_inserts: vec![],
4818                operational_writes: vec![],
4819            })
4820            .expect("v1 step write");
4821
4822        writer
4823            .submit(WriteRequest {
4824                label: "v2".to_owned(),
4825                nodes: vec![],
4826                node_retires: vec![],
4827                edges: vec![],
4828                edge_retires: vec![],
4829                chunks: vec![],
4830                runs: vec![],
4831                steps: vec![StepInsert {
4832                    id: "step-v2".to_owned(),
4833                    run_id: "run-1".to_owned(),
4834                    kind: "llm".to_owned(),
4835                    status: "completed".to_owned(),
4836                    properties: "{}".to_owned(),
4837                    source_ref: Some("src-2".to_owned()),
4838                    upsert: true,
4839                    supersedes_id: Some("step-v1".to_owned()),
4840                }],
4841                actions: vec![],
4842                optional_backfills: vec![],
4843                vec_inserts: vec![],
4844                operational_writes: vec![],
4845            })
4846            .expect("v2 step write");
4847
4848        let conn = rusqlite::Connection::open(db.path()).expect("open");
4849        let v1_historical: i64 = conn
4850            .query_row(
4851                "SELECT COUNT(*) FROM steps WHERE id = 'step-v1' AND superseded_at IS NOT NULL",
4852                [],
4853                |r| r.get(0),
4854            )
4855            .expect("v1 historical count");
4856        let v2_active: i64 = conn
4857            .query_row(
4858                "SELECT COUNT(*) FROM steps WHERE id = 'step-v2' AND superseded_at IS NULL",
4859                [],
4860                |r| r.get(0),
4861            )
4862            .expect("v2 active count");
4863
4864        assert_eq!(v1_historical, 1, "step-v1 must be historical after upsert");
4865        assert_eq!(v2_active, 1, "step-v2 must be active after upsert");
4866    }
4867
4868    #[test]
4869    fn writer_action_upsert_supersedes_prior_active_action() {
4870        let db = NamedTempFile::new().expect("temporary db");
4871        let writer = WriterActor::start(
4872            db.path(),
4873            Arc::new(SchemaManager::new()),
4874            ProvenanceMode::Warn,
4875            Arc::new(TelemetryCounters::default()),
4876        )
4877        .expect("writer");
4878
4879        writer
4880            .submit(WriteRequest {
4881                label: "v1".to_owned(),
4882                nodes: vec![],
4883                node_retires: vec![],
4884                edges: vec![],
4885                edge_retires: vec![],
4886                chunks: vec![],
4887                runs: vec![RunInsert {
4888                    id: "run-1".to_owned(),
4889                    kind: "session".to_owned(),
4890                    status: "completed".to_owned(),
4891                    properties: "{}".to_owned(),
4892                    source_ref: Some("src-1".to_owned()),
4893                    upsert: false,
4894                    supersedes_id: None,
4895                }],
4896                steps: vec![StepInsert {
4897                    id: "step-1".to_owned(),
4898                    run_id: "run-1".to_owned(),
4899                    kind: "llm".to_owned(),
4900                    status: "completed".to_owned(),
4901                    properties: "{}".to_owned(),
4902                    source_ref: Some("src-1".to_owned()),
4903                    upsert: false,
4904                    supersedes_id: None,
4905                }],
4906                actions: vec![ActionInsert {
4907                    id: "action-v1".to_owned(),
4908                    step_id: "step-1".to_owned(),
4909                    kind: "emit".to_owned(),
4910                    status: "completed".to_owned(),
4911                    properties: "{}".to_owned(),
4912                    source_ref: Some("src-1".to_owned()),
4913                    upsert: false,
4914                    supersedes_id: None,
4915                }],
4916                optional_backfills: vec![],
4917                vec_inserts: vec![],
4918                operational_writes: vec![],
4919            })
4920            .expect("v1 action write");
4921
4922        writer
4923            .submit(WriteRequest {
4924                label: "v2".to_owned(),
4925                nodes: vec![],
4926                node_retires: vec![],
4927                edges: vec![],
4928                edge_retires: vec![],
4929                chunks: vec![],
4930                runs: vec![],
4931                steps: vec![],
4932                actions: vec![ActionInsert {
4933                    id: "action-v2".to_owned(),
4934                    step_id: "step-1".to_owned(),
4935                    kind: "emit".to_owned(),
4936                    status: "completed".to_owned(),
4937                    properties: "{}".to_owned(),
4938                    source_ref: Some("src-2".to_owned()),
4939                    upsert: true,
4940                    supersedes_id: Some("action-v1".to_owned()),
4941                }],
4942                optional_backfills: vec![],
4943                vec_inserts: vec![],
4944                operational_writes: vec![],
4945            })
4946            .expect("v2 action write");
4947
4948        let conn = rusqlite::Connection::open(db.path()).expect("open");
4949        let v1_historical: i64 = conn
4950            .query_row(
4951                "SELECT COUNT(*) FROM actions WHERE id = 'action-v1' AND superseded_at IS NOT NULL",
4952                [],
4953                |r| r.get(0),
4954            )
4955            .expect("v1 historical count");
4956        let v2_active: i64 = conn
4957            .query_row(
4958                "SELECT COUNT(*) FROM actions WHERE id = 'action-v2' AND superseded_at IS NULL",
4959                [],
4960                |r| r.get(0),
4961            )
4962            .expect("v2 active count");
4963
4964        assert_eq!(
4965            v1_historical, 1,
4966            "action-v1 must be historical after upsert"
4967        );
4968        assert_eq!(v2_active, 1, "action-v2 must be active after upsert");
4969    }
4970
4971    // P0: runtime upsert without supersedes_id must be rejected
4972
4973    #[test]
4974    fn writer_run_upsert_without_supersedes_id_returns_invalid_write() {
4975        let db = NamedTempFile::new().expect("temporary db");
4976        let writer = WriterActor::start(
4977            db.path(),
4978            Arc::new(SchemaManager::new()),
4979            ProvenanceMode::Warn,
4980            Arc::new(TelemetryCounters::default()),
4981        )
4982        .expect("writer");
4983
4984        let result = writer.submit(WriteRequest {
4985            label: "bad".to_owned(),
4986            nodes: vec![],
4987            node_retires: vec![],
4988            edges: vec![],
4989            edge_retires: vec![],
4990            chunks: vec![],
4991            runs: vec![RunInsert {
4992                id: "run-1".to_owned(),
4993                kind: "session".to_owned(),
4994                status: "completed".to_owned(),
4995                properties: "{}".to_owned(),
4996                source_ref: None,
4997                upsert: true,
4998                supersedes_id: None,
4999            }],
5000            steps: vec![],
5001            actions: vec![],
5002            optional_backfills: vec![],
5003            vec_inserts: vec![],
5004            operational_writes: vec![],
5005        });
5006
5007        assert!(
5008            matches!(result, Err(EngineError::InvalidWrite(_))),
5009            "run upsert=true without supersedes_id must return InvalidWrite"
5010        );
5011    }
5012
5013    #[test]
5014    fn writer_step_upsert_without_supersedes_id_returns_invalid_write() {
5015        let db = NamedTempFile::new().expect("temporary db");
5016        let writer = WriterActor::start(
5017            db.path(),
5018            Arc::new(SchemaManager::new()),
5019            ProvenanceMode::Warn,
5020            Arc::new(TelemetryCounters::default()),
5021        )
5022        .expect("writer");
5023
5024        let result = writer.submit(WriteRequest {
5025            label: "bad".to_owned(),
5026            nodes: vec![],
5027            node_retires: vec![],
5028            edges: vec![],
5029            edge_retires: vec![],
5030            chunks: vec![],
5031            runs: vec![],
5032            steps: vec![StepInsert {
5033                id: "step-1".to_owned(),
5034                run_id: "run-1".to_owned(),
5035                kind: "llm".to_owned(),
5036                status: "completed".to_owned(),
5037                properties: "{}".to_owned(),
5038                source_ref: None,
5039                upsert: true,
5040                supersedes_id: None,
5041            }],
5042            actions: vec![],
5043            optional_backfills: vec![],
5044            vec_inserts: vec![],
5045            operational_writes: vec![],
5046        });
5047
5048        assert!(
5049            matches!(result, Err(EngineError::InvalidWrite(_))),
5050            "step upsert=true without supersedes_id must return InvalidWrite"
5051        );
5052    }
5053
5054    #[test]
5055    fn writer_action_upsert_without_supersedes_id_returns_invalid_write() {
5056        let db = NamedTempFile::new().expect("temporary db");
5057        let writer = WriterActor::start(
5058            db.path(),
5059            Arc::new(SchemaManager::new()),
5060            ProvenanceMode::Warn,
5061            Arc::new(TelemetryCounters::default()),
5062        )
5063        .expect("writer");
5064
5065        let result = writer.submit(WriteRequest {
5066            label: "bad".to_owned(),
5067            nodes: vec![],
5068            node_retires: vec![],
5069            edges: vec![],
5070            edge_retires: vec![],
5071            chunks: vec![],
5072            runs: vec![],
5073            steps: vec![],
5074            actions: vec![ActionInsert {
5075                id: "action-1".to_owned(),
5076                step_id: "step-1".to_owned(),
5077                kind: "emit".to_owned(),
5078                status: "completed".to_owned(),
5079                properties: "{}".to_owned(),
5080                source_ref: None,
5081                upsert: true,
5082                supersedes_id: None,
5083            }],
5084            optional_backfills: vec![],
5085            vec_inserts: vec![],
5086            operational_writes: vec![],
5087        });
5088
5089        assert!(
5090            matches!(result, Err(EngineError::InvalidWrite(_))),
5091            "action upsert=true without supersedes_id must return InvalidWrite"
5092        );
5093    }
5094
5095    // P1a/b: provenance warnings for edge inserts and runtime table inserts
5096
5097    #[test]
5098    fn writer_edge_insert_without_source_ref_emits_provenance_warning() {
5099        let db = NamedTempFile::new().expect("temporary db");
5100        let writer = WriterActor::start(
5101            db.path(),
5102            Arc::new(SchemaManager::new()),
5103            ProvenanceMode::Warn,
5104            Arc::new(TelemetryCounters::default()),
5105        )
5106        .expect("writer");
5107
5108        let receipt = writer
5109            .submit(WriteRequest {
5110                label: "test".to_owned(),
5111                nodes: vec![
5112                    NodeInsert {
5113                        row_id: "row-a".to_owned(),
5114                        logical_id: "node-a".to_owned(),
5115                        kind: "Meeting".to_owned(),
5116                        properties: "{}".to_owned(),
5117                        source_ref: Some("src-1".to_owned()),
5118                        upsert: false,
5119                        chunk_policy: ChunkPolicy::Preserve,
5120                        content_ref: None,
5121                    },
5122                    NodeInsert {
5123                        row_id: "row-b".to_owned(),
5124                        logical_id: "node-b".to_owned(),
5125                        kind: "Task".to_owned(),
5126                        properties: "{}".to_owned(),
5127                        source_ref: Some("src-1".to_owned()),
5128                        upsert: false,
5129                        chunk_policy: ChunkPolicy::Preserve,
5130                        content_ref: None,
5131                    },
5132                ],
5133                node_retires: vec![],
5134                edges: vec![EdgeInsert {
5135                    row_id: "edge-1".to_owned(),
5136                    logical_id: "edge-lg-1".to_owned(),
5137                    source_logical_id: "node-a".to_owned(),
5138                    target_logical_id: "node-b".to_owned(),
5139                    kind: "HAS_TASK".to_owned(),
5140                    properties: "{}".to_owned(),
5141                    source_ref: None,
5142                    upsert: false,
5143                }],
5144                edge_retires: vec![],
5145                chunks: vec![],
5146                runs: vec![],
5147                steps: vec![],
5148                actions: vec![],
5149                optional_backfills: vec![],
5150                vec_inserts: vec![],
5151                operational_writes: vec![],
5152            })
5153            .expect("write");
5154
5155        assert!(
5156            !receipt.provenance_warnings.is_empty(),
5157            "edge insert without source_ref must emit a provenance warning"
5158        );
5159    }
5160
5161    #[test]
5162    fn writer_run_insert_without_source_ref_emits_provenance_warning() {
5163        let db = NamedTempFile::new().expect("temporary db");
5164        let writer = WriterActor::start(
5165            db.path(),
5166            Arc::new(SchemaManager::new()),
5167            ProvenanceMode::Warn,
5168            Arc::new(TelemetryCounters::default()),
5169        )
5170        .expect("writer");
5171
5172        let receipt = writer
5173            .submit(WriteRequest {
5174                label: "test".to_owned(),
5175                nodes: vec![],
5176                node_retires: vec![],
5177                edges: vec![],
5178                edge_retires: vec![],
5179                chunks: vec![],
5180                runs: vec![RunInsert {
5181                    id: "run-1".to_owned(),
5182                    kind: "session".to_owned(),
5183                    status: "completed".to_owned(),
5184                    properties: "{}".to_owned(),
5185                    source_ref: None,
5186                    upsert: false,
5187                    supersedes_id: None,
5188                }],
5189                steps: vec![],
5190                actions: vec![],
5191                optional_backfills: vec![],
5192                vec_inserts: vec![],
5193                operational_writes: vec![],
5194            })
5195            .expect("write");
5196
5197        assert!(
5198            !receipt.provenance_warnings.is_empty(),
5199            "run insert without source_ref must emit a provenance warning"
5200        );
5201    }
5202
5203    // P1c: retire a node AND submit chunks for the same logical_id in one request
5204
5205    #[test]
5206    fn writer_retire_node_with_chunk_in_same_request_returns_invalid_write() {
5207        let db = NamedTempFile::new().expect("temporary db");
5208        let writer = WriterActor::start(
5209            db.path(),
5210            Arc::new(SchemaManager::new()),
5211            ProvenanceMode::Warn,
5212            Arc::new(TelemetryCounters::default()),
5213        )
5214        .expect("writer");
5215
5216        // First seed the node so it exists
5217        writer
5218            .submit(WriteRequest {
5219                label: "seed".to_owned(),
5220                nodes: vec![NodeInsert {
5221                    row_id: "row-1".to_owned(),
5222                    logical_id: "meeting-1".to_owned(),
5223                    kind: "Meeting".to_owned(),
5224                    properties: "{}".to_owned(),
5225                    source_ref: Some("src-1".to_owned()),
5226                    upsert: false,
5227                    chunk_policy: ChunkPolicy::Preserve,
5228                    content_ref: None,
5229                }],
5230                node_retires: vec![],
5231                edges: vec![],
5232                edge_retires: vec![],
5233                chunks: vec![],
5234                runs: vec![],
5235                steps: vec![],
5236                actions: vec![],
5237                optional_backfills: vec![],
5238                vec_inserts: vec![],
5239                operational_writes: vec![],
5240            })
5241            .expect("seed write");
5242
5243        // Now try to retire it AND add a chunk for it in the same request
5244        let result = writer.submit(WriteRequest {
5245            label: "bad".to_owned(),
5246            nodes: vec![],
5247            node_retires: vec![NodeRetire {
5248                logical_id: "meeting-1".to_owned(),
5249                source_ref: Some("src-2".to_owned()),
5250            }],
5251            edges: vec![],
5252            edge_retires: vec![],
5253            chunks: vec![ChunkInsert {
5254                id: "chunk-bad".to_owned(),
5255                node_logical_id: "meeting-1".to_owned(),
5256                text_content: "some text".to_owned(),
5257                byte_start: None,
5258                byte_end: None,
5259                content_hash: None,
5260            }],
5261            runs: vec![],
5262            steps: vec![],
5263            actions: vec![],
5264            optional_backfills: vec![],
5265            vec_inserts: vec![],
5266            operational_writes: vec![],
5267        });
5268
5269        assert!(
5270            matches!(result, Err(EngineError::InvalidWrite(_))),
5271            "retiring a node AND adding chunks for it in the same request must return InvalidWrite"
5272        );
5273    }
5274
5275    // --- Item 1: prepare_cached batch insert ---
5276
5277    #[test]
5278    fn writer_batch_insert_multiple_nodes() {
5279        let db = NamedTempFile::new().expect("temporary db");
5280        let writer = WriterActor::start(
5281            db.path(),
5282            Arc::new(SchemaManager::new()),
5283            ProvenanceMode::Warn,
5284            Arc::new(TelemetryCounters::default()),
5285        )
5286        .expect("writer");
5287
5288        let nodes: Vec<NodeInsert> = (0..100)
5289            .map(|i| NodeInsert {
5290                row_id: format!("row-{i}"),
5291                logical_id: format!("lg-{i}"),
5292                kind: "Note".to_owned(),
5293                properties: "{}".to_owned(),
5294                source_ref: Some("batch-src".to_owned()),
5295                upsert: false,
5296                chunk_policy: ChunkPolicy::Preserve,
5297                content_ref: None,
5298            })
5299            .collect();
5300
5301        writer
5302            .submit(WriteRequest {
5303                label: "batch".to_owned(),
5304                nodes,
5305                node_retires: vec![],
5306                edges: vec![],
5307                edge_retires: vec![],
5308                chunks: vec![],
5309                runs: vec![],
5310                steps: vec![],
5311                actions: vec![],
5312                optional_backfills: vec![],
5313                vec_inserts: vec![],
5314                operational_writes: vec![],
5315            })
5316            .expect("batch write");
5317
5318        let conn = rusqlite::Connection::open(db.path()).expect("open");
5319        let count: i64 = conn
5320            .query_row("SELECT COUNT(*) FROM nodes", [], |r| r.get(0))
5321            .expect("count nodes");
5322        assert_eq!(
5323            count, 100,
5324            "all 100 nodes must be present after batch insert"
5325        );
5326    }
5327
5328    // --- Item 2: ID validation ---
5329
5330    #[test]
5331    fn prepare_write_rejects_empty_node_row_id() {
5332        let db = NamedTempFile::new().expect("temporary db");
5333        let writer = WriterActor::start(
5334            db.path(),
5335            Arc::new(SchemaManager::new()),
5336            ProvenanceMode::Warn,
5337            Arc::new(TelemetryCounters::default()),
5338        )
5339        .expect("writer");
5340
5341        let result = writer.submit(WriteRequest {
5342            label: "test".to_owned(),
5343            nodes: vec![NodeInsert {
5344                row_id: String::new(),
5345                logical_id: "lg-1".to_owned(),
5346                kind: "Note".to_owned(),
5347                properties: "{}".to_owned(),
5348                source_ref: None,
5349                upsert: false,
5350                chunk_policy: ChunkPolicy::Preserve,
5351                content_ref: None,
5352            }],
5353            node_retires: vec![],
5354            edges: vec![],
5355            edge_retires: vec![],
5356            chunks: vec![],
5357            runs: vec![],
5358            steps: vec![],
5359            actions: vec![],
5360            optional_backfills: vec![],
5361            vec_inserts: vec![],
5362            operational_writes: vec![],
5363        });
5364
5365        assert!(
5366            matches!(result, Err(EngineError::InvalidWrite(_))),
5367            "empty row_id must be rejected"
5368        );
5369    }
5370
5371    #[test]
5372    fn prepare_write_rejects_empty_node_logical_id() {
5373        let db = NamedTempFile::new().expect("temporary db");
5374        let writer = WriterActor::start(
5375            db.path(),
5376            Arc::new(SchemaManager::new()),
5377            ProvenanceMode::Warn,
5378            Arc::new(TelemetryCounters::default()),
5379        )
5380        .expect("writer");
5381
5382        let result = writer.submit(WriteRequest {
5383            label: "test".to_owned(),
5384            nodes: vec![NodeInsert {
5385                row_id: "row-1".to_owned(),
5386                logical_id: String::new(),
5387                kind: "Note".to_owned(),
5388                properties: "{}".to_owned(),
5389                source_ref: None,
5390                upsert: false,
5391                chunk_policy: ChunkPolicy::Preserve,
5392                content_ref: None,
5393            }],
5394            node_retires: vec![],
5395            edges: vec![],
5396            edge_retires: vec![],
5397            chunks: vec![],
5398            runs: vec![],
5399            steps: vec![],
5400            actions: vec![],
5401            optional_backfills: vec![],
5402            vec_inserts: vec![],
5403            operational_writes: vec![],
5404        });
5405
5406        assert!(
5407            matches!(result, Err(EngineError::InvalidWrite(_))),
5408            "empty logical_id must be rejected"
5409        );
5410    }
5411
5412    #[test]
5413    fn prepare_write_rejects_duplicate_row_ids_in_request() {
5414        let db = NamedTempFile::new().expect("temporary db");
5415        let writer = WriterActor::start(
5416            db.path(),
5417            Arc::new(SchemaManager::new()),
5418            ProvenanceMode::Warn,
5419            Arc::new(TelemetryCounters::default()),
5420        )
5421        .expect("writer");
5422
5423        let result = writer.submit(WriteRequest {
5424            label: "test".to_owned(),
5425            nodes: vec![
5426                NodeInsert {
5427                    row_id: "row-1".to_owned(),
5428                    logical_id: "lg-1".to_owned(),
5429                    kind: "Note".to_owned(),
5430                    properties: "{}".to_owned(),
5431                    source_ref: None,
5432                    upsert: false,
5433                    chunk_policy: ChunkPolicy::Preserve,
5434                    content_ref: None,
5435                },
5436                NodeInsert {
5437                    row_id: "row-1".to_owned(), // duplicate
5438                    logical_id: "lg-2".to_owned(),
5439                    kind: "Note".to_owned(),
5440                    properties: "{}".to_owned(),
5441                    source_ref: None,
5442                    upsert: false,
5443                    chunk_policy: ChunkPolicy::Preserve,
5444                    content_ref: None,
5445                },
5446            ],
5447            node_retires: vec![],
5448            edges: vec![],
5449            edge_retires: vec![],
5450            chunks: vec![],
5451            runs: vec![],
5452            steps: vec![],
5453            actions: vec![],
5454            optional_backfills: vec![],
5455            vec_inserts: vec![],
5456            operational_writes: vec![],
5457        });
5458
5459        assert!(
5460            matches!(result, Err(EngineError::InvalidWrite(_))),
5461            "duplicate row_id within request must be rejected"
5462        );
5463    }
5464
5465    #[test]
5466    fn prepare_write_rejects_empty_chunk_id() {
5467        let db = NamedTempFile::new().expect("temporary db");
5468        let writer = WriterActor::start(
5469            db.path(),
5470            Arc::new(SchemaManager::new()),
5471            ProvenanceMode::Warn,
5472            Arc::new(TelemetryCounters::default()),
5473        )
5474        .expect("writer");
5475
5476        let result = writer.submit(WriteRequest {
5477            label: "test".to_owned(),
5478            nodes: vec![NodeInsert {
5479                row_id: "row-1".to_owned(),
5480                logical_id: "lg-1".to_owned(),
5481                kind: "Note".to_owned(),
5482                properties: "{}".to_owned(),
5483                source_ref: None,
5484                upsert: false,
5485                chunk_policy: ChunkPolicy::Preserve,
5486                content_ref: None,
5487            }],
5488            node_retires: vec![],
5489            edges: vec![],
5490            edge_retires: vec![],
5491            chunks: vec![ChunkInsert {
5492                id: String::new(),
5493                node_logical_id: "lg-1".to_owned(),
5494                text_content: "some text".to_owned(),
5495                byte_start: None,
5496                byte_end: None,
5497                content_hash: None,
5498            }],
5499            runs: vec![],
5500            steps: vec![],
5501            actions: vec![],
5502            optional_backfills: vec![],
5503            vec_inserts: vec![],
5504            operational_writes: vec![],
5505        });
5506
5507        assert!(
5508            matches!(result, Err(EngineError::InvalidWrite(_))),
5509            "empty chunk id must be rejected"
5510        );
5511    }
5512
5513    // --- Item 4: provenance warning coverage tests ---
5514
5515    #[test]
5516    fn writer_receipt_warns_on_step_without_source_ref() {
5517        let db = NamedTempFile::new().expect("temporary db");
5518        let writer = WriterActor::start(
5519            db.path(),
5520            Arc::new(SchemaManager::new()),
5521            ProvenanceMode::Warn,
5522            Arc::new(TelemetryCounters::default()),
5523        )
5524        .expect("writer");
5525
5526        // seed a run first so step FK is satisfied
5527        writer
5528            .submit(WriteRequest {
5529                label: "seed-run".to_owned(),
5530                nodes: vec![],
5531                node_retires: vec![],
5532                edges: vec![],
5533                edge_retires: vec![],
5534                chunks: vec![],
5535                runs: vec![RunInsert {
5536                    id: "run-1".to_owned(),
5537                    kind: "session".to_owned(),
5538                    status: "active".to_owned(),
5539                    properties: "{}".to_owned(),
5540                    source_ref: Some("src-1".to_owned()),
5541                    upsert: false,
5542                    supersedes_id: None,
5543                }],
5544                steps: vec![],
5545                actions: vec![],
5546                optional_backfills: vec![],
5547                vec_inserts: vec![],
5548                operational_writes: vec![],
5549            })
5550            .expect("seed run");
5551
5552        let receipt = writer
5553            .submit(WriteRequest {
5554                label: "test".to_owned(),
5555                nodes: vec![],
5556                node_retires: vec![],
5557                edges: vec![],
5558                edge_retires: vec![],
5559                chunks: vec![],
5560                runs: vec![],
5561                steps: vec![StepInsert {
5562                    id: "step-1".to_owned(),
5563                    run_id: "run-1".to_owned(),
5564                    kind: "llm_call".to_owned(),
5565                    status: "completed".to_owned(),
5566                    properties: "{}".to_owned(),
5567                    source_ref: None,
5568                    upsert: false,
5569                    supersedes_id: None,
5570                }],
5571                actions: vec![],
5572                optional_backfills: vec![],
5573                vec_inserts: vec![],
5574                operational_writes: vec![],
5575            })
5576            .expect("write");
5577
5578        assert!(
5579            !receipt.provenance_warnings.is_empty(),
5580            "step insert without source_ref must emit a provenance warning"
5581        );
5582    }
5583
5584    #[test]
5585    fn writer_receipt_warns_on_action_without_source_ref() {
5586        let db = NamedTempFile::new().expect("temporary db");
5587        let writer = WriterActor::start(
5588            db.path(),
5589            Arc::new(SchemaManager::new()),
5590            ProvenanceMode::Warn,
5591            Arc::new(TelemetryCounters::default()),
5592        )
5593        .expect("writer");
5594
5595        // seed run and step so action FK is satisfied
5596        writer
5597            .submit(WriteRequest {
5598                label: "seed".to_owned(),
5599                nodes: vec![],
5600                node_retires: vec![],
5601                edges: vec![],
5602                edge_retires: vec![],
5603                chunks: vec![],
5604                runs: vec![RunInsert {
5605                    id: "run-1".to_owned(),
5606                    kind: "session".to_owned(),
5607                    status: "active".to_owned(),
5608                    properties: "{}".to_owned(),
5609                    source_ref: Some("src-1".to_owned()),
5610                    upsert: false,
5611                    supersedes_id: None,
5612                }],
5613                steps: vec![StepInsert {
5614                    id: "step-1".to_owned(),
5615                    run_id: "run-1".to_owned(),
5616                    kind: "llm_call".to_owned(),
5617                    status: "completed".to_owned(),
5618                    properties: "{}".to_owned(),
5619                    source_ref: Some("src-1".to_owned()),
5620                    upsert: false,
5621                    supersedes_id: None,
5622                }],
5623                actions: vec![],
5624                optional_backfills: vec![],
5625                vec_inserts: vec![],
5626                operational_writes: vec![],
5627            })
5628            .expect("seed");
5629
5630        let receipt = writer
5631            .submit(WriteRequest {
5632                label: "test".to_owned(),
5633                nodes: vec![],
5634                node_retires: vec![],
5635                edges: vec![],
5636                edge_retires: vec![],
5637                chunks: vec![],
5638                runs: vec![],
5639                steps: vec![],
5640                actions: vec![ActionInsert {
5641                    id: "action-1".to_owned(),
5642                    step_id: "step-1".to_owned(),
5643                    kind: "tool_call".to_owned(),
5644                    status: "completed".to_owned(),
5645                    properties: "{}".to_owned(),
5646                    source_ref: None,
5647                    upsert: false,
5648                    supersedes_id: None,
5649                }],
5650                optional_backfills: vec![],
5651                vec_inserts: vec![],
5652                operational_writes: vec![],
5653            })
5654            .expect("write");
5655
5656        assert!(
5657            !receipt.provenance_warnings.is_empty(),
5658            "action insert without source_ref must emit a provenance warning"
5659        );
5660    }
5661
5662    #[test]
5663    fn writer_receipt_no_warnings_when_all_types_have_source_ref() {
5664        let db = NamedTempFile::new().expect("temporary db");
5665        let writer = WriterActor::start(
5666            db.path(),
5667            Arc::new(SchemaManager::new()),
5668            ProvenanceMode::Warn,
5669            Arc::new(TelemetryCounters::default()),
5670        )
5671        .expect("writer");
5672
5673        let receipt = writer
5674            .submit(WriteRequest {
5675                label: "test".to_owned(),
5676                nodes: vec![NodeInsert {
5677                    row_id: "row-1".to_owned(),
5678                    logical_id: "node-1".to_owned(),
5679                    kind: "Note".to_owned(),
5680                    properties: "{}".to_owned(),
5681                    source_ref: Some("src-1".to_owned()),
5682                    upsert: false,
5683                    chunk_policy: ChunkPolicy::Preserve,
5684                    content_ref: None,
5685                }],
5686                node_retires: vec![],
5687                edges: vec![],
5688                edge_retires: vec![],
5689                chunks: vec![],
5690                runs: vec![RunInsert {
5691                    id: "run-1".to_owned(),
5692                    kind: "session".to_owned(),
5693                    status: "active".to_owned(),
5694                    properties: "{}".to_owned(),
5695                    source_ref: Some("src-1".to_owned()),
5696                    upsert: false,
5697                    supersedes_id: None,
5698                }],
5699                steps: vec![StepInsert {
5700                    id: "step-1".to_owned(),
5701                    run_id: "run-1".to_owned(),
5702                    kind: "llm_call".to_owned(),
5703                    status: "completed".to_owned(),
5704                    properties: "{}".to_owned(),
5705                    source_ref: Some("src-1".to_owned()),
5706                    upsert: false,
5707                    supersedes_id: None,
5708                }],
5709                actions: vec![ActionInsert {
5710                    id: "action-1".to_owned(),
5711                    step_id: "step-1".to_owned(),
5712                    kind: "tool_call".to_owned(),
5713                    status: "completed".to_owned(),
5714                    properties: "{}".to_owned(),
5715                    source_ref: Some("src-1".to_owned()),
5716                    upsert: false,
5717                    supersedes_id: None,
5718                }],
5719                optional_backfills: vec![],
5720                vec_inserts: vec![],
5721                operational_writes: vec![],
5722            })
5723            .expect("write");
5724
5725        assert!(
5726            receipt.provenance_warnings.is_empty(),
5727            "no warnings expected when all types have source_ref; got: {:?}",
5728            receipt.provenance_warnings
5729        );
5730    }
5731
5732    // --- Item 4 Task 2: ProvenanceMode tests ---
5733
5734    #[test]
5735    fn default_provenance_mode_is_warn() {
5736        // ProvenanceMode::Warn is the Default; a node without source_ref must warn, not error
5737        let db = NamedTempFile::new().expect("temporary db");
5738        let writer = WriterActor::start(
5739            db.path(),
5740            Arc::new(SchemaManager::new()),
5741            ProvenanceMode::default(),
5742            Arc::new(TelemetryCounters::default()),
5743        )
5744        .expect("writer");
5745
5746        let receipt = writer
5747            .submit(WriteRequest {
5748                label: "test".to_owned(),
5749                nodes: vec![NodeInsert {
5750                    row_id: "row-1".to_owned(),
5751                    logical_id: "node-1".to_owned(),
5752                    kind: "Note".to_owned(),
5753                    properties: "{}".to_owned(),
5754                    source_ref: None,
5755                    upsert: false,
5756                    chunk_policy: ChunkPolicy::Preserve,
5757                    content_ref: None,
5758                }],
5759                node_retires: vec![],
5760                edges: vec![],
5761                edge_retires: vec![],
5762                chunks: vec![],
5763                runs: vec![],
5764                steps: vec![],
5765                actions: vec![],
5766                optional_backfills: vec![],
5767                vec_inserts: vec![],
5768                operational_writes: vec![],
5769            })
5770            .expect("Warn mode must not reject missing source_ref");
5771
5772        assert!(
5773            !receipt.provenance_warnings.is_empty(),
5774            "Warn mode must emit a warning instead of rejecting"
5775        );
5776    }
5777
5778    #[test]
5779    fn require_mode_rejects_node_without_source_ref() {
5780        let db = NamedTempFile::new().expect("temporary db");
5781        let writer = WriterActor::start(
5782            db.path(),
5783            Arc::new(SchemaManager::new()),
5784            ProvenanceMode::Require,
5785            Arc::new(TelemetryCounters::default()),
5786        )
5787        .expect("writer");
5788
5789        let result = writer.submit(WriteRequest {
5790            label: "test".to_owned(),
5791            nodes: vec![NodeInsert {
5792                row_id: "row-1".to_owned(),
5793                logical_id: "node-1".to_owned(),
5794                kind: "Note".to_owned(),
5795                properties: "{}".to_owned(),
5796                source_ref: None,
5797                upsert: false,
5798                chunk_policy: ChunkPolicy::Preserve,
5799                content_ref: None,
5800            }],
5801            node_retires: vec![],
5802            edges: vec![],
5803            edge_retires: vec![],
5804            chunks: vec![],
5805            runs: vec![],
5806            steps: vec![],
5807            actions: vec![],
5808            optional_backfills: vec![],
5809            vec_inserts: vec![],
5810            operational_writes: vec![],
5811        });
5812
5813        assert!(
5814            matches!(result, Err(EngineError::InvalidWrite(_))),
5815            "Require mode must reject node without source_ref"
5816        );
5817    }
5818
5819    #[test]
5820    fn require_mode_accepts_node_with_source_ref() {
5821        let db = NamedTempFile::new().expect("temporary db");
5822        let writer = WriterActor::start(
5823            db.path(),
5824            Arc::new(SchemaManager::new()),
5825            ProvenanceMode::Require,
5826            Arc::new(TelemetryCounters::default()),
5827        )
5828        .expect("writer");
5829
5830        let result = writer.submit(WriteRequest {
5831            label: "test".to_owned(),
5832            nodes: vec![NodeInsert {
5833                row_id: "row-1".to_owned(),
5834                logical_id: "node-1".to_owned(),
5835                kind: "Note".to_owned(),
5836                properties: "{}".to_owned(),
5837                source_ref: Some("src-1".to_owned()),
5838                upsert: false,
5839                chunk_policy: ChunkPolicy::Preserve,
5840                content_ref: None,
5841            }],
5842            node_retires: vec![],
5843            edges: vec![],
5844            edge_retires: vec![],
5845            chunks: vec![],
5846            runs: vec![],
5847            steps: vec![],
5848            actions: vec![],
5849            optional_backfills: vec![],
5850            vec_inserts: vec![],
5851            operational_writes: vec![],
5852        });
5853
5854        assert!(
5855            result.is_ok(),
5856            "Require mode must accept node with source_ref"
5857        );
5858    }
5859
5860    #[test]
5861    fn require_mode_rejects_edge_without_source_ref() {
5862        let db = NamedTempFile::new().expect("temporary db");
5863        let writer = WriterActor::start(
5864            db.path(),
5865            Arc::new(SchemaManager::new()),
5866            ProvenanceMode::Require,
5867            Arc::new(TelemetryCounters::default()),
5868        )
5869        .expect("writer");
5870
5871        // seed nodes first so FK check doesn't interfere (Require mode rejects before DB touch)
5872        let result = writer.submit(WriteRequest {
5873            label: "test".to_owned(),
5874            nodes: vec![
5875                NodeInsert {
5876                    row_id: "row-a".to_owned(),
5877                    logical_id: "node-a".to_owned(),
5878                    kind: "Note".to_owned(),
5879                    properties: "{}".to_owned(),
5880                    source_ref: Some("src-1".to_owned()),
5881                    upsert: false,
5882                    chunk_policy: ChunkPolicy::Preserve,
5883                    content_ref: None,
5884                },
5885                NodeInsert {
5886                    row_id: "row-b".to_owned(),
5887                    logical_id: "node-b".to_owned(),
5888                    kind: "Note".to_owned(),
5889                    properties: "{}".to_owned(),
5890                    source_ref: Some("src-1".to_owned()),
5891                    upsert: false,
5892                    chunk_policy: ChunkPolicy::Preserve,
5893                    content_ref: None,
5894                },
5895            ],
5896            node_retires: vec![],
5897            edges: vec![EdgeInsert {
5898                row_id: "edge-row-1".to_owned(),
5899                logical_id: "edge-1".to_owned(),
5900                source_logical_id: "node-a".to_owned(),
5901                target_logical_id: "node-b".to_owned(),
5902                kind: "LINKS_TO".to_owned(),
5903                properties: "{}".to_owned(),
5904                source_ref: None,
5905                upsert: false,
5906            }],
5907            edge_retires: vec![],
5908            chunks: vec![],
5909            runs: vec![],
5910            steps: vec![],
5911            actions: vec![],
5912            optional_backfills: vec![],
5913            vec_inserts: vec![],
5914            operational_writes: vec![],
5915        });
5916
5917        assert!(
5918            matches!(result, Err(EngineError::InvalidWrite(_))),
5919            "Require mode must reject edge without source_ref"
5920        );
5921    }
5922
5923    // --- Item 5: FTS projection coverage tests ---
5924
5925    #[test]
5926    fn fts_row_has_correct_kind_from_co_submitted_node() {
5927        let db = NamedTempFile::new().expect("temporary db");
5928        let writer = WriterActor::start(
5929            db.path(),
5930            Arc::new(SchemaManager::new()),
5931            ProvenanceMode::Warn,
5932            Arc::new(TelemetryCounters::default()),
5933        )
5934        .expect("writer");
5935
5936        writer
5937            .submit(WriteRequest {
5938                label: "test".to_owned(),
5939                nodes: vec![NodeInsert {
5940                    row_id: "row-1".to_owned(),
5941                    logical_id: "node-1".to_owned(),
5942                    kind: "Meeting".to_owned(),
5943                    properties: "{}".to_owned(),
5944                    source_ref: Some("src-1".to_owned()),
5945                    upsert: false,
5946                    chunk_policy: ChunkPolicy::Preserve,
5947                    content_ref: None,
5948                }],
5949                node_retires: vec![],
5950                edges: vec![],
5951                edge_retires: vec![],
5952                chunks: vec![ChunkInsert {
5953                    id: "chunk-1".to_owned(),
5954                    node_logical_id: "node-1".to_owned(),
5955                    text_content: "some text".to_owned(),
5956                    byte_start: None,
5957                    byte_end: None,
5958                    content_hash: None,
5959                }],
5960                runs: vec![],
5961                steps: vec![],
5962                actions: vec![],
5963                optional_backfills: vec![],
5964                vec_inserts: vec![],
5965                operational_writes: vec![],
5966            })
5967            .expect("write");
5968
5969        let conn = rusqlite::Connection::open(db.path()).expect("conn");
5970        let kind: String = conn
5971            .query_row(
5972                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
5973                [],
5974                |row| row.get(0),
5975            )
5976            .expect("fts row");
5977
5978        assert_eq!(kind, "Meeting");
5979    }
5980
5981    #[test]
5982    fn fts_row_has_correct_text_content() {
5983        let db = NamedTempFile::new().expect("temporary db");
5984        let writer = WriterActor::start(
5985            db.path(),
5986            Arc::new(SchemaManager::new()),
5987            ProvenanceMode::Warn,
5988            Arc::new(TelemetryCounters::default()),
5989        )
5990        .expect("writer");
5991
5992        writer
5993            .submit(WriteRequest {
5994                label: "test".to_owned(),
5995                nodes: vec![NodeInsert {
5996                    row_id: "row-1".to_owned(),
5997                    logical_id: "node-1".to_owned(),
5998                    kind: "Note".to_owned(),
5999                    properties: "{}".to_owned(),
6000                    source_ref: Some("src-1".to_owned()),
6001                    upsert: false,
6002                    chunk_policy: ChunkPolicy::Preserve,
6003                    content_ref: None,
6004                }],
6005                node_retires: vec![],
6006                edges: vec![],
6007                edge_retires: vec![],
6008                chunks: vec![ChunkInsert {
6009                    id: "chunk-1".to_owned(),
6010                    node_logical_id: "node-1".to_owned(),
6011                    text_content: "exactly this text".to_owned(),
6012                    byte_start: None,
6013                    byte_end: None,
6014                    content_hash: None,
6015                }],
6016                runs: vec![],
6017                steps: vec![],
6018                actions: vec![],
6019                optional_backfills: vec![],
6020                vec_inserts: vec![],
6021                operational_writes: vec![],
6022            })
6023            .expect("write");
6024
6025        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6026        let text: String = conn
6027            .query_row(
6028                "SELECT text_content FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6029                [],
6030                |row| row.get(0),
6031            )
6032            .expect("fts row");
6033
6034        assert_eq!(text, "exactly this text");
6035    }
6036
6037    #[test]
6038    fn fts_row_has_correct_kind_from_pre_existing_node() {
6039        let db = NamedTempFile::new().expect("temporary db");
6040        let writer = WriterActor::start(
6041            db.path(),
6042            Arc::new(SchemaManager::new()),
6043            ProvenanceMode::Warn,
6044            Arc::new(TelemetryCounters::default()),
6045        )
6046        .expect("writer");
6047
6048        // Request 1: node only
6049        writer
6050            .submit(WriteRequest {
6051                label: "r1".to_owned(),
6052                nodes: vec![NodeInsert {
6053                    row_id: "row-1".to_owned(),
6054                    logical_id: "node-1".to_owned(),
6055                    kind: "Document".to_owned(),
6056                    properties: "{}".to_owned(),
6057                    source_ref: Some("src-1".to_owned()),
6058                    upsert: false,
6059                    chunk_policy: ChunkPolicy::Preserve,
6060                    content_ref: None,
6061                }],
6062                node_retires: vec![],
6063                edges: vec![],
6064                edge_retires: vec![],
6065                chunks: vec![],
6066                runs: vec![],
6067                steps: vec![],
6068                actions: vec![],
6069                optional_backfills: vec![],
6070                vec_inserts: vec![],
6071                operational_writes: vec![],
6072            })
6073            .expect("r1 write");
6074
6075        // Request 2: chunk for pre-existing node
6076        writer
6077            .submit(WriteRequest {
6078                label: "r2".to_owned(),
6079                nodes: vec![],
6080                node_retires: vec![],
6081                edges: vec![],
6082                edge_retires: vec![],
6083                chunks: vec![ChunkInsert {
6084                    id: "chunk-1".to_owned(),
6085                    node_logical_id: "node-1".to_owned(),
6086                    text_content: "some text".to_owned(),
6087                    byte_start: None,
6088                    byte_end: None,
6089                    content_hash: None,
6090                }],
6091                runs: vec![],
6092                steps: vec![],
6093                actions: vec![],
6094                optional_backfills: vec![],
6095                vec_inserts: vec![],
6096                operational_writes: vec![],
6097            })
6098            .expect("r2 write");
6099
6100        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6101        let kind: String = conn
6102            .query_row(
6103                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-1'",
6104                [],
6105                |row| row.get(0),
6106            )
6107            .expect("fts row");
6108
6109        assert_eq!(kind, "Document");
6110    }
6111
6112    #[test]
6113    fn fts_derives_rows_for_multiple_chunks_per_node() {
6114        let db = NamedTempFile::new().expect("temporary db");
6115        let writer = WriterActor::start(
6116            db.path(),
6117            Arc::new(SchemaManager::new()),
6118            ProvenanceMode::Warn,
6119            Arc::new(TelemetryCounters::default()),
6120        )
6121        .expect("writer");
6122
6123        writer
6124            .submit(WriteRequest {
6125                label: "test".to_owned(),
6126                nodes: vec![NodeInsert {
6127                    row_id: "row-1".to_owned(),
6128                    logical_id: "node-1".to_owned(),
6129                    kind: "Meeting".to_owned(),
6130                    properties: "{}".to_owned(),
6131                    source_ref: Some("src-1".to_owned()),
6132                    upsert: false,
6133                    chunk_policy: ChunkPolicy::Preserve,
6134                    content_ref: None,
6135                }],
6136                node_retires: vec![],
6137                edges: vec![],
6138                edge_retires: vec![],
6139                chunks: vec![
6140                    ChunkInsert {
6141                        id: "chunk-a".to_owned(),
6142                        node_logical_id: "node-1".to_owned(),
6143                        text_content: "intro".to_owned(),
6144                        byte_start: None,
6145                        byte_end: None,
6146                        content_hash: None,
6147                    },
6148                    ChunkInsert {
6149                        id: "chunk-b".to_owned(),
6150                        node_logical_id: "node-1".to_owned(),
6151                        text_content: "body".to_owned(),
6152                        byte_start: None,
6153                        byte_end: None,
6154                        content_hash: None,
6155                    },
6156                    ChunkInsert {
6157                        id: "chunk-c".to_owned(),
6158                        node_logical_id: "node-1".to_owned(),
6159                        text_content: "conclusion".to_owned(),
6160                        byte_start: None,
6161                        byte_end: None,
6162                        content_hash: None,
6163                    },
6164                ],
6165                runs: vec![],
6166                steps: vec![],
6167                actions: vec![],
6168                optional_backfills: vec![],
6169                vec_inserts: vec![],
6170                operational_writes: vec![],
6171            })
6172            .expect("write");
6173
6174        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6175        let count: i64 = conn
6176            .query_row(
6177                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6178                [],
6179                |row| row.get(0),
6180            )
6181            .expect("fts count");
6182
6183        assert_eq!(count, 3, "three chunks must produce three FTS rows");
6184    }
6185
6186    #[test]
6187    fn fts_resolves_mixed_fast_and_db_paths() {
6188        let db = NamedTempFile::new().expect("temporary db");
6189        let writer = WriterActor::start(
6190            db.path(),
6191            Arc::new(SchemaManager::new()),
6192            ProvenanceMode::Warn,
6193            Arc::new(TelemetryCounters::default()),
6194        )
6195        .expect("writer");
6196
6197        // Seed pre-existing node
6198        writer
6199            .submit(WriteRequest {
6200                label: "seed".to_owned(),
6201                nodes: vec![NodeInsert {
6202                    row_id: "row-existing".to_owned(),
6203                    logical_id: "existing-node".to_owned(),
6204                    kind: "Archive".to_owned(),
6205                    properties: "{}".to_owned(),
6206                    source_ref: Some("src-1".to_owned()),
6207                    upsert: false,
6208                    chunk_policy: ChunkPolicy::Preserve,
6209                    content_ref: None,
6210                }],
6211                node_retires: vec![],
6212                edges: vec![],
6213                edge_retires: vec![],
6214                chunks: vec![],
6215                runs: vec![],
6216                steps: vec![],
6217                actions: vec![],
6218                optional_backfills: vec![],
6219                vec_inserts: vec![],
6220                operational_writes: vec![],
6221            })
6222            .expect("seed");
6223
6224        // Mixed request: new node (fast path) + chunk for pre-existing node (DB path)
6225        writer
6226            .submit(WriteRequest {
6227                label: "mixed".to_owned(),
6228                nodes: vec![NodeInsert {
6229                    row_id: "row-new".to_owned(),
6230                    logical_id: "new-node".to_owned(),
6231                    kind: "Inbox".to_owned(),
6232                    properties: "{}".to_owned(),
6233                    source_ref: Some("src-2".to_owned()),
6234                    upsert: false,
6235                    chunk_policy: ChunkPolicy::Preserve,
6236                    content_ref: None,
6237                }],
6238                node_retires: vec![],
6239                edges: vec![],
6240                edge_retires: vec![],
6241                chunks: vec![
6242                    ChunkInsert {
6243                        id: "chunk-fast".to_owned(),
6244                        node_logical_id: "new-node".to_owned(),
6245                        text_content: "new content".to_owned(),
6246                        byte_start: None,
6247                        byte_end: None,
6248                        content_hash: None,
6249                    },
6250                    ChunkInsert {
6251                        id: "chunk-db".to_owned(),
6252                        node_logical_id: "existing-node".to_owned(),
6253                        text_content: "archive content".to_owned(),
6254                        byte_start: None,
6255                        byte_end: None,
6256                        content_hash: None,
6257                    },
6258                ],
6259                runs: vec![],
6260                steps: vec![],
6261                actions: vec![],
6262                optional_backfills: vec![],
6263                vec_inserts: vec![],
6264                operational_writes: vec![],
6265            })
6266            .expect("mixed write");
6267
6268        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6269        let fast_kind: String = conn
6270            .query_row(
6271                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-fast'",
6272                [],
6273                |row| row.get(0),
6274            )
6275            .expect("fast path fts row");
6276        let db_kind: String = conn
6277            .query_row(
6278                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-db'",
6279                [],
6280                |row| row.get(0),
6281            )
6282            .expect("db path fts row");
6283
6284        assert_eq!(fast_kind, "Inbox");
6285        assert_eq!(db_kind, "Archive");
6286    }
6287
6288    #[test]
6289    fn prepare_write_rejects_empty_chunk_text() {
6290        let db = NamedTempFile::new().expect("temporary db");
6291        let writer = WriterActor::start(
6292            db.path(),
6293            Arc::new(SchemaManager::new()),
6294            ProvenanceMode::Warn,
6295            Arc::new(TelemetryCounters::default()),
6296        )
6297        .expect("writer");
6298
6299        let result = writer.submit(WriteRequest {
6300            label: "test".to_owned(),
6301            nodes: vec![NodeInsert {
6302                row_id: "row-1".to_owned(),
6303                logical_id: "node-1".to_owned(),
6304                kind: "Note".to_owned(),
6305                properties: "{}".to_owned(),
6306                source_ref: None,
6307                upsert: false,
6308                chunk_policy: ChunkPolicy::Preserve,
6309                content_ref: None,
6310            }],
6311            node_retires: vec![],
6312            edges: vec![],
6313            edge_retires: vec![],
6314            chunks: vec![ChunkInsert {
6315                id: "chunk-1".to_owned(),
6316                node_logical_id: "node-1".to_owned(),
6317                text_content: String::new(),
6318                byte_start: None,
6319                byte_end: None,
6320                content_hash: None,
6321            }],
6322            runs: vec![],
6323            steps: vec![],
6324            actions: vec![],
6325            optional_backfills: vec![],
6326            vec_inserts: vec![],
6327            operational_writes: vec![],
6328        });
6329
6330        assert!(
6331            matches!(result, Err(EngineError::InvalidWrite(_))),
6332            "empty text_content must be rejected"
6333        );
6334    }
6335
6336    #[test]
6337    fn receipt_reports_zero_backfills_when_none_submitted() {
6338        let db = NamedTempFile::new().expect("temporary db");
6339        let writer = WriterActor::start(
6340            db.path(),
6341            Arc::new(SchemaManager::new()),
6342            ProvenanceMode::Warn,
6343            Arc::new(TelemetryCounters::default()),
6344        )
6345        .expect("writer");
6346
6347        let receipt = writer
6348            .submit(WriteRequest {
6349                label: "test".to_owned(),
6350                nodes: vec![NodeInsert {
6351                    row_id: "row-1".to_owned(),
6352                    logical_id: "node-1".to_owned(),
6353                    kind: "Note".to_owned(),
6354                    properties: "{}".to_owned(),
6355                    source_ref: Some("src-1".to_owned()),
6356                    upsert: false,
6357                    chunk_policy: ChunkPolicy::Preserve,
6358                    content_ref: None,
6359                }],
6360                node_retires: vec![],
6361                edges: vec![],
6362                edge_retires: vec![],
6363                chunks: vec![],
6364                runs: vec![],
6365                steps: vec![],
6366                actions: vec![],
6367                optional_backfills: vec![],
6368                vec_inserts: vec![],
6369                operational_writes: vec![],
6370            })
6371            .expect("write");
6372
6373        assert_eq!(receipt.optional_backfill_count, 0);
6374    }
6375
6376    #[test]
6377    fn receipt_reports_correct_backfill_count() {
6378        let db = NamedTempFile::new().expect("temporary db");
6379        let writer = WriterActor::start(
6380            db.path(),
6381            Arc::new(SchemaManager::new()),
6382            ProvenanceMode::Warn,
6383            Arc::new(TelemetryCounters::default()),
6384        )
6385        .expect("writer");
6386
6387        let receipt = writer
6388            .submit(WriteRequest {
6389                label: "test".to_owned(),
6390                nodes: vec![NodeInsert {
6391                    row_id: "row-1".to_owned(),
6392                    logical_id: "node-1".to_owned(),
6393                    kind: "Note".to_owned(),
6394                    properties: "{}".to_owned(),
6395                    source_ref: Some("src-1".to_owned()),
6396                    upsert: false,
6397                    chunk_policy: ChunkPolicy::Preserve,
6398                    content_ref: None,
6399                }],
6400                node_retires: vec![],
6401                edges: vec![],
6402                edge_retires: vec![],
6403                chunks: vec![],
6404                runs: vec![],
6405                steps: vec![],
6406                actions: vec![],
6407                optional_backfills: vec![
6408                    OptionalProjectionTask {
6409                        target: ProjectionTarget::Fts,
6410                        payload: "p1".to_owned(),
6411                    },
6412                    OptionalProjectionTask {
6413                        target: ProjectionTarget::Vec,
6414                        payload: "p2".to_owned(),
6415                    },
6416                    OptionalProjectionTask {
6417                        target: ProjectionTarget::All,
6418                        payload: "p3".to_owned(),
6419                    },
6420                ],
6421                vec_inserts: vec![],
6422                operational_writes: vec![],
6423            })
6424            .expect("write");
6425
6426        assert_eq!(receipt.optional_backfill_count, 3);
6427    }
6428
6429    #[test]
6430    fn backfill_tasks_are_not_executed_during_write() {
6431        let db = NamedTempFile::new().expect("temporary db");
6432        let writer = WriterActor::start(
6433            db.path(),
6434            Arc::new(SchemaManager::new()),
6435            ProvenanceMode::Warn,
6436            Arc::new(TelemetryCounters::default()),
6437        )
6438        .expect("writer");
6439
6440        // Write a node + chunk. Submit a backfill task targeting FTS.
6441        // The write path must not create any extra FTS rows beyond the required one.
6442        writer
6443            .submit(WriteRequest {
6444                label: "test".to_owned(),
6445                nodes: vec![NodeInsert {
6446                    row_id: "row-1".to_owned(),
6447                    logical_id: "node-1".to_owned(),
6448                    kind: "Note".to_owned(),
6449                    properties: "{}".to_owned(),
6450                    source_ref: Some("src-1".to_owned()),
6451                    upsert: false,
6452                    chunk_policy: ChunkPolicy::Preserve,
6453                    content_ref: None,
6454                }],
6455                node_retires: vec![],
6456                edges: vec![],
6457                edge_retires: vec![],
6458                chunks: vec![ChunkInsert {
6459                    id: "chunk-1".to_owned(),
6460                    node_logical_id: "node-1".to_owned(),
6461                    text_content: "required text".to_owned(),
6462                    byte_start: None,
6463                    byte_end: None,
6464                    content_hash: None,
6465                }],
6466                runs: vec![],
6467                steps: vec![],
6468                actions: vec![],
6469                optional_backfills: vec![OptionalProjectionTask {
6470                    target: ProjectionTarget::Fts,
6471                    payload: "backfill-payload".to_owned(),
6472                }],
6473                vec_inserts: vec![],
6474                operational_writes: vec![],
6475            })
6476            .expect("write");
6477
6478        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6479        let count: i64 = conn
6480            .query_row(
6481                "SELECT COUNT(*) FROM fts_nodes WHERE node_logical_id = 'node-1'",
6482                [],
6483                |row| row.get(0),
6484            )
6485            .expect("fts count");
6486
6487        assert_eq!(count, 1, "backfill task must not create extra FTS rows");
6488    }
6489
6490    #[test]
6491    fn fts_row_uses_new_kind_after_node_replace() {
6492        let db = NamedTempFile::new().expect("temporary db");
6493        let writer = WriterActor::start(
6494            db.path(),
6495            Arc::new(SchemaManager::new()),
6496            ProvenanceMode::Warn,
6497            Arc::new(TelemetryCounters::default()),
6498        )
6499        .expect("writer");
6500
6501        // Write original node as "Note"
6502        writer
6503            .submit(WriteRequest {
6504                label: "v1".to_owned(),
6505                nodes: vec![NodeInsert {
6506                    row_id: "row-1".to_owned(),
6507                    logical_id: "node-1".to_owned(),
6508                    kind: "Note".to_owned(),
6509                    properties: "{}".to_owned(),
6510                    source_ref: Some("src-1".to_owned()),
6511                    upsert: false,
6512                    chunk_policy: ChunkPolicy::Preserve,
6513                    content_ref: None,
6514                }],
6515                node_retires: vec![],
6516                edges: vec![],
6517                edge_retires: vec![],
6518                chunks: vec![ChunkInsert {
6519                    id: "chunk-v1".to_owned(),
6520                    node_logical_id: "node-1".to_owned(),
6521                    text_content: "original".to_owned(),
6522                    byte_start: None,
6523                    byte_end: None,
6524                    content_hash: None,
6525                }],
6526                runs: vec![],
6527                steps: vec![],
6528                actions: vec![],
6529                optional_backfills: vec![],
6530                vec_inserts: vec![],
6531                operational_writes: vec![],
6532            })
6533            .expect("v1 write");
6534
6535        // Replace with "Meeting" kind + new chunk using ChunkPolicy::Replace
6536        writer
6537            .submit(WriteRequest {
6538                label: "v2".to_owned(),
6539                nodes: vec![NodeInsert {
6540                    row_id: "row-2".to_owned(),
6541                    logical_id: "node-1".to_owned(),
6542                    kind: "Meeting".to_owned(),
6543                    properties: "{}".to_owned(),
6544                    source_ref: Some("src-2".to_owned()),
6545                    upsert: true,
6546                    chunk_policy: ChunkPolicy::Replace,
6547                    content_ref: None,
6548                }],
6549                node_retires: vec![],
6550                edges: vec![],
6551                edge_retires: vec![],
6552                chunks: vec![ChunkInsert {
6553                    id: "chunk-v2".to_owned(),
6554                    node_logical_id: "node-1".to_owned(),
6555                    text_content: "updated".to_owned(),
6556                    byte_start: None,
6557                    byte_end: None,
6558                    content_hash: None,
6559                }],
6560                runs: vec![],
6561                steps: vec![],
6562                actions: vec![],
6563                optional_backfills: vec![],
6564                vec_inserts: vec![],
6565                operational_writes: vec![],
6566            })
6567            .expect("v2 write");
6568
6569        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6570
6571        // Old FTS row must be gone
6572        let old_count: i64 = conn
6573            .query_row(
6574                "SELECT COUNT(*) FROM fts_nodes WHERE chunk_id = 'chunk-v1'",
6575                [],
6576                |row| row.get(0),
6577            )
6578            .expect("old fts count");
6579        assert_eq!(old_count, 0, "ChunkPolicy::Replace must remove old FTS row");
6580
6581        // New FTS row must use the new kind
6582        let new_kind: String = conn
6583            .query_row(
6584                "SELECT kind FROM fts_nodes WHERE chunk_id = 'chunk-v2'",
6585                [],
6586                |row| row.get(0),
6587            )
6588            .expect("new fts row");
6589        assert_eq!(new_kind, "Meeting", "FTS row must use updated node kind");
6590    }
6591
6592    // --- Item 3: VecInsert tests ---
6593
6594    #[test]
6595    fn vec_insert_empty_chunk_id_is_rejected() {
6596        let db = NamedTempFile::new().expect("temporary db");
6597        let writer = WriterActor::start(
6598            db.path(),
6599            Arc::new(SchemaManager::new()),
6600            ProvenanceMode::Warn,
6601            Arc::new(TelemetryCounters::default()),
6602        )
6603        .expect("writer");
6604        let result = writer.submit(WriteRequest {
6605            label: "vec-test".to_owned(),
6606            nodes: vec![],
6607            node_retires: vec![],
6608            edges: vec![],
6609            edge_retires: vec![],
6610            chunks: vec![],
6611            runs: vec![],
6612            steps: vec![],
6613            actions: vec![],
6614            optional_backfills: vec![],
6615            vec_inserts: vec![VecInsert {
6616                chunk_id: String::new(),
6617                embedding: vec![0.1, 0.2, 0.3],
6618            }],
6619            operational_writes: vec![],
6620        });
6621        assert!(
6622            matches!(result, Err(EngineError::InvalidWrite(_))),
6623            "empty chunk_id in VecInsert must be rejected"
6624        );
6625    }
6626
6627    #[test]
6628    fn vec_insert_empty_embedding_is_rejected() {
6629        let db = NamedTempFile::new().expect("temporary db");
6630        let writer = WriterActor::start(
6631            db.path(),
6632            Arc::new(SchemaManager::new()),
6633            ProvenanceMode::Warn,
6634            Arc::new(TelemetryCounters::default()),
6635        )
6636        .expect("writer");
6637        let result = writer.submit(WriteRequest {
6638            label: "vec-test".to_owned(),
6639            nodes: vec![],
6640            node_retires: vec![],
6641            edges: vec![],
6642            edge_retires: vec![],
6643            chunks: vec![],
6644            runs: vec![],
6645            steps: vec![],
6646            actions: vec![],
6647            optional_backfills: vec![],
6648            vec_inserts: vec![VecInsert {
6649                chunk_id: "chunk-1".to_owned(),
6650                embedding: vec![],
6651            }],
6652            operational_writes: vec![],
6653        });
6654        assert!(
6655            matches!(result, Err(EngineError::InvalidWrite(_))),
6656            "empty embedding in VecInsert must be rejected"
6657        );
6658    }
6659
6660    #[test]
6661    fn vec_insert_noop_without_feature() {
6662        // Without the sqlite-vec feature, a well-formed VecInsert must succeed
6663        // (no error) but not write to any vec table.
6664        let db = NamedTempFile::new().expect("temporary db");
6665        let writer = WriterActor::start(
6666            db.path(),
6667            Arc::new(SchemaManager::new()),
6668            ProvenanceMode::Warn,
6669            Arc::new(TelemetryCounters::default()),
6670        )
6671        .expect("writer");
6672        let result = writer.submit(WriteRequest {
6673            label: "vec-noop".to_owned(),
6674            nodes: vec![],
6675            node_retires: vec![],
6676            edges: vec![],
6677            edge_retires: vec![],
6678            chunks: vec![],
6679            runs: vec![],
6680            steps: vec![],
6681            actions: vec![],
6682            optional_backfills: vec![],
6683            vec_inserts: vec![VecInsert {
6684                chunk_id: "chunk-noop".to_owned(),
6685                embedding: vec![1.0, 2.0, 3.0],
6686            }],
6687            operational_writes: vec![],
6688        });
6689        #[cfg(not(feature = "sqlite-vec"))]
6690        result.expect("noop VecInsert without feature must succeed");
6691        // The result variable is used above; silence unused warning for cfg-on path.
6692        #[cfg(feature = "sqlite-vec")]
6693        let _ = result;
6694    }
6695
6696    #[cfg(feature = "sqlite-vec")]
6697    #[test]
6698    fn node_retire_preserves_vec_rows_for_later_restore() {
6699        use crate::sqlite::open_connection_with_vec;
6700
6701        let db = NamedTempFile::new().expect("temporary db");
6702        let schema_manager = Arc::new(SchemaManager::new());
6703
6704        {
6705            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6706            schema_manager.bootstrap(&conn).expect("bootstrap");
6707        }
6708
6709        let writer = WriterActor::start(
6710            db.path(),
6711            Arc::clone(&schema_manager),
6712            ProvenanceMode::Warn,
6713            Arc::new(TelemetryCounters::default()),
6714        )
6715        .expect("writer");
6716
6717        // Insert node + chunk + vec row
6718        writer
6719            .submit(WriteRequest {
6720                label: "setup".to_owned(),
6721                nodes: vec![NodeInsert {
6722                    row_id: "row-retire-vec".to_owned(),
6723                    logical_id: "node-retire-vec".to_owned(),
6724                    kind: "Doc".to_owned(),
6725                    properties: "{}".to_owned(),
6726                    source_ref: Some("src".to_owned()),
6727                    upsert: false,
6728                    chunk_policy: ChunkPolicy::Preserve,
6729                    content_ref: None,
6730                }],
6731                node_retires: vec![],
6732                edges: vec![],
6733                edge_retires: vec![],
6734                chunks: vec![ChunkInsert {
6735                    id: "chunk-retire-vec".to_owned(),
6736                    node_logical_id: "node-retire-vec".to_owned(),
6737                    text_content: "text".to_owned(),
6738                    byte_start: None,
6739                    byte_end: None,
6740                    content_hash: None,
6741                }],
6742                runs: vec![],
6743                steps: vec![],
6744                actions: vec![],
6745                optional_backfills: vec![],
6746                vec_inserts: vec![VecInsert {
6747                    chunk_id: "chunk-retire-vec".to_owned(),
6748                    embedding: vec![0.1, 0.2, 0.3],
6749                }],
6750                operational_writes: vec![],
6751            })
6752            .expect("setup write");
6753
6754        // Retire the node
6755        writer
6756            .submit(WriteRequest {
6757                label: "retire".to_owned(),
6758                nodes: vec![],
6759                node_retires: vec![NodeRetire {
6760                    logical_id: "node-retire-vec".to_owned(),
6761                    source_ref: Some("src".to_owned()),
6762                }],
6763                edges: vec![],
6764                edge_retires: vec![],
6765                chunks: vec![],
6766                runs: vec![],
6767                steps: vec![],
6768                actions: vec![],
6769                optional_backfills: vec![],
6770                vec_inserts: vec![],
6771                operational_writes: vec![],
6772            })
6773            .expect("retire write");
6774
6775        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6776        let count: i64 = conn
6777            .query_row(
6778                "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-retire-vec'",
6779                [],
6780                |row| row.get(0),
6781            )
6782            .expect("count");
6783        assert_eq!(
6784            count, 1,
6785            "vec rows must remain available while the node is retired so restore can re-establish vector behavior"
6786        );
6787    }
6788
6789    #[cfg(feature = "sqlite-vec")]
6790    #[test]
6791    #[allow(clippy::too_many_lines)]
6792    fn vec_cleanup_on_chunk_replace_removes_old_vec_rows() {
6793        use crate::sqlite::open_connection_with_vec;
6794
6795        let db = NamedTempFile::new().expect("temporary db");
6796        let schema_manager = Arc::new(SchemaManager::new());
6797
6798        {
6799            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6800            schema_manager.bootstrap(&conn).expect("bootstrap");
6801        }
6802
6803        let writer = WriterActor::start(
6804            db.path(),
6805            Arc::clone(&schema_manager),
6806            ProvenanceMode::Warn,
6807            Arc::new(TelemetryCounters::default()),
6808        )
6809        .expect("writer");
6810
6811        // Insert node + chunk-A + vec-A
6812        writer
6813            .submit(WriteRequest {
6814                label: "v1".to_owned(),
6815                nodes: vec![NodeInsert {
6816                    row_id: "row-replace-v1".to_owned(),
6817                    logical_id: "node-replace-vec".to_owned(),
6818                    kind: "Doc".to_owned(),
6819                    properties: "{}".to_owned(),
6820                    source_ref: Some("src".to_owned()),
6821                    upsert: false,
6822                    chunk_policy: ChunkPolicy::Preserve,
6823                    content_ref: None,
6824                }],
6825                node_retires: vec![],
6826                edges: vec![],
6827                edge_retires: vec![],
6828                chunks: vec![ChunkInsert {
6829                    id: "chunk-replace-A".to_owned(),
6830                    node_logical_id: "node-replace-vec".to_owned(),
6831                    text_content: "version one".to_owned(),
6832                    byte_start: None,
6833                    byte_end: None,
6834                    content_hash: None,
6835                }],
6836                runs: vec![],
6837                steps: vec![],
6838                actions: vec![],
6839                optional_backfills: vec![],
6840                vec_inserts: vec![VecInsert {
6841                    chunk_id: "chunk-replace-A".to_owned(),
6842                    embedding: vec![0.1, 0.2, 0.3],
6843                }],
6844                operational_writes: vec![],
6845            })
6846            .expect("v1 write");
6847
6848        // Upsert with Replace + chunk-B + vec-B
6849        writer
6850            .submit(WriteRequest {
6851                label: "v2".to_owned(),
6852                nodes: vec![NodeInsert {
6853                    row_id: "row-replace-v2".to_owned(),
6854                    logical_id: "node-replace-vec".to_owned(),
6855                    kind: "Doc".to_owned(),
6856                    properties: "{}".to_owned(),
6857                    source_ref: Some("src".to_owned()),
6858                    upsert: true,
6859                    chunk_policy: ChunkPolicy::Replace,
6860                    content_ref: None,
6861                }],
6862                node_retires: vec![],
6863                edges: vec![],
6864                edge_retires: vec![],
6865                chunks: vec![ChunkInsert {
6866                    id: "chunk-replace-B".to_owned(),
6867                    node_logical_id: "node-replace-vec".to_owned(),
6868                    text_content: "version two".to_owned(),
6869                    byte_start: None,
6870                    byte_end: None,
6871                    content_hash: None,
6872                }],
6873                runs: vec![],
6874                steps: vec![],
6875                actions: vec![],
6876                optional_backfills: vec![],
6877                vec_inserts: vec![VecInsert {
6878                    chunk_id: "chunk-replace-B".to_owned(),
6879                    embedding: vec![0.4, 0.5, 0.6],
6880                }],
6881                operational_writes: vec![],
6882            })
6883            .expect("v2 write");
6884
6885        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6886        let count_a: i64 = conn
6887            .query_row(
6888                "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-A'",
6889                [],
6890                |row| row.get(0),
6891            )
6892            .expect("count A");
6893        let count_b: i64 = conn
6894            .query_row(
6895                "SELECT count(*) FROM vec_doc WHERE chunk_id = 'chunk-replace-B'",
6896                [],
6897                |row| row.get(0),
6898            )
6899            .expect("count B");
6900        assert_eq!(
6901            count_a, 0,
6902            "old vec row (chunk-A) must be deleted on Replace"
6903        );
6904        assert_eq!(
6905            count_b, 1,
6906            "new vec row (chunk-B) must be present after Replace"
6907        );
6908    }
6909
6910    #[cfg(feature = "sqlite-vec")]
6911    #[test]
6912    fn vec_insert_is_persisted_when_feature_enabled() {
6913        use crate::sqlite::open_connection_with_vec;
6914
6915        let db = NamedTempFile::new().expect("temporary db");
6916        let schema_manager = Arc::new(SchemaManager::new());
6917
6918        // Bootstrap the schema (vec table is auto-created on first VecInsert).
6919        {
6920            let conn = open_connection_with_vec(db.path()).expect("vec connection");
6921            schema_manager.bootstrap(&conn).expect("bootstrap");
6922        }
6923
6924        let writer = WriterActor::start(
6925            db.path(),
6926            Arc::clone(&schema_manager),
6927            ProvenanceMode::Warn,
6928            Arc::new(TelemetryCounters::default()),
6929        )
6930        .expect("writer");
6931
6932        // Submit a node + chunk + vec insert so kind can be resolved from the request.
6933        writer
6934            .submit(WriteRequest {
6935                label: "vec-insert".to_owned(),
6936                nodes: vec![NodeInsert {
6937                    row_id: "row-vec".to_owned(),
6938                    logical_id: "node-vec".to_owned(),
6939                    kind: "Document".to_owned(),
6940                    properties: "{}".to_owned(),
6941                    source_ref: Some("test".to_owned()),
6942                    upsert: false,
6943                    chunk_policy: ChunkPolicy::Preserve,
6944                    content_ref: None,
6945                }],
6946                node_retires: vec![],
6947                edges: vec![],
6948                edge_retires: vec![],
6949                chunks: vec![ChunkInsert {
6950                    id: "chunk-vec".to_owned(),
6951                    node_logical_id: "node-vec".to_owned(),
6952                    text_content: "test text".to_owned(),
6953                    byte_start: None,
6954                    byte_end: None,
6955                    content_hash: None,
6956                }],
6957                runs: vec![],
6958                steps: vec![],
6959                actions: vec![],
6960                optional_backfills: vec![],
6961                vec_inserts: vec![VecInsert {
6962                    chunk_id: "chunk-vec".to_owned(),
6963                    embedding: vec![0.1, 0.2, 0.3],
6964                }],
6965                operational_writes: vec![],
6966            })
6967            .expect("vec insert write");
6968
6969        let conn = rusqlite::Connection::open(db.path()).expect("conn");
6970        let count: i64 = conn
6971            .query_row(
6972                "SELECT count(*) FROM vec_document WHERE chunk_id = 'chunk-vec'",
6973                [],
6974                |row| row.get(0),
6975            )
6976            .expect("count");
6977        assert_eq!(count, 1, "VecInsert must persist a row in vec_document");
6978    }
6979
6980    // --- WriteRequest size validation tests ---
6981
6982    #[test]
6983    fn write_request_exceeding_node_limit_is_rejected() {
6984        let nodes: Vec<NodeInsert> = (0..10_001)
6985            .map(|i| NodeInsert {
6986                row_id: format!("row-{i}"),
6987                logical_id: format!("lg-{i}"),
6988                kind: "Note".to_owned(),
6989                properties: "{}".to_owned(),
6990                source_ref: None,
6991                upsert: false,
6992                chunk_policy: ChunkPolicy::Preserve,
6993                content_ref: None,
6994            })
6995            .collect();
6996
6997        let request = WriteRequest {
6998            label: "too-many-nodes".to_owned(),
6999            nodes,
7000            node_retires: vec![],
7001            edges: vec![],
7002            edge_retires: vec![],
7003            chunks: vec![],
7004            runs: vec![],
7005            steps: vec![],
7006            actions: vec![],
7007            optional_backfills: vec![],
7008            vec_inserts: vec![],
7009            operational_writes: vec![],
7010        };
7011
7012        let result = prepare_write(request, ProvenanceMode::Warn)
7013            .map(|_| ())
7014            .map_err(|e| format!("{e}"));
7015        assert!(
7016            matches!(result, Err(ref msg) if msg.contains("too many nodes")),
7017            "exceeding node limit must return InvalidWrite: got {result:?}"
7018        );
7019    }
7020
7021    #[test]
7022    fn write_request_exceeding_total_limit_is_rejected() {
7023        // Spread items across fields to exceed 100_000 total
7024        // without exceeding any single per-field limit.
7025        // nodes(10k) + edges(10k) + chunks(50k) + vec_inserts(20001) + operational(10k) = 100_001
7026        let request = WriteRequest {
7027            label: "too-many-total".to_owned(),
7028            nodes: (0..10_000)
7029                .map(|i| NodeInsert {
7030                    row_id: format!("row-{i}"),
7031                    logical_id: format!("lg-{i}"),
7032                    kind: "Note".to_owned(),
7033                    properties: "{}".to_owned(),
7034                    source_ref: None,
7035                    upsert: false,
7036                    chunk_policy: ChunkPolicy::Preserve,
7037                    content_ref: None,
7038                })
7039                .collect(),
7040            node_retires: vec![],
7041            edges: (0..10_000)
7042                .map(|i| EdgeInsert {
7043                    row_id: format!("edge-row-{i}"),
7044                    logical_id: format!("edge-lg-{i}"),
7045                    kind: "link".to_owned(),
7046                    source_logical_id: format!("lg-{i}"),
7047                    target_logical_id: format!("lg-{}", i + 1),
7048                    properties: "{}".to_owned(),
7049                    source_ref: None,
7050                    upsert: false,
7051                })
7052                .collect(),
7053            edge_retires: vec![],
7054            chunks: (0..50_000)
7055                .map(|i| ChunkInsert {
7056                    id: format!("chunk-{i}"),
7057                    node_logical_id: "lg-0".to_owned(),
7058                    text_content: "text".to_owned(),
7059                    byte_start: None,
7060                    byte_end: None,
7061                    content_hash: None,
7062                })
7063                .collect(),
7064            runs: vec![],
7065            steps: vec![],
7066            actions: vec![],
7067            optional_backfills: vec![],
7068            vec_inserts: (0..20_001)
7069                .map(|i| VecInsert {
7070                    chunk_id: format!("vec-chunk-{i}"),
7071                    embedding: vec![0.1],
7072                })
7073                .collect(),
7074            operational_writes: (0..10_000)
7075                .map(|i| OperationalWrite::Append {
7076                    collection: format!("col-{i}"),
7077                    record_key: format!("key-{i}"),
7078                    payload_json: "{}".to_owned(),
7079                    source_ref: None,
7080                })
7081                .collect(),
7082        };
7083
7084        let result = prepare_write(request, ProvenanceMode::Warn)
7085            .map(|_| ())
7086            .map_err(|e| format!("{e}"));
7087        assert!(
7088            matches!(result, Err(ref msg) if msg.contains("too many total items")),
7089            "exceeding total item limit must return InvalidWrite: got {result:?}"
7090        );
7091    }
7092
7093    #[test]
7094    fn write_request_within_limits_succeeds() {
7095        let db = NamedTempFile::new().expect("temporary db");
7096        let writer = WriterActor::start(
7097            db.path(),
7098            Arc::new(SchemaManager::new()),
7099            ProvenanceMode::Warn,
7100            Arc::new(TelemetryCounters::default()),
7101        )
7102        .expect("writer");
7103
7104        let result = writer.submit(WriteRequest {
7105            label: "within-limits".to_owned(),
7106            nodes: vec![NodeInsert {
7107                row_id: "row-1".to_owned(),
7108                logical_id: "lg-1".to_owned(),
7109                kind: "Note".to_owned(),
7110                properties: "{}".to_owned(),
7111                source_ref: None,
7112                upsert: false,
7113                chunk_policy: ChunkPolicy::Preserve,
7114                content_ref: None,
7115            }],
7116            node_retires: vec![],
7117            edges: vec![],
7118            edge_retires: vec![],
7119            chunks: vec![],
7120            runs: vec![],
7121            steps: vec![],
7122            actions: vec![],
7123            optional_backfills: vec![],
7124            vec_inserts: vec![],
7125            operational_writes: vec![],
7126        });
7127
7128        assert!(
7129            result.is_ok(),
7130            "write request within limits must succeed: got {result:?}"
7131        );
7132    }
7133
7134    #[test]
7135    fn property_fts_rows_created_on_node_insert() {
7136        let db = NamedTempFile::new().expect("temporary db");
7137        // Bootstrap and register a property schema before starting the writer.
7138        let schema = Arc::new(SchemaManager::new());
7139        {
7140            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7141            schema.bootstrap(&conn).expect("bootstrap");
7142            conn.execute(
7143                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7144                 VALUES ('Goal', '[\"$.name\", \"$.description\"]', ' ')",
7145                [],
7146            )
7147            .expect("register schema");
7148        }
7149        let writer = WriterActor::start(
7150            db.path(),
7151            Arc::clone(&schema),
7152            ProvenanceMode::Warn,
7153            Arc::new(TelemetryCounters::default()),
7154        )
7155        .expect("writer");
7156
7157        writer
7158            .submit(WriteRequest {
7159                label: "goal-insert".to_owned(),
7160                nodes: vec![NodeInsert {
7161                    row_id: "row-1".to_owned(),
7162                    logical_id: "goal-1".to_owned(),
7163                    kind: "Goal".to_owned(),
7164                    properties: r#"{"name":"Ship v2","description":"Launch the redesign"}"#
7165                        .to_owned(),
7166                    source_ref: Some("src-1".to_owned()),
7167                    upsert: false,
7168                    chunk_policy: ChunkPolicy::Preserve,
7169                    content_ref: None,
7170                }],
7171                node_retires: vec![],
7172                edges: vec![],
7173                edge_retires: vec![],
7174                chunks: vec![],
7175                runs: vec![],
7176                steps: vec![],
7177                actions: vec![],
7178                optional_backfills: vec![],
7179                vec_inserts: vec![],
7180                operational_writes: vec![],
7181            })
7182            .expect("write");
7183
7184        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7185        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7186        let text: String = conn
7187            .query_row(
7188                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7189                [],
7190                |row| row.get(0),
7191            )
7192            .expect("property FTS row must exist");
7193        assert_eq!(text, "Ship v2 Launch the redesign");
7194    }
7195
7196    #[test]
7197    fn property_fts_rows_replaced_on_upsert() {
7198        let db = NamedTempFile::new().expect("temporary db");
7199        let schema = Arc::new(SchemaManager::new());
7200        {
7201            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7202            schema.bootstrap(&conn).expect("bootstrap");
7203            conn.execute(
7204                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7205                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7206                [],
7207            )
7208            .expect("register schema");
7209        }
7210        let writer = WriterActor::start(
7211            db.path(),
7212            Arc::clone(&schema),
7213            ProvenanceMode::Warn,
7214            Arc::new(TelemetryCounters::default()),
7215        )
7216        .expect("writer");
7217
7218        // Initial insert.
7219        writer
7220            .submit(WriteRequest {
7221                label: "insert".to_owned(),
7222                nodes: vec![NodeInsert {
7223                    row_id: "row-1".to_owned(),
7224                    logical_id: "goal-1".to_owned(),
7225                    kind: "Goal".to_owned(),
7226                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7227                    source_ref: Some("src-1".to_owned()),
7228                    upsert: false,
7229                    chunk_policy: ChunkPolicy::Preserve,
7230                    content_ref: None,
7231                }],
7232                node_retires: vec![],
7233                edges: vec![],
7234                edge_retires: vec![],
7235                chunks: vec![],
7236                runs: vec![],
7237                steps: vec![],
7238                actions: vec![],
7239                optional_backfills: vec![],
7240                vec_inserts: vec![],
7241                operational_writes: vec![],
7242            })
7243            .expect("insert");
7244
7245        // Upsert with changed property.
7246        writer
7247            .submit(WriteRequest {
7248                label: "upsert".to_owned(),
7249                nodes: vec![NodeInsert {
7250                    row_id: "row-2".to_owned(),
7251                    logical_id: "goal-1".to_owned(),
7252                    kind: "Goal".to_owned(),
7253                    properties: r#"{"name":"Beta"}"#.to_owned(),
7254                    source_ref: Some("src-2".to_owned()),
7255                    upsert: true,
7256                    chunk_policy: ChunkPolicy::Preserve,
7257                    content_ref: None,
7258                }],
7259                node_retires: vec![],
7260                edges: vec![],
7261                edge_retires: vec![],
7262                chunks: vec![],
7263                runs: vec![],
7264                steps: vec![],
7265                actions: vec![],
7266                optional_backfills: vec![],
7267                vec_inserts: vec![],
7268                operational_writes: vec![],
7269            })
7270            .expect("upsert");
7271
7272        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7273        let goal_table = fathomdb_schema::fts_kind_table_name("Goal");
7274        let count: i64 = conn
7275            .query_row(
7276                &format!("SELECT count(*) FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7277                [],
7278                |row| row.get(0),
7279            )
7280            .expect("count");
7281        assert_eq!(
7282            count, 1,
7283            "must have exactly one property FTS row after upsert"
7284        );
7285
7286        let text: String = conn
7287            .query_row(
7288                &format!("SELECT text_content FROM {goal_table} WHERE node_logical_id = 'goal-1'"),
7289                [],
7290                |row| row.get(0),
7291            )
7292            .expect("text");
7293        assert_eq!(text, "Beta", "property FTS must reflect updated properties");
7294    }
7295
7296    #[test]
7297    fn property_fts_rows_deleted_on_retire() {
7298        let db = NamedTempFile::new().expect("temporary db");
7299        let schema = Arc::new(SchemaManager::new());
7300        {
7301            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7302            schema.bootstrap(&conn).expect("bootstrap");
7303            conn.execute(
7304                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7305                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7306                [],
7307            )
7308            .expect("register schema");
7309        }
7310        let writer = WriterActor::start(
7311            db.path(),
7312            Arc::clone(&schema),
7313            ProvenanceMode::Warn,
7314            Arc::new(TelemetryCounters::default()),
7315        )
7316        .expect("writer");
7317
7318        // Insert a node.
7319        writer
7320            .submit(WriteRequest {
7321                label: "insert".to_owned(),
7322                nodes: vec![NodeInsert {
7323                    row_id: "row-1".to_owned(),
7324                    logical_id: "goal-1".to_owned(),
7325                    kind: "Goal".to_owned(),
7326                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7327                    source_ref: Some("src-1".to_owned()),
7328                    upsert: false,
7329                    chunk_policy: ChunkPolicy::Preserve,
7330                    content_ref: None,
7331                }],
7332                node_retires: vec![],
7333                edges: vec![],
7334                edge_retires: vec![],
7335                chunks: vec![],
7336                runs: vec![],
7337                steps: vec![],
7338                actions: vec![],
7339                optional_backfills: vec![],
7340                vec_inserts: vec![],
7341                operational_writes: vec![],
7342            })
7343            .expect("insert");
7344
7345        // Retire the node.
7346        writer
7347            .submit(WriteRequest {
7348                label: "retire".to_owned(),
7349                nodes: vec![],
7350                node_retires: vec![NodeRetire {
7351                    logical_id: "goal-1".to_owned(),
7352                    source_ref: Some("forget-1".to_owned()),
7353                }],
7354                edges: vec![],
7355                edge_retires: vec![],
7356                chunks: vec![],
7357                runs: vec![],
7358                steps: vec![],
7359                actions: vec![],
7360                optional_backfills: vec![],
7361                vec_inserts: vec![],
7362                operational_writes: vec![],
7363            })
7364            .expect("retire");
7365
7366        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7367        let table = fathomdb_schema::fts_kind_table_name("Goal");
7368        let count: i64 = conn
7369            .query_row(
7370                &format!("SELECT count(*) FROM {table} WHERE node_logical_id = 'goal-1'"),
7371                [],
7372                |row| row.get(0),
7373            )
7374            .expect("count");
7375        assert_eq!(count, 0, "property FTS row must be deleted on retire");
7376    }
7377
7378    #[test]
7379    fn no_property_fts_row_for_unregistered_kind() {
7380        let db = NamedTempFile::new().expect("temporary db");
7381        let schema = Arc::new(SchemaManager::new());
7382        {
7383            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7384            schema.bootstrap(&conn).expect("bootstrap");
7385            // No schema registered for "Note" kind.
7386        }
7387        let writer = WriterActor::start(
7388            db.path(),
7389            Arc::clone(&schema),
7390            ProvenanceMode::Warn,
7391            Arc::new(TelemetryCounters::default()),
7392        )
7393        .expect("writer");
7394
7395        writer
7396            .submit(WriteRequest {
7397                label: "insert".to_owned(),
7398                nodes: vec![NodeInsert {
7399                    row_id: "row-1".to_owned(),
7400                    logical_id: "note-1".to_owned(),
7401                    kind: "Note".to_owned(),
7402                    properties: r#"{"title":"hello"}"#.to_owned(),
7403                    source_ref: Some("src-1".to_owned()),
7404                    upsert: false,
7405                    chunk_policy: ChunkPolicy::Preserve,
7406                    content_ref: None,
7407                }],
7408                node_retires: vec![],
7409                edges: vec![],
7410                edge_retires: vec![],
7411                chunks: vec![],
7412                runs: vec![],
7413                steps: vec![],
7414                actions: vec![],
7415                optional_backfills: vec![],
7416                vec_inserts: vec![],
7417                operational_writes: vec![],
7418            })
7419            .expect("insert");
7420
7421        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7422        let table = fathomdb_schema::fts_kind_table_name("Note");
7423        let exists: bool = conn
7424            .query_row(
7425                "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1",
7426                rusqlite::params![table],
7427                |row| row.get::<_, i64>(0),
7428            )
7429            .map(|c| c > 0)
7430            .expect("exists check");
7431        assert!(
7432            !exists,
7433            "no per-kind FTS table should be created for unregistered kind"
7434        );
7435    }
7436
7437    // --- A-6 per-kind FTS table tests ---
7438
7439    #[test]
7440    fn property_fts_write_goes_to_per_kind_table() {
7441        // After A-6: writes go to fts_props_goal, NOT fts_node_properties.
7442        let db = NamedTempFile::new().expect("temporary db");
7443        let schema = Arc::new(SchemaManager::new());
7444        {
7445            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7446            schema.bootstrap(&conn).expect("bootstrap");
7447            conn.execute(
7448                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7449                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7450                [],
7451            )
7452            .expect("register schema");
7453            // Create per-kind table (migration 21 would do this for existing schemas).
7454            conn.execute_batch(
7455                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7456                    node_logical_id UNINDEXED, text_content, \
7457                    tokenize = 'porter unicode61 remove_diacritics 2'\
7458                )",
7459            )
7460            .expect("create per-kind table");
7461        }
7462        let writer = WriterActor::start(
7463            db.path(),
7464            Arc::clone(&schema),
7465            ProvenanceMode::Warn,
7466            Arc::new(TelemetryCounters::default()),
7467        )
7468        .expect("writer");
7469
7470        writer
7471            .submit(WriteRequest {
7472                label: "insert".to_owned(),
7473                nodes: vec![NodeInsert {
7474                    row_id: "row-1".to_owned(),
7475                    logical_id: "goal-1".to_owned(),
7476                    kind: "Goal".to_owned(),
7477                    properties: r#"{"name":"Ship v2"}"#.to_owned(),
7478                    source_ref: Some("src-1".to_owned()),
7479                    upsert: false,
7480                    chunk_policy: ChunkPolicy::Preserve,
7481                    content_ref: None,
7482                }],
7483                node_retires: vec![],
7484                edges: vec![],
7485                edge_retires: vec![],
7486                chunks: vec![],
7487                runs: vec![],
7488                steps: vec![],
7489                actions: vec![],
7490                optional_backfills: vec![],
7491                vec_inserts: vec![],
7492                operational_writes: vec![],
7493            })
7494            .expect("write");
7495
7496        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7497        // Per-kind table must have the row.
7498        let per_kind_count: i64 = conn
7499            .query_row(
7500                "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7501                [],
7502                |row| row.get(0),
7503            )
7504            .expect("per-kind count");
7505        assert_eq!(
7506            per_kind_count, 1,
7507            "per-kind table fts_props_goal must have the row"
7508        );
7509    }
7510
7511    #[test]
7512    fn property_fts_retire_removes_from_per_kind_table() {
7513        // After A-6: retire removes from fts_props_goal, NOT fts_node_properties.
7514        let db = NamedTempFile::new().expect("temporary db");
7515        let schema = Arc::new(SchemaManager::new());
7516        {
7517            let conn = rusqlite::Connection::open(db.path()).expect("conn");
7518            schema.bootstrap(&conn).expect("bootstrap");
7519            conn.execute(
7520                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
7521                 VALUES ('Goal', '[\"$.name\"]', ' ')",
7522                [],
7523            )
7524            .expect("register schema");
7525            conn.execute_batch(
7526                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_goal USING fts5(\
7527                    node_logical_id UNINDEXED, text_content, \
7528                    tokenize = 'porter unicode61 remove_diacritics 2'\
7529                )",
7530            )
7531            .expect("create per-kind table");
7532        }
7533        let writer = WriterActor::start(
7534            db.path(),
7535            Arc::clone(&schema),
7536            ProvenanceMode::Warn,
7537            Arc::new(TelemetryCounters::default()),
7538        )
7539        .expect("writer");
7540
7541        // Insert.
7542        writer
7543            .submit(WriteRequest {
7544                label: "insert".to_owned(),
7545                nodes: vec![NodeInsert {
7546                    row_id: "row-1".to_owned(),
7547                    logical_id: "goal-1".to_owned(),
7548                    kind: "Goal".to_owned(),
7549                    properties: r#"{"name":"Alpha"}"#.to_owned(),
7550                    source_ref: Some("src-1".to_owned()),
7551                    upsert: false,
7552                    chunk_policy: ChunkPolicy::Preserve,
7553                    content_ref: None,
7554                }],
7555                node_retires: vec![],
7556                edges: vec![],
7557                edge_retires: vec![],
7558                chunks: vec![],
7559                runs: vec![],
7560                steps: vec![],
7561                actions: vec![],
7562                optional_backfills: vec![],
7563                vec_inserts: vec![],
7564                operational_writes: vec![],
7565            })
7566            .expect("insert");
7567
7568        // Retire.
7569        writer
7570            .submit(WriteRequest {
7571                label: "retire".to_owned(),
7572                nodes: vec![],
7573                node_retires: vec![NodeRetire {
7574                    logical_id: "goal-1".to_owned(),
7575                    source_ref: Some("forget-1".to_owned()),
7576                }],
7577                edges: vec![],
7578                edge_retires: vec![],
7579                chunks: vec![],
7580                runs: vec![],
7581                steps: vec![],
7582                actions: vec![],
7583                optional_backfills: vec![],
7584                vec_inserts: vec![],
7585                operational_writes: vec![],
7586            })
7587            .expect("retire");
7588
7589        let conn = rusqlite::Connection::open(db.path()).expect("conn");
7590        let per_kind_count: i64 = conn
7591            .query_row(
7592                "SELECT count(*) FROM fts_props_goal WHERE node_logical_id = 'goal-1'",
7593                [],
7594                |row| row.get(0),
7595            )
7596            .expect("per-kind count");
7597        assert_eq!(
7598            per_kind_count, 0,
7599            "per-kind table row must be removed on retire"
7600        );
7601    }
7602
7603    mod extract_json_path_tests {
7604        use super::super::extract_json_path;
7605        use serde_json::json;
7606
7607        #[test]
7608        fn string_value() {
7609            let v = json!({"name": "alice"});
7610            assert_eq!(extract_json_path(&v, "$.name"), vec!["alice"]);
7611        }
7612
7613        #[test]
7614        fn number_value() {
7615            let v = json!({"age": 42});
7616            assert_eq!(extract_json_path(&v, "$.age"), vec!["42"]);
7617        }
7618
7619        #[test]
7620        fn bool_value() {
7621            let v = json!({"active": true});
7622            assert_eq!(extract_json_path(&v, "$.active"), vec!["true"]);
7623        }
7624
7625        #[test]
7626        fn null_value() {
7627            let v = json!({"x": null});
7628            assert!(extract_json_path(&v, "$.x").is_empty());
7629        }
7630
7631        #[test]
7632        fn missing_path() {
7633            let v = json!({"name": "a"});
7634            assert!(extract_json_path(&v, "$.missing").is_empty());
7635        }
7636
7637        #[test]
7638        fn nested_path() {
7639            let v = json!({"address": {"city": "NYC"}});
7640            assert_eq!(extract_json_path(&v, "$.address.city"), vec!["NYC"]);
7641        }
7642
7643        #[test]
7644        fn array_of_strings() {
7645            let v = json!({"tags": ["a", "b", "c"]});
7646            assert_eq!(extract_json_path(&v, "$.tags"), vec!["a", "b", "c"]);
7647        }
7648
7649        #[test]
7650        fn array_mixed_scalars() {
7651            let v = json!({"vals": ["x", 1, true]});
7652            assert_eq!(extract_json_path(&v, "$.vals"), vec!["x", "1", "true"]);
7653        }
7654
7655        #[test]
7656        fn array_only_objects_returns_empty() {
7657            let v = json!({"data": [{"k": "v"}]});
7658            assert!(extract_json_path(&v, "$.data").is_empty());
7659        }
7660
7661        #[test]
7662        fn array_mixed_objects_and_scalars() {
7663            let v = json!({"data": ["keep", {"skip": true}, "also"]});
7664            assert_eq!(extract_json_path(&v, "$.data"), vec!["keep", "also"]);
7665        }
7666
7667        #[test]
7668        fn object_returns_empty() {
7669            let v = json!({"meta": {"k": "v"}});
7670            assert!(extract_json_path(&v, "$.meta").is_empty());
7671        }
7672
7673        #[test]
7674        fn no_prefix_returns_empty() {
7675            let v = json!({"name": "a"});
7676            assert!(extract_json_path(&v, "name").is_empty());
7677        }
7678    }
7679
7680    mod recursive_extraction_tests {
7681        use super::super::{
7682            LEAF_SEPARATOR, MAX_EXTRACTED_BYTES, MAX_RECURSIVE_DEPTH, PositionEntry,
7683            PropertyFtsSchema, PropertyPathEntry, extract_property_fts,
7684        };
7685        use serde_json::json;
7686
7687        fn schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
7688            PropertyFtsSchema {
7689                paths,
7690                separator: " ".to_owned(),
7691                exclude_paths: Vec::new(),
7692            }
7693        }
7694
7695        fn schema_with_excludes(
7696            paths: Vec<PropertyPathEntry>,
7697            excludes: Vec<String>,
7698        ) -> PropertyFtsSchema {
7699            PropertyFtsSchema {
7700                paths,
7701                separator: " ".to_owned(),
7702                exclude_paths: excludes,
7703            }
7704        }
7705
7706        #[test]
7707        fn recursive_extraction_walks_nested_objects_in_stable_lex_order() {
7708            let props = json!({"payload": {"b": "two", "a": "one"}});
7709            let (blob, positions, _stats) = extract_property_fts(
7710                &props,
7711                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7712            );
7713            let blob = blob.expect("blob emitted");
7714            let idx_one = blob.find("one").expect("contains 'one'");
7715            let idx_two = blob.find("two").expect("contains 'two'");
7716            assert!(
7717                idx_one < idx_two,
7718                "lex order: 'one' (key a) before 'two' (key b)"
7719            );
7720            assert_eq!(positions.len(), 2);
7721            assert_eq!(positions[0].leaf_path, "$.payload.a");
7722            assert_eq!(positions[1].leaf_path, "$.payload.b");
7723        }
7724
7725        #[test]
7726        fn recursive_extraction_walks_arrays_of_scalars() {
7727            let props = json!({"tags": ["red", "blue"]});
7728            let (_blob, positions, _stats) = extract_property_fts(
7729                &props,
7730                &schema(vec![PropertyPathEntry::recursive("$.tags")]),
7731            );
7732            assert_eq!(positions.len(), 2);
7733            assert_eq!(positions[0].leaf_path, "$.tags[0]");
7734            assert_eq!(positions[1].leaf_path, "$.tags[1]");
7735        }
7736
7737        #[test]
7738        fn recursive_extraction_walks_arrays_of_objects() {
7739            let props = json!({"items": [{"name": "a"}, {"name": "b"}]});
7740            let (_blob, positions, _stats) = extract_property_fts(
7741                &props,
7742                &schema(vec![PropertyPathEntry::recursive("$.items")]),
7743            );
7744            assert_eq!(positions.len(), 2);
7745            assert_eq!(positions[0].leaf_path, "$.items[0].name");
7746            assert_eq!(positions[1].leaf_path, "$.items[1].name");
7747        }
7748
7749        #[test]
7750        fn recursive_extraction_stringifies_numbers_and_bools() {
7751            let props = json!({"root": {"n": 42, "ok": true}});
7752            let (blob, _positions, _stats) = extract_property_fts(
7753                &props,
7754                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7755            );
7756            let blob = blob.expect("blob emitted");
7757            assert!(blob.contains("42"));
7758            assert!(blob.contains("true"));
7759        }
7760
7761        #[test]
7762        fn recursive_extraction_skips_nulls_and_missing() {
7763            let props = json!({"root": {"x": null, "y": "present"}});
7764            let (blob, positions, _stats) = extract_property_fts(
7765                &props,
7766                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7767            );
7768            let blob = blob.expect("blob emitted");
7769            assert!(!blob.contains("null"));
7770            assert_eq!(positions.len(), 1);
7771            assert_eq!(positions[0].leaf_path, "$.root.y");
7772        }
7773
7774        #[test]
7775        fn recursive_extraction_respects_max_depth_guardrail() {
7776            // Build nested object 10 levels deep. MAX_RECURSIVE_DEPTH = 8.
7777            let mut inner = json!("leaf-value");
7778            for _ in 0..10 {
7779                inner = json!({ "k": inner });
7780            }
7781            let props = json!({ "root": inner });
7782            let (blob, positions, stats) = extract_property_fts(
7783                &props,
7784                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7785            );
7786            assert!(stats.depth_cap_hit > 0, "depth cap guardrail must engage");
7787            // The walk should stop before reaching the leaf (depth 8).
7788            assert!(
7789                blob.is_none() || !blob.as_deref().unwrap_or("").contains("leaf-value"),
7790                "walk must not emit leaves past MAX_RECURSIVE_DEPTH"
7791            );
7792            // Row is still indexed with whatever was emitted — even if
7793            // that's nothing. The contract is only that the walk clamped.
7794            let _ = positions;
7795            let _ = MAX_RECURSIVE_DEPTH;
7796        }
7797
7798        #[test]
7799        fn recursive_extraction_respects_max_bytes_guardrail() {
7800            // Build an array of 4KB strings; total blob > 64KB.
7801            let leaves: Vec<String> = (0..40)
7802                .map(|i| format!("chunk-{i}-{}", "x".repeat(4096)))
7803                .collect();
7804            let props = json!({ "root": leaves });
7805            let (blob, positions, stats) = extract_property_fts(
7806                &props,
7807                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7808            );
7809            assert!(stats.byte_cap_reached, "byte cap guardrail must engage");
7810            let blob = blob.expect("blob must still be emitted");
7811            assert!(
7812                blob.len() <= MAX_EXTRACTED_BYTES,
7813                "blob must not exceed MAX_EXTRACTED_BYTES"
7814            );
7815            // No partial leaves: every position end must fall inside blob length.
7816            for pos in &positions {
7817                assert!(pos.end_offset <= blob.len());
7818                let slice = &blob[pos.start_offset..pos.end_offset];
7819                // Slice must equal some original leaf value; we only
7820                // verify it's not empty and doesn't straddle a separator.
7821                assert!(!slice.is_empty());
7822                assert!(!slice.contains(LEAF_SEPARATOR));
7823            }
7824            // Blob cannot end in a dangling separator.
7825            assert!(!blob.ends_with(LEAF_SEPARATOR));
7826        }
7827
7828        #[test]
7829        fn recursive_extraction_respects_exclude_paths() {
7830            let props = json!({"payload": {"pub": "yes", "priv": "no"}});
7831            let (blob, positions, stats) = extract_property_fts(
7832                &props,
7833                &schema_with_excludes(
7834                    vec![PropertyPathEntry::recursive("$.payload")],
7835                    vec!["$.payload.priv".to_owned()],
7836                ),
7837            );
7838            let blob = blob.expect("blob emitted");
7839            assert!(blob.contains("yes"));
7840            assert!(!blob.contains("no"));
7841            assert_eq!(positions.len(), 1);
7842            assert_eq!(positions[0].leaf_path, "$.payload.pub");
7843            assert!(stats.excluded_subtree > 0);
7844        }
7845
7846        #[test]
7847        fn position_map_entries_match_emitted_leaves_in_order() {
7848            let props = json!({"root": {"a": "alpha", "b": "bravo", "c": "charlie"}});
7849            let (blob, positions, _stats) = extract_property_fts(
7850                &props,
7851                &schema(vec![PropertyPathEntry::recursive("$.root")]),
7852            );
7853            let blob = blob.expect("blob emitted");
7854            assert_eq!(positions.len(), 3);
7855            // Leaf paths in lexicographic order.
7856            assert_eq!(positions[0].leaf_path, "$.root.a");
7857            assert_eq!(positions[1].leaf_path, "$.root.b");
7858            assert_eq!(positions[2].leaf_path, "$.root.c");
7859            // Monotonic, non-overlapping offsets.
7860            let mut prev_end: usize = 0;
7861            for (i, pos) in positions.iter().enumerate() {
7862                assert!(pos.start_offset >= prev_end);
7863                assert!(pos.end_offset > pos.start_offset);
7864                assert!(pos.end_offset <= blob.len());
7865                let slice = &blob[pos.start_offset..pos.end_offset];
7866                match i {
7867                    0 => assert_eq!(slice, "alpha"),
7868                    1 => assert_eq!(slice, "bravo"),
7869                    2 => assert_eq!(slice, "charlie"),
7870                    _ => unreachable!(),
7871                }
7872                prev_end = pos.end_offset;
7873            }
7874        }
7875
7876        #[test]
7877        fn scalar_only_schema_produces_empty_position_map() {
7878            let props = json!({"name": "alpha", "title": "beta"});
7879            let (blob, positions, _stats) = extract_property_fts(
7880                &props,
7881                &schema(vec![
7882                    PropertyPathEntry::scalar("$.name"),
7883                    PropertyPathEntry::scalar("$.title"),
7884                ]),
7885            );
7886            assert_eq!(blob.as_deref(), Some("alpha beta"));
7887            assert!(
7888                positions.is_empty(),
7889                "scalar-only schema must emit no position entries"
7890            );
7891            // Sanity-check the type so the test fails loudly if the
7892            // signature changes.
7893            let _: Vec<PositionEntry> = positions;
7894        }
7895
7896        fn assert_unique_start_offsets(positions: &[PositionEntry]) {
7897            let mut seen = std::collections::HashSet::new();
7898            for pos in positions {
7899                let start = pos.start_offset;
7900                assert!(
7901                    seen.insert(start),
7902                    "duplicate start_offset {start} in positions {positions:?}"
7903                );
7904            }
7905        }
7906
7907        #[test]
7908        fn recursive_extraction_empty_then_nonempty_in_array() {
7909            let props = json!({"payload": {"xs": ["", "x"]}});
7910            let (combined, positions, _stats) = extract_property_fts(
7911                &props,
7912                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7913            );
7914            assert_eq!(combined.as_deref(), Some("x"));
7915            assert_eq!(positions.len(), 1);
7916            assert_eq!(positions[0].leaf_path, "$.payload.xs[1]");
7917            assert_eq!(positions[0].start_offset, 0);
7918            assert_eq!(positions[0].end_offset, 1);
7919            assert_unique_start_offsets(&positions);
7920        }
7921
7922        #[test]
7923        fn recursive_extraction_two_empties_then_nonempty_in_array() {
7924            let props = json!({"payload": {"xs": ["", "", "x"]}});
7925            let (combined, positions, _stats) = extract_property_fts(
7926                &props,
7927                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7928            );
7929            assert_eq!(combined.as_deref(), Some("x"));
7930            assert_eq!(positions.len(), 1);
7931            assert_eq!(positions[0].leaf_path, "$.payload.xs[2]");
7932            assert_eq!(positions[0].start_offset, 0);
7933            assert_eq!(positions[0].end_offset, 1);
7934            assert_unique_start_offsets(&positions);
7935        }
7936
7937        #[test]
7938        fn recursive_extraction_empty_then_nonempty_sibling_keys() {
7939            let props = json!({"payload": {"a": "", "b": "x"}});
7940            let (combined, positions, _stats) = extract_property_fts(
7941                &props,
7942                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7943            );
7944            assert_eq!(combined.as_deref(), Some("x"));
7945            assert_eq!(positions.len(), 1);
7946            assert_eq!(positions[0].leaf_path, "$.payload.b");
7947            assert_eq!(positions[0].start_offset, 0);
7948            assert_eq!(positions[0].end_offset, 1);
7949            assert_unique_start_offsets(&positions);
7950        }
7951
7952        #[test]
7953        fn recursive_extraction_nested_empty_then_nonempty_sibling_keys() {
7954            let props = json!({"payload": {"inner": {"a": "", "b": "x"}}});
7955            let (combined, positions, _stats) = extract_property_fts(
7956                &props,
7957                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7958            );
7959            assert_eq!(combined.as_deref(), Some("x"));
7960            assert_eq!(positions.len(), 1);
7961            assert_eq!(positions[0].leaf_path, "$.payload.inner.b");
7962            assert_eq!(positions[0].start_offset, 0);
7963            assert_eq!(positions[0].end_offset, 1);
7964            assert_unique_start_offsets(&positions);
7965        }
7966
7967        #[test]
7968        fn recursive_extraction_descent_past_empty_sibling_into_nested_subtree() {
7969            let props = json!({"payload": {"a": "", "b": {"c": "x"}}});
7970            let (combined, positions, _stats) = extract_property_fts(
7971                &props,
7972                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7973            );
7974            assert_eq!(combined.as_deref(), Some("x"));
7975            assert_eq!(positions.len(), 1);
7976            assert_eq!(positions[0].leaf_path, "$.payload.b.c");
7977            assert_eq!(positions[0].start_offset, 0);
7978            assert_eq!(positions[0].end_offset, 1);
7979            assert_unique_start_offsets(&positions);
7980        }
7981
7982        #[test]
7983        fn recursive_extraction_all_empty_shapes_emit_no_positions() {
7984            let cases = vec![
7985                json!({"payload": {}}),
7986                json!({"payload": {"a": ""}}),
7987                json!({"payload": {"xs": []}}),
7988                json!({"payload": {"xs": [""]}}),
7989                json!({"payload": {"xs": ["", ""]}}),
7990                json!({"payload": {"xs": ["", "", ""]}}),
7991            ];
7992            for case in cases {
7993                let (combined, positions, _stats) = extract_property_fts(
7994                    &case,
7995                    &schema(vec![PropertyPathEntry::recursive("$.payload")]),
7996                );
7997                assert!(
7998                    combined.is_none(),
7999                    "all-empty payload {case:?} must produce no combined text, got {combined:?}"
8000                );
8001                assert!(
8002                    positions.is_empty(),
8003                    "all-empty payload {case:?} must produce no positions, got {positions:?}"
8004                );
8005            }
8006        }
8007
8008        #[test]
8009        fn recursive_extraction_nonempty_then_empty_then_nonempty() {
8010            let props = json!({"payload": {"xs": ["x", "", "y"]}});
8011            let (combined, positions, _stats) = extract_property_fts(
8012                &props,
8013                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8014            );
8015            let blob = combined.expect("blob emitted");
8016            assert_eq!(positions.len(), 2);
8017            assert_eq!(positions[0].leaf_path, "$.payload.xs[0]");
8018            assert_eq!(positions[1].leaf_path, "$.payload.xs[2]");
8019            assert_eq!(positions[0].start_offset, 0);
8020            assert_eq!(positions[0].end_offset, 1);
8021            // The two non-empty leaves are separated by exactly one
8022            // LEAF_SEPARATOR; the empty middle leaf contributes neither
8023            // bytes nor a position.
8024            assert_eq!(positions[1].start_offset, 1 + LEAF_SEPARATOR.len());
8025            assert_eq!(positions[1].end_offset, 2 + LEAF_SEPARATOR.len());
8026            assert_eq!(
8027                &blob[positions[0].start_offset..positions[0].end_offset],
8028                "x"
8029            );
8030            assert_eq!(
8031                &blob[positions[1].start_offset..positions[1].end_offset],
8032                "y"
8033            );
8034            assert_unique_start_offsets(&positions);
8035        }
8036
8037        #[test]
8038        fn recursive_extraction_null_leaves_unchanged() {
8039            let props = json!({"payload": {"xs": [null, null]}});
8040            let (combined, positions, _stats) = extract_property_fts(
8041                &props,
8042                &schema(vec![PropertyPathEntry::recursive("$.payload")]),
8043            );
8044            assert!(combined.is_none());
8045            assert!(positions.is_empty());
8046        }
8047    }
8048
8049    mod parse_property_schema_json_tests {
8050        use super::super::parse_property_schema_json;
8051
8052        #[test]
8053        fn parse_property_schema_json_preserves_weight() {
8054            let json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar"}]"#;
8055            let schema = parse_property_schema_json(json, " ");
8056            assert_eq!(schema.paths.len(), 2);
8057            let title = &schema.paths[0];
8058            assert_eq!(title.path, "$.title");
8059            assert_eq!(title.weight, Some(2.0_f32));
8060            let body = &schema.paths[1];
8061            assert_eq!(body.path, "$.body");
8062            assert_eq!(body.weight, None);
8063        }
8064    }
8065
8066    // --- B-3: extract_property_fts_columns tests ---
8067
8068    mod extract_property_fts_columns_tests {
8069        use serde_json::json;
8070
8071        use super::super::{PropertyFtsSchema, PropertyPathEntry, extract_property_fts_columns};
8072
8073        fn weighted_schema(paths: Vec<PropertyPathEntry>) -> PropertyFtsSchema {
8074            PropertyFtsSchema {
8075                paths,
8076                separator: " ".to_owned(),
8077                exclude_paths: Vec::new(),
8078            }
8079        }
8080
8081        #[test]
8082        fn extract_property_fts_columns_returns_per_spec_text() {
8083            let props = json!({"title": "Hello", "body": "World"});
8084            let mut title_entry = PropertyPathEntry::scalar("$.title");
8085            title_entry.weight = Some(2.0);
8086            let mut body_entry = PropertyPathEntry::scalar("$.body");
8087            body_entry.weight = Some(1.0);
8088            let schema = weighted_schema(vec![title_entry, body_entry]);
8089            let cols = extract_property_fts_columns(&props, &schema);
8090            assert_eq!(cols.len(), 2);
8091            assert_eq!(cols[0].0, "title");
8092            assert_eq!(cols[0].1, "Hello");
8093            assert_eq!(cols[1].0, "body");
8094            assert_eq!(cols[1].1, "World");
8095        }
8096
8097        #[test]
8098        fn extract_property_fts_columns_missing_field_returns_empty_string() {
8099            let props = json!({"title": "Hello"});
8100            let mut title_entry = PropertyPathEntry::scalar("$.title");
8101            title_entry.weight = Some(2.0);
8102            let mut body_entry = PropertyPathEntry::scalar("$.body");
8103            body_entry.weight = Some(1.0);
8104            let schema = weighted_schema(vec![title_entry, body_entry]);
8105            let cols = extract_property_fts_columns(&props, &schema);
8106            assert_eq!(cols.len(), 2);
8107            assert_eq!(cols[0].1, "Hello");
8108            assert_eq!(cols[1].1, "", "missing field yields empty string");
8109        }
8110    }
8111
8112    // --- B-3: writer per-column INSERT for weighted schemas ---
8113
8114    #[test]
8115    fn writer_inserts_per_column_for_weighted_schema() {
8116        use fathomdb_schema::fts_column_name;
8117
8118        let db = NamedTempFile::new().expect("temporary db");
8119        let schema_manager = Arc::new(SchemaManager::new());
8120
8121        // Bootstrap and create the weighted per-kind table.
8122        {
8123            let conn = rusqlite::Connection::open(db.path()).expect("conn");
8124            schema_manager.bootstrap(&conn).expect("bootstrap");
8125
8126            let title_col = fts_column_name("$.title", false);
8127            let body_col = fts_column_name("$.body", false);
8128
8129            // Register schema with weights.
8130            let paths_json = r#"[{"path":"$.title","mode":"scalar","weight":2.0},{"path":"$.body","mode":"scalar","weight":1.0}]"#.to_string();
8131            conn.execute(
8132                "INSERT INTO fts_property_schemas (kind, property_paths_json, separator) \
8133                 VALUES (?1, ?2, ' ')",
8134                rusqlite::params!["Article", paths_json],
8135            )
8136            .expect("register schema");
8137
8138            // Create the per-kind FTS table with per-column layout.
8139            conn.execute_batch(&format!(
8140                "CREATE VIRTUAL TABLE IF NOT EXISTS fts_props_article USING fts5(\
8141                    node_logical_id UNINDEXED, {title_col}, {body_col}, \
8142                    tokenize = 'porter unicode61 remove_diacritics 2'\
8143                )"
8144            ))
8145            .expect("create weighted per-kind table");
8146        }
8147
8148        let writer = WriterActor::start(
8149            db.path(),
8150            Arc::clone(&schema_manager),
8151            ProvenanceMode::Warn,
8152            Arc::new(TelemetryCounters::default()),
8153        )
8154        .expect("writer");
8155
8156        writer
8157            .submit(WriteRequest {
8158                label: "insert".to_owned(),
8159                nodes: vec![NodeInsert {
8160                    row_id: "row-1".to_owned(),
8161                    logical_id: "article-1".to_owned(),
8162                    kind: "Article".to_owned(),
8163                    properties: r#"{"title":"Hello World","body":"Foo Bar"}"#.to_owned(),
8164                    source_ref: Some("src-1".to_owned()),
8165                    upsert: false,
8166                    chunk_policy: ChunkPolicy::Preserve,
8167                    content_ref: None,
8168                }],
8169                node_retires: vec![],
8170                edges: vec![],
8171                edge_retires: vec![],
8172                chunks: vec![],
8173                runs: vec![],
8174                steps: vec![],
8175                actions: vec![],
8176                optional_backfills: vec![],
8177                vec_inserts: vec![],
8178                operational_writes: vec![],
8179            })
8180            .expect("write");
8181
8182        let conn = rusqlite::Connection::open(db.path()).expect("conn");
8183        let title_col = fts_column_name("$.title", false);
8184        let body_col = fts_column_name("$.body", false);
8185
8186        // The per-kind table must have one row for article-1.
8187        let count: i64 = conn
8188            .query_row(
8189                "SELECT count(*) FROM fts_props_article WHERE node_logical_id = 'article-1'",
8190                [],
8191                |r| r.get(0),
8192            )
8193            .expect("count");
8194        assert_eq!(count, 1, "per-kind FTS table must have the row");
8195
8196        // Verify per-column values via direct SELECT (not MATCH).
8197        let (title_val, body_val): (String, String) = conn
8198            .query_row(
8199                &format!(
8200                    "SELECT {title_col}, {body_col} FROM fts_props_article \
8201                     WHERE node_logical_id = 'article-1'"
8202                ),
8203                [],
8204                |r| Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?)),
8205            )
8206            .expect("select per-column");
8207        assert_eq!(
8208            title_val, "Hello World",
8209            "title column must have correct value"
8210        );
8211        assert_eq!(body_val, "Foo Bar", "body column must have correct value");
8212    }
8213}